package main import ( "bytes" "context" "encoding/binary" "encoding/json" "example/minio-into-stck/util" "fmt" "os" "os/exec" "os/signal" "path/filepath" "stck/stck-nsq-msg" smsg "stck/stck-nsq-msg/msg" "sync" "syscall" "io" // "log" "math" // "os" "regexp" "strconv" "strings" "time" "github.com/ClickHouse/clickhouse-go/v2" "github.com/ClickHouse/clickhouse-go/v2/lib/driver" "github.com/fsnotify/fsnotify" "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" "github.com/minio/minio-go/v7/pkg/notification" "github.com/natefinch/lumberjack" "github.com/nsqio/go-nsq" "github.com/sirupsen/logrus" "github.com/spf13/viper" "gorm.io/driver/mysql" "gorm.io/gorm" gorm_logger "gorm.io/gorm/logger" ) // AppInitConfig is the initial configuration of the application, loaded from the config file. type AppInitConfig struct { Mysql struct { User string `mapstructure:"user"` Password string `mapstructure:"password"` Host string `mapstructure:"host"` Database string `mapstructure:"database"` } `mapstructure:"mysql"` Minio struct { AccessKey string `mapstructure:"accessKey"` Secret string `mapstructure:"secret"` Bucket string `mapstructure:"bucket"` Host string `mapstructure:"host"` } `mapstructure:"minio"` Stck struct { Host string `mapstructure:"host"` Table string `mapstructure:"table"` } `mapstructure:"stck"` Nsq struct { TcpAddr string `mapstructure:"tcpAddr"` LookupdHttpAddr string `mapstructure:"lookupdHttpAddr"` } `mapstructure:"nsq"` Main struct { UploadRetryMaxTimes int `mapstructure:"uploadRetryMaxTimes"` FailedRetryDelaySeconds int `mapstructure:"failedRetryDelaySeconds"` } `mapstructure:"main"` } type AppConfig struct { // List of name regex patterns to exclude from import. ImportBlacklist []string } type AppConfigDbEntry struct { Key string `gorm:"primaryKey"` Value string } // TODO: 插入时先创建行,插入完后更新插入耗时 // UploadRecord Represents a record of an uploaded stream. (A major upload task.) type UploadRecord struct { Key string `gorm:"primaryKey"` CreatedAt time.Time } // PartUploadRecord Represents a record of an uploaded part of a stream. (A minor upload task.) type PartUploadRecord struct { StreamName string `gorm:"primaryKey"` PartName string `gorm:"primaryKey"` CreatedAt time.Time } type StreamMetadata struct { MetricName string `json:"metric_name"` Name string `json:"name"` TimestampOffset int64 `json:"timestamp_offset"` Interval int64 `json:"interval"` PartsCount int64 `json:"parts_count"` PointsPerPart int64 `json:"points_per_part"` TotalPoints int64 `json:"total_points"` } var gLocalIpAddress string var gMachineID string var gNsqProducer *nsq.Producer var gNsqConsumer *nsq.Consumer var gAppStartTime time.Time = time.Now() var programmaticQuitChan chan struct{} = make(chan struct{}, 1) var gAppQuitting = false var gAppExitWaitGroup sync.WaitGroup var buildtime string func MakeHeartbeatMsg() *smsg.DeviceHeartbeatMsg { m := smsg.MakeDeviceHeartbeatMsg(gMachineID, gAppStartTime.UnixNano(), time.Now().UnixNano(), gLocalIpAddress, "minio-into-stck", "0.1.0+dev."+buildtime) return &m } func PublishMessage(msg stcknsqmsg.StckNsqMsgVariant) error { payload, err := stcknsqmsg.ToStckNsqMsgString(msg) if err != nil { return fmt.Errorf("marshal message error: %w", err) } err = gNsqProducer.Publish(smsg.ServerDoNotifyTopic, []byte(payload)) if err != nil { return fmt.Errorf("publish message error: %w", err) } return nil } func initNsq() { ip, err := stcknsqmsg.GetLocalIP() if err != nil { logger.Warnf("GetLocalIP error: %s", err) } else { gLocalIpAddress = ip.String() logger.Infof("Local IP: %s", gLocalIpAddress) } gMachineID = stcknsqmsg.MakeUniqueMachineID() logger.Infof("Machine ID: %s", gMachineID) fmt.Printf("IP: %s, Machine ID: %s\n", gLocalIpAddress, gMachineID) // Connect to NSQ nsqConfig := nsq.NewConfig() gNsqProducer, err = nsq.NewProducer(appInitCfg.Nsq.TcpAddr, nsqConfig) if err != nil { logger.Fatalf("NSQ Producer init error: %s", err) } gNsqConsumer, err = nsq.NewConsumer(smsg.ClientDoActionTopic, gMachineID, nsqConfig) if err != nil { logger.Fatalf("NSQ Consumer init error: %s", err) } gNsqConsumer.AddConcurrentHandlers(nsq.HandlerFunc(func(message *nsq.Message) error { logger.Debugf("NSQ Consumer received message: %s", message.Body) // Parse the message recvTime := message.Timestamp msg, err := stcknsqmsg.FromString(string(message.Body)) if err != nil { logger.Errorf("NSQ Consumer unmarshal message error: %s", err) return nil } // Process the message switch data := msg.Data.(type) { case *smsg.DevicePingMsg: if !stcknsqmsg.DeviceIdMatches(data.DeviceID, gMachineID) { break } // Write pong pongMsg := smsg.MakeDevicePongMsg(gMachineID, recvTime) err := PublishMessage(&pongMsg) if err != nil { logger.Errorf("send pong error: %s", err) } case *smsg.RequestDeviceExecuteShellScriptMsg: if !stcknsqmsg.DeviceIdMatches(data.DeviceID, gMachineID) { break } // Execute the shell script resp := smsg.MakeDeviceActionDoneMsg(gMachineID, data.MsgType(), "", recvTime, -1, "") result, err := func(data *smsg.RequestDeviceExecuteShellScriptMsg) (string, error) { cmd := exec.Command("bash", "-c", data.Script) var out bytes.Buffer cmd.Stdout = &out cmd.Stderr = &out err := cmd.Run() return out.String(), err }(data) if err != nil { errMsg := fmt.Sprintf("execute shell script error:\n%s\n\noutput:\n%s", err, result) // Write error message resp.Status = -1 resp.Msg = errMsg } else { // Write output resp.Status = 0 resp.Msg = result } err = PublishMessage(&resp) if err != nil { logger.Errorf("send action done error: %s", err) } case *smsg.RequestDeviceUpdateMsg: if !stcknsqmsg.DeviceIdMatches(data.DeviceID, gMachineID) { break } if data.ServiceName != "minio-into-stck" { break } resp := smsg.MakeDeviceActionDoneMsg(gMachineID, data.MsgType(), "", recvTime, 0, "") result, err := func(data *smsg.RequestDeviceUpdateMsg) (string, error) { // Download the update file to /tmp downloadPath := "/tmp/minio-into-stck-updater" updateScriptPath := filepath.Join(downloadPath, "update.sh") var out bytes.Buffer err := os.RemoveAll(downloadPath) if err != nil { return "", err } err = os.MkdirAll(downloadPath, 0777) if err != nil { return "", err } updateScriptContent := fmt.Sprintf(`#!/bin/bash set -e cd %s wget --tries=3 -nv -O installer.tar.gz %s tar -xzf installer.tar.gz cd minio-into-stck-installer ./replacing-update.sh `, downloadPath, data.ServiceBinaryURL) err = os.WriteFile(updateScriptPath, []byte(updateScriptContent), 0777) if err != nil { return "", err } // Execute the update script cmd := exec.Command("bash", "-c", updateScriptPath) cmd.Stdout = &out cmd.Stderr = &out err = cmd.Run() return out.String(), err }(data) if err != nil { errMsg := fmt.Sprintf("execute update process error:\n%s\n\noutput:\n%s", err, result) // Write error message resp.Status = -1 resp.Msg = errMsg } else { // Write output resp.Status = 0 resp.Msg = "executed update process successfully\n\noutput:\n" + result } err = PublishMessage(&resp) if err != nil { logger.Errorf("send action done error: %s", err) } case *smsg.ForceDeviceRebootMsg: if !stcknsqmsg.DeviceIdMatches(data.DeviceID, gMachineID) { break } programmaticQuitChan <- struct{}{} case *smsg.ForceDeviceSendHeartbeatMsg: if !stcknsqmsg.DeviceIdMatches(data.DeviceID, gMachineID) { break } // Send heartbeat heartBeatMsg := MakeHeartbeatMsg() err := PublishMessage(heartBeatMsg) if err != nil { logger.Errorf("send heartbeat error: %s", err) } case *smsg.RequestDeviceUploadLogsToMinioMsg: if !stcknsqmsg.DeviceIdMatches(data.DeviceID, gMachineID) { break } // Upload logs to MinIO resp := smsg.MakeDeviceActionDoneMsg(gMachineID, data.MsgType(), "", recvTime, 0, "") minioCreds := credentials.NewStaticV4(appInitCfg.Minio.AccessKey, appInitCfg.Minio.Secret, "") minioClient, err := minio.New(appInitCfg.Minio.Host, &minio.Options{ Creds: minioCreds, Secure: false, }) if err != nil { logger.Errorf("MinIO Client init error: %s", err) resp.Status = -1 resp.Msg += fmt.Sprintf("MinIO Client init error: %s\n", err) } else { if util.FileExists("./logs") { err = util.MinioUploadFolder(minioClient, data.RemoteBucket, filepath.Join(data.RemoteBasePath, gMachineID, "logs"), "logs") if err != nil { logger.Errorf("upload logs to MinIO error: %s", err) resp.Status = -1 resp.Msg += fmt.Sprintf("upload logs to MinIO error: %s\n", err) } } if util.FileExists("./log") { err = util.MinioUploadFolder(minioClient, data.RemoteBucket, filepath.Join(data.RemoteBasePath, gMachineID, "log"), "log") if err != nil { logger.Errorf("upload log to MinIO error: %s", err) resp.Status = -1 resp.Msg += fmt.Sprintf("upload log to MinIO error: %s\n", err) } } } err = PublishMessage(&resp) if err != nil { logger.Errorf("send action done error: %s", err) } default: logger.Debugf("NSQ Consumer ignored unknown or uninteresting message: %v", msg) } // Notify NSQ that the message is processed successfully return nil }), 1) // err = gNsqConsumer.ConnectToNSQLookupd(gAppConfig.Nsq.LookupdHttpAddr) err = gNsqConsumer.ConnectToNSQD(appInitCfg.Nsq.TcpAddr) if err != nil { logger.Fatalf("NSQ Consumer connect error: %s", err) } } var appInitCfg *AppInitConfig = &AppInitConfig{} func initLoadConfig() { viper.SetDefault("main.uploadRetryMaxTimes", 20) viper.SetDefault("main.failedRetryDelaySeconds", 5) viper.SetConfigFile("./config/application.yaml") viper.WatchConfig() viper.OnConfigChange(func(e fsnotify.Event) { logger.Infoln("Config file changed:", e.Name) var newAppInitConfig AppInitConfig err := viper.Unmarshal(&newAppInitConfig) if err != nil { logger.Infoln("Failed to unmarshal config:", err) return } appInitCfg = &newAppInitConfig }) err := viper.ReadInConfig() if err != nil { logger.Fatalf("Failed to read config file: %v", err) } err = viper.Unmarshal(appInitCfg) if err != nil { logger.Errorf("Failed to unmarshal config: %v, exiting...", err) fmt.Printf("Failed to unmarshal config: %v, exiting...\n", err) os.Exit(1) } } func initLog() *logrus.Logger { // 主日志文件 log := &lumberjack.Logger{ Filename: "./log/minio-into-stck.log", // 日志文件的位置 MaxSize: 50, // 文件最大尺寸(以MB为单位) MaxBackups: 5, // 保留的最大旧文件数量 MaxAge: 28, // 保留旧文件的最大天数 Compress: true, // 是否压缩/归档旧文件 LocalTime: true, // 使用本地时间创建时间戳 } // 错误日志文件 errorLog := &lumberjack.Logger{ Filename: "./log/minio-into-stck.error.log", // 错误日志文件的位置 MaxSize: 50, // 文件最大尺寸(以MB为单位) MaxBackups: 5, // 保留的最大旧文件数量 MaxAge: 28, // 保留旧文件的最大天数 Compress: true, // 是否压缩/归档旧文件 LocalTime: true, // 使用本地时间创建时间戳 } // 统计日志文件 statLog := &lumberjack.Logger{ Filename: "./log/minio-into-stck.stat.log", // 统计日志文件的位置 MaxSize: 50, // 文件最大尺寸(以MB为单位) MaxBackups: 5, // 保留的最大旧文件数量 MaxAge: 28, // 保留旧文件的最大天数 Compress: true, // 是否压缩/归档旧文件 LocalTime: true, // 使用本地时间创建时间戳 } logger := logrus.New() if strings.ToLower(os.Getenv("LOG_LEVEL")) == "trace" { logger.SetLevel(logrus.TraceLevel) } else { logger.SetLevel(logrus.DebugLevel) } logger.Out = log // logger.Out = io.MultiWriter(os.Stdout, log) // 设置错误级别日志输出到额外的文件 logger.AddHook(&ErrorHook{ Writer: errorLog, LogLevels: []logrus.Level{ logrus.ErrorLevel, logrus.FatalLevel, logrus.PanicLevel, }, }) statLogger = logrus.New() statLogger.SetLevel(logrus.InfoLevel) statLogger.Out = statLog return logger } // ErrorHook 用于将错误级别的日志输出到额外的文件 type ErrorHook struct { Writer io.Writer LogLevels []logrus.Level } func (hook *ErrorHook) Fire(entry *logrus.Entry) error { line, err := entry.String() if err != nil { return err } _, err = hook.Writer.Write([]byte(line)) return err } func (hook *ErrorHook) Levels() []logrus.Level { return hook.LogLevels } var logger *logrus.Logger var statLogger *logrus.Logger var mutexObjFailCounter = &sync.Mutex{} var objFailCounter map[string]int = make(map[string]int) func main() { var err error fmt.Println("Starting minio-into-stck, build time:", buildtime) logger = initLog() logger.Warnln("Logger initialized.") // Load configuration from file initLoadConfig() // 初始化 NSQ initNsq() var db *gorm.DB var minioClient *minio.Client var ckConn driver.Conn // Connect to MySQL dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=True&loc=Local", appInitCfg.Mysql.User, appInitCfg.Mysql.Password, appInitCfg.Mysql.Host, appInitCfg.Mysql.Database) db, err = gorm.Open(mysql.Open(dsn), &gorm.Config{}) if err != nil { logger.Fatalf("Failed to connect to MySQL: %v", err) } // Disable logging db.Logger = gorm_logger.Discard // Get the underlying sql.DB object to close the connection later sqlDB, err := db.DB() if err != nil { logger.Fatalf("Failed to get MySQL DB: %v", err) } defer sqlDB.Close() // Ping the database to check if the connection is successful err = sqlDB.Ping() if err != nil { logger.Infof("ping db error: %v", err) return } logger.Infoln("Database connection successful") // Perform auto migration err = db.AutoMigrate(&AppConfigDbEntry{}, &UploadRecord{}, &PartUploadRecord{}) if err != nil { logger.Infof("auto migrate error: %v", err) return } logger.Infoln("Auto migration completed") // Connect to MinIO minioClient, err = minio.New(util.ExpandShellStringInfallible(appInitCfg.Minio.Host), &minio.Options{ Creds: credentials.NewStaticV4(appInitCfg.Minio.AccessKey, appInitCfg.Minio.Secret, ""), Secure: false, }) if err == nil { bucketExists, err := minioClient.BucketExists(context.Background(), appInitCfg.Minio.Bucket) if err != nil { logger.Fatalf("Failed to check if bucket %s exists: %v", appInitCfg.Minio.Bucket, err) } if !bucketExists { logger.Fatalf("Bucket %s does not exist", appInitCfg.Minio.Bucket) } } if err != nil { logger.Fatalf("Failed to connect to MinIO: %v", err) } logger.Infoln("Connected to MinIO") // Connect to ClickHouse ckConn, err = clickhouse.Open(&clickhouse.Options{ Addr: []string{appInitCfg.Stck.Host}, }) if err == nil { err = ckConn.Ping(context.Background()) } if err != nil { logger.Fatalf("Failed to connect to ClickHouse: %v", err) } logger.Infoln("Connected to ClickHouse") // OK! Everything is ready now. // objUploadChan := make(chan string, 1024*256) objUploadChan := util.NewDChan[string](1024 * 16) // Start the main work logger.Infoln("Starting main worker...") gAppExitWaitGroup.Add(1) go func() { defer gAppExitWaitGroup.Done() main_worker(AppCtx{db, minioClient, ckConn}, objUploadChan) }() // Wait on signal. signalChan := make(chan os.Signal, 1) signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM) select { case <-signalChan: logger.Infof("received signal, stopping minio-into-stck") case <-programmaticQuitChan: logger.Infof("received programmatic quit signal, stopping minio-into-stck") } gAppQuitting = true // HACK: Notify the main worker to quit objUploadChan.In() <- "" // Close the NSQ producer and consumer gNsqProducer.Stop() gNsqConsumer.Stop() // Wait for the goroutines to exit gAppExitWaitGroup.Wait() logger.Infof("minio-into-stck stopped gracefully") } type AppCtx struct { db *gorm.DB minioClient *minio.Client ckConn driver.Conn } var gAppCfg *AppConfig type PartUploadArgs struct { StreamInfo *StreamMetadata StreamName string PartName string } func main_worker(app AppCtx, objUploadChan *util.DChan[string]) { ctx := context.Background() // Load config from DB appCfg, err := load_app_cfg_from_db(app.db) if err != nil { logger.Fatalf("Failed to load app config from DB: %v", err) } gAppCfg = &appCfg // Start the notification listener go func() { for { // Register the bucket notification listener logger.Infoln("Registering bucket notification listener...") notifys := app.minioClient.ListenBucketNotification( ctx, appInitCfg.Minio.Bucket, "", "", []string{string(notification.ObjectCreatedAll)}) // Listen OK, start the full upload trigger to upload maybe missed files go trigger_full_upload(app, objUploadChan.In()) for notifyInfo := range notifys { for _, record := range notifyInfo.Records { key := record.S3.Object.Key logger.Traceln("New object notification:", key) // Only care when `.zst` / `metadata.json` files are uploaded // keyParts := strings.Split(key, "/") // keyLastPart := keyParts[len(keyParts)-1] keyLastPart := filepath.Base(key) if keyLastPart != "metadata.json" && !strings.HasSuffix(keyLastPart, ".zst") { continue } // key = strings.Join(keyParts[:len(keyParts)-1], "/") + "/" key = filepath.Dir(key) + "/" // Remove fail counter so that the object can be retried instead of // being spuriously ignored mutexObjFailCounter.Lock() delete(objFailCounter, key) mutexObjFailCounter.Unlock() // Queue the object for upload objUploadChan.Write(key) } if notifyInfo.Err != nil { logger.Errorf("Bucket notification listener error: %v", notifyInfo.Err) } } logger.Warnln("Bucket notification listener stopped unexpectedly, retrying in 5 seconds...") time.Sleep(5 * time.Second) // Clear fail counter mutexObjFailCounter.Lock() objFailCounter = make(map[string]int) mutexObjFailCounter.Unlock() } }() // Start the main loop (streams upload worker) for objToUpload := range objUploadChan.Out() { objUploadChan.MarkElemReadDone(objToUpload) if gAppQuitting { logger.Infof("Quitting, stopping main worker") return } if objToUpload == "" { continue } logger.Infoln("Checking stream object:", objToUpload) if object_is_blacklisted(objToUpload) { logger.Infof("Object `%s` is blacklisted, skipping", objToUpload) continue } mutexObjFailCounter.Lock() if objFailCounter[objToUpload] >= appInitCfg.Main.UploadRetryMaxTimes { logger.Warnf("Retried upload stream `%s` for too many %d times, give up", objToUpload, objFailCounter[objToUpload]) delete(objFailCounter, objToUpload) mutexObjFailCounter.Unlock() continue } objFailCounter[objToUpload]++ mutexObjFailCounter.Unlock() fullyUploaded, err := upload_one_stream(app, objToUpload) if err != nil { // Queue the object for retry logger.Warnf("Failed to upload stream `%s`: `%v`, retrying after %d seconds", objToUpload, err, appInitCfg.Main.FailedRetryDelaySeconds) objUploadChan.DelayedWrite(objToUpload, time.Duration(appInitCfg.Main.FailedRetryDelaySeconds)*time.Second) continue } if fullyUploaded { // Mark the stream as fully uploaded //err = app.db.Create(&UploadRecord{Key: objToUpload}).Error err = app.db.Where(UploadRecord{Key: objToUpload}).FirstOrCreate(&UploadRecord{}).Error if err != nil { logger.Warnf("Failed to mark stream %s as uploaded: %v", objToUpload, err) } else { // We can now remove the stream parts from the parts table err = app.db.Where("stream_name = ?", objToUpload).Delete(&PartUploadRecord{}).Error if err != nil { logger.Warnf("Failed to remove parts of stream %s from parts table: %v", objToUpload, err) } logger.Infof("Marked stream %s as fully uploaded", objToUpload) } // Remove entry from the fail counter mutexObjFailCounter.Lock() delete(objFailCounter, objToUpload) mutexObjFailCounter.Unlock() } else { // Queue the object for retry logger.Warnf("Stream %s is not fully uploaded, retrying after %d seconds", objToUpload, appInitCfg.Main.FailedRetryDelaySeconds) objUploadChan.DelayedWrite(objToUpload, time.Duration(appInitCfg.Main.FailedRetryDelaySeconds)*time.Second) } } } func trigger_full_upload(app AppCtx, objToUploadChan chan<- string) { // // Upload all files in the directory // options := minio.ListObjectsOptions{ // Recursive: false, // } // objectsCh := app.minioClient.ListObjects(context.Background(), appInitCfg.Minio.Bucket, options) // // NOTE: Streams in minio are organized as `//`. // for objInfo := range objectsCh { // if objInfo.Err != nil { // logger.Warnf("Error listing bucket `%s` objects: %v", appInitCfg.Minio.Bucket, objInfo.Err) // continue // } // key := objInfo.Key // if strings.HasSuffix(key, "/") { // // Is a directory (), go deeper into it // options := minio.ListObjectsOptions{ // Prefix: key, // Recursive: false, // } // for objInfo := range app.minioClient.ListObjects(context.Background(), appInitCfg.Minio.Bucket, options) { // if objInfo.Err != nil { // logger.Warnf("Error listing bucket `%s` folder `%s` objects: %v", appInitCfg.Minio.Bucket, key, objInfo.Err) // continue // } // key := objInfo.Key // if strings.HasSuffix(key, "/") { // // Is a directory, should be a stream then // uploaded := object_already_uploaded(app, key) // if !uploaded { // objToUploadChan <- key // } // } // } // } // } // WARN: Slow!!! // options := minio.ListObjectsOptions{ // Recursive: true, // } options := minio.ListObjectsOptions{ Recursive: false, } objectsCh := app.minioClient.ListObjects(context.Background(), appInitCfg.Minio.Bucket, options) // NOTE: Streams in minio are organized as `//`. for objInfo := range objectsCh { if objInfo.Err != nil { logger.Warnf("Error listing bucket `%s` objects: %v", appInitCfg.Minio.Bucket, objInfo.Err) continue } key := objInfo.Key // // If it's a directory with `metadata.json` file, it should be a stream // if strings.HasSuffix(key, "/metadata.json") { // streamName := strings.TrimSuffix(key, "metadata.json") // uploaded := object_already_uploaded(app, streamName) // if !uploaded { // objToUploadChan <- streamName // } // } // Scan through all subdirectories if strings.HasSuffix(key, "/") { exists, err := util.MinioObjectExists(app.minioClient, appInitCfg.Minio.Bucket, key+"metadata.json") if err != nil { logger.Warnf("Error checking if object `%s` exists: %v", key+"metadata.json", err) continue } if exists { // Go ahead and upload the stream uploaded := object_already_uploaded(app, key) if !uploaded { objToUploadChan <- key } } else { // Check inner directories options := minio.ListObjectsOptions{ Prefix: key, Recursive: false, } for objInfo := range app.minioClient.ListObjects(context.Background(), appInitCfg.Minio.Bucket, options) { if objInfo.Err != nil { logger.Warnf("Error listing bucket `%s` folder `%s` objects: %v", appInitCfg.Minio.Bucket, key, objInfo.Err) continue } key := objInfo.Key if strings.HasSuffix(key, "/") { // Is a directory, should be a stream then uploaded := object_already_uploaded(app, key) if !uploaded { objToUploadChan <- key } } } } } } } func upload_one_stream(app AppCtx, streamName string) (fullyUploaded bool, err error) { type StreamObjectUploadStatistics struct { StartTime time.Time EndTime time.Time PartName string UpState string // "repeated" / "ok" / "fail" Msg string // "error message" } type StreamUploadStatistics struct { StartTime time.Time EndTime time.Time StreamName string MetaPointsCount int64 MetaPartsCount int64 Objects map[string]StreamObjectUploadStatistics Msg string } streamStats := StreamUploadStatistics{ StartTime: time.Now(), StreamName: streamName, Objects: make(map[string]StreamObjectUploadStatistics), } logger.Infof("Going to upload stream `%s`", streamName) if object_already_uploaded(app, streamName) { logger.Infof("Stream `%s` is already uploaded", streamName) return true, nil } defer func() { streamStats.EndTime = time.Now() repeatedCount := int64(0) okCount := int64(0) failCount := int64(0) totalCount := int64(0) objDetailsMsg := "" for _, obj := range streamStats.Objects { totalCount++ if obj.UpState == "repeated" { repeatedCount++ objDetailsMsg += fmt.Sprintf("\n{%s: REPEATED (t: %v ~ %v, duration: %v)}", obj.PartName, util.FmtMyTime(obj.StartTime), util.FmtMyTime(obj.EndTime), obj.EndTime.Sub(obj.StartTime)) } else if obj.UpState == "ok" { okCount++ objDetailsMsg += fmt.Sprintf("\n{%s: OK (t: %v ~ %v, duration: %v)}", obj.PartName, util.FmtMyTime(obj.StartTime), util.FmtMyTime(obj.EndTime), obj.EndTime.Sub(obj.StartTime)) } else { failCount++ objDetailsMsg += fmt.Sprintf("\n{%s: FAIL (Reason: %s) (t: %v ~ %v, duration: %v)}", obj.PartName, obj.Msg, util.FmtMyTime(obj.StartTime), util.FmtMyTime(obj.EndTime), obj.EndTime.Sub(obj.StartTime)) } } statLogger.Infof("Upload stream `%s` took %v (%v ~ %v); is fully uploaded: %v; metadata-reported part count=%d; "+ "repeated=%d, ok=%d, fail=%d, total=%d; details:%s", streamName, streamStats.EndTime.Sub(streamStats.StartTime), util.FmtMyTime(streamStats.StartTime), util.FmtMyTime(streamStats.EndTime), fullyUploaded, streamStats.MetaPartsCount, repeatedCount, okCount, failCount, totalCount, objDetailsMsg) // Send upload status changed message msg := smsg.MakeStreamInsertToStckStatusChangedMsg(gMachineID, streamStats.StartTime.UnixNano(), streamStats.StreamName, streamStats.MetaPointsCount, streamStats.MetaPartsCount, okCount, failCount, repeatedCount) err := PublishMessage(&msg) if err != nil { logger.Errorf("send stream insert to stck status changed message error: %s", err) } }() fullyUploaded = true // Get stream metadata streamInfo, err := get_stream_metadata(app, streamName) if err != nil { // Cannot continue without metadata return false, err } streamStats.MetaPointsCount = streamInfo.TotalPoints streamStats.MetaPartsCount = streamInfo.PartsCount if streamInfo.PartsCount == 0 { // Edge device didn't finish uploading the stream yet logger.Debugf("Marking stream `%s` as not fully uploaded, reason: parts_count=0", streamName) fullyUploaded = false } hasSomething := false // Upload parts streamObjPath := streamName options := minio.ListObjectsOptions{ Prefix: streamObjPath, Recursive: false, } logger.Tracef("Listing minio objects in `%s`, bucket `%s`", streamObjPath, appInitCfg.Minio.Bucket) // hasSomething := false hasMetadata := false for objInfo := range app.minioClient.ListObjects(context.Background(), appInitCfg.Minio.Bucket, options) { if objInfo.Err != nil { return false, objInfo.Err } if gAppQuitting { logger.Infof("Quitting, stopping uploading one stream") return false, nil } logger.Tracef("Checking minio file `%s`", objInfo.Key) if strings.HasSuffix(objInfo.Key, "/") { continue } partName := filepath.Base(objInfo.Key) if partName == "metadata.json" { hasMetadata = true continue } hasSomething = true objStat := StreamObjectUploadStatistics{ StartTime: time.Now(), PartName: partName, } if part_already_uploaded(app, streamName, partName) { objStat.EndTime = time.Now() objStat.UpState = "repeated" streamStats.Objects[objInfo.Key] = objStat logger.Infof("Part `%s` of stream `%s` is already uploaded", objInfo.Key, streamName) continue } if fullyUploaded { fullyUploaded = false logger.Debugf("Marking stream `%s` as not fully uploaded, reason: part `%s` not uploaded", streamName, objInfo.Key) } // Do the parts upload partInfo := PartUploadArgs{StreamInfo: streamInfo, StreamName: streamName, PartName: objInfo.Key} logger.Infof("Uploading part `%s` (total %d) of stream `%s`, total_points=%d", partInfo.PartName, partInfo.StreamInfo.PartsCount, partInfo.StreamName, partInfo.StreamInfo.TotalPoints) err := upload_one_part(app, partInfo.StreamInfo, partInfo.StreamName, partInfo.PartName) if err != nil { objStat.EndTime = time.Now() objStat.UpState = "fail" objStat.Msg = err.Error() logger.Warnf("Failed to upload part `%s` of stream `%s` (took %v): %v", partInfo.PartName, partInfo.StreamName, objStat.EndTime.Sub(objStat.StartTime), err) fullyUploaded = false } else { // Mark the part as uploaded //err = app.db.Create(&PartUploadRecord{StreamName: partInfo.StreamName, PartName: partInfo.PartName}).Error part := PartUploadRecord{StreamName: partInfo.StreamName, PartName: partName} err = app.db.Where(part).FirstOrCreate(&PartUploadRecord{}).Error if err != nil { logger.Warnf("Failed to mark part `%s` of stream `%s` as uploaded: %v", partInfo.PartName, partInfo.StreamName, err) } objStat.EndTime = time.Now() objStat.UpState = "ok" logger.Infof("Uploaded part `%s` of stream `%s`, took %v", objInfo.Key, streamName, objStat.EndTime.Sub(objStat.StartTime)) } streamStats.Objects[objInfo.Key] = objStat partNum, err := util.ExtractNumberFromString(partName) if err != nil { // Not a part file? Skip continue } status := "success" if objStat.UpState != "ok" { status = "failed" } msg := smsg.MakePartInsertToStckStatusChangedMsg(gMachineID, objStat.StartTime.UnixNano(), streamName, partNum, streamStats.MetaPointsCount, status, objStat.Msg) err = PublishMessage(&msg) if err != nil { logger.Errorf("send part insert to stck status changed message error: %s", err) } } if !hasMetadata { logger.Warnf("Stream `%s` has no metadata file, will retry later", streamName) fullyUploaded = false } if !hasSomething { if streamInfo.PartsCount != 0 { logger.Errorf("Stream `%s` has no parts, but claims to have %d parts, %d points???", streamName, streamInfo.PartsCount, streamInfo.TotalPoints) } else { logger.Warnf("Stream `%s` has no parts in minio, will retry later", streamName) } fullyUploaded = false } return fullyUploaded, nil } func upload_one_part(app AppCtx, streamInfo *StreamMetadata, streamName string, partObjPath string) (err error) { partName := filepath.Base(partObjPath) if part_already_uploaded(app, streamName, partName) { return nil } dryRun := false if !dryRun { // Get the part data from MinIO obj, err := app.minioClient.GetObject(context.Background(), appInitCfg.Minio.Bucket, partObjPath, minio.GetObjectOptions{}) if err != nil { return fmt.Errorf("failed to get part `%s` data: %w", partObjPath, err) } defer obj.Close() // Read the part data var partDataBuf = new(bytes.Buffer) _, err = partDataBuf.ReadFrom(obj) if err != nil { return fmt.Errorf("failed to read part `%s` data: %w", partObjPath, err) } // Process the part data partData, err := util.DecompressZstdBuffer(partDataBuf.Bytes()) if err != nil { return fmt.Errorf("failed to decompress part `%s` data: %w", partObjPath, err) } // Use regex to extract the part index from the part name partIndex := int64(0) { re := regexp.MustCompile(`part_(\d+)\.zst`) matches := re.FindStringSubmatch(partObjPath) if len(matches) != 2 { return fmt.Errorf("failed to extract part index from part name `%s`", partObjPath) } partIndex, err = strconv.ParseInt(matches[1], 10, 64) if err != nil { return fmt.Errorf("failed to convert part index `%s` to integer: %w", matches[1], err) } // Check if the part index is correct if partIndex < 0 || (streamInfo.PartsCount != 0 && partIndex >= streamInfo.PartsCount) { return fmt.Errorf("part `%s` index out of bounds: %d / %d", partObjPath, partIndex, streamInfo.PartsCount) } // Check if the part data size is correct if streamInfo.PartsCount != 0 { left := int64(len(partData)) if partIndex < streamInfo.PartsCount-1 { right := streamInfo.PointsPerPart * 8 if left != right { return fmt.Errorf("part `%s` data size mismatch: %d versus %d", partObjPath, left, right) } } else if partIndex == streamInfo.PartsCount-1 { right := (streamInfo.TotalPoints % streamInfo.PointsPerPart) * 8 if right == 0 { right = streamInfo.PointsPerPart * 8 } if left != right { return fmt.Errorf("part `%s` data size mismatch: %d versus %d", partObjPath, left, right) } } else { return fmt.Errorf("part `%s` index out of bounds: %d", partObjPath, partIndex) } } } partPointsCount := len(partData) / 8 // Insert the part data into ClickHouse batch, err := app.ckConn.PrepareBatch(context.Background(), "INSERT INTO "+appInitCfg.Stck.Table) if err != nil { return fmt.Errorf("failed to insert part data into stck: %w", err) } /* ┌─name────────┬─type────────────────────────────────────────────────┬─default_type─┬─default_expression─┬─comment───────────────────┬─codec_expression─┬─ttl_expression─┐ │ metric_name │ LowCardinality(String) │ │ │ Metric name │ │ │ │ point_name │ LowCardinality(String) │ │ │ Point name │ │ │ │ tags │ Map(LowCardinality(String), LowCardinality(String)) │ │ │ Point tags │ │ │ │ value │ Float64 │ │ │ Point value │ │ │ │ nanoseconds │ Int64 │ │ │ Point time in nanoseconds │ DoubleDelta, LZ4 │ │ └─────────────┴─────────────────────────────────────────────────────┴──────────────┴────────────────────┴───────────────────────────┴──────────────────┴────────────────┘ */ logger.Debugf("Going to insert %d points for part `%s` of stream `%s`", partPointsCount, partObjPath, streamName) for i := 0; i < partPointsCount; i++ { metricName := streamInfo.MetricName pointName := streamInfo.Name tags := map[string]string{} value := math.Float64frombits(binary.LittleEndian.Uint64(partData[i*8 : (i+1)*8])) // NOTE: TimestampOffset is in milliseconds, need to convert to nanoseconds nanoseconds := streamInfo.TimestampOffset * 1e6 nanoseconds += int64(partIndex) * int64(streamInfo.PointsPerPart) * streamInfo.Interval nanoseconds += int64(i) * streamInfo.Interval err := batch.Append(metricName, pointName, tags, value, nanoseconds) if err != nil { return err } } if batch.Rows() != partPointsCount { logger.Errorf("Batch rows mismatch: %d / %d???", batch.Rows(), partPointsCount) } err = batch.Send() if err != nil { return err } if !batch.IsSent() { logger.Errorln("Batch not sent???") } logger.Debugf("Inserted %d points for part `%s` of stream `%s`", partPointsCount, partObjPath, streamName) // We made progress, reset the fail counter mutexObjFailCounter.Lock() objFailCounter[streamName] = 0 mutexObjFailCounter.Unlock() } return nil } func load_app_cfg_from_db(db *gorm.DB) (AppConfig, error) { var cfg AppConfig err := db.AutoMigrate(&AppConfigDbEntry{}) if err != nil { return cfg, err } //db.Create(&AppConfigDbEntry{Key: "ImportBlacklist", Value: `[]`}) var dbEntry AppConfigDbEntry result := db.Where(AppConfigDbEntry{Key: "ImportBlacklist"}).Attrs(AppConfigDbEntry{Value: `[]`}).FirstOrCreate(&dbEntry) if result.Error != nil { //if errors.Is(result.Error, gorm.ErrRecordNotFound) { // dbEntry.Value = `[]` //} else { // return cfg, result.Error //} return cfg, result.Error } err = json.Unmarshal([]byte(dbEntry.Value), &cfg.ImportBlacklist) if err != nil { return cfg, err } return cfg, nil } func object_already_uploaded(app AppCtx, key string) bool { var record UploadRecord result := app.db.First(&record, "key", key) return result.Error == nil } func object_is_blacklisted(key string) bool { for _, regexPattern := range gAppCfg.ImportBlacklist { // TODO: Cache compiled regex patterns if matched, _ := regexp.MatchString(regexPattern, key); matched { return true } } return false } func part_already_uploaded(app AppCtx, streamName string, partName string) bool { var record PartUploadRecord result := app.db.First(&record, "stream_name = ? AND part_name = ?", streamName, partName) return result.Error == nil } func get_stream_metadata(app AppCtx, streamName string) (*StreamMetadata, error) { // Get the stream metadata from MinIO metadataObjPath := streamName + "metadata.json" obj, err := app.minioClient.GetObject(context.Background(), appInitCfg.Minio.Bucket, metadataObjPath, minio.GetObjectOptions{}) if err != nil { return nil, fmt.Errorf("failed to get stream metadata: %w", err) } defer obj.Close() var streamInfo StreamMetadata err = json.NewDecoder(obj).Decode(&streamInfo) if err != nil { return nil, fmt.Errorf("failed to decode stream metadata: %w", err) } return &streamInfo, nil }