main.go 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981
  1. package main
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/binary"
  6. "encoding/json"
  7. "example/minio-into-stck/util"
  8. "fmt"
  9. "os"
  10. "os/signal"
  11. "path/filepath"
  12. "sync"
  13. "syscall"
  14. "io"
  15. // "log"
  16. "math"
  17. // "os"
  18. "regexp"
  19. "strconv"
  20. "strings"
  21. "time"
  22. "github.com/ClickHouse/ch-go"
  23. "github.com/ClickHouse/ch-go/proto"
  24. "github.com/ClickHouse/clickhouse-go/v2"
  25. "github.com/ClickHouse/clickhouse-go/v2/lib/driver"
  26. "github.com/Jeffail/tunny"
  27. "github.com/fsnotify/fsnotify"
  28. "github.com/minio/minio-go/v7"
  29. "github.com/minio/minio-go/v7/pkg/credentials"
  30. "github.com/minio/minio-go/v7/pkg/notification"
  31. "github.com/natefinch/lumberjack"
  32. "github.com/sirupsen/logrus"
  33. "github.com/spf13/viper"
  34. "gorm.io/driver/mysql"
  35. "gorm.io/gorm"
  36. gorm_logger "gorm.io/gorm/logger"
  37. )
  38. // AppInitConfig is the initial configuration of the application, loaded from the config file.
  39. type AppInitConfig struct {
  40. Mysql struct {
  41. User string `mapstructure:"user"`
  42. Password string `mapstructure:"password"`
  43. Host string `mapstructure:"host"`
  44. Database string `mapstructure:"database"`
  45. } `mapstructure:"mysql"`
  46. Minio struct {
  47. AccessKey string `mapstructure:"accessKey"`
  48. Secret string `mapstructure:"secret"`
  49. Bucket string `mapstructure:"bucket"`
  50. Host string `mapstructure:"host"`
  51. } `mapstructure:"minio"`
  52. Stck struct {
  53. Host string `mapstructure:"host"`
  54. Table string `mapstructure:"table"`
  55. } `mapstructure:"stck"`
  56. Main struct {
  57. UploadRetryMaxTimes int `mapstructure:"uploadRetryMaxTimes"`
  58. FailedRetryDelaySeconds int `mapstructure:"failedRetryDelaySeconds"`
  59. NotifyToUploadDelaySeconds int `mapstructure:"notifyToUploadDelaySeconds"`
  60. ParallelUploadThreadsCount int `mapstructure:"parallelUploadThreadsCount"`
  61. } `mapstructure:"main"`
  62. }
  63. type AppConfig struct {
  64. // List of name regex patterns to exclude from import.
  65. ImportBlacklist []string
  66. }
  67. type AppConfigDbEntry struct {
  68. Key string `gorm:"primaryKey"`
  69. Value string
  70. }
  71. // TODO: 插入时先创建行,插入完后更新插入耗时
  72. // UploadRecord Represents a record of an uploaded stream. (A major upload task.)
  73. type UploadRecord struct {
  74. Key string `gorm:"primaryKey"`
  75. CreatedAt time.Time
  76. }
  77. // PartUploadRecord Represents a record of an uploaded part of a stream. (A minor upload task.)
  78. type PartUploadRecord struct {
  79. StreamName string `gorm:"primaryKey"`
  80. PartName string `gorm:"primaryKey"`
  81. CreatedAt time.Time
  82. }
  83. type StreamMetadata struct {
  84. MetricName string `json:"metric_name"`
  85. Name string `json:"name"`
  86. TimestampOffset int64 `json:"timestamp_offset"`
  87. Interval int64 `json:"interval"`
  88. PartsCount int64 `json:"parts_count"`
  89. PointsPerPart int64 `json:"points_per_part"`
  90. TotalPoints int64 `json:"total_points"`
  91. }
  92. var programmaticQuitChan chan struct{} = make(chan struct{}, 1)
  93. var gAppQuitting = false
  94. var gAppExitWaitGroup sync.WaitGroup
  95. var buildtime string
  96. var appInitCfg *AppInitConfig = &AppInitConfig{}
  97. func initLoadConfig() {
  98. viper.SetDefault("main.uploadRetryMaxTimes", 20)
  99. viper.SetDefault("main.failedRetryDelaySeconds", 5)
  100. viper.SetDefault("main.notifyToUploadDelaySeconds", 1)
  101. viper.SetDefault("main.parallelUploadThreadsCount", 2)
  102. viper.SetConfigFile("./config/application.yaml")
  103. viper.WatchConfig()
  104. viper.OnConfigChange(func(e fsnotify.Event) {
  105. logger.Infoln("Config file changed:", e.Name)
  106. var newAppInitConfig AppInitConfig
  107. err := viper.Unmarshal(&newAppInitConfig)
  108. if err != nil {
  109. logger.Infoln("Failed to unmarshal config:", err)
  110. return
  111. }
  112. appInitCfg = &newAppInitConfig
  113. })
  114. err := viper.ReadInConfig()
  115. if err != nil {
  116. logger.Fatalf("Failed to read config file: %v", err)
  117. }
  118. err = viper.Unmarshal(appInitCfg)
  119. if err != nil {
  120. logger.Errorf("Failed to unmarshal config: %v, exiting...", err)
  121. fmt.Printf("Failed to unmarshal config: %v, exiting...\n", err)
  122. os.Exit(1)
  123. }
  124. }
  125. func initLog() *logrus.Logger {
  126. // 主日志文件
  127. log := &lumberjack.Logger{
  128. Filename: "./log/minio-into-stck.log", // 日志文件的位置
  129. MaxSize: 50, // 文件最大尺寸(以MB为单位)
  130. MaxBackups: 5, // 保留的最大旧文件数量
  131. MaxAge: 28, // 保留旧文件的最大天数
  132. Compress: true, // 是否压缩/归档旧文件
  133. LocalTime: true, // 使用本地时间创建时间戳
  134. }
  135. // 错误日志文件
  136. errorLog := &lumberjack.Logger{
  137. Filename: "./log/minio-into-stck.error.log", // 错误日志文件的位置
  138. MaxSize: 50, // 文件最大尺寸(以MB为单位)
  139. MaxBackups: 5, // 保留的最大旧文件数量
  140. MaxAge: 28, // 保留旧文件的最大天数
  141. Compress: true, // 是否压缩/归档旧文件
  142. LocalTime: true, // 使用本地时间创建时间戳
  143. }
  144. // 统计日志文件
  145. statLog := &lumberjack.Logger{
  146. Filename: "./log/minio-into-stck.stat.log", // 统计日志文件的位置
  147. MaxSize: 50, // 文件最大尺寸(以MB为单位)
  148. MaxBackups: 5, // 保留的最大旧文件数量
  149. MaxAge: 28, // 保留旧文件的最大天数
  150. Compress: true, // 是否压缩/归档旧文件
  151. LocalTime: true, // 使用本地时间创建时间戳
  152. }
  153. logger := logrus.New()
  154. if strings.ToLower(os.Getenv("LOG_LEVEL")) == "trace" {
  155. logger.SetLevel(logrus.TraceLevel)
  156. } else {
  157. logger.SetLevel(logrus.DebugLevel)
  158. }
  159. logger.Out = log
  160. // logger.Out = io.MultiWriter(os.Stdout, log)
  161. // 设置错误级别日志输出到额外的文件
  162. logger.AddHook(&ErrorHook{
  163. // Writer: errorLog,
  164. Writer: io.MultiWriter(os.Stderr, errorLog),
  165. LogLevels: []logrus.Level{
  166. logrus.ErrorLevel,
  167. logrus.FatalLevel,
  168. logrus.PanicLevel,
  169. },
  170. })
  171. statLogger = logrus.New()
  172. statLogger.SetLevel(logrus.InfoLevel)
  173. statLogger.Out = statLog
  174. return logger
  175. }
  176. // ErrorHook 用于将错误级别的日志输出到额外的文件
  177. type ErrorHook struct {
  178. Writer io.Writer
  179. LogLevels []logrus.Level
  180. }
  181. func (hook *ErrorHook) Fire(entry *logrus.Entry) error {
  182. line, err := entry.String()
  183. if err != nil {
  184. return err
  185. }
  186. _, err = hook.Writer.Write([]byte(line))
  187. return err
  188. }
  189. func (hook *ErrorHook) Levels() []logrus.Level {
  190. return hook.LogLevels
  191. }
  192. var logger *logrus.Logger
  193. var statLogger *logrus.Logger
  194. var mutexObjFailCounter = &sync.Mutex{}
  195. var objFailCounter map[string]int = make(map[string]int)
  196. func main() {
  197. var err error
  198. fmt.Println("Starting minio-into-stck, build time:", buildtime)
  199. logger = initLog()
  200. logger.Warnln("Logger initialized.")
  201. // Load configuration from file
  202. initLoadConfig()
  203. var db *gorm.DB
  204. var minioClient *minio.Client
  205. var ckConn driver.Conn
  206. // Connect to MySQL
  207. dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=True&loc=Local",
  208. appInitCfg.Mysql.User, appInitCfg.Mysql.Password, appInitCfg.Mysql.Host, appInitCfg.Mysql.Database)
  209. db, err = gorm.Open(mysql.Open(dsn), &gorm.Config{})
  210. if err != nil {
  211. logger.Fatalf("Failed to connect to MySQL: %v", err)
  212. }
  213. // Disable logging
  214. db.Logger = gorm_logger.Discard
  215. // Get the underlying sql.DB object to close the connection later
  216. sqlDB, err := db.DB()
  217. if err != nil {
  218. logger.Fatalf("Failed to get MySQL DB: %v", err)
  219. }
  220. defer sqlDB.Close()
  221. // Ping the database to check if the connection is successful
  222. err = sqlDB.Ping()
  223. if err != nil {
  224. logger.Infof("ping db error: %v", err)
  225. return
  226. }
  227. logger.Infoln("Database connection successful")
  228. // Perform auto migration
  229. err = db.AutoMigrate(&AppConfigDbEntry{}, &UploadRecord{}, &PartUploadRecord{})
  230. if err != nil {
  231. logger.Infof("auto migrate error: %v", err)
  232. return
  233. }
  234. logger.Infoln("Auto migration completed")
  235. // Connect to MinIO
  236. minioClient, err = minio.New(util.ExpandShellStringInfallible(appInitCfg.Minio.Host), &minio.Options{
  237. Creds: credentials.NewStaticV4(appInitCfg.Minio.AccessKey, appInitCfg.Minio.Secret, ""),
  238. Secure: false,
  239. })
  240. if err == nil {
  241. bucketExists, err := minioClient.BucketExists(context.Background(), appInitCfg.Minio.Bucket)
  242. if err != nil {
  243. logger.Fatalf("Failed to check if bucket %s exists: %v", appInitCfg.Minio.Bucket, err)
  244. }
  245. if !bucketExists {
  246. logger.Fatalf("Bucket %s does not exist", appInitCfg.Minio.Bucket)
  247. }
  248. }
  249. if err != nil {
  250. logger.Fatalf("Failed to connect to MinIO: %v", err)
  251. }
  252. logger.Infoln("Connected to MinIO")
  253. // Connect to ClickHouse
  254. ckConn, err = clickhouse.Open(&clickhouse.Options{
  255. Addr: []string{appInitCfg.Stck.Host},
  256. })
  257. if err == nil {
  258. err = ckConn.Ping(context.Background())
  259. }
  260. if err != nil {
  261. logger.Fatalf("Failed to connect to ClickHouse: %v", err)
  262. }
  263. logger.Infoln("Connected to ClickHouse")
  264. // OK! Everything is ready now.
  265. // objUploadChan := make(chan string, 1024*256)
  266. objUploadChan := util.NewDChan[string](1024 * 16)
  267. // Start the main work
  268. logger.Infoln("Starting main worker...")
  269. gAppExitWaitGroup.Add(1)
  270. go func() {
  271. defer gAppExitWaitGroup.Done()
  272. main_worker(AppCtx{db, minioClient, ckConn}, objUploadChan)
  273. }()
  274. // Wait on signal.
  275. signalChan := make(chan os.Signal, 1)
  276. signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
  277. select {
  278. case <-signalChan:
  279. logger.Infof("received signal, stopping minio-into-stck")
  280. case <-programmaticQuitChan:
  281. logger.Infof("received programmatic quit signal, stopping minio-into-stck")
  282. }
  283. gAppQuitting = true
  284. // HACK: Notify the main worker to quit
  285. objUploadChan.In() <- ""
  286. // Wait for the goroutines to exit
  287. gAppExitWaitGroup.Wait()
  288. logger.Infof("minio-into-stck stopped gracefully")
  289. }
  290. type AppCtx struct {
  291. db *gorm.DB
  292. minioClient *minio.Client
  293. ckConn driver.Conn
  294. }
  295. var gAppCfg *AppConfig
  296. type PartUploadArgs struct {
  297. StreamInfo *StreamMetadata
  298. StreamName string
  299. PartName string
  300. }
  301. func main_worker(app AppCtx, objUploadChan *util.DChan[string]) {
  302. ctx := context.Background()
  303. // Load config from DB
  304. appCfg, err := load_app_cfg_from_db(app.db)
  305. if err != nil {
  306. logger.Fatalf("Failed to load app config from DB: %v", err)
  307. }
  308. gAppCfg = &appCfg
  309. // Start the notification listener
  310. go func() {
  311. for {
  312. // Register the bucket notification listener
  313. logger.Infoln("Registering bucket notification listener...")
  314. notifys := app.minioClient.ListenBucketNotification(
  315. ctx, appInitCfg.Minio.Bucket, "", "", []string{string(notification.ObjectCreatedAll)})
  316. // Listen OK, start the full upload trigger to upload maybe missed files
  317. go trigger_full_upload(app, objUploadChan.In())
  318. for notifyInfo := range notifys {
  319. for _, record := range notifyInfo.Records {
  320. key := record.S3.Object.Key
  321. logger.Debugln("New object notification:", key)
  322. // Only care when `.zst` / `metadata.json` files are uploaded
  323. // keyParts := strings.Split(key, "/")
  324. // keyLastPart := keyParts[len(keyParts)-1]
  325. keyLastPart := filepath.Base(key)
  326. if keyLastPart != "metadata.json" && !strings.HasSuffix(keyLastPart, ".zst") {
  327. continue
  328. }
  329. // key = strings.Join(keyParts[:len(keyParts)-1], "/") + "/"
  330. key = filepath.Dir(key) + "/"
  331. // Remove fail counter so that the object can be retried instead of
  332. // being spuriously ignored
  333. mutexObjFailCounter.Lock()
  334. delete(objFailCounter, key)
  335. mutexObjFailCounter.Unlock()
  336. // Queue the object for upload
  337. objUploadChan.DelayedWrite(key, time.Duration(appInitCfg.Main.NotifyToUploadDelaySeconds)*time.Second)
  338. }
  339. if notifyInfo.Err != nil {
  340. logger.Errorf("Bucket notification listener error: %v", notifyInfo.Err)
  341. }
  342. }
  343. logger.Warnln("Bucket notification listener stopped unexpectedly, retrying in 5 seconds...")
  344. time.Sleep(5 * time.Second)
  345. // Clear fail counter
  346. mutexObjFailCounter.Lock()
  347. objFailCounter = make(map[string]int)
  348. mutexObjFailCounter.Unlock()
  349. }
  350. }()
  351. // Start the main loop (streams upload worker)
  352. for objToUpload := range objUploadChan.Out() {
  353. objUploadChan.MarkElemReadDone(objToUpload)
  354. if gAppQuitting {
  355. logger.Infof("Quitting, stopping main worker")
  356. return
  357. }
  358. if objToUpload == "" {
  359. continue
  360. }
  361. logger.Infoln("Checking stream object:", objToUpload)
  362. if object_is_blacklisted(objToUpload) {
  363. logger.Infof("Object `%s` is blacklisted, skipping", objToUpload)
  364. continue
  365. }
  366. mutexObjFailCounter.Lock()
  367. if objFailCounter[objToUpload] >= appInitCfg.Main.UploadRetryMaxTimes {
  368. logger.Warnf("Retried upload stream `%s` for too many %d times, give up", objToUpload, objFailCounter[objToUpload])
  369. delete(objFailCounter, objToUpload)
  370. mutexObjFailCounter.Unlock()
  371. continue
  372. }
  373. objFailCounter[objToUpload]++
  374. mutexObjFailCounter.Unlock()
  375. fullyUploaded, err := upload_one_stream(app, objToUpload)
  376. if err != nil {
  377. // Queue the object for retry
  378. logger.Warnf("Failed to upload stream `%s`: `%v`, retrying after %d seconds",
  379. objToUpload, err, appInitCfg.Main.FailedRetryDelaySeconds)
  380. objUploadChan.DelayedWrite(objToUpload, time.Duration(appInitCfg.Main.FailedRetryDelaySeconds)*time.Second)
  381. continue
  382. }
  383. if fullyUploaded {
  384. // Mark the stream as fully uploaded
  385. //err = app.db.Create(&UploadRecord{Key: objToUpload}).Error
  386. err = app.db.Where(UploadRecord{Key: objToUpload}).FirstOrCreate(&UploadRecord{}).Error
  387. if err != nil {
  388. logger.Warnf("Failed to mark stream %s as uploaded: %v", objToUpload, err)
  389. } else {
  390. // We can now remove the stream parts from the parts table
  391. err = app.db.Where("stream_name = ?", objToUpload).Delete(&PartUploadRecord{}).Error
  392. if err != nil {
  393. logger.Warnf("Failed to remove parts of stream %s from parts table: %v", objToUpload, err)
  394. }
  395. logger.Infof("Marked stream %s as fully uploaded", objToUpload)
  396. }
  397. // Remove entry from the fail counter
  398. mutexObjFailCounter.Lock()
  399. delete(objFailCounter, objToUpload)
  400. mutexObjFailCounter.Unlock()
  401. } else {
  402. // Queue the object for retry
  403. logger.Warnf("Stream %s is not fully uploaded, retrying after %d seconds",
  404. objToUpload, appInitCfg.Main.FailedRetryDelaySeconds)
  405. objUploadChan.DelayedWrite(objToUpload, time.Duration(appInitCfg.Main.FailedRetryDelaySeconds)*time.Second)
  406. }
  407. }
  408. }
  409. func trigger_full_upload(app AppCtx, objToUploadChan chan<- string) {
  410. // // Upload all files in the directory
  411. // options := minio.ListObjectsOptions{
  412. // Recursive: false,
  413. // }
  414. // objectsCh := app.minioClient.ListObjects(context.Background(), appInitCfg.Minio.Bucket, options)
  415. // // NOTE: Streams in minio are organized as `<date>/<stream_name>/`.
  416. // for objInfo := range objectsCh {
  417. // if objInfo.Err != nil {
  418. // logger.Warnf("Error listing bucket `%s` objects: %v", appInitCfg.Minio.Bucket, objInfo.Err)
  419. // continue
  420. // }
  421. // key := objInfo.Key
  422. // if strings.HasSuffix(key, "/") {
  423. // // Is a directory (<date>), go deeper into it
  424. // options := minio.ListObjectsOptions{
  425. // Prefix: key,
  426. // Recursive: false,
  427. // }
  428. // for objInfo := range app.minioClient.ListObjects(context.Background(), appInitCfg.Minio.Bucket, options) {
  429. // if objInfo.Err != nil {
  430. // logger.Warnf("Error listing bucket `%s` folder `%s` objects: %v", appInitCfg.Minio.Bucket, key, objInfo.Err)
  431. // continue
  432. // }
  433. // key := objInfo.Key
  434. // if strings.HasSuffix(key, "/") {
  435. // // Is a directory, should be a stream then
  436. // uploaded := object_already_uploaded(app, key)
  437. // if !uploaded {
  438. // objToUploadChan <- key
  439. // }
  440. // }
  441. // }
  442. // }
  443. // }
  444. // WARN: Slow!!!
  445. // options := minio.ListObjectsOptions{
  446. // Recursive: true,
  447. // }
  448. options := minio.ListObjectsOptions{
  449. Recursive: false,
  450. }
  451. objectsCh := app.minioClient.ListObjects(context.Background(), appInitCfg.Minio.Bucket, options)
  452. // NOTE: Streams in minio are organized as `<date>/<stream_name>/`.
  453. for objInfo := range objectsCh {
  454. if objInfo.Err != nil {
  455. logger.Warnf("Error listing bucket `%s` objects: %v", appInitCfg.Minio.Bucket, objInfo.Err)
  456. continue
  457. }
  458. key := objInfo.Key
  459. // // If it's a directory with `metadata.json` file, it should be a stream
  460. // if strings.HasSuffix(key, "/metadata.json") {
  461. // streamName := strings.TrimSuffix(key, "metadata.json")
  462. // uploaded := object_already_uploaded(app, streamName)
  463. // if !uploaded {
  464. // objToUploadChan <- streamName
  465. // }
  466. // }
  467. // Scan through all subdirectories
  468. if strings.HasSuffix(key, "/") {
  469. exists, err := util.MinioObjectExists(app.minioClient, appInitCfg.Minio.Bucket, key+"metadata.json")
  470. if err != nil {
  471. logger.Warnf("Error checking if object `%s` exists: %v", key+"metadata.json", err)
  472. continue
  473. }
  474. if exists {
  475. // Go ahead and upload the stream
  476. uploaded := object_already_uploaded(app, key)
  477. if !uploaded {
  478. objToUploadChan <- key
  479. }
  480. } else {
  481. // Check inner directories
  482. options := minio.ListObjectsOptions{
  483. Prefix: key,
  484. Recursive: false,
  485. }
  486. for objInfo := range app.minioClient.ListObjects(context.Background(), appInitCfg.Minio.Bucket, options) {
  487. if objInfo.Err != nil {
  488. logger.Warnf("Error listing bucket `%s` folder `%s` objects: %v", appInitCfg.Minio.Bucket, key, objInfo.Err)
  489. continue
  490. }
  491. key := objInfo.Key
  492. if strings.HasSuffix(key, "/") {
  493. // Is a directory, should be a stream then
  494. uploaded := object_already_uploaded(app, key)
  495. if !uploaded {
  496. objToUploadChan <- key
  497. }
  498. }
  499. }
  500. }
  501. }
  502. }
  503. logger.Infoln("Full upload trigger finished")
  504. }
  505. func upload_one_stream(app AppCtx, streamName string) (fullyUploaded bool, err error) {
  506. type StreamObjectUploadStatistics struct {
  507. StartTime time.Time
  508. EndTime time.Time
  509. PartName string
  510. UpState string // "repeated" / "ok" / "fail"
  511. Msg string // "error message"
  512. }
  513. type StreamUploadStatistics struct {
  514. StartTime time.Time
  515. EndTime time.Time
  516. StreamName string
  517. MetaPointsCount int64
  518. MetaPartsCount int64
  519. Objects map[string]StreamObjectUploadStatistics
  520. Msg string
  521. }
  522. streamStats := StreamUploadStatistics{
  523. StartTime: time.Now(),
  524. StreamName: streamName,
  525. Objects: make(map[string]StreamObjectUploadStatistics),
  526. }
  527. logger.Infof("Going to upload stream `%s`", streamName)
  528. if object_already_uploaded(app, streamName) {
  529. logger.Infof("Stream `%s` is already uploaded", streamName)
  530. return true, nil
  531. }
  532. defer func() {
  533. streamStats.EndTime = time.Now()
  534. repeatedCount := int64(0)
  535. okCount := int64(0)
  536. failCount := int64(0)
  537. totalCount := int64(0)
  538. objDetailsMsg := ""
  539. for _, obj := range streamStats.Objects {
  540. totalCount++
  541. if obj.UpState == "repeated" {
  542. repeatedCount++
  543. objDetailsMsg += fmt.Sprintf("\n{%s: REPEATED (t: %v ~ %v, duration: %v)}",
  544. obj.PartName, util.FmtMyTime(obj.StartTime), util.FmtMyTime(obj.EndTime), obj.EndTime.Sub(obj.StartTime))
  545. } else if obj.UpState == "ok" {
  546. okCount++
  547. objDetailsMsg += fmt.Sprintf("\n{%s: OK (t: %v ~ %v, duration: %v)}",
  548. obj.PartName, util.FmtMyTime(obj.StartTime), util.FmtMyTime(obj.EndTime), obj.EndTime.Sub(obj.StartTime))
  549. } else {
  550. failCount++
  551. objDetailsMsg += fmt.Sprintf("\n{%s: FAIL (Reason: %s) (t: %v ~ %v, duration: %v)}",
  552. obj.PartName, obj.Msg, util.FmtMyTime(obj.StartTime), util.FmtMyTime(obj.EndTime), obj.EndTime.Sub(obj.StartTime))
  553. }
  554. }
  555. statLogger.Infof("Upload stream `%s` took %v (%v ~ %v); is fully uploaded: %v; metadata-reported part count=%d; "+
  556. "repeated=%d, ok=%d, fail=%d, total=%d; details:%s",
  557. streamName, streamStats.EndTime.Sub(streamStats.StartTime),
  558. util.FmtMyTime(streamStats.StartTime), util.FmtMyTime(streamStats.EndTime),
  559. fullyUploaded, streamStats.MetaPartsCount,
  560. repeatedCount, okCount, failCount, totalCount, objDetailsMsg)
  561. }()
  562. fullyUploaded = true
  563. // Get stream metadata
  564. streamInfo, err := get_stream_metadata(app, streamName)
  565. if err != nil {
  566. // Cannot continue without metadata
  567. return false, err
  568. }
  569. streamStats.MetaPointsCount = streamInfo.TotalPoints
  570. streamStats.MetaPartsCount = streamInfo.PartsCount
  571. if streamInfo.PartsCount == 0 {
  572. // Edge device didn't finish uploading the stream yet
  573. logger.Debugf("Marking stream `%s` as not fully uploaded, reason: parts_count=0", streamName)
  574. fullyUploaded = false
  575. }
  576. hasSomething := false
  577. // Upload parts
  578. streamObjPath := streamName
  579. options := minio.ListObjectsOptions{
  580. Prefix: streamObjPath,
  581. Recursive: false,
  582. }
  583. logger.Tracef("Listing minio objects in `%s`, bucket `%s`", streamObjPath, appInitCfg.Minio.Bucket)
  584. // hasSomething := false
  585. hasMetadata := false
  586. var wg sync.WaitGroup
  587. var mt sync.Mutex
  588. pool := tunny.NewFunc(appInitCfg.Main.ParallelUploadThreadsCount, func(payload interface{}) interface{} {
  589. packed := payload.(util.Pair[string, PartUploadArgs])
  590. partName := packed.First
  591. partInfo := packed.Second
  592. objStat := StreamObjectUploadStatistics{
  593. StartTime: time.Now(),
  594. PartName: partName,
  595. }
  596. logger.Infof("Uploading part `%s` (total %d) of stream `%s`, total_points=%d",
  597. partInfo.PartName, partInfo.StreamInfo.PartsCount,
  598. partInfo.StreamName, partInfo.StreamInfo.TotalPoints)
  599. err := upload_one_part(app, partInfo.StreamInfo, partInfo.StreamName, partInfo.PartName)
  600. if err != nil {
  601. objStat.EndTime = time.Now()
  602. objStat.UpState = "fail"
  603. objStat.Msg = err.Error()
  604. logger.Warnf("Failed to upload part `%s` of stream `%s` (took %v): %v", partInfo.PartName, partInfo.StreamName,
  605. objStat.EndTime.Sub(objStat.StartTime), err)
  606. fullyUploaded = false
  607. } else {
  608. // Mark the part as uploaded
  609. //err = app.db.Create(&PartUploadRecord{StreamName: partInfo.StreamName, PartName: partInfo.PartName}).Error
  610. part := PartUploadRecord{StreamName: partInfo.StreamName, PartName: partName}
  611. err = app.db.Where(part).FirstOrCreate(&PartUploadRecord{}).Error
  612. if err != nil {
  613. logger.Warnf("Failed to mark part `%s` of stream `%s` as uploaded: %v", partInfo.PartName, partInfo.StreamName, err)
  614. }
  615. objStat.EndTime = time.Now()
  616. objStat.UpState = "ok"
  617. logger.Infof("Uploaded part `%s` of stream `%s`, took %v", partInfo.PartName, streamName,
  618. objStat.EndTime.Sub(objStat.StartTime))
  619. }
  620. func() {
  621. mt.Lock()
  622. defer mt.Unlock()
  623. streamStats.Objects[partInfo.PartName] = objStat
  624. }()
  625. return nil
  626. })
  627. for objInfo := range app.minioClient.ListObjects(context.Background(), appInitCfg.Minio.Bucket, options) {
  628. if objInfo.Err != nil {
  629. return false, objInfo.Err
  630. }
  631. if gAppQuitting {
  632. logger.Infof("Quitting, stopping uploading one stream")
  633. return false, nil
  634. }
  635. logger.Tracef("Checking minio file `%s`", objInfo.Key)
  636. if strings.HasSuffix(objInfo.Key, "/") {
  637. continue
  638. }
  639. partName := filepath.Base(objInfo.Key)
  640. if partName == "metadata.json" {
  641. hasMetadata = true
  642. continue
  643. }
  644. hasSomething = true
  645. objStat := StreamObjectUploadStatistics{
  646. StartTime: time.Now(),
  647. PartName: partName,
  648. }
  649. if part_already_uploaded(app, streamName, partName) {
  650. objStat.EndTime = time.Now()
  651. objStat.UpState = "repeated"
  652. func() {
  653. mt.Lock()
  654. defer mt.Unlock()
  655. streamStats.Objects[objInfo.Key] = objStat
  656. }()
  657. logger.Infof("Part `%s` of stream `%s` is already uploaded", objInfo.Key, streamName)
  658. continue
  659. }
  660. if fullyUploaded {
  661. fullyUploaded = false
  662. logger.Debugf("Marking stream `%s` as not fully uploaded, reason: part `%s` not uploaded", streamName, objInfo.Key)
  663. }
  664. // Do the parts upload
  665. partInfo := PartUploadArgs{StreamInfo: streamInfo, StreamName: streamName, PartName: objInfo.Key}
  666. wg.Add(1)
  667. go func() {
  668. defer wg.Done()
  669. pool.Process(util.Pair[string, PartUploadArgs]{First: partName, Second: partInfo})
  670. }()
  671. }
  672. wg.Wait()
  673. pool.Close()
  674. if !hasMetadata {
  675. logger.Warnf("Stream `%s` has no metadata file, will retry later", streamName)
  676. fullyUploaded = false
  677. }
  678. if !hasSomething {
  679. if streamInfo.PartsCount != 0 {
  680. logger.Errorf("Stream `%s` has no parts, but claims to have %d parts, %d points???",
  681. streamName, streamInfo.PartsCount, streamInfo.TotalPoints)
  682. } else {
  683. logger.Warnf("Stream `%s` has no parts in minio, will retry later", streamName)
  684. }
  685. fullyUploaded = false
  686. }
  687. return fullyUploaded, nil
  688. }
  689. func upload_one_part(app AppCtx, streamInfo *StreamMetadata, streamName string, partObjPath string) (err error) {
  690. partName := filepath.Base(partObjPath)
  691. if part_already_uploaded(app, streamName, partName) {
  692. return nil
  693. }
  694. dryRun := false
  695. if !dryRun {
  696. // Get the part data from MinIO
  697. obj, err := app.minioClient.GetObject(context.Background(), appInitCfg.Minio.Bucket, partObjPath,
  698. minio.GetObjectOptions{})
  699. if err != nil {
  700. return fmt.Errorf("failed to get part `%s` data: %w", partObjPath, err)
  701. }
  702. defer obj.Close()
  703. // Read the part data
  704. var partDataBuf = new(bytes.Buffer)
  705. _, err = partDataBuf.ReadFrom(obj)
  706. if err != nil {
  707. return fmt.Errorf("failed to read part `%s` data: %w", partObjPath, err)
  708. }
  709. // Process the part data
  710. partData, err := util.DecompressZstdBuffer(partDataBuf.Bytes())
  711. if err != nil {
  712. return fmt.Errorf("failed to decompress part `%s` data: %w", partObjPath, err)
  713. }
  714. // Use regex to extract the part index from the part name
  715. partIndex := int64(0)
  716. {
  717. re := regexp.MustCompile(`part_(\d+)\.zst`)
  718. matches := re.FindStringSubmatch(partObjPath)
  719. if len(matches) != 2 {
  720. return fmt.Errorf("failed to extract part index from part name `%s`", partObjPath)
  721. }
  722. partIndex, err = strconv.ParseInt(matches[1], 10, 64)
  723. if err != nil {
  724. return fmt.Errorf("failed to convert part index `%s` to integer: %w", matches[1], err)
  725. }
  726. // Check if the part index is correct
  727. if partIndex < 0 || (streamInfo.PartsCount != 0 && partIndex >= streamInfo.PartsCount) {
  728. return fmt.Errorf("part `%s` index out of bounds: %d / %d", partObjPath, partIndex, streamInfo.PartsCount)
  729. }
  730. // Check if the part data size is correct
  731. if streamInfo.PartsCount != 0 {
  732. left := int64(len(partData))
  733. if partIndex < streamInfo.PartsCount-1 {
  734. right := streamInfo.PointsPerPart * 8
  735. if left != right {
  736. return fmt.Errorf("part `%s` data size mismatch: %d versus %d", partObjPath, left, right)
  737. }
  738. } else if partIndex == streamInfo.PartsCount-1 {
  739. right := (streamInfo.TotalPoints % streamInfo.PointsPerPart) * 8
  740. if right == 0 {
  741. right = streamInfo.PointsPerPart * 8
  742. }
  743. if left != right {
  744. return fmt.Errorf("part `%s` data size mismatch: %d versus %d", partObjPath, left, right)
  745. }
  746. } else {
  747. return fmt.Errorf("part `%s` index out of bounds: %d", partObjPath, partIndex)
  748. }
  749. }
  750. }
  751. partPointsCount := len(partData) / 8
  752. // Insert the part data into ClickHouse
  753. customInsertFn := func() error {
  754. ctx := context.Background()
  755. sql := fmt.Sprintf("INSERT INTO %s (metric_name, point_name, tags, value, nanoseconds) "+
  756. "SELECT %s, %s, %s, col1, col2 FROM input('col1 Float64, col2 Int64') FORMAT NATIVE",
  757. appInitCfg.Stck.Table,
  758. util.ToSqlLiteral(streamInfo.MetricName), util.ToSqlLiteral(streamInfo.Name), "[]")
  759. c, err := ch.Dial(ctx, ch.Options{Address: appInitCfg.Stck.Host})
  760. if err != nil {
  761. return fmt.Errorf("failed to connect to stck: %w", err)
  762. }
  763. defer c.Close()
  764. /*
  765. ┌─name────────┬─type────────────────────┬─default_type─┬─default_expression─┬─comment───────────────────┬─codec_expression─┬─ttl_expression─┐
  766. │ metric_name │ LowCardinality(String) │ │ │ Metric name │ │ │
  767. │ point_name │ LowCardinality(String) │ │ │ Point name │ │ │
  768. │ tags │ Map(String, String) │ │ │ Point tags │ │ │
  769. │ value │ Float64 │ │ │ Point value │ │ │
  770. │ nanoseconds │ Int64 │ │ │ Point time in nanoseconds │ DoubleDelta, LZ4 │ │
  771. └─────────────┴─────────────────────────┴──────────────┴────────────────────┴───────────────────────────┴──────────────────┴────────────────┘
  772. */
  773. logger.Debugf("Going to insert %d points for part `%s` of stream `%s`", partPointsCount, partObjPath, streamName)
  774. colValue := make(proto.ColFloat64, 0, partPointsCount)
  775. colNanoseconds := make(proto.ColInt64, 0, partPointsCount)
  776. for i := 0; i < partPointsCount; i++ {
  777. value := math.Float64frombits(binary.LittleEndian.Uint64(partData[i*8 : (i+1)*8]))
  778. // NOTE: TimestampOffset is in milliseconds, need to convert to nanoseconds
  779. nanoseconds := streamInfo.TimestampOffset * 1e6
  780. nanoseconds += int64(partIndex) * int64(streamInfo.PointsPerPart) * streamInfo.Interval
  781. nanoseconds += int64(i) * streamInfo.Interval
  782. colValue = append(colValue, value)
  783. colNanoseconds = append(colNanoseconds, nanoseconds)
  784. }
  785. input := proto.Input{
  786. {Name: "col1", Data: &colValue},
  787. {Name: "col2", Data: &colNanoseconds},
  788. }
  789. err = c.Do(ctx, ch.Query{
  790. Body: sql,
  791. Input: input,
  792. // Settings: []ch.Setting{
  793. // ch.SettingInt("max_insert_threads", 2),
  794. // },
  795. })
  796. if err != nil {
  797. return fmt.Errorf("failed to insert part into stck: %w", err)
  798. }
  799. logger.Debugf("Inserted %d points for part `%s` of stream `%s`", partPointsCount, partObjPath, streamName)
  800. return nil
  801. }
  802. err = customInsertFn()
  803. if err != nil {
  804. return fmt.Errorf("failed to insert part `%s` data into stck: %w", partObjPath, err)
  805. }
  806. // We made progress, reset the fail counter
  807. mutexObjFailCounter.Lock()
  808. objFailCounter[streamName] = 0
  809. mutexObjFailCounter.Unlock()
  810. }
  811. return nil
  812. }
  813. func load_app_cfg_from_db(db *gorm.DB) (AppConfig, error) {
  814. var cfg AppConfig
  815. err := db.AutoMigrate(&AppConfigDbEntry{})
  816. if err != nil {
  817. return cfg, err
  818. }
  819. //db.Create(&AppConfigDbEntry{Key: "ImportBlacklist", Value: `[]`})
  820. var dbEntry AppConfigDbEntry
  821. result := db.Where(AppConfigDbEntry{Key: "ImportBlacklist"}).Attrs(AppConfigDbEntry{Value: `[]`}).FirstOrCreate(&dbEntry)
  822. if result.Error != nil {
  823. //if errors.Is(result.Error, gorm.ErrRecordNotFound) {
  824. // dbEntry.Value = `[]`
  825. //} else {
  826. // return cfg, result.Error
  827. //}
  828. return cfg, result.Error
  829. }
  830. err = json.Unmarshal([]byte(dbEntry.Value), &cfg.ImportBlacklist)
  831. if err != nil {
  832. return cfg, err
  833. }
  834. return cfg, nil
  835. }
  836. func object_already_uploaded(app AppCtx, key string) bool {
  837. var record UploadRecord
  838. result := app.db.First(&record, "key", key)
  839. return result.Error == nil
  840. }
  841. func object_is_blacklisted(key string) bool {
  842. for _, regexPattern := range gAppCfg.ImportBlacklist {
  843. // TODO: Cache compiled regex patterns
  844. if matched, _ := regexp.MatchString(regexPattern, key); matched {
  845. return true
  846. }
  847. }
  848. return false
  849. }
  850. func part_already_uploaded(app AppCtx, streamName string, partName string) bool {
  851. var record PartUploadRecord
  852. result := app.db.First(&record, "stream_name = ? AND part_name = ?", streamName, partName)
  853. return result.Error == nil
  854. }
  855. func get_stream_metadata(app AppCtx, streamName string) (*StreamMetadata, error) {
  856. // Get the stream metadata from MinIO
  857. metadataObjPath := streamName + "metadata.json"
  858. obj, err := app.minioClient.GetObject(context.Background(), appInitCfg.Minio.Bucket,
  859. metadataObjPath, minio.GetObjectOptions{})
  860. if err != nil {
  861. return nil, fmt.Errorf("failed to get stream metadata: %w", err)
  862. }
  863. defer obj.Close()
  864. var streamInfo StreamMetadata
  865. err = json.NewDecoder(obj).Decode(&streamInfo)
  866. if err != nil {
  867. return nil, fmt.Errorf("failed to decode stream metadata: %w", err)
  868. }
  869. return &streamInfo, nil
  870. }