package main import ( "encoding/json" "fmt" "io" "os" "os/exec" "path/filepath" "strings" "sync" "time" "watch-daemon/util" "github.com/fsnotify/fsnotify" "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" "github.com/natefinch/lumberjack" "github.com/sirupsen/logrus" "github.com/spf13/viper" ) var logger *logrus.Logger var statLogger *logrus.Logger var gAppConfig *AppConfig func main() { fmt.Println("Starting watch-daemon...") // 初始化日志框架、读取配置 logger = initLog() initLoadConfig() notifyChan := make(chan string) // // Start a goroutine to monitor dir and notify // go monitorDirAndNotify(notifyChan) // Start a goroutine to do full scan and upload go func() { for { notify, ok := <-notifyChan if !ok { return } logger.Debugf("received notify: %s", notify) // Drain the channel (debounce) _ = util.DrainChannelBuffer(notifyChan) doFullScanAndUpload() } }() // Initiate the first full scan and upload notifyChan <- "" // Set a timer for notifying full scan and upload go func() { for { time.Sleep(time.Duration(gAppConfig.Watching.UploadFileIntervalSeconds) * time.Second) notifyChan <- "" } }() // Block main goroutine forever. <-make(chan struct{}) } func doFullScanAndUpload() { logger.Debugln("begin to do full scan and upload") var minioCreds *credentials.Credentials if gAppConfig.Minio.AccessKeyID != "" && gAppConfig.Minio.SecretAccessKey != "" { minioCreds = credentials.NewStaticV4(gAppConfig.Minio.AccessKeyID, gAppConfig.Minio.SecretAccessKey, "") } else { // Make anonymous client minioCreds = credentials.NewStaticV4("", "", "") } minioClient, err := minio.New(gAppConfig.Minio.Addr, &minio.Options{ Creds: minioCreds, Secure: gAppConfig.Minio.UseSSL, }) if err != nil { logger.Errorf("MinIO Client init error: %s", err) return } logger.Debugln("MinIO Client初始化成功") // Make sure `.trash` folder exists in local trashFolder := filepath.Join(gAppConfig.Watching.Dir, ".trash") if !util.FileExists(trashFolder) { err := os.Mkdir(trashFolder, 0777) if err != nil { logger.Errorf("create trash folder error: %v", err) // return } } // Enumerate all files in the folder entries, err := os.ReadDir(gAppConfig.Watching.Dir) if err != nil { logger.Error(err) return } // Upload files for _, entry := range entries { if strings.HasPrefix(entry.Name(), ".") { // Skip hidden files continue } if entry.IsDir() { // Upload the folder stream first localPath := filepath.Join(gAppConfig.Watching.Dir, entry.Name()) objPath := entry.Name() fullyUploaded, err := uploadOneStreamFolder(localPath, objPath, minioClient) if err != nil { logger.Errorf("upload folder `%s` error: %v", localPath, err) } else { if fullyUploaded { // Remove the uploaded folder logger.Infof("fully uploaded, removing folder: %s", localPath) err := os.RemoveAll(localPath) if err != nil { logger.Errorf("remove folder `%s` error: %v", localPath, err) } } else { logger.Infof("not fully uploaded, keeping folder: %s", localPath) } } // Then remove folder if it is too old if gAppConfig.Watching.DeleteFileAfterMinutes > 0 { info, err := entry.Info() if err != nil { logger.Errorf("get info for folder `%s` error: %v", localPath, err) continue } entryModTime := info.ModTime() entryAgeMinutes := time.Since(entryModTime).Minutes() if entryAgeMinutes > float64(gAppConfig.Watching.DeleteFileAfterMinutes) { logger.Infof("removing folder `%s` because it is too old (last modified: %v, %v minutes ago)", localPath, util.FmtMyTime(entryModTime), entryAgeMinutes) err := os.RemoveAll(localPath) if err != nil { logger.Errorf("remove folder `%s` error: %v", localPath, err) } } } } } logger.Debugln("full scan and upload done") } func uploadOneStreamFolder(localPath string, objPath string, minioClient *minio.Client) (fullyUploaded bool, err error) { type StreamObjectUploadStatistics struct { StartTime time.Time EndTime time.Time Name string Succeeded bool Msg string } type StreamUploadStatistics struct { StartTime time.Time EndTime time.Time MetaPartsCount int MetaPointsCount int Objects map[string]StreamObjectUploadStatistics Msg string } streamUploadStatistics := StreamUploadStatistics{ Objects: make(map[string]StreamObjectUploadStatistics), } streamUploadStatistics.StartTime = time.Now() defer func() { streamUploadStatistics.EndTime = time.Now() okCount := 0 failCount := 0 totalCount := 0 objDetailsMsg := "" for _, obj := range streamUploadStatistics.Objects { totalCount++ if obj.Succeeded { okCount++ objDetailsMsg += fmt.Sprintf("\n{%s: OK (t: %v ~ %v, duration: %v)}", obj.Name, util.FmtMyTime(obj.StartTime), util.FmtMyTime(obj.EndTime), obj.EndTime.Sub(obj.StartTime)) } else { failCount++ objDetailsMsg += fmt.Sprintf("\n{%s: FAIL (Reason: %s) (t: %v ~ %v, duration: %v)}", obj.Name, obj.Msg, util.FmtMyTime(obj.StartTime), util.FmtMyTime(obj.EndTime), obj.EndTime.Sub(obj.StartTime)) } } statLogger.Infof("upload folder `%s` took %v (%v ~ %v); is fully uploaded: %v; metadata-reported parts & points count: %d & %d;"+ " ok / fail / total: %d / %d / %d; details:%s", localPath, streamUploadStatistics.EndTime.Sub(streamUploadStatistics.StartTime), util.FmtMyTime(streamUploadStatistics.StartTime), util.FmtMyTime(streamUploadStatistics.EndTime), fullyUploaded, streamUploadStatistics.MetaPartsCount, streamUploadStatistics.MetaPointsCount, okCount, failCount, totalCount, objDetailsMsg) }() bucket := gAppConfig.Minio.Bucket fullyUploaded = true streamDateStr := util.FmtMyDate(time.Now()) checkMetadataFile := func(localPath string) error { // Check for JSON metadata file. If its `total_points` is 0, // then the producer has not yet finished writing the stream, so we // should not delete the folder. metaFilePath := filepath.Join(localPath, "metadata.json") // Read metadata file metaFile, err := os.Open(metaFilePath) if err != nil { return fmt.Errorf("open metadata file `%s` error: %w", metaFilePath, err) } defer metaFile.Close() var metadata struct { TotalPoints int `json:"total_points"` PartsCount int `json:"parts_count"` TimestampOffset int64 `json:"timestamp_offset"` // In milliseconds } if err := json.NewDecoder(metaFile).Decode(&metadata); err != nil { return fmt.Errorf("decode metadata file `%s` error: %w", metaFilePath, err) } if metadata.TotalPoints == 0 { logger.Infof("metadata file `%s` indicates not fully uploaded because total_points is 0", metaFilePath) fullyUploaded = false } streamUploadStatistics.MetaPartsCount = metadata.PartsCount streamUploadStatistics.MetaPointsCount = metadata.TotalPoints streamDateStr = util.FmtMyDate(time.Unix(metadata.TimestampOffset/1000, 0)) return nil } // NOTE: We need to check metadata file before obtaining the list of files // to prevent TOCTOU issues. if fullyUploaded { if !util.FileExists(filepath.Join(localPath, "metadata.json")) { logger.Infof("metadata file not found in folder `%s`, skipping", localPath) return false, nil } err := checkMetadataFile(localPath) if err != nil { return false, fmt.Errorf("check metadata file error: %w", err) } } // Enumerate all files in the folder entries, err := os.ReadDir(localPath) if err != nil { return false, fmt.Errorf("read dir `%s` error: %w", localPath, err) } // Create folder in minio objFolderPath := streamDateStr + "/" + objPath + "/" err = util.MinioCreateFolderIfNotExists(minioClient, bucket, objFolderPath) if err != nil { return false, fmt.Errorf("create minio folder `%s` error: %w", objFolderPath, err) } // Upload metadata file _, err = util.MinioUploadFileIfChanged(minioClient, bucket, objFolderPath+"metadata.json", filepath.Join(localPath, "metadata.json")) if err != nil { return false, fmt.Errorf("upload metadata file error: %w", err) } // Upload files for _, entry := range entries { // Skip hidden files if strings.HasPrefix(entry.Name(), ".") { continue } // Skip metadata file if entry.Name() == "metadata.json" { continue } // Skip partial files if strings.HasSuffix(entry.Name(), ".part") { logger.Infof("skipping partial file `%s`", entry.Name()) fullyUploaded = false continue } innerLocalPath := filepath.Join(localPath, entry.Name()) if entry.IsDir() { // Upload files in the folder // logger.Warnf("stream folder contains subfolder `%s`, ignoring", entry.Name()) innerObjPath := objPath + "/" + entry.Name() fullyUploadedInner, err := uploadOneStreamFolder(innerLocalPath, innerObjPath, minioClient) if err != nil { return false, err } if !fullyUploadedInner { fullyUploaded = false } } else { // Upload the file objStat := StreamObjectUploadStatistics{ StartTime: time.Now(), Name: entry.Name(), } _, err := util.MinioUploadFile(minioClient, bucket, objFolderPath+entry.Name(), innerLocalPath, true) objStat.EndTime = time.Now() if err != nil { objStat.Succeeded = false objStat.Msg = err.Error() // return false, fmt.Errorf("upload file `%s` error: %w", entry.Name(), err) logger.Errorf("upload file `%s` of stream `%s` error: %v, took %v", entry.Name(), objPath, err, objStat.EndTime.Sub(objStat.StartTime)) } else { objStat.Succeeded = true logger.Infof("uploaded file `%s`, took %v", entry.Name(), objStat.EndTime.Sub(objStat.StartTime)) // Remove *.zst files early if strings.HasSuffix(entry.Name(), ".zst") { err := os.Remove(innerLocalPath) if err != nil { logger.Errorf("remove file `%s` error: %v", entry.Name(), err) } } } streamUploadStatistics.Objects[entry.Name()] = objStat } } return fullyUploaded, nil } func monitorDirAndNotify(notifyChan chan string) { scanFn := func() { // Create new watcher. watcher, err := fsnotify.NewWatcher() if err != nil { logger.Errorf("unable to create watcher: %s", err) } defer watcher.Close() // Add a path. dir := gAppConfig.Watching.Dir if err := watcher.Add(dir); err != nil { logger.Errorf("unable to watch dir `%s`: %s", dir, err) return } var wg sync.WaitGroup wg.Add(2) // Start listening for events. go func() { defer wg.Done() for event := range watcher.Events { logger.Debugf("received event: %v", event) // Notify notifyChan <- "" } }() // Start listening for errors. go func() { defer wg.Done() for err := range watcher.Errors { logger.Errorf("fsnotify error: %s", err) } }() wg.Wait() } for { // scan scanFn() logger.Errorf("scan task stopped unexpectedly, will retry in %d seconds", gAppConfig.Watching.UploadFileIntervalSeconds) // sleep time.Sleep(time.Duration(gAppConfig.Watching.UploadFileIntervalSeconds) * time.Second) } } func initLog() *logrus.Logger { // 主日志文件 log := &lumberjack.Logger{ Filename: "./log/watch-daemon.log", // 日志文件的位置 MaxSize: 50, // 文件最大尺寸(以MB为单位) MaxBackups: 5, // 保留的最大旧文件数量 MaxAge: 28, // 保留旧文件的最大天数 Compress: true, // 是否压缩/归档旧文件 LocalTime: true, // 使用本地时间创建时间戳 } // 错误日志文件 errorLog := &lumberjack.Logger{ Filename: "./log/watch-daemon.error.log", // 错误日志文件的位置 MaxSize: 50, // 文件最大尺寸(以MB为单位) MaxBackups: 5, // 保留的最大旧文件数量 MaxAge: 28, // 保留旧文件的最大天数 Compress: true, // 是否压缩/归档旧文件 LocalTime: true, // 使用本地时间创建时间戳 } // 统计日志文件 statLog := &lumberjack.Logger{ Filename: "./log/watch-daemon.stat.log", // 统计日志文件的位置 MaxSize: 50, // 文件最大尺寸(以MB为单位) MaxBackups: 5, // 保留的最大旧文件数量 MaxAge: 28, // 保留旧文件的最大天数 Compress: true, // 是否压缩/归档旧文件 LocalTime: true, // 使用本地时间创建时间戳 } logger := logrus.New() if strings.ToLower(os.Getenv("LOG_LEVEL")) == "trace" { logger.SetLevel(logrus.TraceLevel) } else { logger.SetLevel(logrus.DebugLevel) } logger.Out = log // logger.Out = io.MultiWriter(os.Stdout, log) // 设置错误级别日志输出到额外的文件 logger.AddHook(&ErrorHook{ Writer: errorLog, LogLevels: []logrus.Level{ logrus.ErrorLevel, logrus.FatalLevel, logrus.PanicLevel, }, }) statLogger = logrus.New() statLogger.SetLevel(logrus.InfoLevel) statLogger.Out = statLog return logger } // ErrorHook 用于将错误级别的日志输出到额外的文件 type ErrorHook struct { Writer io.Writer LogLevels []logrus.Level } func (hook *ErrorHook) Fire(entry *logrus.Entry) error { line, err := entry.String() if err != nil { return err } _, err = hook.Writer.Write([]byte(line)) return err } func (hook *ErrorHook) Levels() []logrus.Level { return hook.LogLevels } func initLoadConfig() { gAppConfig = &AppConfig{} configFilePath := "./config/application.yaml" viper.SetDefault("watching.uploadFileIntervalSeconds", 3) viper.SetDefault("watching.deleteFileAfterMinutes", 60) viper.SetConfigFile(configFilePath) // 热更新 viper.WatchConfig() viper.OnConfigChange(func(e fsnotify.Event) { fmt.Printf("配置文件 %s 发生了更改!!!\n", e.Name) // 重新加载配置 err := loadConfig() if err != nil { fmt.Printf("load config `%s` failed, err: %s\n", configFilePath, err) } }) err := loadConfig() if err != nil { logger.Error(err) logger.Errorf("无法加载配置,程序退出") fmt.Println("无法加载配置,程序退出") os.Exit(1) } } func loadConfig() error { err := viper.ReadInConfig() if err != nil { return err } //把读取到的配置信息反序列化到 config 变量中 var config AppConfig if err := viper.Unmarshal(&config); err != nil { fmt.Printf("viper Unmarshal config failed, err: %s\n", err) return err } if strings.Contains(config.Minio.Addr, "$") { // Start a shell to expand the environment variables cmd := exec.Command("sh", "-c", "echo "+config.Minio.Addr) out, err := cmd.Output() if err == nil { config.Minio.Addr = strings.TrimSpace(string(out)) } } gAppConfig = &config return nil }