package main import ( "context" "encoding/json" "errors" "fmt" "io" "os" "os/exec" "path/filepath" "strings" "sync" "time" "watch-daemon/util" "github.com/fsnotify/fsnotify" "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" "github.com/natefinch/lumberjack" "github.com/sirupsen/logrus" "github.com/spf13/viper" ) var logger *logrus.Logger var appConfig AppConfig func main() { fmt.Println("Starting watch-daemon...") // 初始化日志框架、读取配置 logger = initLog() theAppConfig, err := loadConfig() if err != nil { logger.Error(err) logger.Errorf("无法加载配置,程序退出") fmt.Println("无法加载配置,程序退出") os.Exit(1) } appConfig = theAppConfig notifyChan := make(chan string) // // Start a goroutine to monitor dir and notify // go monitorDirAndNotify(notifyChan) // Start a goroutine to do full scan and upload go func() { for { notify, ok := <-notifyChan if !ok { return } logger.Infof("received notify: %s", notify) // Drain the channel (debounce) _ = util.DrainChannelBuffer(notifyChan) doFullScanAndUpload() } }() // Initiate the first full scan and upload notifyChan <- "" // Set a timer for notifying full scan and upload go func() { for { time.Sleep(time.Duration(appConfig.WatchingConfig.UploadFileInterval) * time.Second) notifyChan <- "" } }() // Block main goroutine forever. <-make(chan struct{}) } func doFullScanAndUpload() { logger.Info("begin to do full scan and upload") minioClient, err := minio.New(appConfig.MinIoConfig.Addr, &minio.Options{ Creds: credentials.NewStaticV4(appConfig.MinIoConfig.AccessKeyID, appConfig.MinIoConfig.SecretAccessKey, ""), Secure: appConfig.UseSSL, }) if err != nil { logger.Errorf("MinIO Client init error: %s", err) return } logger.Println("MinIO Client初始化成功") // Enumerate all files in the folder entries, err := os.ReadDir(appConfig.WatchingConfig.Dir) if err != nil { logger.Error(err) return } // Upload files for _, entry := range entries { if entry.IsDir() { // Upload files in the folder localPath := filepath.Join(appConfig.WatchingConfig.Dir, entry.Name()) objPath := entry.Name() fullyUploaded, err := uploadOneStreamFolder(localPath, objPath, minioClient) if err != nil { logger.Error(err) continue } if fullyUploaded { // Remove the uploaded folder logger.Infof("fully uploaded, removing folder: %s", localPath) err := os.RemoveAll(localPath) if err != nil { logger.Error(err) } } else { logger.Infof("not fully uploaded, keeping folder: %s", localPath) } } } logger.Info("full scan and upload done") } func uploadOneStreamFolder(localPath string, objPath string, minioClient *minio.Client) (fullyUploaded bool, err error) { ctx := context.Background() bucket := appConfig.MinIoConfig.Bucket fullyUploaded = true checkMetadataFile := func(localPath string) error { // Check for JSON metadata file. If its `total_points` is 0, // then the producer has not yet finished writing the stream, so we // should not delete the folder. metaFilePath := filepath.Join(localPath, "metadata.json") // Read metadata file metaFile, err := os.Open(metaFilePath) if err != nil { return fmt.Errorf("open metadata file `%s` error: %w", metaFilePath, err) } defer metaFile.Close() var metadata struct { TotalPoints int `json:"total_points"` } if err := json.NewDecoder(metaFile).Decode(&metadata); err != nil { return fmt.Errorf("decode metadata file `%s` error: %w", metaFilePath, err) } if metadata.TotalPoints == 0 { logger.Infof("metadata file `%s` indicates not fully uploaded because total_points is 0", metaFilePath) fullyUploaded = false } return nil } // NOTE: We need to check metadata file before obtaining the list of files // to prevent TOCTOU issues. if fullyUploaded { err := checkMetadataFile(localPath) if err != nil { return false, err } } // Enumerate all files in the folder entries, err := os.ReadDir(localPath) if err != nil { return false, fmt.Errorf("read dir %s error: %w", localPath, err) } // Create folder in minio objFolderPath := objPath + "/" _, err = minioClient.PutObject(ctx, bucket, objFolderPath, nil, 0, minio.PutObjectOptions{}) if err != nil { return false, fmt.Errorf("create folder `%s` error: %w", objPath, err) } // Upload files // NOTE: We overwrite the existing files in the minio bucket for _, entry := range entries { // Skip partial files if strings.HasSuffix(entry.Name(), ".part") { logger.Infof("skipping partial file `%s`", entry.Name()) fullyUploaded = false continue } innerLocalPath := filepath.Join(localPath, entry.Name()) if entry.IsDir() { // Upload files in the folder // logger.Warnf("stream folder contains subfolder `%s`, ignoring", entry.Name()) innerObjPath := objPath + "/" + entry.Name() fullyUploadedInner, err := uploadOneStreamFolder(innerLocalPath, innerObjPath, minioClient) if err != nil { return false, err } if !fullyUploadedInner { fullyUploaded = false } } else { // Upload the file _, err := minioClient.FPutObject(ctx, bucket, objFolderPath+entry.Name(), innerLocalPath, minio.PutObjectOptions{ContentType: "application/octet-stream"}) if err != nil { return false, fmt.Errorf("upload file `%s` error: %w", entry.Name(), err) } logger.Infof("uploaded file `%s`", entry.Name()) // Remove *.zst files early if strings.HasSuffix(entry.Name(), ".zst") { err := os.Remove(innerLocalPath) if err != nil { logger.Errorf("remove file `%s` error: %v", entry.Name(), err) } } } } return fullyUploaded, nil } func monitorDirAndNotify(notifyChan chan string) { scanFn := func() { // Create new watcher. watcher, err := fsnotify.NewWatcher() if err != nil { logger.Error(err) } defer watcher.Close() // Add a path. dir := appConfig.WatchingConfig.Dir if err := watcher.Add(dir); err != nil { logger.Errorf("unable to watch dir %s: %s", dir, err) return } var wg sync.WaitGroup wg.Add(2) // Start listening for events. go func() { defer wg.Done() for event := range watcher.Events { logger.Infof("received event: %v", event) // Notify notifyChan <- "" } }() // Start listening for errors. go func() { defer wg.Done() for err := range watcher.Errors { logger.Errorf("fsnotify error: %s", err) } }() wg.Wait() } for { // scan scanFn() logger.Errorf("scan task stopped unexpectedly, will retry in %d seconds", appConfig.WatchingConfig.UploadFileInterval) // sleep time.Sleep(time.Duration(appConfig.WatchingConfig.UploadFileInterval) * time.Second) } } func initLog() *logrus.Logger { log := &lumberjack.Logger{ Filename: "./log/watch-daemon.log", // 日志文件的位置 MaxSize: 50, // 文件最大尺寸(以MB为单位) MaxBackups: 5, // 保留的最大旧文件数量 MaxAge: 28, // 保留旧文件的最大天数 Compress: true, // 是否压缩/归档旧文件 LocalTime: true, // 使用本地时间创建时间戳 } logger := logrus.New() // logger.Out = log logger.Out = io.MultiWriter(os.Stdout, log) return logger } func loadConfig() (AppConfig, error) { configFilePath := "./config/application.yaml" if !util.FileExists(configFilePath) { return AppConfig{}, errors.New("找不到配置文件") } viper.SetConfigFile(configFilePath) // 热更新 viper.WatchConfig() viper.OnConfigChange(func(e fsnotify.Event) { fmt.Printf("配置文件 %s 发生了更改!!! 最新的Global.Source这个字段的值为 %s:", e.Name, viper.GetString("Global.Source")) }) err := viper.ReadInConfig() if err != nil { panic(fmt.Errorf("error reading config: %s", err)) } //把读取到的配置信息反序列化到 config 变量中 var config AppConfig if err := viper.Unmarshal(&config); err != nil { fmt.Printf("viper Unmarshal failed, err:%s\n", err) } if strings.Contains(config.MinIoConfig.Addr, "$") { // Start a shell to expand the environment variables cmd := exec.Command("sh", "-c", "echo "+config.MinIoConfig.Addr) out, err := cmd.Output() if err == nil { config.MinIoConfig.Addr = strings.TrimSpace(string(out)) } } return config, nil } // func old_main() { // // 初始化日志框架、读取配置 // logger = initLog() // appConfig, err := loadConfig() // if err != nil { // logger.Error(err) // logger.Errorf("无法加载配置,程序退出") // os.Exit(1) // } // // upload file from tmp to minio // go uploadFileToMinIo(appConfig) // // Create new watcher. // watcher, err := fsnotify.NewWatcher() // if err != nil { // logger.Error(err) // } // defer watcher.Close() // // Start listening for events. // go func() { // for { // select { // case event, ok := <-watcher.Events: // if !ok { // return // } // if event.Has(fsnotify.Create) && strings.HasSuffix(event.Name, "sync") { // logger.Printf("copy file:%s from src:%s to:%s", event.Name, // appConfig.WatchingConfig.Dir, appConfig.WatchingConfig.TmpDir) // // create hard link // srcFilePath := strings.ReplaceAll(strings.TrimSuffix(event.Name, ".sync")+".zst", // "\\", "/") // dstFilePath := strings.ReplaceAll(srcFilePath, appConfig.WatchingConfig.Dir, appConfig.WatchingConfig.TmpDir) // err := util.CreateHardLink(srcFilePath, dstFilePath) // if err != nil { // logger.Errorf("拷贝文件失败, src:%s dst:%s", srcFilePath, dstFilePath) // } // } else if event.Has(fsnotify.Create) { // stat, err := os.Stat(event.Name) // if err != nil { // logger.Error(err) // } // if stat.IsDir() { // logger.Println("开始监听: ", event.Name) // err = watcher.Add(event.Name) // if err != nil { // logger.Errorf("监听目录失败, dir:%s e:%s", event.Name, err) // } // } // } // case err, ok := <-watcher.Errors: // if !ok { // return // } // logger.Errorf("error: %s", err) // } // } // }() // // Add a path. // err = addWatchPath(appConfig.WatchingConfig.Dir, watcher) // if err != nil { // logger.Error(err) // } // // Block main goroutine forever. // <-make(chan struct{}) // } // // 递归添加监控目录 // func addWatchPath(dir string, watcher *fsnotify.Watcher) error { // entries, err := os.ReadDir(dir) // if err != nil { // return fmt.Errorf("读取目录error: %w", err) // } // logger.Println("开始监听目录:", dir) // err = watcher.Add(dir) // if err != nil { // return err // } // if len(entries) > 0 { // // 递归子目录 // for _, entry := range entries { // if entry.IsDir() { // err := addWatchPath(filepath.Join(dir, entry.Name()), watcher) // if err != nil { // return err // } // } // } // } // return nil // } // // 每隔一定时间上传文件 // func uploadFileToMinIo(appConfig AppConfig) { // // minioClient // minioClient, err := minio.New(appConfig.MinIoConfig.Addr, &minio.Options{ // Creds: credentials.NewStaticV4(appConfig.MinIoConfig.AccessKeyID, appConfig.MinIoConfig.SecretAccessKey, ""), // Secure: appConfig.UseSSL, // }) // if err != nil { // logger.Errorf("MinIO Client init error: %s", err) // return // } // logger.Println("MinIO Client初始化成功") // for { // err := uploadFiles(appConfig.TmpDir, minioClient, appConfig.MinIoConfig.Bucket) // if err != nil { // logger.Errorf("upload files error, e: %s", err) // } // time.Sleep(time.Duration(appConfig.WatchingConfig.UploadFileInterval) * time.Second) // } // } // func uploadFiles(dir string, minioClient *minio.Client, bucket string) error { // entries, err := os.ReadDir(dir) // if err != nil { // return fmt.Errorf("上传文件, 读取目录. err: %w", err) // } // // uploadFiles // if len(entries) > 0 { // for _, entry := range entries { // if entry.IsDir() { // // 递归处理子文件夹 // subfolder := filepath.Join(dir, entry.Name()) // err := uploadFiles(subfolder, minioClient, bucket) // if err != nil { // logger.Errorf("上传文件失败 err:%s\n", err) // } // // 删除空文件夹 // err = util.RemoveEmptyDir(subfolder) // if err != nil { // logger.Errorf("上传文件失败 err:%s\n", err) // } // } else { // filePath := filepath.Join(dir, entry.Name()) // // 将文件上传至桶中对应文件夹 // _, err = minioClient.FPutObject(context.Background(), bucket, // strings.ReplaceAll(filePath, "\\", "/"), dir, // minio.PutObjectOptions{ContentType: "application/octet-stream"}) // if err != nil { // logger.Errorf("上传文件失败, filePath: %s err:%s\n", filePath, err) // } // // 删除文件 // err := os.Remove(filePath) // if err != nil { // logger.Errorf("删除备份文件失败, filePath: %s err:%s\n", filePath, err) // } // } // } // } // return nil // }