123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534 |
- package main
- import (
- "encoding/json"
- "fmt"
- "io"
- "os"
- "os/exec"
- "path/filepath"
- "strings"
- "sync"
- "time"
- "watch-daemon/util"
- "github.com/fsnotify/fsnotify"
- "github.com/minio/minio-go/v7"
- "github.com/minio/minio-go/v7/pkg/credentials"
- "github.com/natefinch/lumberjack"
- "github.com/sirupsen/logrus"
- "github.com/spf13/viper"
- )
- var logger *logrus.Logger
- var statLogger *logrus.Logger
- var gAppConfig *AppConfig
- func main() {
- fmt.Println("Starting watch-daemon...")
- // 初始化日志框架、读取配置
- logger = initLog()
- initLoadConfig()
- notifyChan := make(chan string)
- // // Start a goroutine to monitor dir and notify
- // go monitorDirAndNotify(notifyChan)
- // Start a goroutine to do full scan and upload
- go func() {
- for {
- notify, ok := <-notifyChan
- if !ok {
- return
- }
- logger.Debugf("received notify: %s", notify)
- // Drain the channel (debounce)
- _ = util.DrainChannelBuffer(notifyChan)
- doFullScanAndUpload()
- }
- }()
- // Initiate the first full scan and upload
- notifyChan <- "<start>"
- // Set a timer for notifying full scan and upload
- go func() {
- for {
- time.Sleep(time.Duration(gAppConfig.Watching.UploadFileIntervalSeconds) * time.Second)
- notifyChan <- "<timer>"
- }
- }()
- // Block main goroutine forever.
- <-make(chan struct{})
- }
- func doFullScanAndUpload() {
- logger.Debugln("begin to do full scan and upload")
- var minioCreds *credentials.Credentials
- if gAppConfig.Minio.AccessKeyID != "" && gAppConfig.Minio.SecretAccessKey != "" {
- minioCreds = credentials.NewStaticV4(gAppConfig.Minio.AccessKeyID, gAppConfig.Minio.SecretAccessKey, "")
- } else {
- // Make anonymous client
- minioCreds = credentials.NewStaticV4("", "", "")
- }
- minioClient, err := minio.New(gAppConfig.Minio.Addr, &minio.Options{
- Creds: minioCreds,
- Secure: gAppConfig.Minio.UseSSL,
- })
- if err != nil {
- logger.Errorf("MinIO Client init error: %s", err)
- return
- }
- logger.Debugln("MinIO Client初始化成功")
- // Make sure `.trash` folder exists in local
- trashFolder := filepath.Join(gAppConfig.Watching.Dir, ".trash")
- if !util.FileExists(trashFolder) {
- err := os.Mkdir(trashFolder, 0777)
- if err != nil {
- logger.Errorf("create trash folder error: %v", err)
- // return
- }
- }
- // Enumerate all files in the folder
- entries, err := os.ReadDir(gAppConfig.Watching.Dir)
- if err != nil {
- logger.Error(err)
- return
- }
- // Upload files
- for _, entry := range entries {
- if strings.HasPrefix(entry.Name(), ".") {
- // Skip hidden files
- continue
- }
- if entry.IsDir() {
- // Upload the folder stream first
- localPath := filepath.Join(gAppConfig.Watching.Dir, entry.Name())
- objPath := entry.Name()
- fullyUploaded, err := uploadOneStreamFolder(localPath, objPath, minioClient)
- if err != nil {
- logger.Errorf("upload folder `%s` error: %v", localPath, err)
- } else {
- if fullyUploaded {
- // Remove the uploaded folder
- logger.Infof("fully uploaded, removing folder: %s", localPath)
- err := os.RemoveAll(localPath)
- if err != nil {
- logger.Errorf("remove folder `%s` error: %v", localPath, err)
- }
- } else {
- logger.Infof("not fully uploaded, keeping folder: %s", localPath)
- }
- }
- // Then remove folder if it is too old
- if gAppConfig.Watching.DeleteFileAfterMinutes > 0 {
- info, err := entry.Info()
- if err != nil {
- logger.Errorf("get info for folder `%s` error: %v", localPath, err)
- continue
- }
- entryModTime := info.ModTime()
- entryAgeMinutes := time.Since(entryModTime).Minutes()
- if entryAgeMinutes > float64(gAppConfig.Watching.DeleteFileAfterMinutes) {
- logger.Infof("removing folder `%s` because it is too old (last modified: %v, %v minutes ago)",
- localPath, util.FmtMyTime(entryModTime), entryAgeMinutes)
- err := os.RemoveAll(localPath)
- if err != nil {
- logger.Errorf("remove folder `%s` error: %v", localPath, err)
- }
- }
- }
- }
- }
- logger.Debugln("full scan and upload done")
- }
- func uploadOneStreamFolder(localPath string, objPath string, minioClient *minio.Client) (fullyUploaded bool, err error) {
- type StreamObjectUploadStatistics struct {
- StartTime time.Time
- EndTime time.Time
- Name string
- Succeeded bool
- Msg string
- }
- type StreamUploadStatistics struct {
- StartTime time.Time
- EndTime time.Time
- MetaPartsCount int
- MetaPointsCount int
- Objects map[string]StreamObjectUploadStatistics
- Msg string
- }
- streamUploadStatistics := StreamUploadStatistics{
- Objects: make(map[string]StreamObjectUploadStatistics),
- }
- streamUploadStatistics.StartTime = time.Now()
- defer func() {
- streamUploadStatistics.EndTime = time.Now()
- okCount := 0
- failCount := 0
- totalCount := 0
- objDetailsMsg := ""
- for _, obj := range streamUploadStatistics.Objects {
- totalCount++
- if obj.Succeeded {
- okCount++
- objDetailsMsg += fmt.Sprintf("\n{%s: OK (t: %v ~ %v, duration: %v)}",
- obj.Name, util.FmtMyTime(obj.StartTime), util.FmtMyTime(obj.EndTime), obj.EndTime.Sub(obj.StartTime))
- } else {
- failCount++
- objDetailsMsg += fmt.Sprintf("\n{%s: FAIL (Reason: %s) (t: %v ~ %v, duration: %v)}",
- obj.Name, obj.Msg, util.FmtMyTime(obj.StartTime), util.FmtMyTime(obj.EndTime), obj.EndTime.Sub(obj.StartTime))
- }
- }
- statLogger.Infof("upload folder `%s` took %v (%v ~ %v); is fully uploaded: %v; metadata-reported parts & points count: %d & %d;"+
- " ok / fail / total: %d / %d / %d; details:%s",
- localPath, streamUploadStatistics.EndTime.Sub(streamUploadStatistics.StartTime),
- util.FmtMyTime(streamUploadStatistics.StartTime), util.FmtMyTime(streamUploadStatistics.EndTime),
- fullyUploaded, streamUploadStatistics.MetaPartsCount, streamUploadStatistics.MetaPointsCount,
- okCount, failCount, totalCount, objDetailsMsg)
- }()
- bucket := gAppConfig.Minio.Bucket
- fullyUploaded = true
- streamDateStr := util.FmtMyDate(time.Now())
- checkMetadataFile := func(localPath string) error {
- // Check for JSON metadata file. If its `total_points` is 0,
- // then the producer has not yet finished writing the stream, so we
- // should not delete the folder.
- metaFilePath := filepath.Join(localPath, "metadata.json")
- // Read metadata file
- metaFile, err := os.Open(metaFilePath)
- if err != nil {
- return fmt.Errorf("open metadata file `%s` error: %w", metaFilePath, err)
- }
- defer metaFile.Close()
- var metadata struct {
- TotalPoints int `json:"total_points"`
- PartsCount int `json:"parts_count"`
- TimestampOffset int64 `json:"timestamp_offset"` // In milliseconds
- }
- if err := json.NewDecoder(metaFile).Decode(&metadata); err != nil {
- return fmt.Errorf("decode metadata file `%s` error: %w", metaFilePath, err)
- }
- if metadata.TotalPoints == 0 {
- logger.Infof("metadata file `%s` indicates not fully uploaded because total_points is 0", metaFilePath)
- fullyUploaded = false
- }
- streamUploadStatistics.MetaPartsCount = metadata.PartsCount
- streamUploadStatistics.MetaPointsCount = metadata.TotalPoints
- streamDateStr = util.FmtMyDate(time.Unix(metadata.TimestampOffset/1000, 0))
- return nil
- }
- // NOTE: We need to check metadata file before obtaining the list of files
- // to prevent TOCTOU issues.
- if fullyUploaded {
- if !util.FileExists(filepath.Join(localPath, "metadata.json")) {
- logger.Infof("metadata file not found in folder `%s`, skipping", localPath)
- return false, nil
- }
- err := checkMetadataFile(localPath)
- if err != nil {
- return false, fmt.Errorf("check metadata file error: %w", err)
- }
- }
- // Enumerate all files in the folder
- entries, err := os.ReadDir(localPath)
- if err != nil {
- return false, fmt.Errorf("read dir `%s` error: %w", localPath, err)
- }
- // Create folder in minio
- objFolderPath := streamDateStr + "/" + objPath + "/"
- err = util.MinioCreateFolderIfNotExists(minioClient, bucket, objFolderPath)
- if err != nil {
- return false, fmt.Errorf("create minio folder `%s` error: %w", objFolderPath, err)
- }
- // Upload metadata file
- _, err = util.MinioUploadFileIfChanged(minioClient, bucket,
- objFolderPath+"metadata.json", filepath.Join(localPath, "metadata.json"))
- if err != nil {
- return false, fmt.Errorf("upload metadata file error: %w", err)
- }
- // Upload files
- for _, entry := range entries {
- // Skip hidden files
- if strings.HasPrefix(entry.Name(), ".") {
- continue
- }
- // Skip metadata file
- if entry.Name() == "metadata.json" {
- continue
- }
- // Skip partial files
- if strings.HasSuffix(entry.Name(), ".part") {
- logger.Infof("skipping partial file `%s`", entry.Name())
- fullyUploaded = false
- continue
- }
- innerLocalPath := filepath.Join(localPath, entry.Name())
- if entry.IsDir() {
- // Upload files in the folder
- // logger.Warnf("stream folder contains subfolder `%s`, ignoring", entry.Name())
- innerObjPath := objPath + "/" + entry.Name()
- fullyUploadedInner, err := uploadOneStreamFolder(innerLocalPath, innerObjPath, minioClient)
- if err != nil {
- return false, err
- }
- if !fullyUploadedInner {
- fullyUploaded = false
- }
- } else {
- // Upload the file
- objStat := StreamObjectUploadStatistics{
- StartTime: time.Now(),
- Name: entry.Name(),
- }
- _, err := util.MinioUploadFile(minioClient, bucket,
- objFolderPath+entry.Name(), innerLocalPath, true)
- objStat.EndTime = time.Now()
- if err != nil {
- objStat.Succeeded = false
- objStat.Msg = err.Error()
- // return false, fmt.Errorf("upload file `%s` error: %w", entry.Name(), err)
- logger.Errorf("upload file `%s` of stream `%s` error: %v, took %v",
- entry.Name(), objPath, err, objStat.EndTime.Sub(objStat.StartTime))
- } else {
- objStat.Succeeded = true
- logger.Infof("uploaded file `%s`, took %v", entry.Name(), objStat.EndTime.Sub(objStat.StartTime))
- // Remove *.zst files early
- if strings.HasSuffix(entry.Name(), ".zst") {
- err := os.Remove(innerLocalPath)
- if err != nil {
- logger.Errorf("remove file `%s` error: %v", entry.Name(), err)
- }
- }
- }
- streamUploadStatistics.Objects[entry.Name()] = objStat
- }
- }
- return fullyUploaded, nil
- }
- func monitorDirAndNotify(notifyChan chan string) {
- scanFn := func() {
- // Create new watcher.
- watcher, err := fsnotify.NewWatcher()
- if err != nil {
- logger.Errorf("unable to create watcher: %s", err)
- }
- defer watcher.Close()
- // Add a path.
- dir := gAppConfig.Watching.Dir
- if err := watcher.Add(dir); err != nil {
- logger.Errorf("unable to watch dir `%s`: %s", dir, err)
- return
- }
- var wg sync.WaitGroup
- wg.Add(2)
- // Start listening for events.
- go func() {
- defer wg.Done()
- for event := range watcher.Events {
- logger.Debugf("received event: %v", event)
- // Notify
- notifyChan <- "<FSNOTIFY: " + event.Name + ">"
- }
- }()
- // Start listening for errors.
- go func() {
- defer wg.Done()
- for err := range watcher.Errors {
- logger.Errorf("fsnotify error: %s", err)
- }
- }()
- wg.Wait()
- }
- for {
- // scan
- scanFn()
- logger.Errorf("scan task stopped unexpectedly, will retry in %d seconds", gAppConfig.Watching.UploadFileIntervalSeconds)
- // sleep
- time.Sleep(time.Duration(gAppConfig.Watching.UploadFileIntervalSeconds) * time.Second)
- }
- }
- func initLog() *logrus.Logger {
- // 主日志文件
- log := &lumberjack.Logger{
- Filename: "./log/watch-daemon.log", // 日志文件的位置
- MaxSize: 50, // 文件最大尺寸(以MB为单位)
- MaxBackups: 5, // 保留的最大旧文件数量
- MaxAge: 28, // 保留旧文件的最大天数
- Compress: true, // 是否压缩/归档旧文件
- LocalTime: true, // 使用本地时间创建时间戳
- }
- // 错误日志文件
- errorLog := &lumberjack.Logger{
- Filename: "./log/watch-daemon.error.log", // 错误日志文件的位置
- MaxSize: 50, // 文件最大尺寸(以MB为单位)
- MaxBackups: 5, // 保留的最大旧文件数量
- MaxAge: 28, // 保留旧文件的最大天数
- Compress: true, // 是否压缩/归档旧文件
- LocalTime: true, // 使用本地时间创建时间戳
- }
- // 统计日志文件
- statLog := &lumberjack.Logger{
- Filename: "./log/watch-daemon.stat.log", // 统计日志文件的位置
- MaxSize: 50, // 文件最大尺寸(以MB为单位)
- MaxBackups: 5, // 保留的最大旧文件数量
- MaxAge: 28, // 保留旧文件的最大天数
- Compress: true, // 是否压缩/归档旧文件
- LocalTime: true, // 使用本地时间创建时间戳
- }
- logger := logrus.New()
- if strings.ToLower(os.Getenv("LOG_LEVEL")) == "trace" {
- logger.SetLevel(logrus.TraceLevel)
- } else {
- logger.SetLevel(logrus.DebugLevel)
- }
- logger.Out = log
- // logger.Out = io.MultiWriter(os.Stdout, log)
- // 设置错误级别日志输出到额外的文件
- logger.AddHook(&ErrorHook{
- Writer: errorLog,
- LogLevels: []logrus.Level{
- logrus.ErrorLevel,
- logrus.FatalLevel,
- logrus.PanicLevel,
- },
- })
- statLogger = logrus.New()
- statLogger.SetLevel(logrus.InfoLevel)
- statLogger.Out = statLog
- return logger
- }
- // ErrorHook 用于将错误级别的日志输出到额外的文件
- type ErrorHook struct {
- Writer io.Writer
- LogLevels []logrus.Level
- }
- func (hook *ErrorHook) Fire(entry *logrus.Entry) error {
- line, err := entry.String()
- if err != nil {
- return err
- }
- _, err = hook.Writer.Write([]byte(line))
- return err
- }
- func (hook *ErrorHook) Levels() []logrus.Level {
- return hook.LogLevels
- }
- func initLoadConfig() {
- gAppConfig = &AppConfig{}
- configFilePath := "./config/application.yaml"
- viper.SetDefault("watching.uploadFileIntervalSeconds", 3)
- viper.SetDefault("watching.deleteFileAfterMinutes", 60)
- viper.SetConfigFile(configFilePath)
- // 热更新
- viper.WatchConfig()
- viper.OnConfigChange(func(e fsnotify.Event) {
- fmt.Printf("配置文件 %s 发生了更改!!!\n", e.Name)
- // 重新加载配置
- err := loadConfig()
- if err != nil {
- fmt.Printf("load config `%s` failed, err: %s\n", configFilePath, err)
- }
- })
- err := loadConfig()
- if err != nil {
- logger.Error(err)
- logger.Errorf("无法加载配置,程序退出")
- fmt.Println("无法加载配置,程序退出")
- os.Exit(1)
- }
- }
- func loadConfig() error {
- err := viper.ReadInConfig()
- if err != nil {
- return err
- }
- //把读取到的配置信息反序列化到 config 变量中
- var config AppConfig
- if err := viper.Unmarshal(&config); err != nil {
- fmt.Printf("viper Unmarshal config failed, err: %s\n", err)
- return err
- }
- if strings.Contains(config.Minio.Addr, "$") {
- // Start a shell to expand the environment variables
- cmd := exec.Command("sh", "-c", "echo "+config.Minio.Addr)
- out, err := cmd.Output()
- if err == nil {
- config.Minio.Addr = strings.TrimSpace(string(out))
- }
- }
- gAppConfig = &config
- return nil
- }
|