123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537 |
- package main
- import (
- "bytes"
- "context"
- "encoding/binary"
- "encoding/json"
- "example/minio-into-stck/util"
- "fmt"
- "log"
- "math"
- "regexp"
- "strconv"
- "strings"
- "time"
- "github.com/ClickHouse/clickhouse-go/v2"
- "github.com/ClickHouse/clickhouse-go/v2/lib/driver"
- "github.com/fsnotify/fsnotify"
- "github.com/minio/minio-go/v7"
- "github.com/minio/minio-go/v7/pkg/credentials"
- "github.com/minio/minio-go/v7/pkg/notification"
- "gorm.io/driver/mysql"
- "gorm.io/gorm"
- "gorm.io/gorm/logger"
- // "github.com/natefinch/lumberjack"
- // "github.com/sirupsen/logrus"
- "github.com/spf13/viper"
- )
- // AppInitConfig is the initial configuration of the application, loaded from the config file.
- type AppInitConfig struct {
- Mysql struct {
- User string `mapstructure:"user"`
- Password string `mapstructure:"password"`
- Host string `mapstructure:"host"`
- Database string `mapstructure:"database"`
- } `mapstructure:"mysql"`
- Minio struct {
- AccessKey string `mapstructure:"accessKey"`
- Secret string `mapstructure:"secret"`
- Bucket string `mapstructure:"bucket"`
- Host string `mapstructure:"host"`
- } `mapstructure:"minio"`
- Stck struct {
- Host string `mapstructure:"host"`
- Table string `mapstructure:"table"`
- } `mapstructure:"stck"`
- }
- type AppConfig struct {
- // List of name regex patterns to exclude from import.
- ImportBlacklist []string
- }
- type AppConfigDbEntry struct {
- Key string `gorm:"primaryKey"`
- Value string
- }
- // TODO: 插入时先创建行,插入完后更新插入耗时
- // UploadRecord Represents a record of an uploaded stream. (A major upload task.)
- type UploadRecord struct {
- Key string `gorm:"primaryKey"`
- CreatedAt time.Time
- }
- // PartUploadRecord Represents a record of an uploaded part of a stream. (A minor upload task.)
- type PartUploadRecord struct {
- StreamName string `gorm:"primaryKey"`
- PartName string `gorm:"primaryKey"`
- CreatedAt time.Time
- }
- type StreamMetadata struct {
- Name string `json:"name"`
- TimestampOffset int64 `json:"timestamp_offset"`
- Interval int64 `json:"interval"`
- PartsCount int `json:"parts_count"`
- PointsPerPart int `json:"points_per_part"`
- TotalPoints int `json:"total_points"`
- }
- var appInitCfg *AppInitConfig = &AppInitConfig{}
- func initLoadConfig() {
- viper.SetConfigFile("./config/application.yaml")
- viper.WatchConfig()
- viper.OnConfigChange(func(e fsnotify.Event) {
- log.Println("Config file changed:", e.Name)
- var newAppInitConfig AppInitConfig
- err := viper.Unmarshal(&newAppInitConfig)
- if err != nil {
- log.Println("Failed to unmarshal config:", err)
- }
- appInitCfg = &newAppInitConfig
- })
- err := viper.ReadInConfig()
- if err != nil {
- log.Fatalf("Failed to read config file: %v", err)
- }
- err = viper.Unmarshal(appInitCfg)
- if err != nil {
- log.Printf("Failed to unmarshal config: %v\n", err)
- }
- }
- func main() {
- var err error
- log.Println("Starting application...")
- // Load configuration from file
- initLoadConfig()
- // Connect to MySQL
- dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=True&loc=Local",
- appInitCfg.Mysql.User, appInitCfg.Mysql.Password, appInitCfg.Mysql.Host, appInitCfg.Mysql.Database)
- db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{})
- if err != nil {
- log.Fatalf("Failed to connect to MySQL: %v", err)
- }
- // Disable logging
- db.Logger = logger.Discard
- // Get the underlying sql.DB object to close the connection later
- sqlDB, err := db.DB()
- if err != nil {
- log.Fatalf("Failed to get MySQL DB: %v", err)
- }
- defer sqlDB.Close()
- // Ping the database to check if the connection is successful
- err = sqlDB.Ping()
- if err != nil {
- log.Printf("ping db error: %v", err)
- return
- }
- log.Println("Database connection successful")
- // Perform auto migration
- err = db.AutoMigrate(&AppConfigDbEntry{}, &UploadRecord{}, &PartUploadRecord{})
- if err != nil {
- log.Printf("auto migrate error: %v", err)
- return
- }
- log.Println("Auto migration completed")
- // Connect to MinIO
- minioClient, err := minio.New(util.ExpandShellStringInfallible(appInitCfg.Minio.Host), &minio.Options{
- Creds: credentials.NewStaticV4(appInitCfg.Minio.AccessKey, appInitCfg.Minio.Secret, ""),
- Secure: false,
- })
- if err == nil {
- bucketExists, err := minioClient.BucketExists(context.Background(), appInitCfg.Minio.Bucket)
- if err != nil {
- log.Fatalf("Failed to check if bucket %s exists: %v", appInitCfg.Minio.Bucket, err)
- }
- if !bucketExists {
- log.Fatalf("Bucket %s does not exist", appInitCfg.Minio.Bucket)
- }
- }
- if err != nil {
- log.Fatalf("Failed to connect to MinIO: %v", err)
- }
- log.Println("Connected to MinIO")
- // Connect to ClickHouse
- ckConn, err := clickhouse.Open(&clickhouse.Options{
- Addr: []string{appInitCfg.Stck.Host},
- })
- if err == nil {
- err = ckConn.Ping(context.Background())
- }
- if err != nil {
- log.Fatalf("Failed to connect to ClickHouse: %v", err)
- }
- log.Println("Connected to ClickHouse")
- // Start the main work
- log.Println("Starting main worker...")
- main_worker(AppCtx{db, minioClient, ckConn})
- }
- type AppCtx struct {
- db *gorm.DB
- minioClient *minio.Client
- ckConn driver.Conn
- }
- /** Design notes:
- * The main worker will first load the app config from the database. Then it will pull the
- * list of objects from MinIO to build a local mirrored list (which gets rebuilt on every startup).
- * A bucket notification will be registered to listen for new objects at the same time.
- * For each new (stream) object, the worker will check if it's in the blacklist or if it's already
- * uploaded. If not, it will start the upload process by putting parts of the stream into parts table.
- * When all parts are uploaded, the worker will mark the stream as uploaded (insert into uploads table).
- */
- var gAppCfg *AppConfig
- type PartUploadArgs struct {
- StreamInfo *StreamMetadata
- StreamName string
- PartName string
- }
- func main_worker(app AppCtx) {
- ctx := context.Background()
- objUploadChan := make(chan string)
- partUploadChan := make(chan PartUploadArgs)
- // Load config from DB
- appCfg, err := load_app_cfg_from_db(app.db)
- if err != nil {
- log.Fatalf("Failed to load app config from DB: %v", err)
- }
- gAppCfg = &appCfg
- // Register bucket notification
- notifys := app.minioClient.ListenBucketNotification(
- ctx, appInitCfg.Minio.Bucket, "", "", []string{string(notification.ObjectCreatedAll)})
- // Start the notification listener
- go func() {
- for notifyInfo := range notifys {
- for _, record := range notifyInfo.Records {
- key := record.S3.Object.Key
- log.Println("New object notification:", key)
- // Only process root folders
- key = strings.Split(key, "/")[0]
- objUploadChan <- key
- }
- }
- }()
- // Start the full upload trigger
- go trigger_full_upload(app, objUploadChan)
- // Start the parts upload worker
- go upload_parts_worker(app, partUploadChan)
- // Start the main loop (streams upload worker)
- for objToUpload := range objUploadChan {
- log.Println("Checking stream object:", objToUpload)
- if object_is_blacklisted(objToUpload) {
- log.Printf("Object %s is blacklisted, skipping\n", objToUpload)
- continue
- }
- fullyUploaded, err := upload_one_stream(app, objToUpload, partUploadChan)
- if err != nil {
- // Queue the object for retry
- log.Printf("Failed to upload stream %s: `%v`, retrying\n", objToUpload, err)
- go func() {
- time.Sleep(5 * time.Second)
- objUploadChan <- objToUpload
- }()
- continue
- }
- if fullyUploaded {
- // Mark the stream as fully uploaded
- //err = app.db.Create(&UploadRecord{Key: objToUpload}).Error
- err = app.db.Where(UploadRecord{Key: objToUpload}).FirstOrCreate(&UploadRecord{}).Error
- if err != nil {
- log.Printf("Failed to mark stream %s as uploaded: %v\n", objToUpload, err)
- } else {
- // We can now remove the stream parts from the parts table
- err = app.db.Where("stream_name = ?", objToUpload).Delete(&PartUploadRecord{}).Error
- if err != nil {
- log.Printf("Failed to remove parts of stream %s from parts table: %v\n", objToUpload, err)
- }
- }
- } else {
- // Queue the object for retry
- log.Printf("Stream %s is not fully uploaded, retrying\n", objToUpload)
- go func() {
- time.Sleep(60 * time.Second)
- objUploadChan <- objToUpload
- }()
- }
- }
- }
- // Polls the parts table for not-yet-uploaded parts and uploads them.
- func upload_parts_worker(app AppCtx, partUploadChan chan PartUploadArgs) {
- for partInfo := range partUploadChan {
- log.Printf("Uploading part %s of stream %s\n", partInfo.PartName, partInfo.StreamName)
- err := upload_one_part(app, partInfo.StreamInfo, partInfo.StreamName, partInfo.PartName)
- if err != nil {
- log.Printf("Failed to upload part %s of stream %s: %v\n", partInfo.PartName, partInfo.StreamName, err)
- // Queue the part for retry
- go func() {
- time.Sleep(5 * time.Second)
- partUploadChan <- partInfo
- }()
- continue
- }
- // Mark the part as uploaded
- //err = app.db.Create(&PartUploadRecord{StreamName: partInfo.StreamName, PartName: partInfo.PartName}).Error
- part := PartUploadRecord{StreamName: partInfo.StreamName, PartName: partInfo.PartName}
- err = app.db.Where(part).FirstOrCreate(&PartUploadRecord{}).Error
- if err != nil {
- log.Printf("Failed to mark part %s of stream %s as uploaded: %v\n", partInfo.PartName, partInfo.StreamName, err)
- }
- }
- }
- func trigger_full_upload(app AppCtx, objToUploadChan chan<- string) {
- // Upload all files in the directory
- options := minio.ListObjectsOptions{
- Recursive: false,
- }
- objectsCh := app.minioClient.ListObjects(context.Background(), appInitCfg.Minio.Bucket, options)
- for objInfo := range objectsCh {
- if objInfo.Err != nil {
- log.Printf("Error listing objects: %v\n", objInfo.Err)
- break
- }
- key := objInfo.Key
- if strings.HasSuffix(key, "/") {
- objToUploadChan <- strings.Split(key, "/")[0]
- }
- }
- }
- func upload_one_stream(app AppCtx, streamName string, partUploadChan chan PartUploadArgs) (fullyUploaded bool, err error) {
- if object_already_uploaded(app, streamName) {
- return true, nil
- }
- fullyUploaded = true
- // Get stream metadata
- streamInfo, err := get_stream_metadata(app, streamName)
- if err != nil {
- return false, err
- }
- if streamInfo.PartsCount == 0 {
- // Edge device didn't finish uploading the stream yet
- fullyUploaded = false
- }
- // Upload parts
- streamObjPath := streamName + "/"
- options := minio.ListObjectsOptions{
- Prefix: streamObjPath,
- Recursive: false,
- }
- for objInfo := range app.minioClient.ListObjects(context.Background(), appInitCfg.Minio.Bucket, options) {
- if objInfo.Err != nil {
- return false, objInfo.Err
- }
- if strings.HasSuffix(objInfo.Key, "/") {
- continue
- }
- if objInfo.Key == streamObjPath+"metadata.json" {
- continue
- }
- if part_already_uploaded(app, streamName, objInfo.Key) {
- continue
- }
- fullyUploaded = false
- partUploadChan <- PartUploadArgs{StreamInfo: streamInfo, StreamName: streamName, PartName: objInfo.Key}
- }
- return fullyUploaded, nil
- }
- func upload_one_part(app AppCtx, streamInfo *StreamMetadata, streamName string, partName string) (err error) {
- if part_already_uploaded(app, streamName, partName) {
- return nil
- }
- dryRun := false
- if !dryRun {
- // Get the part data from MinIO
- obj, err := app.minioClient.GetObject(context.Background(), appInitCfg.Minio.Bucket, partName, minio.GetObjectOptions{})
- if err != nil {
- return fmt.Errorf("failed to get part data: %w", err)
- }
- defer obj.Close()
- // Read the part data
- var partDataBuf = new(bytes.Buffer)
- _, err = partDataBuf.ReadFrom(obj)
- if err != nil {
- return fmt.Errorf("failed to read part data: %w", err)
- }
- // Process the part data
- partData, err := util.DecompressZstdBuffer(partDataBuf.Bytes())
- if err != nil {
- return fmt.Errorf("failed to decompress part data: %w", err)
- }
- // Use regex to extract the part index from the part name
- partIndex := 0
- {
- re := regexp.MustCompile(`part_(\d+)\.zst`)
- matches := re.FindStringSubmatch(partName)
- if len(matches) != 2 {
- return fmt.Errorf("failed to extract part index from part name: %s", partName)
- }
- partIndex, err = strconv.Atoi(matches[1])
- if err != nil {
- return fmt.Errorf("failed to convert part index to integer: %w", err)
- }
- // Check if the part index is correct
- if partIndex < 0 || (streamInfo.PartsCount != 0 && partIndex >= streamInfo.PartsCount) {
- return fmt.Errorf("part index out of bounds: %d / %d", partIndex, streamInfo.PartsCount)
- }
- // Check if the part data size is correct
- if streamInfo.PartsCount != 0 {
- if partIndex < streamInfo.PartsCount-1 {
- if len(partData) != streamInfo.PointsPerPart*8 {
- return fmt.Errorf("part data size mismatch: %d", len(partData))
- }
- } else if partIndex == streamInfo.PartsCount-1 {
- if len(partData) != (streamInfo.TotalPoints%streamInfo.PointsPerPart)*8 {
- return fmt.Errorf("part data size mismatch: %d", len(partData))
- }
- } else {
- return fmt.Errorf("part index out of bounds: %d", partIndex)
- }
- }
- }
- partPointsCount := len(partData) / 8
- // Insert the part data into ClickHouse
- batch, err := app.ckConn.PrepareBatch(context.Background(), "INSERT INTO "+appInitCfg.Stck.Table)
- if err != nil {
- return fmt.Errorf("failed to insert part data into ClickHouse: %w", err)
- }
- /*
- ┌─name────────┬─type────────────────────────────────────────────────┬─default_type─┬─default_expression─┬─comment───────────────────┬─codec_expression─┬─ttl_expression─┐
- │ metric_name │ LowCardinality(String) │ │ │ Metric name │ │ │
- │ point_name │ LowCardinality(String) │ │ │ Point name │ │ │
- │ tags │ Map(LowCardinality(String), LowCardinality(String)) │ │ │ Point tags │ │ │
- │ value │ Float64 │ │ │ Point value │ │ │
- │ nanoseconds │ Int64 │ │ │ Point time in nanoseconds │ DoubleDelta, LZ4 │ │
- └─────────────┴─────────────────────────────────────────────────────┴──────────────┴────────────────────┴───────────────────────────┴──────────────────┴────────────────┘
- */
- for i := 0; i < partPointsCount; i++ {
- metricName := ""
- pointName := streamInfo.Name
- tags := map[string]string{}
- value := math.Float64frombits(binary.LittleEndian.Uint64(partData[i*8 : (i+1)*8]))
- // NOTE: TimestampOffset is in milliseconds, need to convert to nanoseconds
- nanoseconds := streamInfo.TimestampOffset * 1e6
- nanoseconds += int64(partIndex) * int64(streamInfo.PointsPerPart) * streamInfo.Interval
- nanoseconds += int64(i) * streamInfo.Interval
- err := batch.Append(metricName, pointName, tags, value, nanoseconds)
- if err != nil {
- return err
- }
- }
- err = batch.Send()
- if err != nil {
- return err
- }
- }
- return nil
- }
- func load_app_cfg_from_db(db *gorm.DB) (AppConfig, error) {
- var cfg AppConfig
- err := db.AutoMigrate(&AppConfigDbEntry{})
- if err != nil {
- return cfg, err
- }
- //db.Create(&AppConfigDbEntry{Key: "ImportBlacklist", Value: `[]`})
- var dbEntry AppConfigDbEntry
- result := db.Where(AppConfigDbEntry{Key: "ImportBlacklist"}).Attrs(AppConfigDbEntry{Value: `[]`}).FirstOrCreate(&dbEntry)
- if result.Error != nil {
- //if errors.Is(result.Error, gorm.ErrRecordNotFound) {
- // dbEntry.Value = `[]`
- //} else {
- // return cfg, result.Error
- //}
- return cfg, result.Error
- }
- err = json.Unmarshal([]byte(dbEntry.Value), &cfg.ImportBlacklist)
- if err != nil {
- return cfg, err
- }
- return cfg, nil
- }
- func object_already_uploaded(app AppCtx, key string) bool {
- var record UploadRecord
- result := app.db.First(&record, "key", key)
- return result.Error == nil
- }
- func object_is_blacklisted(key string) bool {
- for _, regexPattern := range gAppCfg.ImportBlacklist {
- // TODO: Cache compiled regex patterns
- if matched, _ := regexp.MatchString(regexPattern, key); matched {
- return true
- }
- }
- return false
- }
- func part_already_uploaded(app AppCtx, streamName string, partName string) bool {
- var record PartUploadRecord
- result := app.db.First(&record, "stream_name = ? AND part_name = ?", streamName, partName)
- return result.Error == nil
- }
- func get_stream_metadata(app AppCtx, streamName string) (*StreamMetadata, error) {
- // Get the stream metadata from MinIO
- metadataObjPath := streamName + "/metadata.json"
- obj, err := app.minioClient.GetObject(context.Background(), appInitCfg.Minio.Bucket,
- metadataObjPath, minio.GetObjectOptions{})
- if err != nil {
- return nil, fmt.Errorf("failed to get stream metadata: %w", err)
- }
- defer obj.Close()
- var streamInfo StreamMetadata
- err = json.NewDecoder(obj).Decode(&streamInfo)
- if err != nil {
- return nil, fmt.Errorf("failed to decode stream metadata: %w", err)
- }
- return &streamInfo, nil
- }
|