123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497 |
- package main
- import (
- "context"
- "encoding/json"
- "errors"
- "fmt"
- "io"
- "os"
- "os/exec"
- "path/filepath"
- "strings"
- "sync"
- "time"
- "watch-daemon/util"
- "github.com/fsnotify/fsnotify"
- "github.com/minio/minio-go/v7"
- "github.com/minio/minio-go/v7/pkg/credentials"
- "github.com/natefinch/lumberjack"
- "github.com/sirupsen/logrus"
- "github.com/spf13/viper"
- )
- var logger *logrus.Logger
- var appConfig AppConfig
- func main() {
- fmt.Println("Starting watch-daemon...")
- // 初始化日志框架、读取配置
- logger = initLog()
- theAppConfig, err := loadConfig()
- if err != nil {
- logger.Error(err)
- logger.Errorf("无法加载配置,程序退出")
- fmt.Println("无法加载配置,程序退出")
- os.Exit(1)
- }
- appConfig = theAppConfig
- notifyChan := make(chan string)
- // // Start a goroutine to monitor dir and notify
- // go monitorDirAndNotify(notifyChan)
- // Start a goroutine to do full scan and upload
- go func() {
- for {
- notify, ok := <-notifyChan
- if !ok {
- return
- }
- logger.Infof("received notify: %s", notify)
- // Drain the channel (debounce)
- _ = util.DrainChannelBuffer(notifyChan)
- doFullScanAndUpload()
- }
- }()
- // Initiate the first full scan and upload
- notifyChan <- "<start>"
- // Set a timer for notifying full scan and upload
- go func() {
- for {
- time.Sleep(time.Duration(appConfig.WatchingConfig.UploadFileInterval) * time.Second)
- notifyChan <- "<timer>"
- }
- }()
- // Block main goroutine forever.
- <-make(chan struct{})
- }
- func doFullScanAndUpload() {
- logger.Info("begin to do full scan and upload")
- minioClient, err := minio.New(appConfig.MinIoConfig.Addr, &minio.Options{
- Creds: credentials.NewStaticV4(appConfig.MinIoConfig.AccessKeyID, appConfig.MinIoConfig.SecretAccessKey, ""),
- Secure: appConfig.UseSSL,
- })
- if err != nil {
- logger.Errorf("MinIO Client init error: %s", err)
- return
- }
- logger.Println("MinIO Client初始化成功")
- // Enumerate all files in the folder
- entries, err := os.ReadDir(appConfig.WatchingConfig.Dir)
- if err != nil {
- logger.Error(err)
- return
- }
- // Upload files
- for _, entry := range entries {
- if entry.IsDir() {
- // Upload files in the folder
- localPath := filepath.Join(appConfig.WatchingConfig.Dir, entry.Name())
- objPath := entry.Name()
- fullyUploaded, err := uploadOneStreamFolder(localPath, objPath, minioClient)
- if err != nil {
- logger.Error(err)
- continue
- }
- if fullyUploaded {
- // Remove the uploaded folder
- logger.Infof("fully uploaded, removing folder: %s", localPath)
- err := os.RemoveAll(localPath)
- if err != nil {
- logger.Error(err)
- }
- } else {
- logger.Infof("not fully uploaded, keeping folder: %s", localPath)
- }
- }
- }
- logger.Info("full scan and upload done")
- }
- func uploadOneStreamFolder(localPath string, objPath string, minioClient *minio.Client) (fullyUploaded bool, err error) {
- ctx := context.Background()
- bucket := appConfig.MinIoConfig.Bucket
- fullyUploaded = true
- checkMetadataFile := func(localPath string) error {
- // Check for JSON metadata file. If its `total_points` is 0,
- // then the producer has not yet finished writing the stream, so we
- // should not delete the folder.
- metaFilePath := filepath.Join(localPath, "metadata.json")
- // Read metadata file
- metaFile, err := os.Open(metaFilePath)
- if err != nil {
- return fmt.Errorf("open metadata file `%s` error: %w", metaFilePath, err)
- }
- defer metaFile.Close()
- var metadata struct {
- TotalPoints int `json:"total_points"`
- }
- if err := json.NewDecoder(metaFile).Decode(&metadata); err != nil {
- return fmt.Errorf("decode metadata file `%s` error: %w", metaFilePath, err)
- }
- if metadata.TotalPoints == 0 {
- logger.Infof("metadata file `%s` indicates not fully uploaded because total_points is 0", metaFilePath)
- fullyUploaded = false
- }
- return nil
- }
- // NOTE: We need to check metadata file before obtaining the list of files
- // to prevent TOCTOU issues.
- if fullyUploaded {
- err := checkMetadataFile(localPath)
- if err != nil {
- return false, err
- }
- }
- // Enumerate all files in the folder
- entries, err := os.ReadDir(localPath)
- if err != nil {
- return false, fmt.Errorf("read dir %s error: %w", localPath, err)
- }
- // Create folder in minio
- objFolderPath := objPath + "/"
- _, err = minioClient.PutObject(ctx, bucket, objFolderPath, nil, 0, minio.PutObjectOptions{})
- if err != nil {
- return false, fmt.Errorf("create folder `%s` error: %w", objPath, err)
- }
- // Upload files
- // NOTE: We overwrite the existing files in the minio bucket
- for _, entry := range entries {
- // Skip partial files
- if strings.HasSuffix(entry.Name(), ".part") {
- logger.Infof("skipping partial file `%s`", entry.Name())
- fullyUploaded = false
- continue
- }
- innerLocalPath := filepath.Join(localPath, entry.Name())
- if entry.IsDir() {
- // Upload files in the folder
- // logger.Warnf("stream folder contains subfolder `%s`, ignoring", entry.Name())
- innerObjPath := objPath + "/" + entry.Name()
- fullyUploadedInner, err := uploadOneStreamFolder(innerLocalPath, innerObjPath, minioClient)
- if err != nil {
- return false, err
- }
- if !fullyUploadedInner {
- fullyUploaded = false
- }
- } else {
- // Upload the file
- _, err := minioClient.FPutObject(ctx, bucket,
- objFolderPath+entry.Name(), innerLocalPath,
- minio.PutObjectOptions{ContentType: "application/octet-stream"})
- if err != nil {
- return false, fmt.Errorf("upload file `%s` error: %w", entry.Name(), err)
- }
- logger.Infof("uploaded file `%s`", entry.Name())
- // Remove *.zst files early
- if strings.HasSuffix(entry.Name(), ".zst") {
- err := os.Remove(innerLocalPath)
- if err != nil {
- logger.Errorf("remove file `%s` error: %v", entry.Name(), err)
- }
- }
- }
- }
- return fullyUploaded, nil
- }
- func monitorDirAndNotify(notifyChan chan string) {
- scanFn := func() {
- // Create new watcher.
- watcher, err := fsnotify.NewWatcher()
- if err != nil {
- logger.Error(err)
- }
- defer watcher.Close()
- // Add a path.
- dir := appConfig.WatchingConfig.Dir
- if err := watcher.Add(dir); err != nil {
- logger.Errorf("unable to watch dir %s: %s", dir, err)
- return
- }
- var wg sync.WaitGroup
- wg.Add(2)
- // Start listening for events.
- go func() {
- defer wg.Done()
- for event := range watcher.Events {
- logger.Infof("received event: %v", event)
- // Notify
- notifyChan <- "<FSNOTIFY: " + event.Name + ">"
- }
- }()
- // Start listening for errors.
- go func() {
- defer wg.Done()
- for err := range watcher.Errors {
- logger.Errorf("fsnotify error: %s", err)
- }
- }()
- wg.Wait()
- }
- for {
- // scan
- scanFn()
- logger.Errorf("scan task stopped unexpectedly, will retry in %d seconds", appConfig.WatchingConfig.UploadFileInterval)
- // sleep
- time.Sleep(time.Duration(appConfig.WatchingConfig.UploadFileInterval) * time.Second)
- }
- }
- func initLog() *logrus.Logger {
- log := &lumberjack.Logger{
- Filename: "./log/watch-daemon.log", // 日志文件的位置
- MaxSize: 50, // 文件最大尺寸(以MB为单位)
- MaxBackups: 5, // 保留的最大旧文件数量
- MaxAge: 28, // 保留旧文件的最大天数
- Compress: true, // 是否压缩/归档旧文件
- LocalTime: true, // 使用本地时间创建时间戳
- }
- logger := logrus.New()
- // logger.Out = log
- logger.Out = io.MultiWriter(os.Stdout, log)
- return logger
- }
- func loadConfig() (AppConfig, error) {
- configFilePath := "./config/application.yaml"
- if !util.FileExists(configFilePath) {
- return AppConfig{}, errors.New("找不到配置文件")
- }
- viper.SetConfigFile(configFilePath)
- // 热更新
- viper.WatchConfig()
- viper.OnConfigChange(func(e fsnotify.Event) {
- fmt.Printf("配置文件 %s 发生了更改!!! 最新的Global.Source这个字段的值为 %s:", e.Name, viper.GetString("Global.Source"))
- })
- err := viper.ReadInConfig()
- if err != nil {
- panic(fmt.Errorf("error reading config: %s", err))
- }
- //把读取到的配置信息反序列化到 config 变量中
- var config AppConfig
- if err := viper.Unmarshal(&config); err != nil {
- fmt.Printf("viper Unmarshal failed, err:%s\n", err)
- }
- if strings.Contains(config.MinIoConfig.Addr, "$") {
- // Start a shell to expand the environment variables
- cmd := exec.Command("sh", "-c", "echo "+config.MinIoConfig.Addr)
- out, err := cmd.Output()
- if err == nil {
- config.MinIoConfig.Addr = strings.TrimSpace(string(out))
- }
- }
- return config, nil
- }
- // func old_main() {
- // // 初始化日志框架、读取配置
- // logger = initLog()
- // appConfig, err := loadConfig()
- // if err != nil {
- // logger.Error(err)
- // logger.Errorf("无法加载配置,程序退出")
- // os.Exit(1)
- // }
- // // upload file from tmp to minio
- // go uploadFileToMinIo(appConfig)
- // // Create new watcher.
- // watcher, err := fsnotify.NewWatcher()
- // if err != nil {
- // logger.Error(err)
- // }
- // defer watcher.Close()
- // // Start listening for events.
- // go func() {
- // for {
- // select {
- // case event, ok := <-watcher.Events:
- // if !ok {
- // return
- // }
- // if event.Has(fsnotify.Create) && strings.HasSuffix(event.Name, "sync") {
- // logger.Printf("copy file:%s from src:%s to:%s", event.Name,
- // appConfig.WatchingConfig.Dir, appConfig.WatchingConfig.TmpDir)
- // // create hard link
- // srcFilePath := strings.ReplaceAll(strings.TrimSuffix(event.Name, ".sync")+".zst",
- // "\\", "/")
- // dstFilePath := strings.ReplaceAll(srcFilePath, appConfig.WatchingConfig.Dir, appConfig.WatchingConfig.TmpDir)
- // err := util.CreateHardLink(srcFilePath, dstFilePath)
- // if err != nil {
- // logger.Errorf("拷贝文件失败, src:%s dst:%s", srcFilePath, dstFilePath)
- // }
- // } else if event.Has(fsnotify.Create) {
- // stat, err := os.Stat(event.Name)
- // if err != nil {
- // logger.Error(err)
- // }
- // if stat.IsDir() {
- // logger.Println("开始监听: ", event.Name)
- // err = watcher.Add(event.Name)
- // if err != nil {
- // logger.Errorf("监听目录失败, dir:%s e:%s", event.Name, err)
- // }
- // }
- // }
- // case err, ok := <-watcher.Errors:
- // if !ok {
- // return
- // }
- // logger.Errorf("error: %s", err)
- // }
- // }
- // }()
- // // Add a path.
- // err = addWatchPath(appConfig.WatchingConfig.Dir, watcher)
- // if err != nil {
- // logger.Error(err)
- // }
- // // Block main goroutine forever.
- // <-make(chan struct{})
- // }
- // // 递归添加监控目录
- // func addWatchPath(dir string, watcher *fsnotify.Watcher) error {
- // entries, err := os.ReadDir(dir)
- // if err != nil {
- // return fmt.Errorf("读取目录error: %w", err)
- // }
- // logger.Println("开始监听目录:", dir)
- // err = watcher.Add(dir)
- // if err != nil {
- // return err
- // }
- // if len(entries) > 0 {
- // // 递归子目录
- // for _, entry := range entries {
- // if entry.IsDir() {
- // err := addWatchPath(filepath.Join(dir, entry.Name()), watcher)
- // if err != nil {
- // return err
- // }
- // }
- // }
- // }
- // return nil
- // }
- // // 每隔一定时间上传文件
- // func uploadFileToMinIo(appConfig AppConfig) {
- // // minioClient
- // minioClient, err := minio.New(appConfig.MinIoConfig.Addr, &minio.Options{
- // Creds: credentials.NewStaticV4(appConfig.MinIoConfig.AccessKeyID, appConfig.MinIoConfig.SecretAccessKey, ""),
- // Secure: appConfig.UseSSL,
- // })
- // if err != nil {
- // logger.Errorf("MinIO Client init error: %s", err)
- // return
- // }
- // logger.Println("MinIO Client初始化成功")
- // for {
- // err := uploadFiles(appConfig.TmpDir, minioClient, appConfig.MinIoConfig.Bucket)
- // if err != nil {
- // logger.Errorf("upload files error, e: %s", err)
- // }
- // time.Sleep(time.Duration(appConfig.WatchingConfig.UploadFileInterval) * time.Second)
- // }
- // }
- // func uploadFiles(dir string, minioClient *minio.Client, bucket string) error {
- // entries, err := os.ReadDir(dir)
- // if err != nil {
- // return fmt.Errorf("上传文件, 读取目录. err: %w", err)
- // }
- // // uploadFiles
- // if len(entries) > 0 {
- // for _, entry := range entries {
- // if entry.IsDir() {
- // // 递归处理子文件夹
- // subfolder := filepath.Join(dir, entry.Name())
- // err := uploadFiles(subfolder, minioClient, bucket)
- // if err != nil {
- // logger.Errorf("上传文件失败 err:%s\n", err)
- // }
- // // 删除空文件夹
- // err = util.RemoveEmptyDir(subfolder)
- // if err != nil {
- // logger.Errorf("上传文件失败 err:%s\n", err)
- // }
- // } else {
- // filePath := filepath.Join(dir, entry.Name())
- // // 将文件上传至桶中对应文件夹
- // _, err = minioClient.FPutObject(context.Background(), bucket,
- // strings.ReplaceAll(filePath, "\\", "/"), dir,
- // minio.PutObjectOptions{ContentType: "application/octet-stream"})
- // if err != nil {
- // logger.Errorf("上传文件失败, filePath: %s err:%s\n", filePath, err)
- // }
- // // 删除文件
- // err := os.Remove(filePath)
- // if err != nil {
- // logger.Errorf("删除备份文件失败, filePath: %s err:%s\n", filePath, err)
- // }
- // }
- // }
- // }
- // return nil
- // }
|