main.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497
  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "io"
  8. "os"
  9. "os/exec"
  10. "path/filepath"
  11. "strings"
  12. "sync"
  13. "time"
  14. "watch-daemon/util"
  15. "github.com/fsnotify/fsnotify"
  16. "github.com/minio/minio-go/v7"
  17. "github.com/minio/minio-go/v7/pkg/credentials"
  18. "github.com/natefinch/lumberjack"
  19. "github.com/sirupsen/logrus"
  20. "github.com/spf13/viper"
  21. )
  22. var logger *logrus.Logger
  23. var appConfig AppConfig
  24. func main() {
  25. fmt.Println("Starting watch-daemon...")
  26. // 初始化日志框架、读取配置
  27. logger = initLog()
  28. theAppConfig, err := loadConfig()
  29. if err != nil {
  30. logger.Error(err)
  31. logger.Errorf("无法加载配置,程序退出")
  32. fmt.Println("无法加载配置,程序退出")
  33. os.Exit(1)
  34. }
  35. appConfig = theAppConfig
  36. notifyChan := make(chan string)
  37. // // Start a goroutine to monitor dir and notify
  38. // go monitorDirAndNotify(notifyChan)
  39. // Start a goroutine to do full scan and upload
  40. go func() {
  41. for {
  42. notify, ok := <-notifyChan
  43. if !ok {
  44. return
  45. }
  46. logger.Infof("received notify: %s", notify)
  47. // Drain the channel (debounce)
  48. _ = util.DrainChannelBuffer(notifyChan)
  49. doFullScanAndUpload()
  50. }
  51. }()
  52. // Initiate the first full scan and upload
  53. notifyChan <- "<start>"
  54. // Set a timer for notifying full scan and upload
  55. go func() {
  56. for {
  57. time.Sleep(time.Duration(appConfig.WatchingConfig.UploadFileInterval) * time.Second)
  58. notifyChan <- "<timer>"
  59. }
  60. }()
  61. // Block main goroutine forever.
  62. <-make(chan struct{})
  63. }
  64. func doFullScanAndUpload() {
  65. logger.Info("begin to do full scan and upload")
  66. minioClient, err := minio.New(appConfig.MinIoConfig.Addr, &minio.Options{
  67. Creds: credentials.NewStaticV4(appConfig.MinIoConfig.AccessKeyID, appConfig.MinIoConfig.SecretAccessKey, ""),
  68. Secure: appConfig.UseSSL,
  69. })
  70. if err != nil {
  71. logger.Errorf("MinIO Client init error: %s", err)
  72. return
  73. }
  74. logger.Println("MinIO Client初始化成功")
  75. // Enumerate all files in the folder
  76. entries, err := os.ReadDir(appConfig.WatchingConfig.Dir)
  77. if err != nil {
  78. logger.Error(err)
  79. return
  80. }
  81. // Upload files
  82. for _, entry := range entries {
  83. if entry.IsDir() {
  84. // Upload files in the folder
  85. localPath := filepath.Join(appConfig.WatchingConfig.Dir, entry.Name())
  86. objPath := entry.Name()
  87. fullyUploaded, err := uploadOneStreamFolder(localPath, objPath, minioClient)
  88. if err != nil {
  89. logger.Error(err)
  90. continue
  91. }
  92. if fullyUploaded {
  93. // Remove the uploaded folder
  94. logger.Infof("fully uploaded, removing folder: %s", localPath)
  95. err := os.RemoveAll(localPath)
  96. if err != nil {
  97. logger.Error(err)
  98. }
  99. } else {
  100. logger.Infof("not fully uploaded, keeping folder: %s", localPath)
  101. }
  102. }
  103. }
  104. logger.Info("full scan and upload done")
  105. }
  106. func uploadOneStreamFolder(localPath string, objPath string, minioClient *minio.Client) (fullyUploaded bool, err error) {
  107. ctx := context.Background()
  108. bucket := appConfig.MinIoConfig.Bucket
  109. fullyUploaded = true
  110. checkMetadataFile := func(localPath string) error {
  111. // Check for JSON metadata file. If its `total_points` is 0,
  112. // then the producer has not yet finished writing the stream, so we
  113. // should not delete the folder.
  114. metaFilePath := filepath.Join(localPath, "metadata.json")
  115. // Read metadata file
  116. metaFile, err := os.Open(metaFilePath)
  117. if err != nil {
  118. return fmt.Errorf("open metadata file `%s` error: %w", metaFilePath, err)
  119. }
  120. defer metaFile.Close()
  121. var metadata struct {
  122. TotalPoints int `json:"total_points"`
  123. }
  124. if err := json.NewDecoder(metaFile).Decode(&metadata); err != nil {
  125. return fmt.Errorf("decode metadata file `%s` error: %w", metaFilePath, err)
  126. }
  127. if metadata.TotalPoints == 0 {
  128. logger.Infof("metadata file `%s` indicates not fully uploaded because total_points is 0", metaFilePath)
  129. fullyUploaded = false
  130. }
  131. return nil
  132. }
  133. // NOTE: We need to check metadata file before obtaining the list of files
  134. // to prevent TOCTOU issues.
  135. if fullyUploaded {
  136. err := checkMetadataFile(localPath)
  137. if err != nil {
  138. return false, err
  139. }
  140. }
  141. // Enumerate all files in the folder
  142. entries, err := os.ReadDir(localPath)
  143. if err != nil {
  144. return false, fmt.Errorf("read dir %s error: %w", localPath, err)
  145. }
  146. // Create folder in minio
  147. objFolderPath := objPath + "/"
  148. _, err = minioClient.PutObject(ctx, bucket, objFolderPath, nil, 0, minio.PutObjectOptions{})
  149. if err != nil {
  150. return false, fmt.Errorf("create folder `%s` error: %w", objPath, err)
  151. }
  152. // Upload files
  153. // NOTE: We overwrite the existing files in the minio bucket
  154. for _, entry := range entries {
  155. // Skip partial files
  156. if strings.HasSuffix(entry.Name(), ".part") {
  157. logger.Infof("skipping partial file `%s`", entry.Name())
  158. fullyUploaded = false
  159. continue
  160. }
  161. innerLocalPath := filepath.Join(localPath, entry.Name())
  162. if entry.IsDir() {
  163. // Upload files in the folder
  164. // logger.Warnf("stream folder contains subfolder `%s`, ignoring", entry.Name())
  165. innerObjPath := objPath + "/" + entry.Name()
  166. fullyUploadedInner, err := uploadOneStreamFolder(innerLocalPath, innerObjPath, minioClient)
  167. if err != nil {
  168. return false, err
  169. }
  170. if !fullyUploadedInner {
  171. fullyUploaded = false
  172. }
  173. } else {
  174. // Upload the file
  175. _, err := minioClient.FPutObject(ctx, bucket,
  176. objFolderPath+entry.Name(), innerLocalPath,
  177. minio.PutObjectOptions{ContentType: "application/octet-stream"})
  178. if err != nil {
  179. return false, fmt.Errorf("upload file `%s` error: %w", entry.Name(), err)
  180. }
  181. logger.Infof("uploaded file `%s`", entry.Name())
  182. // Remove *.zst files early
  183. if strings.HasSuffix(entry.Name(), ".zst") {
  184. err := os.Remove(innerLocalPath)
  185. if err != nil {
  186. logger.Errorf("remove file `%s` error: %v", entry.Name(), err)
  187. }
  188. }
  189. }
  190. }
  191. return fullyUploaded, nil
  192. }
  193. func monitorDirAndNotify(notifyChan chan string) {
  194. scanFn := func() {
  195. // Create new watcher.
  196. watcher, err := fsnotify.NewWatcher()
  197. if err != nil {
  198. logger.Error(err)
  199. }
  200. defer watcher.Close()
  201. // Add a path.
  202. dir := appConfig.WatchingConfig.Dir
  203. if err := watcher.Add(dir); err != nil {
  204. logger.Errorf("unable to watch dir %s: %s", dir, err)
  205. return
  206. }
  207. var wg sync.WaitGroup
  208. wg.Add(2)
  209. // Start listening for events.
  210. go func() {
  211. defer wg.Done()
  212. for event := range watcher.Events {
  213. logger.Infof("received event: %v", event)
  214. // Notify
  215. notifyChan <- "<FSNOTIFY: " + event.Name + ">"
  216. }
  217. }()
  218. // Start listening for errors.
  219. go func() {
  220. defer wg.Done()
  221. for err := range watcher.Errors {
  222. logger.Errorf("fsnotify error: %s", err)
  223. }
  224. }()
  225. wg.Wait()
  226. }
  227. for {
  228. // scan
  229. scanFn()
  230. logger.Errorf("scan task stopped unexpectedly, will retry in %d seconds", appConfig.WatchingConfig.UploadFileInterval)
  231. // sleep
  232. time.Sleep(time.Duration(appConfig.WatchingConfig.UploadFileInterval) * time.Second)
  233. }
  234. }
  235. func initLog() *logrus.Logger {
  236. log := &lumberjack.Logger{
  237. Filename: "./log/watch-daemon.log", // 日志文件的位置
  238. MaxSize: 50, // 文件最大尺寸(以MB为单位)
  239. MaxBackups: 5, // 保留的最大旧文件数量
  240. MaxAge: 28, // 保留旧文件的最大天数
  241. Compress: true, // 是否压缩/归档旧文件
  242. LocalTime: true, // 使用本地时间创建时间戳
  243. }
  244. logger := logrus.New()
  245. // logger.Out = log
  246. logger.Out = io.MultiWriter(os.Stdout, log)
  247. return logger
  248. }
  249. func loadConfig() (AppConfig, error) {
  250. configFilePath := "./config/application.yaml"
  251. if !util.FileExists(configFilePath) {
  252. return AppConfig{}, errors.New("找不到配置文件")
  253. }
  254. viper.SetConfigFile(configFilePath)
  255. // 热更新
  256. viper.WatchConfig()
  257. viper.OnConfigChange(func(e fsnotify.Event) {
  258. fmt.Printf("配置文件 %s 发生了更改!!! 最新的Global.Source这个字段的值为 %s:", e.Name, viper.GetString("Global.Source"))
  259. })
  260. err := viper.ReadInConfig()
  261. if err != nil {
  262. panic(fmt.Errorf("error reading config: %s", err))
  263. }
  264. //把读取到的配置信息反序列化到 config 变量中
  265. var config AppConfig
  266. if err := viper.Unmarshal(&config); err != nil {
  267. fmt.Printf("viper Unmarshal failed, err:%s\n", err)
  268. }
  269. if strings.Contains(config.MinIoConfig.Addr, "$") {
  270. // Start a shell to expand the environment variables
  271. cmd := exec.Command("sh", "-c", "echo "+config.MinIoConfig.Addr)
  272. out, err := cmd.Output()
  273. if err == nil {
  274. config.MinIoConfig.Addr = strings.TrimSpace(string(out))
  275. }
  276. }
  277. return config, nil
  278. }
  279. // func old_main() {
  280. // // 初始化日志框架、读取配置
  281. // logger = initLog()
  282. // appConfig, err := loadConfig()
  283. // if err != nil {
  284. // logger.Error(err)
  285. // logger.Errorf("无法加载配置,程序退出")
  286. // os.Exit(1)
  287. // }
  288. // // upload file from tmp to minio
  289. // go uploadFileToMinIo(appConfig)
  290. // // Create new watcher.
  291. // watcher, err := fsnotify.NewWatcher()
  292. // if err != nil {
  293. // logger.Error(err)
  294. // }
  295. // defer watcher.Close()
  296. // // Start listening for events.
  297. // go func() {
  298. // for {
  299. // select {
  300. // case event, ok := <-watcher.Events:
  301. // if !ok {
  302. // return
  303. // }
  304. // if event.Has(fsnotify.Create) && strings.HasSuffix(event.Name, "sync") {
  305. // logger.Printf("copy file:%s from src:%s to:%s", event.Name,
  306. // appConfig.WatchingConfig.Dir, appConfig.WatchingConfig.TmpDir)
  307. // // create hard link
  308. // srcFilePath := strings.ReplaceAll(strings.TrimSuffix(event.Name, ".sync")+".zst",
  309. // "\\", "/")
  310. // dstFilePath := strings.ReplaceAll(srcFilePath, appConfig.WatchingConfig.Dir, appConfig.WatchingConfig.TmpDir)
  311. // err := util.CreateHardLink(srcFilePath, dstFilePath)
  312. // if err != nil {
  313. // logger.Errorf("拷贝文件失败, src:%s dst:%s", srcFilePath, dstFilePath)
  314. // }
  315. // } else if event.Has(fsnotify.Create) {
  316. // stat, err := os.Stat(event.Name)
  317. // if err != nil {
  318. // logger.Error(err)
  319. // }
  320. // if stat.IsDir() {
  321. // logger.Println("开始监听: ", event.Name)
  322. // err = watcher.Add(event.Name)
  323. // if err != nil {
  324. // logger.Errorf("监听目录失败, dir:%s e:%s", event.Name, err)
  325. // }
  326. // }
  327. // }
  328. // case err, ok := <-watcher.Errors:
  329. // if !ok {
  330. // return
  331. // }
  332. // logger.Errorf("error: %s", err)
  333. // }
  334. // }
  335. // }()
  336. // // Add a path.
  337. // err = addWatchPath(appConfig.WatchingConfig.Dir, watcher)
  338. // if err != nil {
  339. // logger.Error(err)
  340. // }
  341. // // Block main goroutine forever.
  342. // <-make(chan struct{})
  343. // }
  344. // // 递归添加监控目录
  345. // func addWatchPath(dir string, watcher *fsnotify.Watcher) error {
  346. // entries, err := os.ReadDir(dir)
  347. // if err != nil {
  348. // return fmt.Errorf("读取目录error: %w", err)
  349. // }
  350. // logger.Println("开始监听目录:", dir)
  351. // err = watcher.Add(dir)
  352. // if err != nil {
  353. // return err
  354. // }
  355. // if len(entries) > 0 {
  356. // // 递归子目录
  357. // for _, entry := range entries {
  358. // if entry.IsDir() {
  359. // err := addWatchPath(filepath.Join(dir, entry.Name()), watcher)
  360. // if err != nil {
  361. // return err
  362. // }
  363. // }
  364. // }
  365. // }
  366. // return nil
  367. // }
  368. // // 每隔一定时间上传文件
  369. // func uploadFileToMinIo(appConfig AppConfig) {
  370. // // minioClient
  371. // minioClient, err := minio.New(appConfig.MinIoConfig.Addr, &minio.Options{
  372. // Creds: credentials.NewStaticV4(appConfig.MinIoConfig.AccessKeyID, appConfig.MinIoConfig.SecretAccessKey, ""),
  373. // Secure: appConfig.UseSSL,
  374. // })
  375. // if err != nil {
  376. // logger.Errorf("MinIO Client init error: %s", err)
  377. // return
  378. // }
  379. // logger.Println("MinIO Client初始化成功")
  380. // for {
  381. // err := uploadFiles(appConfig.TmpDir, minioClient, appConfig.MinIoConfig.Bucket)
  382. // if err != nil {
  383. // logger.Errorf("upload files error, e: %s", err)
  384. // }
  385. // time.Sleep(time.Duration(appConfig.WatchingConfig.UploadFileInterval) * time.Second)
  386. // }
  387. // }
  388. // func uploadFiles(dir string, minioClient *minio.Client, bucket string) error {
  389. // entries, err := os.ReadDir(dir)
  390. // if err != nil {
  391. // return fmt.Errorf("上传文件, 读取目录. err: %w", err)
  392. // }
  393. // // uploadFiles
  394. // if len(entries) > 0 {
  395. // for _, entry := range entries {
  396. // if entry.IsDir() {
  397. // // 递归处理子文件夹
  398. // subfolder := filepath.Join(dir, entry.Name())
  399. // err := uploadFiles(subfolder, minioClient, bucket)
  400. // if err != nil {
  401. // logger.Errorf("上传文件失败 err:%s\n", err)
  402. // }
  403. // // 删除空文件夹
  404. // err = util.RemoveEmptyDir(subfolder)
  405. // if err != nil {
  406. // logger.Errorf("上传文件失败 err:%s\n", err)
  407. // }
  408. // } else {
  409. // filePath := filepath.Join(dir, entry.Name())
  410. // // 将文件上传至桶中对应文件夹
  411. // _, err = minioClient.FPutObject(context.Background(), bucket,
  412. // strings.ReplaceAll(filePath, "\\", "/"), dir,
  413. // minio.PutObjectOptions{ContentType: "application/octet-stream"})
  414. // if err != nil {
  415. // logger.Errorf("上传文件失败, filePath: %s err:%s\n", filePath, err)
  416. // }
  417. // // 删除文件
  418. // err := os.Remove(filePath)
  419. // if err != nil {
  420. // logger.Errorf("删除备份文件失败, filePath: %s err:%s\n", filePath, err)
  421. // }
  422. // }
  423. // }
  424. // }
  425. // return nil
  426. // }