main.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io"
  6. "os"
  7. "os/exec"
  8. "path/filepath"
  9. "strings"
  10. "sync"
  11. "time"
  12. "watch-daemon/util"
  13. "github.com/fsnotify/fsnotify"
  14. "github.com/minio/minio-go/v7"
  15. "github.com/minio/minio-go/v7/pkg/credentials"
  16. "github.com/natefinch/lumberjack"
  17. "github.com/sirupsen/logrus"
  18. "github.com/spf13/viper"
  19. )
  20. var logger *logrus.Logger
  21. var statLogger *logrus.Logger
  22. var gAppConfig *AppConfig
  23. func main() {
  24. fmt.Println("Starting watch-daemon...")
  25. // 初始化日志框架、读取配置
  26. logger = initLog()
  27. initLoadConfig()
  28. notifyChan := make(chan string)
  29. // // Start a goroutine to monitor dir and notify
  30. // go monitorDirAndNotify(notifyChan)
  31. // Start a goroutine to do full scan and upload
  32. go func() {
  33. for {
  34. notify, ok := <-notifyChan
  35. if !ok {
  36. return
  37. }
  38. logger.Debugf("received notify: %s", notify)
  39. // Drain the channel (debounce)
  40. _ = util.DrainChannelBuffer(notifyChan)
  41. doFullScanAndUpload()
  42. }
  43. }()
  44. // Initiate the first full scan and upload
  45. notifyChan <- "<start>"
  46. // Set a timer for notifying full scan and upload
  47. go func() {
  48. for {
  49. time.Sleep(time.Duration(gAppConfig.Watching.UploadFileIntervalSeconds) * time.Second)
  50. notifyChan <- "<timer>"
  51. }
  52. }()
  53. // Block main goroutine forever.
  54. <-make(chan struct{})
  55. }
  56. func doFullScanAndUpload() {
  57. logger.Debugln("begin to do full scan and upload")
  58. var minioCreds *credentials.Credentials
  59. if gAppConfig.Minio.AccessKeyID != "" && gAppConfig.Minio.SecretAccessKey != "" {
  60. minioCreds = credentials.NewStaticV4(gAppConfig.Minio.AccessKeyID, gAppConfig.Minio.SecretAccessKey, "")
  61. } else {
  62. // Make anonymous client
  63. minioCreds = credentials.NewStaticV4("", "", "")
  64. }
  65. minioClient, err := minio.New(gAppConfig.Minio.Addr, &minio.Options{
  66. Creds: minioCreds,
  67. Secure: gAppConfig.Minio.UseSSL,
  68. })
  69. if err != nil {
  70. logger.Errorf("MinIO Client init error: %s", err)
  71. return
  72. }
  73. logger.Debugln("MinIO Client初始化成功")
  74. // Make sure `.trash` folder exists in local
  75. trashFolder := filepath.Join(gAppConfig.Watching.Dir, ".trash")
  76. if !util.FileExists(trashFolder) {
  77. err := os.Mkdir(trashFolder, 0777)
  78. if err != nil {
  79. logger.Errorf("create trash folder error: %v", err)
  80. // return
  81. }
  82. }
  83. // Enumerate all files in the folder
  84. entries, err := os.ReadDir(gAppConfig.Watching.Dir)
  85. if err != nil {
  86. logger.Error(err)
  87. return
  88. }
  89. // Upload files
  90. for _, entry := range entries {
  91. if strings.HasPrefix(entry.Name(), ".") {
  92. // Skip hidden files
  93. continue
  94. }
  95. if entry.IsDir() {
  96. // Upload the folder stream first
  97. localPath := filepath.Join(gAppConfig.Watching.Dir, entry.Name())
  98. objPath := entry.Name()
  99. fullyUploaded, err := uploadOneStreamFolder(localPath, objPath, minioClient)
  100. if err != nil {
  101. logger.Errorf("upload folder `%s` error: %v", localPath, err)
  102. } else {
  103. if fullyUploaded {
  104. // Remove the uploaded folder
  105. logger.Infof("fully uploaded, removing folder: %s", localPath)
  106. err := os.RemoveAll(localPath)
  107. if err != nil {
  108. logger.Errorf("remove folder `%s` error: %v", localPath, err)
  109. }
  110. } else {
  111. logger.Infof("not fully uploaded, keeping folder: %s", localPath)
  112. }
  113. }
  114. // Then remove folder if it is too old
  115. if gAppConfig.Watching.DeleteFileAfterMinutes > 0 {
  116. info, err := entry.Info()
  117. if err != nil {
  118. logger.Errorf("get info for folder `%s` error: %v", localPath, err)
  119. continue
  120. }
  121. entryModTime := info.ModTime()
  122. entryAgeMinutes := time.Since(entryModTime).Minutes()
  123. if entryAgeMinutes > float64(gAppConfig.Watching.DeleteFileAfterMinutes) {
  124. logger.Infof("removing folder `%s` because it is too old (last modified: %v, %v minutes ago)",
  125. localPath, util.FmtMyTime(entryModTime), entryAgeMinutes)
  126. err := os.RemoveAll(localPath)
  127. if err != nil {
  128. logger.Errorf("remove folder `%s` error: %v", localPath, err)
  129. }
  130. }
  131. }
  132. }
  133. }
  134. logger.Debugln("full scan and upload done")
  135. }
  136. func uploadOneStreamFolder(localPath string, objPath string, minioClient *minio.Client) (fullyUploaded bool, err error) {
  137. type StreamObjectUploadStatistics struct {
  138. StartTime time.Time
  139. EndTime time.Time
  140. Name string
  141. Succeeded bool
  142. Msg string
  143. }
  144. type StreamUploadStatistics struct {
  145. StartTime time.Time
  146. EndTime time.Time
  147. MetaPartsCount int
  148. MetaPointsCount int
  149. Objects map[string]StreamObjectUploadStatistics
  150. Msg string
  151. }
  152. streamUploadStatistics := StreamUploadStatistics{
  153. Objects: make(map[string]StreamObjectUploadStatistics),
  154. }
  155. streamUploadStatistics.StartTime = time.Now()
  156. defer func() {
  157. streamUploadStatistics.EndTime = time.Now()
  158. okCount := 0
  159. failCount := 0
  160. totalCount := 0
  161. objDetailsMsg := ""
  162. for _, obj := range streamUploadStatistics.Objects {
  163. totalCount++
  164. if obj.Succeeded {
  165. okCount++
  166. objDetailsMsg += fmt.Sprintf("\n{%s: OK (t: %v ~ %v, duration: %v)}",
  167. obj.Name, util.FmtMyTime(obj.StartTime), util.FmtMyTime(obj.EndTime), obj.EndTime.Sub(obj.StartTime))
  168. } else {
  169. failCount++
  170. objDetailsMsg += fmt.Sprintf("\n{%s: FAIL (Reason: %s) (t: %v ~ %v, duration: %v)}",
  171. obj.Name, obj.Msg, util.FmtMyTime(obj.StartTime), util.FmtMyTime(obj.EndTime), obj.EndTime.Sub(obj.StartTime))
  172. }
  173. }
  174. statLogger.Infof("upload folder `%s` took %v (%v ~ %v); is fully uploaded: %v; metadata-reported parts & points count: %d & %d;"+
  175. " ok / fail / total: %d / %d / %d; details:%s",
  176. localPath, streamUploadStatistics.EndTime.Sub(streamUploadStatistics.StartTime),
  177. util.FmtMyTime(streamUploadStatistics.StartTime), util.FmtMyTime(streamUploadStatistics.EndTime),
  178. fullyUploaded, streamUploadStatistics.MetaPartsCount, streamUploadStatistics.MetaPointsCount,
  179. okCount, failCount, totalCount, objDetailsMsg)
  180. }()
  181. bucket := gAppConfig.Minio.Bucket
  182. fullyUploaded = true
  183. streamDateStr := util.FmtMyDate(time.Now())
  184. checkMetadataFile := func(localPath string) error {
  185. // Check for JSON metadata file. If its `total_points` is 0,
  186. // then the producer has not yet finished writing the stream, so we
  187. // should not delete the folder.
  188. metaFilePath := filepath.Join(localPath, "metadata.json")
  189. // Read metadata file
  190. metaFile, err := os.Open(metaFilePath)
  191. if err != nil {
  192. return fmt.Errorf("open metadata file `%s` error: %w", metaFilePath, err)
  193. }
  194. defer metaFile.Close()
  195. var metadata struct {
  196. TotalPoints int `json:"total_points"`
  197. PartsCount int `json:"parts_count"`
  198. TimestampOffset int64 `json:"timestamp_offset"` // In milliseconds
  199. }
  200. if err := json.NewDecoder(metaFile).Decode(&metadata); err != nil {
  201. return fmt.Errorf("decode metadata file `%s` error: %w", metaFilePath, err)
  202. }
  203. if metadata.TotalPoints == 0 {
  204. logger.Infof("metadata file `%s` indicates not fully uploaded because total_points is 0", metaFilePath)
  205. fullyUploaded = false
  206. }
  207. streamUploadStatistics.MetaPartsCount = metadata.PartsCount
  208. streamUploadStatistics.MetaPointsCount = metadata.TotalPoints
  209. streamDateStr = util.FmtMyDate(time.Unix(metadata.TimestampOffset/1000, 0))
  210. return nil
  211. }
  212. // NOTE: We need to check metadata file before obtaining the list of files
  213. // to prevent TOCTOU issues.
  214. if fullyUploaded {
  215. if !util.FileExists(filepath.Join(localPath, "metadata.json")) {
  216. logger.Infof("metadata file not found in folder `%s`, skipping", localPath)
  217. return false, nil
  218. }
  219. err := checkMetadataFile(localPath)
  220. if err != nil {
  221. return false, fmt.Errorf("check metadata file error: %w", err)
  222. }
  223. }
  224. // Enumerate all files in the folder
  225. entries, err := os.ReadDir(localPath)
  226. if err != nil {
  227. return false, fmt.Errorf("read dir `%s` error: %w", localPath, err)
  228. }
  229. // Create folder in minio
  230. // objFolderPath := streamDateStr + "/" + objPath + "/"
  231. streamDateStr = streamDateStr
  232. objFolderPath := objPath + "/"
  233. err = util.MinioCreateFolderIfNotExists(minioClient, bucket, objFolderPath)
  234. if err != nil {
  235. return false, fmt.Errorf("create minio folder `%s` error: %w", objFolderPath, err)
  236. }
  237. // Upload metadata file
  238. _, err = util.MinioUploadFileIfChanged(minioClient, bucket,
  239. objFolderPath+"metadata.json", filepath.Join(localPath, "metadata.json"))
  240. if err != nil {
  241. return false, fmt.Errorf("upload metadata file error: %w", err)
  242. }
  243. // Upload files
  244. for _, entry := range entries {
  245. // Skip hidden files
  246. if strings.HasPrefix(entry.Name(), ".") {
  247. continue
  248. }
  249. // Skip metadata file
  250. if entry.Name() == "metadata.json" {
  251. continue
  252. }
  253. // Skip partial files
  254. if strings.HasSuffix(entry.Name(), ".part") {
  255. logger.Infof("skipping partial file `%s`", entry.Name())
  256. fullyUploaded = false
  257. continue
  258. }
  259. innerLocalPath := filepath.Join(localPath, entry.Name())
  260. if entry.IsDir() {
  261. // Upload files in the folder
  262. // logger.Warnf("stream folder contains subfolder `%s`, ignoring", entry.Name())
  263. innerObjPath := objPath + "/" + entry.Name()
  264. fullyUploadedInner, err := uploadOneStreamFolder(innerLocalPath, innerObjPath, minioClient)
  265. if err != nil {
  266. return false, err
  267. }
  268. if !fullyUploadedInner {
  269. fullyUploaded = false
  270. }
  271. } else {
  272. // Upload the file
  273. objStat := StreamObjectUploadStatistics{
  274. StartTime: time.Now(),
  275. Name: entry.Name(),
  276. }
  277. _, err := util.MinioUploadFile(minioClient, bucket,
  278. objFolderPath+entry.Name(), innerLocalPath, true)
  279. objStat.EndTime = time.Now()
  280. if err != nil {
  281. objStat.Succeeded = false
  282. objStat.Msg = err.Error()
  283. // return false, fmt.Errorf("upload file `%s` error: %w", entry.Name(), err)
  284. logger.Errorf("upload file `%s` of stream `%s` error: %v, took %v",
  285. entry.Name(), objPath, err, objStat.EndTime.Sub(objStat.StartTime))
  286. } else {
  287. objStat.Succeeded = true
  288. logger.Infof("uploaded file `%s`, took %v", entry.Name(), objStat.EndTime.Sub(objStat.StartTime))
  289. // Remove *.zst files early
  290. if strings.HasSuffix(entry.Name(), ".zst") {
  291. err := os.Remove(innerLocalPath)
  292. if err != nil {
  293. logger.Errorf("remove file `%s` error: %v", entry.Name(), err)
  294. }
  295. }
  296. }
  297. streamUploadStatistics.Objects[entry.Name()] = objStat
  298. }
  299. }
  300. return fullyUploaded, nil
  301. }
  302. func monitorDirAndNotify(notifyChan chan string) {
  303. scanFn := func() {
  304. // Create new watcher.
  305. watcher, err := fsnotify.NewWatcher()
  306. if err != nil {
  307. logger.Errorf("unable to create watcher: %s", err)
  308. }
  309. defer watcher.Close()
  310. // Add a path.
  311. dir := gAppConfig.Watching.Dir
  312. if err := watcher.Add(dir); err != nil {
  313. logger.Errorf("unable to watch dir `%s`: %s", dir, err)
  314. return
  315. }
  316. var wg sync.WaitGroup
  317. wg.Add(2)
  318. // Start listening for events.
  319. go func() {
  320. defer wg.Done()
  321. for event := range watcher.Events {
  322. logger.Debugf("received event: %v", event)
  323. // Notify
  324. notifyChan <- "<FSNOTIFY: " + event.Name + ">"
  325. }
  326. }()
  327. // Start listening for errors.
  328. go func() {
  329. defer wg.Done()
  330. for err := range watcher.Errors {
  331. logger.Errorf("fsnotify error: %s", err)
  332. }
  333. }()
  334. wg.Wait()
  335. }
  336. for {
  337. // scan
  338. scanFn()
  339. logger.Errorf("scan task stopped unexpectedly, will retry in %d seconds", gAppConfig.Watching.UploadFileIntervalSeconds)
  340. // sleep
  341. time.Sleep(time.Duration(gAppConfig.Watching.UploadFileIntervalSeconds) * time.Second)
  342. }
  343. }
  344. func initLog() *logrus.Logger {
  345. // 主日志文件
  346. log := &lumberjack.Logger{
  347. Filename: "./log/watch-daemon.log", // 日志文件的位置
  348. MaxSize: 50, // 文件最大尺寸(以MB为单位)
  349. MaxBackups: 5, // 保留的最大旧文件数量
  350. MaxAge: 28, // 保留旧文件的最大天数
  351. Compress: true, // 是否压缩/归档旧文件
  352. LocalTime: true, // 使用本地时间创建时间戳
  353. }
  354. // 错误日志文件
  355. errorLog := &lumberjack.Logger{
  356. Filename: "./log/watch-daemon.error.log", // 错误日志文件的位置
  357. MaxSize: 50, // 文件最大尺寸(以MB为单位)
  358. MaxBackups: 5, // 保留的最大旧文件数量
  359. MaxAge: 28, // 保留旧文件的最大天数
  360. Compress: true, // 是否压缩/归档旧文件
  361. LocalTime: true, // 使用本地时间创建时间戳
  362. }
  363. // 统计日志文件
  364. statLog := &lumberjack.Logger{
  365. Filename: "./log/watch-daemon.stat.log", // 统计日志文件的位置
  366. MaxSize: 50, // 文件最大尺寸(以MB为单位)
  367. MaxBackups: 5, // 保留的最大旧文件数量
  368. MaxAge: 28, // 保留旧文件的最大天数
  369. Compress: true, // 是否压缩/归档旧文件
  370. LocalTime: true, // 使用本地时间创建时间戳
  371. }
  372. logger := logrus.New()
  373. if strings.ToLower(os.Getenv("LOG_LEVEL")) == "trace" {
  374. logger.SetLevel(logrus.TraceLevel)
  375. } else {
  376. logger.SetLevel(logrus.DebugLevel)
  377. }
  378. logger.Out = log
  379. // logger.Out = io.MultiWriter(os.Stdout, log)
  380. // 设置错误级别日志输出到额外的文件
  381. logger.AddHook(&ErrorHook{
  382. Writer: errorLog,
  383. LogLevels: []logrus.Level{
  384. logrus.ErrorLevel,
  385. logrus.FatalLevel,
  386. logrus.PanicLevel,
  387. },
  388. })
  389. statLogger = logrus.New()
  390. statLogger.SetLevel(logrus.InfoLevel)
  391. statLogger.Out = statLog
  392. return logger
  393. }
  394. // ErrorHook 用于将错误级别的日志输出到额外的文件
  395. type ErrorHook struct {
  396. Writer io.Writer
  397. LogLevels []logrus.Level
  398. }
  399. func (hook *ErrorHook) Fire(entry *logrus.Entry) error {
  400. line, err := entry.String()
  401. if err != nil {
  402. return err
  403. }
  404. _, err = hook.Writer.Write([]byte(line))
  405. return err
  406. }
  407. func (hook *ErrorHook) Levels() []logrus.Level {
  408. return hook.LogLevels
  409. }
  410. func initLoadConfig() {
  411. gAppConfig = &AppConfig{}
  412. configFilePath := "./config/application.yaml"
  413. viper.SetDefault("watching.uploadFileIntervalSeconds", 3)
  414. viper.SetDefault("watching.deleteFileAfterMinutes", 60)
  415. viper.SetConfigFile(configFilePath)
  416. // 热更新
  417. viper.WatchConfig()
  418. viper.OnConfigChange(func(e fsnotify.Event) {
  419. fmt.Printf("配置文件 %s 发生了更改!!!\n", e.Name)
  420. // 重新加载配置
  421. err := loadConfig()
  422. if err != nil {
  423. fmt.Printf("load config `%s` failed, err: %s\n", configFilePath, err)
  424. }
  425. })
  426. err := loadConfig()
  427. if err != nil {
  428. logger.Error(err)
  429. logger.Errorf("无法加载配置,程序退出")
  430. fmt.Println("无法加载配置,程序退出")
  431. os.Exit(1)
  432. }
  433. }
  434. func loadConfig() error {
  435. err := viper.ReadInConfig()
  436. if err != nil {
  437. return err
  438. }
  439. //把读取到的配置信息反序列化到 config 变量中
  440. var config AppConfig
  441. if err := viper.Unmarshal(&config); err != nil {
  442. fmt.Printf("viper Unmarshal config failed, err: %s\n", err)
  443. return err
  444. }
  445. if strings.Contains(config.Minio.Addr, "$") {
  446. // Start a shell to expand the environment variables
  447. cmd := exec.Command("sh", "-c", "echo "+config.Minio.Addr)
  448. out, err := cmd.Output()
  449. if err == nil {
  450. config.Minio.Addr = strings.TrimSpace(string(out))
  451. }
  452. }
  453. gAppConfig = &config
  454. return nil
  455. }