package main import ( "bytes" "context" "encoding/binary" "encoding/json" "example/minio-into-stck/util" "fmt" "log" "math" "regexp" "strconv" "strings" "time" "github.com/ClickHouse/clickhouse-go/v2" "github.com/ClickHouse/clickhouse-go/v2/lib/driver" "github.com/fsnotify/fsnotify" "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" "github.com/minio/minio-go/v7/pkg/notification" "gorm.io/driver/mysql" "gorm.io/gorm" "gorm.io/gorm/logger" // "github.com/natefinch/lumberjack" // "github.com/sirupsen/logrus" "github.com/spf13/viper" ) // AppInitConfig is the initial configuration of the application, loaded from the config file. type AppInitConfig struct { Mysql struct { User string `mapstructure:"user"` Password string `mapstructure:"password"` Host string `mapstructure:"host"` Database string `mapstructure:"database"` } `mapstructure:"mysql"` Minio struct { AccessKey string `mapstructure:"accessKey"` Secret string `mapstructure:"secret"` Bucket string `mapstructure:"bucket"` Host string `mapstructure:"host"` } `mapstructure:"minio"` Stck struct { Host string `mapstructure:"host"` Table string `mapstructure:"table"` } `mapstructure:"stck"` } type AppConfig struct { // List of name regex patterns to exclude from import. ImportBlacklist []string } type AppConfigDbEntry struct { Key string `gorm:"primaryKey"` Value string } // TODO: 插入时先创建行,插入完后更新插入耗时 // UploadRecord Represents a record of an uploaded stream. (A major upload task.) type UploadRecord struct { Key string `gorm:"primaryKey"` CreatedAt time.Time } // PartUploadRecord Represents a record of an uploaded part of a stream. (A minor upload task.) type PartUploadRecord struct { StreamName string `gorm:"primaryKey"` PartName string `gorm:"primaryKey"` CreatedAt time.Time } type StreamMetadata struct { Name string `json:"name"` TimestampOffset int64 `json:"timestamp_offset"` Interval int64 `json:"interval"` PartsCount int `json:"parts_count"` PointsPerPart int `json:"points_per_part"` TotalPoints int `json:"total_points"` } var appInitCfg *AppInitConfig = &AppInitConfig{} func initLoadConfig() { viper.SetConfigFile("./config/application.yaml") viper.WatchConfig() viper.OnConfigChange(func(e fsnotify.Event) { log.Println("Config file changed:", e.Name) var newAppInitConfig AppInitConfig err := viper.Unmarshal(&newAppInitConfig) if err != nil { log.Println("Failed to unmarshal config:", err) } appInitCfg = &newAppInitConfig }) err := viper.ReadInConfig() if err != nil { log.Fatalf("Failed to read config file: %v", err) } err = viper.Unmarshal(appInitCfg) if err != nil { log.Printf("Failed to unmarshal config: %v\n", err) } } func main() { var err error log.Println("Starting application...") // Load configuration from file initLoadConfig() // Connect to MySQL dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=True&loc=Local", appInitCfg.Mysql.User, appInitCfg.Mysql.Password, appInitCfg.Mysql.Host, appInitCfg.Mysql.Database) db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{}) if err != nil { log.Fatalf("Failed to connect to MySQL: %v", err) } // Disable logging db.Logger = logger.Discard // Get the underlying sql.DB object to close the connection later sqlDB, err := db.DB() if err != nil { log.Fatalf("Failed to get MySQL DB: %v", err) } defer sqlDB.Close() // Ping the database to check if the connection is successful err = sqlDB.Ping() if err != nil { log.Printf("ping db error: %v", err) return } log.Println("Database connection successful") // Perform auto migration err = db.AutoMigrate(&AppConfigDbEntry{}, &UploadRecord{}, &PartUploadRecord{}) if err != nil { log.Printf("auto migrate error: %v", err) return } log.Println("Auto migration completed") // Connect to MinIO minioClient, err := minio.New(util.ExpandShellStringInfallible(appInitCfg.Minio.Host), &minio.Options{ Creds: credentials.NewStaticV4(appInitCfg.Minio.AccessKey, appInitCfg.Minio.Secret, ""), Secure: false, }) if err == nil { bucketExists, err := minioClient.BucketExists(context.Background(), appInitCfg.Minio.Bucket) if err != nil { log.Fatalf("Failed to check if bucket %s exists: %v", appInitCfg.Minio.Bucket, err) } if !bucketExists { log.Fatalf("Bucket %s does not exist", appInitCfg.Minio.Bucket) } } if err != nil { log.Fatalf("Failed to connect to MinIO: %v", err) } log.Println("Connected to MinIO") // Connect to ClickHouse ckConn, err := clickhouse.Open(&clickhouse.Options{ Addr: []string{appInitCfg.Stck.Host}, }) if err == nil { err = ckConn.Ping(context.Background()) } if err != nil { log.Fatalf("Failed to connect to ClickHouse: %v", err) } log.Println("Connected to ClickHouse") // Start the main work log.Println("Starting main worker...") main_worker(AppCtx{db, minioClient, ckConn}) } type AppCtx struct { db *gorm.DB minioClient *minio.Client ckConn driver.Conn } /** Design notes: * The main worker will first load the app config from the database. Then it will pull the * list of objects from MinIO to build a local mirrored list (which gets rebuilt on every startup). * A bucket notification will be registered to listen for new objects at the same time. * For each new (stream) object, the worker will check if it's in the blacklist or if it's already * uploaded. If not, it will start the upload process by putting parts of the stream into parts table. * When all parts are uploaded, the worker will mark the stream as uploaded (insert into uploads table). */ var gAppCfg *AppConfig type PartUploadArgs struct { StreamInfo *StreamMetadata StreamName string PartName string } func main_worker(app AppCtx) { ctx := context.Background() objUploadChan := make(chan string) partUploadChan := make(chan PartUploadArgs) // Load config from DB appCfg, err := load_app_cfg_from_db(app.db) if err != nil { log.Fatalf("Failed to load app config from DB: %v", err) } gAppCfg = &appCfg // Register bucket notification notifys := app.minioClient.ListenBucketNotification( ctx, appInitCfg.Minio.Bucket, "", "", []string{string(notification.ObjectCreatedAll)}) // Start the notification listener go func() { for notifyInfo := range notifys { for _, record := range notifyInfo.Records { key := record.S3.Object.Key log.Println("New object notification:", key) // Only process root folders key = strings.Split(key, "/")[0] objUploadChan <- key } } }() // Start the full upload trigger go trigger_full_upload(app, objUploadChan) // Start the parts upload worker go upload_parts_worker(app, partUploadChan) // Start the main loop (streams upload worker) for objToUpload := range objUploadChan { log.Println("Checking stream object:", objToUpload) if object_is_blacklisted(objToUpload) { log.Printf("Object %s is blacklisted, skipping\n", objToUpload) continue } fullyUploaded, err := upload_one_stream(app, objToUpload, partUploadChan) if err != nil { // Queue the object for retry log.Printf("Failed to upload stream %s: `%v`, retrying\n", objToUpload, err) go func() { time.Sleep(5 * time.Second) objUploadChan <- objToUpload }() continue } if fullyUploaded { // Mark the stream as fully uploaded //err = app.db.Create(&UploadRecord{Key: objToUpload}).Error err = app.db.Where(UploadRecord{Key: objToUpload}).FirstOrCreate(&UploadRecord{}).Error if err != nil { log.Printf("Failed to mark stream %s as uploaded: %v\n", objToUpload, err) } else { // We can now remove the stream parts from the parts table err = app.db.Where("stream_name = ?", objToUpload).Delete(&PartUploadRecord{}).Error if err != nil { log.Printf("Failed to remove parts of stream %s from parts table: %v\n", objToUpload, err) } } } else { // Queue the object for retry log.Printf("Stream %s is not fully uploaded, retrying\n", objToUpload) go func() { time.Sleep(60 * time.Second) objUploadChan <- objToUpload }() } } } // Polls the parts table for not-yet-uploaded parts and uploads them. func upload_parts_worker(app AppCtx, partUploadChan chan PartUploadArgs) { for partInfo := range partUploadChan { log.Printf("Uploading part %s of stream %s\n", partInfo.PartName, partInfo.StreamName) err := upload_one_part(app, partInfo.StreamInfo, partInfo.StreamName, partInfo.PartName) if err != nil { log.Printf("Failed to upload part %s of stream %s: %v\n", partInfo.PartName, partInfo.StreamName, err) // Queue the part for retry go func() { time.Sleep(5 * time.Second) partUploadChan <- partInfo }() continue } // Mark the part as uploaded //err = app.db.Create(&PartUploadRecord{StreamName: partInfo.StreamName, PartName: partInfo.PartName}).Error part := PartUploadRecord{StreamName: partInfo.StreamName, PartName: partInfo.PartName} err = app.db.Where(part).FirstOrCreate(&PartUploadRecord{}).Error if err != nil { log.Printf("Failed to mark part %s of stream %s as uploaded: %v\n", partInfo.PartName, partInfo.StreamName, err) } } } func trigger_full_upload(app AppCtx, objToUploadChan chan<- string) { // Upload all files in the directory options := minio.ListObjectsOptions{ Recursive: false, } objectsCh := app.minioClient.ListObjects(context.Background(), appInitCfg.Minio.Bucket, options) for objInfo := range objectsCh { if objInfo.Err != nil { log.Printf("Error listing objects: %v\n", objInfo.Err) break } key := objInfo.Key if strings.HasSuffix(key, "/") { objToUploadChan <- strings.Split(key, "/")[0] } } } func upload_one_stream(app AppCtx, streamName string, partUploadChan chan PartUploadArgs) (fullyUploaded bool, err error) { if object_already_uploaded(app, streamName) { return true, nil } fullyUploaded = true // Get stream metadata streamInfo, err := get_stream_metadata(app, streamName) if err != nil { return false, err } if streamInfo.PartsCount == 0 { // Edge device didn't finish uploading the stream yet fullyUploaded = false } // Upload parts streamObjPath := streamName + "/" options := minio.ListObjectsOptions{ Prefix: streamObjPath, Recursive: false, } for objInfo := range app.minioClient.ListObjects(context.Background(), appInitCfg.Minio.Bucket, options) { if objInfo.Err != nil { return false, objInfo.Err } if strings.HasSuffix(objInfo.Key, "/") { continue } if objInfo.Key == streamObjPath+"metadata.json" { continue } if part_already_uploaded(app, streamName, objInfo.Key) { continue } fullyUploaded = false partUploadChan <- PartUploadArgs{StreamInfo: streamInfo, StreamName: streamName, PartName: objInfo.Key} } return fullyUploaded, nil } func upload_one_part(app AppCtx, streamInfo *StreamMetadata, streamName string, partName string) (err error) { if part_already_uploaded(app, streamName, partName) { return nil } dryRun := false if !dryRun { // Get the part data from MinIO obj, err := app.minioClient.GetObject(context.Background(), appInitCfg.Minio.Bucket, partName, minio.GetObjectOptions{}) if err != nil { return fmt.Errorf("failed to get part data: %w", err) } defer obj.Close() // Read the part data var partDataBuf = new(bytes.Buffer) _, err = partDataBuf.ReadFrom(obj) if err != nil { return fmt.Errorf("failed to read part data: %w", err) } // Process the part data partData, err := util.DecompressZstdBuffer(partDataBuf.Bytes()) if err != nil { return fmt.Errorf("failed to decompress part data: %w", err) } // Use regex to extract the part index from the part name partIndex := 0 { re := regexp.MustCompile(`part_(\d+)\.zst`) matches := re.FindStringSubmatch(partName) if len(matches) != 2 { return fmt.Errorf("failed to extract part index from part name: %s", partName) } partIndex, err = strconv.Atoi(matches[1]) if err != nil { return fmt.Errorf("failed to convert part index to integer: %w", err) } // Check if the part index is correct if partIndex < 0 || (streamInfo.PartsCount != 0 && partIndex >= streamInfo.PartsCount) { return fmt.Errorf("part index out of bounds: %d / %d", partIndex, streamInfo.PartsCount) } // Check if the part data size is correct if streamInfo.PartsCount != 0 { if partIndex < streamInfo.PartsCount-1 { if len(partData) != streamInfo.PointsPerPart*8 { return fmt.Errorf("part data size mismatch: %d", len(partData)) } } else if partIndex == streamInfo.PartsCount-1 { if len(partData) != (streamInfo.TotalPoints%streamInfo.PointsPerPart)*8 { return fmt.Errorf("part data size mismatch: %d", len(partData)) } } else { return fmt.Errorf("part index out of bounds: %d", partIndex) } } } partPointsCount := len(partData) / 8 // Insert the part data into ClickHouse batch, err := app.ckConn.PrepareBatch(context.Background(), "INSERT INTO "+appInitCfg.Stck.Table) if err != nil { return fmt.Errorf("failed to insert part data into ClickHouse: %w", err) } /* ┌─name────────┬─type────────────────────────────────────────────────┬─default_type─┬─default_expression─┬─comment───────────────────┬─codec_expression─┬─ttl_expression─┐ │ metric_name │ LowCardinality(String) │ │ │ Metric name │ │ │ │ point_name │ LowCardinality(String) │ │ │ Point name │ │ │ │ tags │ Map(LowCardinality(String), LowCardinality(String)) │ │ │ Point tags │ │ │ │ value │ Float64 │ │ │ Point value │ │ │ │ nanoseconds │ Int64 │ │ │ Point time in nanoseconds │ DoubleDelta, LZ4 │ │ └─────────────┴─────────────────────────────────────────────────────┴──────────────┴────────────────────┴───────────────────────────┴──────────────────┴────────────────┘ */ for i := 0; i < partPointsCount; i++ { metricName := "" pointName := streamInfo.Name tags := map[string]string{} value := math.Float64frombits(binary.LittleEndian.Uint64(partData[i*8 : (i+1)*8])) // NOTE: TimestampOffset is in milliseconds, need to convert to nanoseconds nanoseconds := streamInfo.TimestampOffset * 1e6 nanoseconds += int64(partIndex) * int64(streamInfo.PointsPerPart) * streamInfo.Interval nanoseconds += int64(i) * streamInfo.Interval err := batch.Append(metricName, pointName, tags, value, nanoseconds) if err != nil { return err } } err = batch.Send() if err != nil { return err } } return nil } func load_app_cfg_from_db(db *gorm.DB) (AppConfig, error) { var cfg AppConfig err := db.AutoMigrate(&AppConfigDbEntry{}) if err != nil { return cfg, err } //db.Create(&AppConfigDbEntry{Key: "ImportBlacklist", Value: `[]`}) var dbEntry AppConfigDbEntry result := db.Where(AppConfigDbEntry{Key: "ImportBlacklist"}).Attrs(AppConfigDbEntry{Value: `[]`}).FirstOrCreate(&dbEntry) if result.Error != nil { //if errors.Is(result.Error, gorm.ErrRecordNotFound) { // dbEntry.Value = `[]` //} else { // return cfg, result.Error //} return cfg, result.Error } err = json.Unmarshal([]byte(dbEntry.Value), &cfg.ImportBlacklist) if err != nil { return cfg, err } return cfg, nil } func object_already_uploaded(app AppCtx, key string) bool { var record UploadRecord result := app.db.First(&record, "key", key) return result.Error == nil } func object_is_blacklisted(key string) bool { for _, regexPattern := range gAppCfg.ImportBlacklist { // TODO: Cache compiled regex patterns if matched, _ := regexp.MatchString(regexPattern, key); matched { return true } } return false } func part_already_uploaded(app AppCtx, streamName string, partName string) bool { var record PartUploadRecord result := app.db.First(&record, "stream_name = ? AND part_name = ?", streamName, partName) return result.Error == nil } func get_stream_metadata(app AppCtx, streamName string) (*StreamMetadata, error) { // Get the stream metadata from MinIO metadataObjPath := streamName + "/metadata.json" obj, err := app.minioClient.GetObject(context.Background(), appInitCfg.Minio.Bucket, metadataObjPath, minio.GetObjectOptions{}) if err != nil { return nil, fmt.Errorf("failed to get stream metadata: %w", err) } defer obj.Close() var streamInfo StreamMetadata err = json.NewDecoder(obj).Decode(&streamInfo) if err != nil { return nil, fmt.Errorf("failed to decode stream metadata: %w", err) } return &streamInfo, nil }