main.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io"
  6. "os"
  7. "os/exec"
  8. "path/filepath"
  9. "strings"
  10. "sync"
  11. "time"
  12. "watch-daemon/util"
  13. "github.com/fsnotify/fsnotify"
  14. "github.com/minio/minio-go/v7"
  15. "github.com/minio/minio-go/v7/pkg/credentials"
  16. "github.com/natefinch/lumberjack"
  17. "github.com/sirupsen/logrus"
  18. "github.com/spf13/viper"
  19. )
  20. var logger *logrus.Logger
  21. var statLogger *logrus.Logger
  22. var gAppConfig *AppConfig
  23. func main() {
  24. fmt.Println("Starting watch-daemon...")
  25. // 初始化日志框架、读取配置
  26. logger = initLog()
  27. initLoadConfig()
  28. notifyChan := make(chan string)
  29. // // Start a goroutine to monitor dir and notify
  30. // go monitorDirAndNotify(notifyChan)
  31. // Start a goroutine to do full scan and upload
  32. go func() {
  33. for {
  34. notify, ok := <-notifyChan
  35. if !ok {
  36. return
  37. }
  38. logger.Debugf("received notify: %s", notify)
  39. // Drain the channel (debounce)
  40. _ = util.DrainChannelBuffer(notifyChan)
  41. doFullScanAndUpload()
  42. }
  43. }()
  44. // Initiate the first full scan and upload
  45. notifyChan <- "<start>"
  46. // Set a timer for notifying full scan and upload
  47. go func() {
  48. for {
  49. time.Sleep(time.Duration(gAppConfig.Watching.UploadFileIntervalSeconds) * time.Second)
  50. notifyChan <- "<timer>"
  51. }
  52. }()
  53. // Block main goroutine forever.
  54. <-make(chan struct{})
  55. }
  56. func doFullScanAndUpload() {
  57. logger.Debugln("begin to do full scan and upload")
  58. var minioCreds *credentials.Credentials
  59. if gAppConfig.Minio.AccessKeyID != "" && gAppConfig.Minio.SecretAccessKey != "" {
  60. minioCreds = credentials.NewStaticV4(gAppConfig.Minio.AccessKeyID, gAppConfig.Minio.SecretAccessKey, "")
  61. } else {
  62. // Make anonymous client
  63. minioCreds = credentials.NewStaticV4("", "", "")
  64. }
  65. minioClient, err := minio.New(gAppConfig.Minio.Addr, &minio.Options{
  66. Creds: minioCreds,
  67. Secure: gAppConfig.Minio.UseSSL,
  68. })
  69. if err != nil {
  70. logger.Errorf("MinIO Client init error: %s", err)
  71. return
  72. }
  73. logger.Debugln("MinIO Client初始化成功")
  74. // Make sure `.trash` folder exists in local
  75. trashFolder := filepath.Join(gAppConfig.Watching.Dir, ".trash")
  76. if !util.FileExists(trashFolder) {
  77. err := os.Mkdir(trashFolder, 0777)
  78. if err != nil {
  79. logger.Errorf("create trash folder error: %v", err)
  80. // return
  81. }
  82. }
  83. // Enumerate all files in the folder
  84. entries, err := os.ReadDir(gAppConfig.Watching.Dir)
  85. if err != nil {
  86. logger.Error(err)
  87. return
  88. }
  89. // Upload files
  90. for _, entry := range entries {
  91. if strings.HasPrefix(entry.Name(), ".") {
  92. // Skip hidden files
  93. continue
  94. }
  95. if entry.IsDir() {
  96. // Upload the folder stream first
  97. localPath := filepath.Join(gAppConfig.Watching.Dir, entry.Name())
  98. objPath := entry.Name()
  99. fullyUploaded, err := uploadOneStreamFolder(localPath, objPath, minioClient)
  100. if err != nil {
  101. logger.Errorf("upload folder `%s` error: %v", localPath, err)
  102. } else {
  103. if fullyUploaded {
  104. // Remove the uploaded folder
  105. logger.Infof("fully uploaded, removing folder: %s", localPath)
  106. err := os.RemoveAll(localPath)
  107. if err != nil {
  108. logger.Errorf("remove folder `%s` error: %v", localPath, err)
  109. }
  110. } else {
  111. logger.Infof("not fully uploaded, keeping folder: %s", localPath)
  112. }
  113. }
  114. // Then remove folder if it is too old
  115. if gAppConfig.Watching.DeleteFileAfterMinutes > 0 {
  116. info, err := entry.Info()
  117. if err != nil {
  118. logger.Errorf("get info for folder `%s` error: %v", localPath, err)
  119. continue
  120. }
  121. entryModTime := info.ModTime()
  122. entryAgeMinutes := time.Since(entryModTime).Minutes()
  123. if entryAgeMinutes > float64(gAppConfig.Watching.DeleteFileAfterMinutes) {
  124. logger.Infof("removing folder `%s` because it is too old (last modified: %v, %v minutes ago)",
  125. localPath, util.FmtMyTime(entryModTime), entryAgeMinutes)
  126. err := os.RemoveAll(localPath)
  127. if err != nil {
  128. logger.Errorf("remove folder `%s` error: %v", localPath, err)
  129. }
  130. }
  131. }
  132. }
  133. }
  134. logger.Debugln("full scan and upload done")
  135. }
  136. func uploadOneStreamFolder(localPath string, objPath string, minioClient *minio.Client) (fullyUploaded bool, err error) {
  137. type StreamObjectUploadStatistics struct {
  138. StartTime time.Time
  139. EndTime time.Time
  140. Name string
  141. Succeeded bool
  142. Msg string
  143. }
  144. type StreamUploadStatistics struct {
  145. StartTime time.Time
  146. EndTime time.Time
  147. MetaPartsCount int
  148. MetaPointsCount int
  149. Objects map[string]StreamObjectUploadStatistics
  150. Msg string
  151. }
  152. streamUploadStatistics := StreamUploadStatistics{
  153. Objects: make(map[string]StreamObjectUploadStatistics),
  154. }
  155. streamUploadStatistics.StartTime = time.Now()
  156. defer func() {
  157. streamUploadStatistics.EndTime = time.Now()
  158. okCount := 0
  159. failCount := 0
  160. totalCount := 0
  161. objDetailsMsg := ""
  162. for _, obj := range streamUploadStatistics.Objects {
  163. totalCount++
  164. if obj.Succeeded {
  165. okCount++
  166. objDetailsMsg += fmt.Sprintf("\n{%s: OK (t: %v ~ %v, duration: %v)}",
  167. obj.Name, util.FmtMyTime(obj.StartTime), util.FmtMyTime(obj.EndTime), obj.EndTime.Sub(obj.StartTime))
  168. } else {
  169. failCount++
  170. objDetailsMsg += fmt.Sprintf("\n{%s: FAIL (Reason: %s) (t: %v ~ %v, duration: %v)}",
  171. obj.Name, obj.Msg, util.FmtMyTime(obj.StartTime), util.FmtMyTime(obj.EndTime), obj.EndTime.Sub(obj.StartTime))
  172. }
  173. }
  174. statLogger.Infof("upload folder `%s` took %v (%v ~ %v); is fully uploaded: %v; metadata-reported parts & points count: %d & %d;"+
  175. " ok / fail / total: %d / %d / %d; details:%s",
  176. localPath, streamUploadStatistics.EndTime.Sub(streamUploadStatistics.StartTime),
  177. util.FmtMyTime(streamUploadStatistics.StartTime), util.FmtMyTime(streamUploadStatistics.EndTime),
  178. fullyUploaded, streamUploadStatistics.MetaPartsCount, streamUploadStatistics.MetaPointsCount,
  179. okCount, failCount, totalCount, objDetailsMsg)
  180. }()
  181. bucket := gAppConfig.Minio.Bucket
  182. fullyUploaded = true
  183. streamDateStr := util.FmtMyDate(time.Now())
  184. checkMetadataFile := func(localPath string) error {
  185. // Check for JSON metadata file. If its `total_points` is 0,
  186. // then the producer has not yet finished writing the stream, so we
  187. // should not delete the folder.
  188. metaFilePath := filepath.Join(localPath, "metadata.json")
  189. // Read metadata file
  190. metaFile, err := os.Open(metaFilePath)
  191. if err != nil {
  192. return fmt.Errorf("open metadata file `%s` error: %w", metaFilePath, err)
  193. }
  194. defer metaFile.Close()
  195. var metadata struct {
  196. TotalPoints int `json:"total_points"`
  197. PartsCount int `json:"parts_count"`
  198. TimestampOffset int64 `json:"timestamp_offset"` // In milliseconds
  199. }
  200. if err := json.NewDecoder(metaFile).Decode(&metadata); err != nil {
  201. return fmt.Errorf("decode metadata file `%s` error: %w", metaFilePath, err)
  202. }
  203. if metadata.TotalPoints == 0 {
  204. logger.Infof("metadata file `%s` indicates not fully uploaded because total_points is 0", metaFilePath)
  205. fullyUploaded = false
  206. }
  207. streamUploadStatistics.MetaPartsCount = metadata.PartsCount
  208. streamUploadStatistics.MetaPointsCount = metadata.TotalPoints
  209. streamDateStr = util.FmtMyDate(time.Unix(metadata.TimestampOffset/1000, 0))
  210. return nil
  211. }
  212. // NOTE: We need to check metadata file before obtaining the list of files
  213. // to prevent TOCTOU issues.
  214. if fullyUploaded {
  215. if !util.FileExists(filepath.Join(localPath, "metadata.json")) {
  216. logger.Infof("metadata file not found in folder `%s`, skipping", localPath)
  217. return false, nil
  218. }
  219. err := checkMetadataFile(localPath)
  220. if err != nil {
  221. return false, fmt.Errorf("check metadata file error: %w", err)
  222. }
  223. }
  224. // Enumerate all files in the folder
  225. entries, err := os.ReadDir(localPath)
  226. if err != nil {
  227. return false, fmt.Errorf("read dir `%s` error: %w", localPath, err)
  228. }
  229. // Create folder in minio
  230. objFolderPath := streamDateStr + "/" + objPath + "/"
  231. err = util.MinioCreateFolderIfNotExists(minioClient, bucket, objFolderPath)
  232. if err != nil {
  233. return false, fmt.Errorf("create minio folder `%s` error: %w", objFolderPath, err)
  234. }
  235. // Upload metadata file
  236. _, err = util.MinioUploadFileIfChanged(minioClient, bucket,
  237. objFolderPath+"metadata.json", filepath.Join(localPath, "metadata.json"))
  238. if err != nil {
  239. return false, fmt.Errorf("upload metadata file error: %w", err)
  240. }
  241. // Upload files
  242. for _, entry := range entries {
  243. // Skip hidden files
  244. if strings.HasPrefix(entry.Name(), ".") {
  245. continue
  246. }
  247. // Skip metadata file
  248. if entry.Name() == "metadata.json" {
  249. continue
  250. }
  251. // Skip partial files
  252. if strings.HasSuffix(entry.Name(), ".part") {
  253. logger.Infof("skipping partial file `%s`", entry.Name())
  254. fullyUploaded = false
  255. continue
  256. }
  257. innerLocalPath := filepath.Join(localPath, entry.Name())
  258. if entry.IsDir() {
  259. // Upload files in the folder
  260. // logger.Warnf("stream folder contains subfolder `%s`, ignoring", entry.Name())
  261. innerObjPath := objPath + "/" + entry.Name()
  262. fullyUploadedInner, err := uploadOneStreamFolder(innerLocalPath, innerObjPath, minioClient)
  263. if err != nil {
  264. return false, err
  265. }
  266. if !fullyUploadedInner {
  267. fullyUploaded = false
  268. }
  269. } else {
  270. // Upload the file
  271. objStat := StreamObjectUploadStatistics{
  272. StartTime: time.Now(),
  273. Name: entry.Name(),
  274. }
  275. _, err := util.MinioUploadFile(minioClient, bucket,
  276. objFolderPath+entry.Name(), innerLocalPath, true)
  277. objStat.EndTime = time.Now()
  278. if err != nil {
  279. objStat.Succeeded = false
  280. objStat.Msg = err.Error()
  281. // return false, fmt.Errorf("upload file `%s` error: %w", entry.Name(), err)
  282. logger.Errorf("upload file `%s` of stream `%s` error: %v, took %v",
  283. entry.Name(), objPath, err, objStat.EndTime.Sub(objStat.StartTime))
  284. } else {
  285. objStat.Succeeded = true
  286. logger.Infof("uploaded file `%s`, took %v", entry.Name(), objStat.EndTime.Sub(objStat.StartTime))
  287. // Remove *.zst files early
  288. if strings.HasSuffix(entry.Name(), ".zst") {
  289. err := os.Remove(innerLocalPath)
  290. if err != nil {
  291. logger.Errorf("remove file `%s` error: %v", entry.Name(), err)
  292. }
  293. }
  294. }
  295. streamUploadStatistics.Objects[entry.Name()] = objStat
  296. }
  297. }
  298. return fullyUploaded, nil
  299. }
  300. func monitorDirAndNotify(notifyChan chan string) {
  301. scanFn := func() {
  302. // Create new watcher.
  303. watcher, err := fsnotify.NewWatcher()
  304. if err != nil {
  305. logger.Errorf("unable to create watcher: %s", err)
  306. }
  307. defer watcher.Close()
  308. // Add a path.
  309. dir := gAppConfig.Watching.Dir
  310. if err := watcher.Add(dir); err != nil {
  311. logger.Errorf("unable to watch dir `%s`: %s", dir, err)
  312. return
  313. }
  314. var wg sync.WaitGroup
  315. wg.Add(2)
  316. // Start listening for events.
  317. go func() {
  318. defer wg.Done()
  319. for event := range watcher.Events {
  320. logger.Debugf("received event: %v", event)
  321. // Notify
  322. notifyChan <- "<FSNOTIFY: " + event.Name + ">"
  323. }
  324. }()
  325. // Start listening for errors.
  326. go func() {
  327. defer wg.Done()
  328. for err := range watcher.Errors {
  329. logger.Errorf("fsnotify error: %s", err)
  330. }
  331. }()
  332. wg.Wait()
  333. }
  334. for {
  335. // scan
  336. scanFn()
  337. logger.Errorf("scan task stopped unexpectedly, will retry in %d seconds", gAppConfig.Watching.UploadFileIntervalSeconds)
  338. // sleep
  339. time.Sleep(time.Duration(gAppConfig.Watching.UploadFileIntervalSeconds) * time.Second)
  340. }
  341. }
  342. func initLog() *logrus.Logger {
  343. // 主日志文件
  344. log := &lumberjack.Logger{
  345. Filename: "./log/watch-daemon.log", // 日志文件的位置
  346. MaxSize: 50, // 文件最大尺寸(以MB为单位)
  347. MaxBackups: 5, // 保留的最大旧文件数量
  348. MaxAge: 28, // 保留旧文件的最大天数
  349. Compress: true, // 是否压缩/归档旧文件
  350. LocalTime: true, // 使用本地时间创建时间戳
  351. }
  352. // 错误日志文件
  353. errorLog := &lumberjack.Logger{
  354. Filename: "./log/watch-daemon.error.log", // 错误日志文件的位置
  355. MaxSize: 50, // 文件最大尺寸(以MB为单位)
  356. MaxBackups: 5, // 保留的最大旧文件数量
  357. MaxAge: 28, // 保留旧文件的最大天数
  358. Compress: true, // 是否压缩/归档旧文件
  359. LocalTime: true, // 使用本地时间创建时间戳
  360. }
  361. // 统计日志文件
  362. statLog := &lumberjack.Logger{
  363. Filename: "./log/watch-daemon.stat.log", // 统计日志文件的位置
  364. MaxSize: 50, // 文件最大尺寸(以MB为单位)
  365. MaxBackups: 5, // 保留的最大旧文件数量
  366. MaxAge: 28, // 保留旧文件的最大天数
  367. Compress: true, // 是否压缩/归档旧文件
  368. LocalTime: true, // 使用本地时间创建时间戳
  369. }
  370. logger := logrus.New()
  371. if strings.ToLower(os.Getenv("LOG_LEVEL")) == "trace" {
  372. logger.SetLevel(logrus.TraceLevel)
  373. } else {
  374. logger.SetLevel(logrus.DebugLevel)
  375. }
  376. logger.Out = log
  377. // logger.Out = io.MultiWriter(os.Stdout, log)
  378. // 设置错误级别日志输出到额外的文件
  379. logger.AddHook(&ErrorHook{
  380. Writer: errorLog,
  381. LogLevels: []logrus.Level{
  382. logrus.ErrorLevel,
  383. logrus.FatalLevel,
  384. logrus.PanicLevel,
  385. },
  386. })
  387. statLogger = logrus.New()
  388. statLogger.SetLevel(logrus.InfoLevel)
  389. statLogger.Out = statLog
  390. return logger
  391. }
  392. // ErrorHook 用于将错误级别的日志输出到额外的文件
  393. type ErrorHook struct {
  394. Writer io.Writer
  395. LogLevels []logrus.Level
  396. }
  397. func (hook *ErrorHook) Fire(entry *logrus.Entry) error {
  398. line, err := entry.String()
  399. if err != nil {
  400. return err
  401. }
  402. _, err = hook.Writer.Write([]byte(line))
  403. return err
  404. }
  405. func (hook *ErrorHook) Levels() []logrus.Level {
  406. return hook.LogLevels
  407. }
  408. func initLoadConfig() {
  409. gAppConfig = &AppConfig{}
  410. configFilePath := "./config/application.yaml"
  411. viper.SetDefault("watching.uploadFileIntervalSeconds", 3)
  412. viper.SetDefault("watching.deleteFileAfterMinutes", 60)
  413. viper.SetConfigFile(configFilePath)
  414. // 热更新
  415. viper.WatchConfig()
  416. viper.OnConfigChange(func(e fsnotify.Event) {
  417. fmt.Printf("配置文件 %s 发生了更改!!!\n", e.Name)
  418. // 重新加载配置
  419. err := loadConfig()
  420. if err != nil {
  421. fmt.Printf("load config `%s` failed, err: %s\n", configFilePath, err)
  422. }
  423. })
  424. err := loadConfig()
  425. if err != nil {
  426. logger.Error(err)
  427. logger.Errorf("无法加载配置,程序退出")
  428. fmt.Println("无法加载配置,程序退出")
  429. os.Exit(1)
  430. }
  431. }
  432. func loadConfig() error {
  433. err := viper.ReadInConfig()
  434. if err != nil {
  435. return err
  436. }
  437. //把读取到的配置信息反序列化到 config 变量中
  438. var config AppConfig
  439. if err := viper.Unmarshal(&config); err != nil {
  440. fmt.Printf("viper Unmarshal config failed, err: %s\n", err)
  441. return err
  442. }
  443. if strings.Contains(config.Minio.Addr, "$") {
  444. // Start a shell to expand the environment variables
  445. cmd := exec.Command("sh", "-c", "echo "+config.Minio.Addr)
  446. out, err := cmd.Output()
  447. if err == nil {
  448. config.Minio.Addr = strings.TrimSpace(string(out))
  449. }
  450. }
  451. gAppConfig = &config
  452. return nil
  453. }