123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182 |
- package main
- import (
- "bytes"
- "context"
- "encoding/binary"
- "encoding/json"
- "example/minio-into-stck/util"
- "fmt"
- "os"
- "os/exec"
- "os/signal"
- "path/filepath"
- "stck/stck-nsq-msg"
- smsg "stck/stck-nsq-msg/msg"
- "sync"
- "syscall"
- "io"
- // "log"
- "math"
- // "os"
- "regexp"
- "strconv"
- "strings"
- "time"
- "github.com/ClickHouse/clickhouse-go/v2"
- "github.com/ClickHouse/clickhouse-go/v2/lib/driver"
- "github.com/fsnotify/fsnotify"
- "github.com/minio/minio-go/v7"
- "github.com/minio/minio-go/v7/pkg/credentials"
- "github.com/minio/minio-go/v7/pkg/notification"
- "github.com/natefinch/lumberjack"
- "github.com/nsqio/go-nsq"
- "github.com/sirupsen/logrus"
- "github.com/spf13/viper"
- "gorm.io/driver/mysql"
- "gorm.io/gorm"
- gorm_logger "gorm.io/gorm/logger"
- )
- // AppInitConfig is the initial configuration of the application, loaded from the config file.
- type AppInitConfig struct {
- Mysql struct {
- User string `mapstructure:"user"`
- Password string `mapstructure:"password"`
- Host string `mapstructure:"host"`
- Database string `mapstructure:"database"`
- } `mapstructure:"mysql"`
- Minio struct {
- AccessKey string `mapstructure:"accessKey"`
- Secret string `mapstructure:"secret"`
- Bucket string `mapstructure:"bucket"`
- Host string `mapstructure:"host"`
- } `mapstructure:"minio"`
- Stck struct {
- Host string `mapstructure:"host"`
- Table string `mapstructure:"table"`
- } `mapstructure:"stck"`
- Nsq struct {
- TcpAddr string `mapstructure:"tcpAddr"`
- LookupdHttpAddr string `mapstructure:"lookupdHttpAddr"`
- } `mapstructure:"nsq"`
- Main struct {
- UploadRetryMaxTimes int `mapstructure:"uploadRetryMaxTimes"`
- FailedRetryDelaySeconds int `mapstructure:"failedRetryDelaySeconds"`
- } `mapstructure:"main"`
- }
- type AppConfig struct {
- // List of name regex patterns to exclude from import.
- ImportBlacklist []string
- }
- type AppConfigDbEntry struct {
- Key string `gorm:"primaryKey"`
- Value string
- }
- // TODO: 插入时先创建行,插入完后更新插入耗时
- // UploadRecord Represents a record of an uploaded stream. (A major upload task.)
- type UploadRecord struct {
- Key string `gorm:"primaryKey"`
- CreatedAt time.Time
- }
- // PartUploadRecord Represents a record of an uploaded part of a stream. (A minor upload task.)
- type PartUploadRecord struct {
- StreamName string `gorm:"primaryKey"`
- PartName string `gorm:"primaryKey"`
- CreatedAt time.Time
- }
- type StreamMetadata struct {
- MetricName string `json:"metric_name"`
- Name string `json:"name"`
- TimestampOffset int64 `json:"timestamp_offset"`
- Interval int64 `json:"interval"`
- PartsCount int64 `json:"parts_count"`
- PointsPerPart int64 `json:"points_per_part"`
- TotalPoints int64 `json:"total_points"`
- }
- var gLocalIpAddress string
- var gMachineID string
- var gNsqProducer *nsq.Producer
- var gNsqConsumer *nsq.Consumer
- var gAppStartTime time.Time = time.Now()
- var programmaticQuitChan chan struct{} = make(chan struct{}, 1)
- var gAppQuitting = false
- var gAppExitWaitGroup sync.WaitGroup
- var buildtime string
- func MakeHeartbeatMsg() *smsg.DeviceHeartbeatMsg {
- m := smsg.MakeDeviceHeartbeatMsg(gMachineID, gAppStartTime.UnixNano(), time.Now().UnixNano(),
- gLocalIpAddress, "minio-into-stck", "0.1.0+dev."+buildtime)
- return &m
- }
- func PublishMessage(msg stcknsqmsg.StckNsqMsgVariant) error {
- payload, err := stcknsqmsg.ToStckNsqMsgString(msg)
- if err != nil {
- return fmt.Errorf("marshal message error: %w", err)
- }
- err = gNsqProducer.Publish(smsg.ServerDoNotifyTopic, []byte(payload))
- if err != nil {
- return fmt.Errorf("publish message error: %w", err)
- }
- return nil
- }
- func initNsq() {
- ip, err := stcknsqmsg.GetLocalIP()
- if err != nil {
- logger.Warnf("GetLocalIP error: %s", err)
- } else {
- gLocalIpAddress = ip.String()
- logger.Infof("Local IP: %s", gLocalIpAddress)
- }
- gMachineID = stcknsqmsg.MakeUniqueMachineID()
- logger.Infof("Machine ID: %s", gMachineID)
- fmt.Printf("IP: %s, Machine ID: %s\n", gLocalIpAddress, gMachineID)
- // Connect to NSQ
- nsqConfig := nsq.NewConfig()
- gNsqProducer, err = nsq.NewProducer(appInitCfg.Nsq.TcpAddr, nsqConfig)
- if err != nil {
- logger.Fatalf("NSQ Producer init error: %s", err)
- }
- gNsqConsumer, err = nsq.NewConsumer(smsg.ClientDoActionTopic, gMachineID, nsqConfig)
- if err != nil {
- logger.Fatalf("NSQ Consumer init error: %s", err)
- }
- gNsqConsumer.AddConcurrentHandlers(nsq.HandlerFunc(func(message *nsq.Message) error {
- logger.Debugf("NSQ Consumer received message: %s", message.Body)
- // Parse the message
- recvTime := message.Timestamp
- msg, err := stcknsqmsg.FromString(string(message.Body))
- if err != nil {
- logger.Errorf("NSQ Consumer unmarshal message error: %s", err)
- return nil
- }
- // Process the message
- switch data := msg.Data.(type) {
- case *smsg.DevicePingMsg:
- if !stcknsqmsg.DeviceIdMatches(data.DeviceID, gMachineID) {
- break
- }
- // Write pong
- pongMsg := smsg.MakeDevicePongMsg(gMachineID, recvTime)
- err := PublishMessage(&pongMsg)
- if err != nil {
- logger.Errorf("send pong error: %s", err)
- }
- case *smsg.RequestDeviceExecuteShellScriptMsg:
- if !stcknsqmsg.DeviceIdMatches(data.DeviceID, gMachineID) {
- break
- }
- // Execute the shell script
- resp := smsg.MakeDeviceActionDoneMsg(gMachineID, data.MsgType(), "", recvTime, -1, "")
- result, err := func(data *smsg.RequestDeviceExecuteShellScriptMsg) (string, error) {
- cmd := exec.Command("bash", "-c", data.Script)
- var out bytes.Buffer
- cmd.Stdout = &out
- cmd.Stderr = &out
- err := cmd.Run()
- return out.String(), err
- }(data)
- if err != nil {
- errMsg := fmt.Sprintf("execute shell script error:\n%s\n\noutput:\n%s", err, result)
- // Write error message
- resp.Status = -1
- resp.Msg = errMsg
- } else {
- // Write output
- resp.Status = 0
- resp.Msg = result
- }
- err = PublishMessage(&resp)
- if err != nil {
- logger.Errorf("send action done error: %s", err)
- }
- case *smsg.RequestDeviceUpdateMsg:
- if !stcknsqmsg.DeviceIdMatches(data.DeviceID, gMachineID) {
- break
- }
- if data.ServiceName != "minio-into-stck" {
- break
- }
- resp := smsg.MakeDeviceActionDoneMsg(gMachineID, data.MsgType(), "", recvTime, 0, "")
- result, err := func(data *smsg.RequestDeviceUpdateMsg) (string, error) {
- // Download the update file to /tmp
- downloadPath := "/tmp/minio-into-stck-updater"
- updateScriptPath := filepath.Join(downloadPath, "update.sh")
- var out bytes.Buffer
- err := os.RemoveAll(downloadPath)
- if err != nil {
- return "", err
- }
- err = os.MkdirAll(downloadPath, 0777)
- if err != nil {
- return "", err
- }
- updateScriptContent := fmt.Sprintf(`#!/bin/bash
- set -e
- cd %s
- wget --tries=3 -nv -O installer.tar.gz %s
- tar -xzf installer.tar.gz
- cd minio-into-stck-installer
- ./replacing-update.sh
- `, downloadPath, data.ServiceBinaryURL)
- err = os.WriteFile(updateScriptPath, []byte(updateScriptContent), 0777)
- if err != nil {
- return "", err
- }
- // Execute the update script
- cmd := exec.Command("bash", "-c", updateScriptPath)
- cmd.Stdout = &out
- cmd.Stderr = &out
- err = cmd.Run()
- return out.String(), err
- }(data)
- if err != nil {
- errMsg := fmt.Sprintf("execute update process error:\n%s\n\noutput:\n%s", err, result)
- // Write error message
- resp.Status = -1
- resp.Msg = errMsg
- } else {
- // Write output
- resp.Status = 0
- resp.Msg = "executed update process successfully\n\noutput:\n" + result
- }
- err = PublishMessage(&resp)
- if err != nil {
- logger.Errorf("send action done error: %s", err)
- }
- case *smsg.ForceDeviceRebootMsg:
- if !stcknsqmsg.DeviceIdMatches(data.DeviceID, gMachineID) {
- break
- }
- programmaticQuitChan <- struct{}{}
- case *smsg.ForceDeviceSendHeartbeatMsg:
- if !stcknsqmsg.DeviceIdMatches(data.DeviceID, gMachineID) {
- break
- }
- // Send heartbeat
- heartBeatMsg := MakeHeartbeatMsg()
- err := PublishMessage(heartBeatMsg)
- if err != nil {
- logger.Errorf("send heartbeat error: %s", err)
- }
- case *smsg.RequestDeviceUploadLogsToMinioMsg:
- if !stcknsqmsg.DeviceIdMatches(data.DeviceID, gMachineID) {
- break
- }
- // Upload logs to MinIO
- resp := smsg.MakeDeviceActionDoneMsg(gMachineID, data.MsgType(), "", recvTime, 0, "")
- minioCreds := credentials.NewStaticV4(appInitCfg.Minio.AccessKey, appInitCfg.Minio.Secret, "")
- minioClient, err := minio.New(appInitCfg.Minio.Host, &minio.Options{
- Creds: minioCreds,
- Secure: false,
- })
- if err != nil {
- logger.Errorf("MinIO Client init error: %s", err)
- resp.Status = -1
- resp.Msg += fmt.Sprintf("MinIO Client init error: %s\n", err)
- } else {
- if util.FileExists("./logs") {
- err = util.MinioUploadFolder(minioClient, data.RemoteBucket,
- filepath.Join(data.RemoteBasePath, gMachineID, "logs"), "logs")
- if err != nil {
- logger.Errorf("upload logs to MinIO error: %s", err)
- resp.Status = -1
- resp.Msg += fmt.Sprintf("upload logs to MinIO error: %s\n", err)
- }
- }
- if util.FileExists("./log") {
- err = util.MinioUploadFolder(minioClient, data.RemoteBucket,
- filepath.Join(data.RemoteBasePath, gMachineID, "log"), "log")
- if err != nil {
- logger.Errorf("upload log to MinIO error: %s", err)
- resp.Status = -1
- resp.Msg += fmt.Sprintf("upload log to MinIO error: %s\n", err)
- }
- }
- }
- err = PublishMessage(&resp)
- if err != nil {
- logger.Errorf("send action done error: %s", err)
- }
- default:
- logger.Debugf("NSQ Consumer ignored unknown or uninteresting message: %v", msg)
- }
- // Notify NSQ that the message is processed successfully
- return nil
- }), 1)
- // err = gNsqConsumer.ConnectToNSQLookupd(gAppConfig.Nsq.LookupdHttpAddr)
- err = gNsqConsumer.ConnectToNSQD(appInitCfg.Nsq.TcpAddr)
- if err != nil {
- logger.Fatalf("NSQ Consumer connect error: %s", err)
- }
- }
- var appInitCfg *AppInitConfig = &AppInitConfig{}
- func initLoadConfig() {
- viper.SetDefault("main.uploadRetryMaxTimes", 20)
- viper.SetDefault("main.failedRetryDelaySeconds", 5)
- viper.SetConfigFile("./config/application.yaml")
- viper.WatchConfig()
- viper.OnConfigChange(func(e fsnotify.Event) {
- logger.Infoln("Config file changed:", e.Name)
- var newAppInitConfig AppInitConfig
- err := viper.Unmarshal(&newAppInitConfig)
- if err != nil {
- logger.Infoln("Failed to unmarshal config:", err)
- return
- }
- appInitCfg = &newAppInitConfig
- })
- err := viper.ReadInConfig()
- if err != nil {
- logger.Fatalf("Failed to read config file: %v", err)
- }
- err = viper.Unmarshal(appInitCfg)
- if err != nil {
- logger.Errorf("Failed to unmarshal config: %v, exiting...", err)
- fmt.Printf("Failed to unmarshal config: %v, exiting...\n", err)
- os.Exit(1)
- }
- }
- func initLog() *logrus.Logger {
- // 主日志文件
- log := &lumberjack.Logger{
- Filename: "./log/minio-into-stck.log", // 日志文件的位置
- MaxSize: 50, // 文件最大尺寸(以MB为单位)
- MaxBackups: 5, // 保留的最大旧文件数量
- MaxAge: 28, // 保留旧文件的最大天数
- Compress: true, // 是否压缩/归档旧文件
- LocalTime: true, // 使用本地时间创建时间戳
- }
- // 错误日志文件
- errorLog := &lumberjack.Logger{
- Filename: "./log/minio-into-stck.error.log", // 错误日志文件的位置
- MaxSize: 50, // 文件最大尺寸(以MB为单位)
- MaxBackups: 5, // 保留的最大旧文件数量
- MaxAge: 28, // 保留旧文件的最大天数
- Compress: true, // 是否压缩/归档旧文件
- LocalTime: true, // 使用本地时间创建时间戳
- }
- // 统计日志文件
- statLog := &lumberjack.Logger{
- Filename: "./log/minio-into-stck.stat.log", // 统计日志文件的位置
- MaxSize: 50, // 文件最大尺寸(以MB为单位)
- MaxBackups: 5, // 保留的最大旧文件数量
- MaxAge: 28, // 保留旧文件的最大天数
- Compress: true, // 是否压缩/归档旧文件
- LocalTime: true, // 使用本地时间创建时间戳
- }
- logger := logrus.New()
- if strings.ToLower(os.Getenv("LOG_LEVEL")) == "trace" {
- logger.SetLevel(logrus.TraceLevel)
- } else {
- logger.SetLevel(logrus.DebugLevel)
- }
- logger.Out = log
- // logger.Out = io.MultiWriter(os.Stdout, log)
- // 设置错误级别日志输出到额外的文件
- logger.AddHook(&ErrorHook{
- Writer: errorLog,
- LogLevels: []logrus.Level{
- logrus.ErrorLevel,
- logrus.FatalLevel,
- logrus.PanicLevel,
- },
- })
- statLogger = logrus.New()
- statLogger.SetLevel(logrus.InfoLevel)
- statLogger.Out = statLog
- return logger
- }
- // ErrorHook 用于将错误级别的日志输出到额外的文件
- type ErrorHook struct {
- Writer io.Writer
- LogLevels []logrus.Level
- }
- func (hook *ErrorHook) Fire(entry *logrus.Entry) error {
- line, err := entry.String()
- if err != nil {
- return err
- }
- _, err = hook.Writer.Write([]byte(line))
- return err
- }
- func (hook *ErrorHook) Levels() []logrus.Level {
- return hook.LogLevels
- }
- var logger *logrus.Logger
- var statLogger *logrus.Logger
- var mutexObjFailCounter = &sync.Mutex{}
- var objFailCounter map[string]int = make(map[string]int)
- func main() {
- var err error
- fmt.Println("Starting minio-into-stck, build time:", buildtime)
- logger = initLog()
- logger.Warnln("Logger initialized.")
- // Load configuration from file
- initLoadConfig()
- // 初始化 NSQ
- initNsq()
- var db *gorm.DB
- var minioClient *minio.Client
- var ckConn driver.Conn
- // Connect to MySQL
- dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=True&loc=Local",
- appInitCfg.Mysql.User, appInitCfg.Mysql.Password, appInitCfg.Mysql.Host, appInitCfg.Mysql.Database)
- db, err = gorm.Open(mysql.Open(dsn), &gorm.Config{})
- if err != nil {
- logger.Fatalf("Failed to connect to MySQL: %v", err)
- }
- // Disable logging
- db.Logger = gorm_logger.Discard
- // Get the underlying sql.DB object to close the connection later
- sqlDB, err := db.DB()
- if err != nil {
- logger.Fatalf("Failed to get MySQL DB: %v", err)
- }
- defer sqlDB.Close()
- // Ping the database to check if the connection is successful
- err = sqlDB.Ping()
- if err != nil {
- logger.Infof("ping db error: %v", err)
- return
- }
- logger.Infoln("Database connection successful")
- // Perform auto migration
- err = db.AutoMigrate(&AppConfigDbEntry{}, &UploadRecord{}, &PartUploadRecord{})
- if err != nil {
- logger.Infof("auto migrate error: %v", err)
- return
- }
- logger.Infoln("Auto migration completed")
- // Connect to MinIO
- minioClient, err = minio.New(util.ExpandShellStringInfallible(appInitCfg.Minio.Host), &minio.Options{
- Creds: credentials.NewStaticV4(appInitCfg.Minio.AccessKey, appInitCfg.Minio.Secret, ""),
- Secure: false,
- })
- if err == nil {
- bucketExists, err := minioClient.BucketExists(context.Background(), appInitCfg.Minio.Bucket)
- if err != nil {
- logger.Fatalf("Failed to check if bucket %s exists: %v", appInitCfg.Minio.Bucket, err)
- }
- if !bucketExists {
- logger.Fatalf("Bucket %s does not exist", appInitCfg.Minio.Bucket)
- }
- }
- if err != nil {
- logger.Fatalf("Failed to connect to MinIO: %v", err)
- }
- logger.Infoln("Connected to MinIO")
- // Connect to ClickHouse
- ckConn, err = clickhouse.Open(&clickhouse.Options{
- Addr: []string{appInitCfg.Stck.Host},
- })
- if err == nil {
- err = ckConn.Ping(context.Background())
- }
- if err != nil {
- logger.Fatalf("Failed to connect to ClickHouse: %v", err)
- }
- logger.Infoln("Connected to ClickHouse")
- // OK! Everything is ready now.
- // objUploadChan := make(chan string, 1024*256)
- objUploadChan := util.NewDChan[string](1024 * 16)
- // Start the main work
- logger.Infoln("Starting main worker...")
- gAppExitWaitGroup.Add(1)
- go func() {
- defer gAppExitWaitGroup.Done()
- main_worker(AppCtx{db, minioClient, ckConn}, objUploadChan)
- }()
- // Wait on signal.
- signalChan := make(chan os.Signal, 1)
- signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
- select {
- case <-signalChan:
- logger.Infof("received signal, stopping minio-into-stck")
- case <-programmaticQuitChan:
- logger.Infof("received programmatic quit signal, stopping minio-into-stck")
- }
- gAppQuitting = true
- // HACK: Notify the main worker to quit
- objUploadChan.In() <- ""
- // Close the NSQ producer and consumer
- gNsqProducer.Stop()
- gNsqConsumer.Stop()
- // Wait for the goroutines to exit
- gAppExitWaitGroup.Wait()
- logger.Infof("minio-into-stck stopped gracefully")
- }
- type AppCtx struct {
- db *gorm.DB
- minioClient *minio.Client
- ckConn driver.Conn
- }
- var gAppCfg *AppConfig
- type PartUploadArgs struct {
- StreamInfo *StreamMetadata
- StreamName string
- PartName string
- }
- func main_worker(app AppCtx, objUploadChan *util.DChan[string]) {
- ctx := context.Background()
- // Load config from DB
- appCfg, err := load_app_cfg_from_db(app.db)
- if err != nil {
- logger.Fatalf("Failed to load app config from DB: %v", err)
- }
- gAppCfg = &appCfg
- // Start the notification listener
- go func() {
- for {
- // Register the bucket notification listener
- logger.Infoln("Registering bucket notification listener...")
- notifys := app.minioClient.ListenBucketNotification(
- ctx, appInitCfg.Minio.Bucket, "", "", []string{string(notification.ObjectCreatedAll)})
- // Listen OK, start the full upload trigger to upload maybe missed files
- go trigger_full_upload(app, objUploadChan.In())
- for notifyInfo := range notifys {
- for _, record := range notifyInfo.Records {
- key := record.S3.Object.Key
- logger.Traceln("New object notification:", key)
- // Only care when `.zst` / `metadata.json` files are uploaded
- // keyParts := strings.Split(key, "/")
- // keyLastPart := keyParts[len(keyParts)-1]
- keyLastPart := filepath.Base(key)
- if keyLastPart != "metadata.json" && !strings.HasSuffix(keyLastPart, ".zst") {
- continue
- }
- // key = strings.Join(keyParts[:len(keyParts)-1], "/") + "/"
- key = filepath.Dir(key) + "/"
- // Remove fail counter so that the object can be retried instead of
- // being spuriously ignored
- mutexObjFailCounter.Lock()
- delete(objFailCounter, key)
- mutexObjFailCounter.Unlock()
- // Queue the object for upload
- objUploadChan.Write(key)
- }
- if notifyInfo.Err != nil {
- logger.Errorf("Bucket notification listener error: %v", notifyInfo.Err)
- }
- }
- logger.Warnln("Bucket notification listener stopped unexpectedly, retrying in 5 seconds...")
- time.Sleep(5 * time.Second)
- // Clear fail counter
- mutexObjFailCounter.Lock()
- objFailCounter = make(map[string]int)
- mutexObjFailCounter.Unlock()
- }
- }()
- // Start the main loop (streams upload worker)
- for objToUpload := range objUploadChan.Out() {
- objUploadChan.MarkElemReadDone(objToUpload)
- if gAppQuitting {
- logger.Infof("Quitting, stopping main worker")
- return
- }
- if objToUpload == "" {
- continue
- }
- logger.Infoln("Checking stream object:", objToUpload)
- if object_is_blacklisted(objToUpload) {
- logger.Infof("Object `%s` is blacklisted, skipping", objToUpload)
- continue
- }
- mutexObjFailCounter.Lock()
- if objFailCounter[objToUpload] >= appInitCfg.Main.UploadRetryMaxTimes {
- logger.Warnf("Retried upload stream `%s` for too many %d times, give up", objToUpload, objFailCounter[objToUpload])
- delete(objFailCounter, objToUpload)
- mutexObjFailCounter.Unlock()
- continue
- }
- objFailCounter[objToUpload]++
- mutexObjFailCounter.Unlock()
- fullyUploaded, err := upload_one_stream(app, objToUpload)
- if err != nil {
- // Queue the object for retry
- logger.Warnf("Failed to upload stream `%s`: `%v`, retrying after %d seconds",
- objToUpload, err, appInitCfg.Main.FailedRetryDelaySeconds)
- objUploadChan.DelayedWrite(objToUpload, time.Duration(appInitCfg.Main.FailedRetryDelaySeconds)*time.Second)
- continue
- }
- if fullyUploaded {
- // Mark the stream as fully uploaded
- //err = app.db.Create(&UploadRecord{Key: objToUpload}).Error
- err = app.db.Where(UploadRecord{Key: objToUpload}).FirstOrCreate(&UploadRecord{}).Error
- if err != nil {
- logger.Warnf("Failed to mark stream %s as uploaded: %v", objToUpload, err)
- } else {
- // We can now remove the stream parts from the parts table
- err = app.db.Where("stream_name = ?", objToUpload).Delete(&PartUploadRecord{}).Error
- if err != nil {
- logger.Warnf("Failed to remove parts of stream %s from parts table: %v", objToUpload, err)
- }
- logger.Infof("Marked stream %s as fully uploaded", objToUpload)
- }
- // Remove entry from the fail counter
- mutexObjFailCounter.Lock()
- delete(objFailCounter, objToUpload)
- mutexObjFailCounter.Unlock()
- } else {
- // Queue the object for retry
- logger.Warnf("Stream %s is not fully uploaded, retrying after %d seconds",
- objToUpload, appInitCfg.Main.FailedRetryDelaySeconds)
- objUploadChan.DelayedWrite(objToUpload, time.Duration(appInitCfg.Main.FailedRetryDelaySeconds)*time.Second)
- }
- }
- }
- func trigger_full_upload(app AppCtx, objToUploadChan chan<- string) {
- // // Upload all files in the directory
- // options := minio.ListObjectsOptions{
- // Recursive: false,
- // }
- // objectsCh := app.minioClient.ListObjects(context.Background(), appInitCfg.Minio.Bucket, options)
- // // NOTE: Streams in minio are organized as `<date>/<stream_name>/`.
- // for objInfo := range objectsCh {
- // if objInfo.Err != nil {
- // logger.Warnf("Error listing bucket `%s` objects: %v", appInitCfg.Minio.Bucket, objInfo.Err)
- // continue
- // }
- // key := objInfo.Key
- // if strings.HasSuffix(key, "/") {
- // // Is a directory (<date>), go deeper into it
- // options := minio.ListObjectsOptions{
- // Prefix: key,
- // Recursive: false,
- // }
- // for objInfo := range app.minioClient.ListObjects(context.Background(), appInitCfg.Minio.Bucket, options) {
- // if objInfo.Err != nil {
- // logger.Warnf("Error listing bucket `%s` folder `%s` objects: %v", appInitCfg.Minio.Bucket, key, objInfo.Err)
- // continue
- // }
- // key := objInfo.Key
- // if strings.HasSuffix(key, "/") {
- // // Is a directory, should be a stream then
- // uploaded := object_already_uploaded(app, key)
- // if !uploaded {
- // objToUploadChan <- key
- // }
- // }
- // }
- // }
- // }
- // WARN: Slow!!!
- // options := minio.ListObjectsOptions{
- // Recursive: true,
- // }
- options := minio.ListObjectsOptions{
- Recursive: false,
- }
- objectsCh := app.minioClient.ListObjects(context.Background(), appInitCfg.Minio.Bucket, options)
- // NOTE: Streams in minio are organized as `<date>/<stream_name>/`.
- for objInfo := range objectsCh {
- if objInfo.Err != nil {
- logger.Warnf("Error listing bucket `%s` objects: %v", appInitCfg.Minio.Bucket, objInfo.Err)
- continue
- }
- key := objInfo.Key
- // // If it's a directory with `metadata.json` file, it should be a stream
- // if strings.HasSuffix(key, "/metadata.json") {
- // streamName := strings.TrimSuffix(key, "metadata.json")
- // uploaded := object_already_uploaded(app, streamName)
- // if !uploaded {
- // objToUploadChan <- streamName
- // }
- // }
- // Scan through all subdirectories
- if strings.HasSuffix(key, "/") {
- exists, err := util.MinioObjectExists(app.minioClient, appInitCfg.Minio.Bucket, key+"metadata.json")
- if err != nil {
- logger.Warnf("Error checking if object `%s` exists: %v", key+"metadata.json", err)
- continue
- }
- if exists {
- // Go ahead and upload the stream
- uploaded := object_already_uploaded(app, key)
- if !uploaded {
- objToUploadChan <- key
- }
- } else {
- // Check inner directories
- options := minio.ListObjectsOptions{
- Prefix: key,
- Recursive: false,
- }
- for objInfo := range app.minioClient.ListObjects(context.Background(), appInitCfg.Minio.Bucket, options) {
- if objInfo.Err != nil {
- logger.Warnf("Error listing bucket `%s` folder `%s` objects: %v", appInitCfg.Minio.Bucket, key, objInfo.Err)
- continue
- }
- key := objInfo.Key
- if strings.HasSuffix(key, "/") {
- // Is a directory, should be a stream then
- uploaded := object_already_uploaded(app, key)
- if !uploaded {
- objToUploadChan <- key
- }
- }
- }
- }
- }
- }
- }
- func upload_one_stream(app AppCtx, streamName string) (fullyUploaded bool, err error) {
- type StreamObjectUploadStatistics struct {
- StartTime time.Time
- EndTime time.Time
- PartName string
- UpState string // "repeated" / "ok" / "fail"
- Msg string // "error message"
- }
- type StreamUploadStatistics struct {
- StartTime time.Time
- EndTime time.Time
- StreamName string
- MetaPointsCount int64
- MetaPartsCount int64
- Objects map[string]StreamObjectUploadStatistics
- Msg string
- }
- streamStats := StreamUploadStatistics{
- StartTime: time.Now(),
- StreamName: streamName,
- Objects: make(map[string]StreamObjectUploadStatistics),
- }
- logger.Infof("Going to upload stream `%s`", streamName)
- if object_already_uploaded(app, streamName) {
- logger.Infof("Stream `%s` is already uploaded", streamName)
- return true, nil
- }
- defer func() {
- streamStats.EndTime = time.Now()
- repeatedCount := int64(0)
- okCount := int64(0)
- failCount := int64(0)
- totalCount := int64(0)
- objDetailsMsg := ""
- for _, obj := range streamStats.Objects {
- totalCount++
- if obj.UpState == "repeated" {
- repeatedCount++
- objDetailsMsg += fmt.Sprintf("\n{%s: REPEATED (t: %v ~ %v, duration: %v)}",
- obj.PartName, util.FmtMyTime(obj.StartTime), util.FmtMyTime(obj.EndTime), obj.EndTime.Sub(obj.StartTime))
- } else if obj.UpState == "ok" {
- okCount++
- objDetailsMsg += fmt.Sprintf("\n{%s: OK (t: %v ~ %v, duration: %v)}",
- obj.PartName, util.FmtMyTime(obj.StartTime), util.FmtMyTime(obj.EndTime), obj.EndTime.Sub(obj.StartTime))
- } else {
- failCount++
- objDetailsMsg += fmt.Sprintf("\n{%s: FAIL (Reason: %s) (t: %v ~ %v, duration: %v)}",
- obj.PartName, obj.Msg, util.FmtMyTime(obj.StartTime), util.FmtMyTime(obj.EndTime), obj.EndTime.Sub(obj.StartTime))
- }
- }
- statLogger.Infof("Upload stream `%s` took %v (%v ~ %v); is fully uploaded: %v; metadata-reported part count=%d; "+
- "repeated=%d, ok=%d, fail=%d, total=%d; details:%s",
- streamName, streamStats.EndTime.Sub(streamStats.StartTime),
- util.FmtMyTime(streamStats.StartTime), util.FmtMyTime(streamStats.EndTime),
- fullyUploaded, streamStats.MetaPartsCount,
- repeatedCount, okCount, failCount, totalCount, objDetailsMsg)
- // Send upload status changed message
- msg := smsg.MakeStreamInsertToStckStatusChangedMsg(gMachineID, streamStats.StartTime.UnixNano(),
- streamStats.StreamName, streamStats.MetaPointsCount, streamStats.MetaPartsCount,
- okCount, failCount, repeatedCount)
- err := PublishMessage(&msg)
- if err != nil {
- logger.Errorf("send stream insert to stck status changed message error: %s", err)
- }
- }()
- fullyUploaded = true
- // Get stream metadata
- streamInfo, err := get_stream_metadata(app, streamName)
- if err != nil {
- // Cannot continue without metadata
- return false, err
- }
- streamStats.MetaPointsCount = streamInfo.TotalPoints
- streamStats.MetaPartsCount = streamInfo.PartsCount
- if streamInfo.PartsCount == 0 {
- // Edge device didn't finish uploading the stream yet
- logger.Debugf("Marking stream `%s` as not fully uploaded, reason: parts_count=0", streamName)
- fullyUploaded = false
- }
- hasSomething := false
- // Upload parts
- streamObjPath := streamName
- options := minio.ListObjectsOptions{
- Prefix: streamObjPath,
- Recursive: false,
- }
- logger.Tracef("Listing minio objects in `%s`, bucket `%s`", streamObjPath, appInitCfg.Minio.Bucket)
- // hasSomething := false
- hasMetadata := false
- for objInfo := range app.minioClient.ListObjects(context.Background(), appInitCfg.Minio.Bucket, options) {
- if objInfo.Err != nil {
- return false, objInfo.Err
- }
- if gAppQuitting {
- logger.Infof("Quitting, stopping uploading one stream")
- return false, nil
- }
- logger.Tracef("Checking minio file `%s`", objInfo.Key)
- if strings.HasSuffix(objInfo.Key, "/") {
- continue
- }
- partName := filepath.Base(objInfo.Key)
- if partName == "metadata.json" {
- hasMetadata = true
- continue
- }
- hasSomething = true
- objStat := StreamObjectUploadStatistics{
- StartTime: time.Now(),
- PartName: partName,
- }
- if part_already_uploaded(app, streamName, partName) {
- objStat.EndTime = time.Now()
- objStat.UpState = "repeated"
- streamStats.Objects[objInfo.Key] = objStat
- logger.Infof("Part `%s` of stream `%s` is already uploaded", objInfo.Key, streamName)
- continue
- }
- if fullyUploaded {
- fullyUploaded = false
- logger.Debugf("Marking stream `%s` as not fully uploaded, reason: part `%s` not uploaded", streamName, objInfo.Key)
- }
- // Do the parts upload
- partInfo := PartUploadArgs{StreamInfo: streamInfo, StreamName: streamName, PartName: objInfo.Key}
- logger.Infof("Uploading part `%s` (total %d) of stream `%s`, total_points=%d",
- partInfo.PartName, partInfo.StreamInfo.PartsCount,
- partInfo.StreamName, partInfo.StreamInfo.TotalPoints)
- err := upload_one_part(app, partInfo.StreamInfo, partInfo.StreamName, partInfo.PartName)
- if err != nil {
- objStat.EndTime = time.Now()
- objStat.UpState = "fail"
- objStat.Msg = err.Error()
- logger.Warnf("Failed to upload part `%s` of stream `%s` (took %v): %v", partInfo.PartName, partInfo.StreamName,
- objStat.EndTime.Sub(objStat.StartTime), err)
- fullyUploaded = false
- } else {
- // Mark the part as uploaded
- //err = app.db.Create(&PartUploadRecord{StreamName: partInfo.StreamName, PartName: partInfo.PartName}).Error
- part := PartUploadRecord{StreamName: partInfo.StreamName, PartName: partName}
- err = app.db.Where(part).FirstOrCreate(&PartUploadRecord{}).Error
- if err != nil {
- logger.Warnf("Failed to mark part `%s` of stream `%s` as uploaded: %v", partInfo.PartName, partInfo.StreamName, err)
- }
- objStat.EndTime = time.Now()
- objStat.UpState = "ok"
- logger.Infof("Uploaded part `%s` of stream `%s`, took %v", objInfo.Key, streamName,
- objStat.EndTime.Sub(objStat.StartTime))
- }
- streamStats.Objects[objInfo.Key] = objStat
- partNum, err := util.ExtractNumberFromString(partName)
- if err != nil {
- // Not a part file? Skip
- continue
- }
- status := "success"
- if objStat.UpState != "ok" {
- status = "failed"
- }
- msg := smsg.MakePartInsertToStckStatusChangedMsg(gMachineID, objStat.StartTime.UnixNano(),
- streamName, partNum, streamStats.MetaPointsCount, status, objStat.Msg)
- err = PublishMessage(&msg)
- if err != nil {
- logger.Errorf("send part insert to stck status changed message error: %s", err)
- }
- }
- if !hasMetadata {
- logger.Warnf("Stream `%s` has no metadata file, will retry later", streamName)
- fullyUploaded = false
- }
- if !hasSomething {
- if streamInfo.PartsCount != 0 {
- logger.Errorf("Stream `%s` has no parts, but claims to have %d parts, %d points???",
- streamName, streamInfo.PartsCount, streamInfo.TotalPoints)
- } else {
- logger.Warnf("Stream `%s` has no parts in minio, will retry later", streamName)
- }
- fullyUploaded = false
- }
- return fullyUploaded, nil
- }
- func upload_one_part(app AppCtx, streamInfo *StreamMetadata, streamName string, partObjPath string) (err error) {
- partName := filepath.Base(partObjPath)
- if part_already_uploaded(app, streamName, partName) {
- return nil
- }
- dryRun := false
- if !dryRun {
- // Get the part data from MinIO
- obj, err := app.minioClient.GetObject(context.Background(), appInitCfg.Minio.Bucket, partObjPath,
- minio.GetObjectOptions{})
- if err != nil {
- return fmt.Errorf("failed to get part `%s` data: %w", partObjPath, err)
- }
- defer obj.Close()
- // Read the part data
- var partDataBuf = new(bytes.Buffer)
- _, err = partDataBuf.ReadFrom(obj)
- if err != nil {
- return fmt.Errorf("failed to read part `%s` data: %w", partObjPath, err)
- }
- // Process the part data
- partData, err := util.DecompressZstdBuffer(partDataBuf.Bytes())
- if err != nil {
- return fmt.Errorf("failed to decompress part `%s` data: %w", partObjPath, err)
- }
- // Use regex to extract the part index from the part name
- partIndex := int64(0)
- {
- re := regexp.MustCompile(`part_(\d+)\.zst`)
- matches := re.FindStringSubmatch(partObjPath)
- if len(matches) != 2 {
- return fmt.Errorf("failed to extract part index from part name `%s`", partObjPath)
- }
- partIndex, err = strconv.ParseInt(matches[1], 10, 64)
- if err != nil {
- return fmt.Errorf("failed to convert part index `%s` to integer: %w", matches[1], err)
- }
- // Check if the part index is correct
- if partIndex < 0 || (streamInfo.PartsCount != 0 && partIndex >= streamInfo.PartsCount) {
- return fmt.Errorf("part `%s` index out of bounds: %d / %d", partObjPath, partIndex, streamInfo.PartsCount)
- }
- // Check if the part data size is correct
- if streamInfo.PartsCount != 0 {
- left := int64(len(partData))
- if partIndex < streamInfo.PartsCount-1 {
- right := streamInfo.PointsPerPart * 8
- if left != right {
- return fmt.Errorf("part `%s` data size mismatch: %d versus %d", partObjPath, left, right)
- }
- } else if partIndex == streamInfo.PartsCount-1 {
- right := (streamInfo.TotalPoints % streamInfo.PointsPerPart) * 8
- if right == 0 {
- right = streamInfo.PointsPerPart * 8
- }
- if left != right {
- return fmt.Errorf("part `%s` data size mismatch: %d versus %d", partObjPath, left, right)
- }
- } else {
- return fmt.Errorf("part `%s` index out of bounds: %d", partObjPath, partIndex)
- }
- }
- }
- partPointsCount := len(partData) / 8
- // Insert the part data into ClickHouse
- batch, err := app.ckConn.PrepareBatch(context.Background(), "INSERT INTO "+appInitCfg.Stck.Table)
- if err != nil {
- return fmt.Errorf("failed to insert part data into stck: %w", err)
- }
- /*
- ┌─name────────┬─type────────────────────────────────────────────────┬─default_type─┬─default_expression─┬─comment───────────────────┬─codec_expression─┬─ttl_expression─┐
- │ metric_name │ LowCardinality(String) │ │ │ Metric name │ │ │
- │ point_name │ LowCardinality(String) │ │ │ Point name │ │ │
- │ tags │ Map(LowCardinality(String), LowCardinality(String)) │ │ │ Point tags │ │ │
- │ value │ Float64 │ │ │ Point value │ │ │
- │ nanoseconds │ Int64 │ │ │ Point time in nanoseconds │ DoubleDelta, LZ4 │ │
- └─────────────┴─────────────────────────────────────────────────────┴──────────────┴────────────────────┴───────────────────────────┴──────────────────┴────────────────┘
- */
- logger.Debugf("Going to insert %d points for part `%s` of stream `%s`", partPointsCount, partObjPath, streamName)
- for i := 0; i < partPointsCount; i++ {
- metricName := streamInfo.MetricName
- pointName := streamInfo.Name
- tags := map[string]string{}
- value := math.Float64frombits(binary.LittleEndian.Uint64(partData[i*8 : (i+1)*8]))
- // NOTE: TimestampOffset is in milliseconds, need to convert to nanoseconds
- nanoseconds := streamInfo.TimestampOffset * 1e6
- nanoseconds += int64(partIndex) * int64(streamInfo.PointsPerPart) * streamInfo.Interval
- nanoseconds += int64(i) * streamInfo.Interval
- err := batch.Append(metricName, pointName, tags, value, nanoseconds)
- if err != nil {
- return err
- }
- }
- if batch.Rows() != partPointsCount {
- logger.Errorf("Batch rows mismatch: %d / %d???", batch.Rows(), partPointsCount)
- }
- err = batch.Send()
- if err != nil {
- return err
- }
- if !batch.IsSent() {
- logger.Errorln("Batch not sent???")
- }
- logger.Debugf("Inserted %d points for part `%s` of stream `%s`", partPointsCount, partObjPath, streamName)
- // We made progress, reset the fail counter
- mutexObjFailCounter.Lock()
- objFailCounter[streamName] = 0
- mutexObjFailCounter.Unlock()
- }
- return nil
- }
- func load_app_cfg_from_db(db *gorm.DB) (AppConfig, error) {
- var cfg AppConfig
- err := db.AutoMigrate(&AppConfigDbEntry{})
- if err != nil {
- return cfg, err
- }
- //db.Create(&AppConfigDbEntry{Key: "ImportBlacklist", Value: `[]`})
- var dbEntry AppConfigDbEntry
- result := db.Where(AppConfigDbEntry{Key: "ImportBlacklist"}).Attrs(AppConfigDbEntry{Value: `[]`}).FirstOrCreate(&dbEntry)
- if result.Error != nil {
- //if errors.Is(result.Error, gorm.ErrRecordNotFound) {
- // dbEntry.Value = `[]`
- //} else {
- // return cfg, result.Error
- //}
- return cfg, result.Error
- }
- err = json.Unmarshal([]byte(dbEntry.Value), &cfg.ImportBlacklist)
- if err != nil {
- return cfg, err
- }
- return cfg, nil
- }
- func object_already_uploaded(app AppCtx, key string) bool {
- var record UploadRecord
- result := app.db.First(&record, "key", key)
- return result.Error == nil
- }
- func object_is_blacklisted(key string) bool {
- for _, regexPattern := range gAppCfg.ImportBlacklist {
- // TODO: Cache compiled regex patterns
- if matched, _ := regexp.MatchString(regexPattern, key); matched {
- return true
- }
- }
- return false
- }
- func part_already_uploaded(app AppCtx, streamName string, partName string) bool {
- var record PartUploadRecord
- result := app.db.First(&record, "stream_name = ? AND part_name = ?", streamName, partName)
- return result.Error == nil
- }
- func get_stream_metadata(app AppCtx, streamName string) (*StreamMetadata, error) {
- // Get the stream metadata from MinIO
- metadataObjPath := streamName + "metadata.json"
- obj, err := app.minioClient.GetObject(context.Background(), appInitCfg.Minio.Bucket,
- metadataObjPath, minio.GetObjectOptions{})
- if err != nil {
- return nil, fmt.Errorf("failed to get stream metadata: %w", err)
- }
- defer obj.Close()
- var streamInfo StreamMetadata
- err = json.NewDecoder(obj).Decode(&streamInfo)
- if err != nil {
- return nil, fmt.Errorf("failed to decode stream metadata: %w", err)
- }
- return &streamInfo, nil
- }
|