123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529 |
- package main
- import (
- "bytes"
- "context"
- "encoding/binary"
- "encoding/json"
- "example/minio-into-ck/util"
- "fmt"
- "log"
- "math"
- "os"
- "regexp"
- "strconv"
- "strings"
- "time"
- "github.com/ClickHouse/clickhouse-go/v2"
- "github.com/ClickHouse/clickhouse-go/v2/lib/driver"
- "github.com/minio/minio-go/v7"
- "github.com/minio/minio-go/v7/pkg/credentials"
- "github.com/minio/minio-go/v7/pkg/notification"
- "gorm.io/driver/mysql"
- "gorm.io/gorm"
- "gorm.io/gorm/logger"
- )
- var MYSQL_USER = "root"
- var MYSQL_PASSWORD = ""
- var MYSQL_HOST = "localhost:3306"
- var MYSQL_DATABASE = "minio-into-ck-db"
- var MINIO_ACCESS_KEY = "25cSXPqzdHrPwJkSIRkM"
- var MINIO_SECRET = "FN3AhQaVo7z1wgvce3IWiI1CI68T02OVeSUKCeRf"
- var MINIO_BUCKET = "bucket"
- var MINIO_HOST = "$(hostname).local:9000"
- var CLICKHOUSE_HOST = "localhost:9000"
- var CLICKHOUSE_TABLE = "tsdb_cpp"
- type AppConfig struct {
- // List of name regex patterns to exclude from import.
- ImportBlacklist []string
- }
- type AppConfigDbEntry struct {
- Key string `gorm:"primaryKey"`
- Value string
- }
- // UploadRecord Represents a record of an uploaded stream. (A major upload task.)
- type UploadRecord struct {
- Key string `gorm:"primaryKey"`
- CreatedAt time.Time
- }
- // PartUploadRecord Represents a record of an uploaded part of a stream. (A minor upload task.)
- type PartUploadRecord struct {
- StreamName string `gorm:"primaryKey"`
- PartName string `gorm:"primaryKey"`
- CreatedAt time.Time
- }
- type StreamMetadata struct {
- Name string `json:"name"`
- TimestampOffset int64 `json:"timestamp_offset"`
- Interval int64 `json:"interval"`
- PartsCount int `json:"parts_count"`
- PointsPerPart int `json:"points_per_part"`
- TotalPoints int `json:"total_points"`
- }
- func main() {
- var err error
- log.Println("Starting application...")
- // Load environment variables
- if os.Getenv("MYSQL_USER") != "" {
- MYSQL_USER = os.Getenv("MYSQL_USER")
- }
- if os.Getenv("MYSQL_PASSWORD") != "" {
- MYSQL_PASSWORD = os.Getenv("MYSQL_PASSWORD")
- }
- if os.Getenv("MYSQL_HOST") != "" {
- MYSQL_HOST = os.Getenv("MYSQL_HOST")
- }
- if os.Getenv("MYSQL_DATABASE") != "" {
- MYSQL_DATABASE = os.Getenv("MYSQL_DATABASE")
- }
- if os.Getenv("MINIO_ACCESS_KEY") != "" {
- MINIO_ACCESS_KEY = os.Getenv("MINIO_ACCESS_KEY")
- }
- if os.Getenv("MINIO_SECRET") != "" {
- MINIO_SECRET = os.Getenv("MINIO_SECRET")
- }
- if os.Getenv("MINIO_BUCKET") != "" {
- MINIO_BUCKET = os.Getenv("MINIO_BUCKET")
- }
- if os.Getenv("MINIO_HOST") != "" {
- MINIO_HOST = os.Getenv("MINIO_HOST")
- }
- MINIO_HOST, err = util.ExpandShellString(MINIO_HOST)
- if err != nil {
- log.Fatalf("Failed to expand shell string: %v", err)
- }
- if os.Getenv("CLICKHOUSE_HOST") != "" {
- CLICKHOUSE_HOST = os.Getenv("CLICKHOUSE_HOST")
- }
- if os.Getenv("CLICKHOUSE_TABLE") != "" {
- CLICKHOUSE_TABLE = os.Getenv("CLICKHOUSE_TABLE")
- }
- // Connect to MySQL
- dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=True&loc=Local",
- MYSQL_USER, MYSQL_PASSWORD, MYSQL_HOST, MYSQL_DATABASE)
- db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{})
- if err != nil {
- log.Fatalf("Failed to connect to MySQL: %v", err)
- }
- // Disable logging
- db.Logger = logger.Discard
- // Get the underlying sql.DB object to close the connection later
- sqlDB, err := db.DB()
- if err != nil {
- log.Fatalf("Failed to get MySQL DB: %v", err)
- }
- defer sqlDB.Close()
- // Ping the database to check if the connection is successful
- err = sqlDB.Ping()
- if err != nil {
- log.Printf("ping db error: %v", err)
- return
- }
- log.Println("Database connection successful")
- // Perform auto migration
- err = db.AutoMigrate(&AppConfigDbEntry{}, &UploadRecord{}, &PartUploadRecord{})
- if err != nil {
- log.Printf("auto migrate error: %v", err)
- return
- }
- log.Println("Auto migration completed")
- // Connect to MinIO
- minioClient, err := minio.New(MINIO_HOST, &minio.Options{
- Creds: credentials.NewStaticV4(MINIO_ACCESS_KEY, MINIO_SECRET, ""),
- Secure: false,
- })
- if err == nil {
- bucketExists, err := minioClient.BucketExists(context.Background(), MINIO_BUCKET)
- if err != nil {
- log.Fatalf("Failed to check if bucket %s exists: %v", MINIO_BUCKET, err)
- }
- if !bucketExists {
- log.Fatalf("Bucket %s does not exist", MINIO_BUCKET)
- }
- }
- if err != nil {
- log.Fatalf("Failed to connect to MinIO: %v", err)
- }
- log.Println("Connected to MinIO")
- // Connect to ClickHouse
- ckConn, err := clickhouse.Open(&clickhouse.Options{
- Addr: []string{CLICKHOUSE_HOST},
- })
- if err == nil {
- err = ckConn.Ping(context.Background())
- }
- if err != nil {
- log.Fatalf("Failed to connect to ClickHouse: %v", err)
- }
- log.Println("Connected to ClickHouse")
- // Start the main work
- log.Println("Starting main worker...")
- main_worker(AppCtx{db, minioClient, ckConn})
- }
- type AppCtx struct {
- db *gorm.DB
- minioClient *minio.Client
- ckConn driver.Conn
- }
- /** Design notes:
- * The main worker will first load the app config from the database. Then it will pull the
- * list of objects from MinIO to build a local mirrored list (which gets rebuilt on every startup).
- * A bucket notification will be registered to listen for new objects at the same time.
- * For each new (stream) object, the worker will check if it's in the blacklist or if it's already
- * uploaded. If not, it will start the upload process by putting parts of the stream into parts table.
- * When all parts are uploaded, the worker will mark the stream as uploaded (insert into uploads table).
- */
- var gAppCfg *AppConfig
- type PartUploadArgs struct {
- StreamInfo *StreamMetadata
- StreamName string
- PartName string
- }
- func main_worker(app AppCtx) {
- ctx := context.Background()
- objUploadChan := make(chan string)
- partUploadChan := make(chan PartUploadArgs)
- // Load config from DB
- appCfg, err := load_app_cfg_from_db(app.db)
- if err != nil {
- log.Fatalf("Failed to load app config from DB: %v", err)
- }
- gAppCfg = &appCfg
- // Register bucket notification
- notifys := app.minioClient.ListenBucketNotification(
- ctx, MINIO_BUCKET, "", "", []string{string(notification.ObjectCreatedAll)})
- // Start the notification listener
- go func() {
- for notifyInfo := range notifys {
- for _, record := range notifyInfo.Records {
- key := record.S3.Object.Key
- log.Println("New object notification:", key)
- // Only process root folders
- key = strings.Split(key, "/")[0]
- objUploadChan <- key
- }
- }
- }()
- // Start the full upload trigger
- go trigger_full_upload(app, objUploadChan)
- // Start the parts upload worker
- go upload_parts_worker(app, partUploadChan)
- // Start the main loop (streams upload worker)
- for objToUpload := range objUploadChan {
- log.Println("Checking stream object:", objToUpload)
- if object_is_blacklisted(objToUpload) {
- log.Printf("Object %s is blacklisted, skipping\n", objToUpload)
- continue
- }
- fullyUploaded, err := upload_one_stream(app, objToUpload, partUploadChan)
- if err != nil {
- // Queue the object for retry
- log.Printf("Failed to upload stream %s: `%v`, retrying\n", objToUpload, err)
- go func() {
- time.Sleep(5 * time.Second)
- objUploadChan <- objToUpload
- }()
- continue
- }
- if fullyUploaded {
- // Mark the stream as fully uploaded
- //err = app.db.Create(&UploadRecord{Key: objToUpload}).Error
- err = app.db.Where(UploadRecord{Key: objToUpload}).FirstOrCreate(&UploadRecord{}).Error
- if err != nil {
- log.Printf("Failed to mark stream %s as uploaded: %v\n", objToUpload, err)
- } else {
- // We can now remove the stream parts from the parts table
- err = app.db.Where("stream_name = ?", objToUpload).Delete(&PartUploadRecord{}).Error
- if err != nil {
- log.Printf("Failed to remove parts of stream %s from parts table: %v\n", objToUpload, err)
- }
- }
- } else {
- // Queue the object for retry
- log.Printf("Stream %s is not fully uploaded, retrying\n", objToUpload)
- go func() {
- time.Sleep(60 * time.Second)
- objUploadChan <- objToUpload
- }()
- }
- }
- }
- // Polls the parts table for not-yet-uploaded parts and uploads them.
- func upload_parts_worker(app AppCtx, partUploadChan chan PartUploadArgs) {
- for partInfo := range partUploadChan {
- log.Printf("Uploading part %s of stream %s\n", partInfo.PartName, partInfo.StreamName)
- err := upload_one_part(app, partInfo.StreamInfo, partInfo.StreamName, partInfo.PartName)
- if err != nil {
- log.Printf("Failed to upload part %s of stream %s: %v\n", partInfo.PartName, partInfo.StreamName, err)
- // Queue the part for retry
- go func() {
- time.Sleep(5 * time.Second)
- partUploadChan <- partInfo
- }()
- continue
- }
- // Mark the part as uploaded
- //err = app.db.Create(&PartUploadRecord{StreamName: partInfo.StreamName, PartName: partInfo.PartName}).Error
- part := PartUploadRecord{StreamName: partInfo.StreamName, PartName: partInfo.PartName}
- err = app.db.Where(part).FirstOrCreate(&PartUploadRecord{}).Error
- if err != nil {
- log.Printf("Failed to mark part %s of stream %s as uploaded: %v\n", partInfo.PartName, partInfo.StreamName, err)
- }
- }
- }
- func trigger_full_upload(app AppCtx, objToUploadChan chan<- string) {
- // Upload all files in the directory
- options := minio.ListObjectsOptions{
- Recursive: false,
- }
- objectsCh := app.minioClient.ListObjects(context.Background(), MINIO_BUCKET, options)
- for objInfo := range objectsCh {
- if objInfo.Err != nil {
- log.Printf("Error listing objects: %v\n", objInfo.Err)
- break
- }
- key := objInfo.Key
- if strings.HasSuffix(key, "/") {
- objToUploadChan <- strings.Split(key, "/")[0]
- }
- }
- }
- func upload_one_stream(app AppCtx, streamName string, partUploadChan chan PartUploadArgs) (fullyUploaded bool, err error) {
- if object_already_uploaded(app, streamName) {
- return true, nil
- }
- fullyUploaded = true
- // Get stream metadata
- streamInfo, err := get_stream_metadata(app, streamName)
- if err != nil {
- return false, err
- }
- if streamInfo.PartsCount == 0 {
- // Edge device didn't finish uploading the stream yet
- fullyUploaded = false
- }
- // Upload parts
- streamObjPath := streamName + "/"
- options := minio.ListObjectsOptions{
- Prefix: streamObjPath,
- Recursive: false,
- }
- for objInfo := range app.minioClient.ListObjects(context.Background(), MINIO_BUCKET, options) {
- if objInfo.Err != nil {
- return false, objInfo.Err
- }
- if strings.HasSuffix(objInfo.Key, "/") {
- continue
- }
- if objInfo.Key == streamObjPath+"metadata.json" {
- continue
- }
- if part_already_uploaded(app, streamName, objInfo.Key) {
- continue
- }
- fullyUploaded = false
- partUploadChan <- PartUploadArgs{StreamInfo: streamInfo, StreamName: streamName, PartName: objInfo.Key}
- }
- return fullyUploaded, nil
- }
- func upload_one_part(app AppCtx, streamInfo *StreamMetadata, streamName string, partName string) (err error) {
- if part_already_uploaded(app, streamName, partName) {
- return nil
- }
- dryRun := false
- if !dryRun {
- // Get the part data from MinIO
- obj, err := app.minioClient.GetObject(context.Background(), MINIO_BUCKET, partName, minio.GetObjectOptions{})
- if err != nil {
- return fmt.Errorf("failed to get part data: %w", err)
- }
- defer obj.Close()
- // Read the part data
- var partDataBuf = new(bytes.Buffer)
- _, err = partDataBuf.ReadFrom(obj)
- if err != nil {
- return fmt.Errorf("failed to read part data: %w", err)
- }
- // Process the part data
- partData, err := util.DecompressZstdBuffer(partDataBuf.Bytes())
- if err != nil {
- return fmt.Errorf("failed to decompress part data: %w", err)
- }
- // Use regex to extract the part index from the part name
- partIndex := 0
- {
- re := regexp.MustCompile(`part_(\d+)\.zst`)
- matches := re.FindStringSubmatch(partName)
- if len(matches) != 2 {
- return fmt.Errorf("failed to extract part index from part name: %s", partName)
- }
- partIndex, err = strconv.Atoi(matches[1])
- if err != nil {
- return fmt.Errorf("failed to convert part index to integer: %w", err)
- }
- // Check if the part index is correct
- if partIndex < 0 || partIndex >= streamInfo.PartsCount {
- return fmt.Errorf("part index out of bounds: %d", partIndex)
- }
- // Check if the part data size is correct
- if partIndex < streamInfo.PartsCount-1 {
- if len(partData) != streamInfo.PointsPerPart*8 {
- return fmt.Errorf("part data size mismatch: %d", len(partData))
- }
- } else if partIndex == streamInfo.PartsCount-1 {
- if len(partData) != (streamInfo.TotalPoints%streamInfo.PointsPerPart)*8 {
- return fmt.Errorf("part data size mismatch: %d", len(partData))
- }
- } else {
- return fmt.Errorf("part index out of bounds: %d", partIndex)
- }
- }
- partPointsCount := len(partData) / 8
- // Insert the part data into ClickHouse
- batch, err := app.ckConn.PrepareBatch(context.Background(), "INSERT INTO "+CLICKHOUSE_TABLE)
- if err != nil {
- return fmt.Errorf("failed to insert part data into ClickHouse: %w", err)
- }
- /*
- ┌─name────────┬─type────────────────────────────────────────────────┬─default_type─┬─default_expression─┬─comment───────────────────┬─codec_expression─┬─ttl_expression─┐
- │ metric_name │ LowCardinality(String) │ │ │ Metric name │ │ │
- │ point_name │ LowCardinality(String) │ │ │ Point name │ │ │
- │ tags │ Map(LowCardinality(String), LowCardinality(String)) │ │ │ Point tags │ │ │
- │ value │ Float64 │ │ │ Point value │ │ │
- │ nanoseconds │ Int64 │ │ │ Point time in nanoseconds │ DoubleDelta, LZ4 │ │
- └─────────────┴─────────────────────────────────────────────────────┴──────────────┴────────────────────┴───────────────────────────┴──────────────────┴────────────────┘
- */
- for i := 0; i < partPointsCount; i++ {
- metricName := ""
- pointName := streamInfo.Name
- tags := map[string]string{}
- value := math.Float64frombits(binary.LittleEndian.Uint64(partData[i*8 : (i+1)*8]))
- // NOTE: TimestampOffset is in milliseconds, need to convert to nanoseconds
- nanoseconds := streamInfo.TimestampOffset * 1e6
- nanoseconds += int64(partIndex) * int64(streamInfo.PointsPerPart) * streamInfo.Interval
- nanoseconds += int64(i) * streamInfo.Interval
- err := batch.Append(metricName, pointName, tags, value, nanoseconds)
- if err != nil {
- return err
- }
- }
- err = batch.Send()
- if err != nil {
- return err
- }
- }
- return nil
- }
- func load_app_cfg_from_db(db *gorm.DB) (AppConfig, error) {
- var cfg AppConfig
- err := db.AutoMigrate(&AppConfigDbEntry{})
- if err != nil {
- return cfg, err
- }
- //db.Create(&AppConfigDbEntry{Key: "ImportBlacklist", Value: `[]`})
- var dbEntry AppConfigDbEntry
- result := db.Where(AppConfigDbEntry{Key: "ImportBlacklist"}).Attrs(AppConfigDbEntry{Value: `[]`}).FirstOrCreate(&dbEntry)
- if result.Error != nil {
- //if errors.Is(result.Error, gorm.ErrRecordNotFound) {
- // dbEntry.Value = `[]`
- //} else {
- // return cfg, result.Error
- //}
- return cfg, result.Error
- }
- err = json.Unmarshal([]byte(dbEntry.Value), &cfg.ImportBlacklist)
- if err != nil {
- return cfg, err
- }
- return cfg, nil
- }
- func object_already_uploaded(app AppCtx, key string) bool {
- var record UploadRecord
- result := app.db.First(&record, "key", key)
- return result.Error == nil
- }
- func object_is_blacklisted(key string) bool {
- for _, regexPattern := range gAppCfg.ImportBlacklist {
- // TODO: Cache compiled regex patterns
- if matched, _ := regexp.MatchString(regexPattern, key); matched {
- return true
- }
- }
- return false
- }
- func part_already_uploaded(app AppCtx, streamName string, partName string) bool {
- var record PartUploadRecord
- result := app.db.First(&record, "stream_name = ? AND part_name = ?", streamName, partName)
- return result.Error == nil
- }
- func get_stream_metadata(app AppCtx, streamName string) (*StreamMetadata, error) {
- // Get the stream metadata from MinIO
- metadataObjPath := streamName + "/metadata.json"
- obj, err := app.minioClient.GetObject(context.Background(), MINIO_BUCKET, metadataObjPath, minio.GetObjectOptions{})
- if err != nil {
- return nil, fmt.Errorf("failed to get stream metadata: %w", err)
- }
- defer obj.Close()
- var streamInfo StreamMetadata
- err = json.NewDecoder(obj).Decode(&streamInfo)
- if err != nil {
- return nil, fmt.Errorf("failed to decode stream metadata: %w", err)
- }
- return &streamInfo, nil
- }
|