main.go 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941
  1. package main
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/binary"
  6. "encoding/json"
  7. "example/minio-into-stck/util"
  8. "fmt"
  9. "os"
  10. "os/signal"
  11. "path/filepath"
  12. "sync"
  13. "syscall"
  14. "io"
  15. // "log"
  16. "math"
  17. // "os"
  18. "regexp"
  19. "strconv"
  20. "strings"
  21. "time"
  22. "github.com/ClickHouse/ch-go"
  23. "github.com/ClickHouse/ch-go/proto"
  24. "github.com/ClickHouse/clickhouse-go/v2"
  25. "github.com/ClickHouse/clickhouse-go/v2/lib/driver"
  26. "github.com/fsnotify/fsnotify"
  27. "github.com/minio/minio-go/v7"
  28. "github.com/minio/minio-go/v7/pkg/credentials"
  29. "github.com/minio/minio-go/v7/pkg/notification"
  30. "github.com/natefinch/lumberjack"
  31. "github.com/sirupsen/logrus"
  32. "github.com/spf13/viper"
  33. "gorm.io/driver/mysql"
  34. "gorm.io/gorm"
  35. gorm_logger "gorm.io/gorm/logger"
  36. )
  37. // AppInitConfig is the initial configuration of the application, loaded from the config file.
  38. type AppInitConfig struct {
  39. Mysql struct {
  40. User string `mapstructure:"user"`
  41. Password string `mapstructure:"password"`
  42. Host string `mapstructure:"host"`
  43. Database string `mapstructure:"database"`
  44. } `mapstructure:"mysql"`
  45. Minio struct {
  46. AccessKey string `mapstructure:"accessKey"`
  47. Secret string `mapstructure:"secret"`
  48. Bucket string `mapstructure:"bucket"`
  49. Host string `mapstructure:"host"`
  50. } `mapstructure:"minio"`
  51. Stck struct {
  52. Host string `mapstructure:"host"`
  53. Table string `mapstructure:"table"`
  54. } `mapstructure:"stck"`
  55. Main struct {
  56. UploadRetryMaxTimes int `mapstructure:"uploadRetryMaxTimes"`
  57. FailedRetryDelaySeconds int `mapstructure:"failedRetryDelaySeconds"`
  58. NotifyToUploadDelaySeconds int `mapstructure:"notifyToUploadDelaySeconds"`
  59. } `mapstructure:"main"`
  60. }
  61. type AppConfig struct {
  62. // List of name regex patterns to exclude from import.
  63. ImportBlacklist []string
  64. }
  65. type AppConfigDbEntry struct {
  66. Key string `gorm:"primaryKey"`
  67. Value string
  68. }
  69. // TODO: 插入时先创建行,插入完后更新插入耗时
  70. // UploadRecord Represents a record of an uploaded stream. (A major upload task.)
  71. type UploadRecord struct {
  72. Key string `gorm:"primaryKey"`
  73. CreatedAt time.Time
  74. }
  75. // PartUploadRecord Represents a record of an uploaded part of a stream. (A minor upload task.)
  76. type PartUploadRecord struct {
  77. StreamName string `gorm:"primaryKey"`
  78. PartName string `gorm:"primaryKey"`
  79. CreatedAt time.Time
  80. }
  81. type StreamMetadata struct {
  82. MetricName string `json:"metric_name"`
  83. Name string `json:"name"`
  84. TimestampOffset int64 `json:"timestamp_offset"`
  85. Interval int64 `json:"interval"`
  86. PartsCount int64 `json:"parts_count"`
  87. PointsPerPart int64 `json:"points_per_part"`
  88. TotalPoints int64 `json:"total_points"`
  89. }
  90. var programmaticQuitChan chan struct{} = make(chan struct{}, 1)
  91. var gAppQuitting = false
  92. var gAppExitWaitGroup sync.WaitGroup
  93. var buildtime string
  94. var appInitCfg *AppInitConfig = &AppInitConfig{}
  95. func initLoadConfig() {
  96. viper.SetDefault("main.uploadRetryMaxTimes", 20)
  97. viper.SetDefault("main.failedRetryDelaySeconds", 5)
  98. viper.SetDefault("main.notifyToUploadDelaySeconds", 1)
  99. viper.SetConfigFile("./config/application.yaml")
  100. viper.WatchConfig()
  101. viper.OnConfigChange(func(e fsnotify.Event) {
  102. logger.Infoln("Config file changed:", e.Name)
  103. var newAppInitConfig AppInitConfig
  104. err := viper.Unmarshal(&newAppInitConfig)
  105. if err != nil {
  106. logger.Infoln("Failed to unmarshal config:", err)
  107. return
  108. }
  109. appInitCfg = &newAppInitConfig
  110. })
  111. err := viper.ReadInConfig()
  112. if err != nil {
  113. logger.Fatalf("Failed to read config file: %v", err)
  114. }
  115. err = viper.Unmarshal(appInitCfg)
  116. if err != nil {
  117. logger.Errorf("Failed to unmarshal config: %v, exiting...", err)
  118. fmt.Printf("Failed to unmarshal config: %v, exiting...\n", err)
  119. os.Exit(1)
  120. }
  121. }
  122. func initLog() *logrus.Logger {
  123. // 主日志文件
  124. log := &lumberjack.Logger{
  125. Filename: "./log/minio-into-stck.log", // 日志文件的位置
  126. MaxSize: 50, // 文件最大尺寸(以MB为单位)
  127. MaxBackups: 5, // 保留的最大旧文件数量
  128. MaxAge: 28, // 保留旧文件的最大天数
  129. Compress: true, // 是否压缩/归档旧文件
  130. LocalTime: true, // 使用本地时间创建时间戳
  131. }
  132. // 错误日志文件
  133. errorLog := &lumberjack.Logger{
  134. Filename: "./log/minio-into-stck.error.log", // 错误日志文件的位置
  135. MaxSize: 50, // 文件最大尺寸(以MB为单位)
  136. MaxBackups: 5, // 保留的最大旧文件数量
  137. MaxAge: 28, // 保留旧文件的最大天数
  138. Compress: true, // 是否压缩/归档旧文件
  139. LocalTime: true, // 使用本地时间创建时间戳
  140. }
  141. // 统计日志文件
  142. statLog := &lumberjack.Logger{
  143. Filename: "./log/minio-into-stck.stat.log", // 统计日志文件的位置
  144. MaxSize: 50, // 文件最大尺寸(以MB为单位)
  145. MaxBackups: 5, // 保留的最大旧文件数量
  146. MaxAge: 28, // 保留旧文件的最大天数
  147. Compress: true, // 是否压缩/归档旧文件
  148. LocalTime: true, // 使用本地时间创建时间戳
  149. }
  150. logger := logrus.New()
  151. if strings.ToLower(os.Getenv("LOG_LEVEL")) == "trace" {
  152. logger.SetLevel(logrus.TraceLevel)
  153. } else {
  154. logger.SetLevel(logrus.DebugLevel)
  155. }
  156. logger.Out = log
  157. // logger.Out = io.MultiWriter(os.Stdout, log)
  158. // 设置错误级别日志输出到额外的文件
  159. logger.AddHook(&ErrorHook{
  160. // Writer: errorLog,
  161. Writer: io.MultiWriter(os.Stderr, errorLog),
  162. LogLevels: []logrus.Level{
  163. logrus.ErrorLevel,
  164. logrus.FatalLevel,
  165. logrus.PanicLevel,
  166. },
  167. })
  168. statLogger = logrus.New()
  169. statLogger.SetLevel(logrus.InfoLevel)
  170. statLogger.Out = statLog
  171. return logger
  172. }
  173. // ErrorHook 用于将错误级别的日志输出到额外的文件
  174. type ErrorHook struct {
  175. Writer io.Writer
  176. LogLevels []logrus.Level
  177. }
  178. func (hook *ErrorHook) Fire(entry *logrus.Entry) error {
  179. line, err := entry.String()
  180. if err != nil {
  181. return err
  182. }
  183. _, err = hook.Writer.Write([]byte(line))
  184. return err
  185. }
  186. func (hook *ErrorHook) Levels() []logrus.Level {
  187. return hook.LogLevels
  188. }
  189. var logger *logrus.Logger
  190. var statLogger *logrus.Logger
  191. var mutexObjFailCounter = &sync.Mutex{}
  192. var objFailCounter map[string]int = make(map[string]int)
  193. func main() {
  194. var err error
  195. fmt.Println("Starting minio-into-stck, build time:", buildtime)
  196. logger = initLog()
  197. logger.Warnln("Logger initialized.")
  198. // Load configuration from file
  199. initLoadConfig()
  200. var db *gorm.DB
  201. var minioClient *minio.Client
  202. var ckConn driver.Conn
  203. // Connect to MySQL
  204. dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=True&loc=Local",
  205. appInitCfg.Mysql.User, appInitCfg.Mysql.Password, appInitCfg.Mysql.Host, appInitCfg.Mysql.Database)
  206. db, err = gorm.Open(mysql.Open(dsn), &gorm.Config{})
  207. if err != nil {
  208. logger.Fatalf("Failed to connect to MySQL: %v", err)
  209. }
  210. // Disable logging
  211. db.Logger = gorm_logger.Discard
  212. // Get the underlying sql.DB object to close the connection later
  213. sqlDB, err := db.DB()
  214. if err != nil {
  215. logger.Fatalf("Failed to get MySQL DB: %v", err)
  216. }
  217. defer sqlDB.Close()
  218. // Ping the database to check if the connection is successful
  219. err = sqlDB.Ping()
  220. if err != nil {
  221. logger.Infof("ping db error: %v", err)
  222. return
  223. }
  224. logger.Infoln("Database connection successful")
  225. // Perform auto migration
  226. err = db.AutoMigrate(&AppConfigDbEntry{}, &UploadRecord{}, &PartUploadRecord{})
  227. if err != nil {
  228. logger.Infof("auto migrate error: %v", err)
  229. return
  230. }
  231. logger.Infoln("Auto migration completed")
  232. // Connect to MinIO
  233. minioClient, err = minio.New(util.ExpandShellStringInfallible(appInitCfg.Minio.Host), &minio.Options{
  234. Creds: credentials.NewStaticV4(appInitCfg.Minio.AccessKey, appInitCfg.Minio.Secret, ""),
  235. Secure: false,
  236. })
  237. if err == nil {
  238. bucketExists, err := minioClient.BucketExists(context.Background(), appInitCfg.Minio.Bucket)
  239. if err != nil {
  240. logger.Fatalf("Failed to check if bucket %s exists: %v", appInitCfg.Minio.Bucket, err)
  241. }
  242. if !bucketExists {
  243. logger.Fatalf("Bucket %s does not exist", appInitCfg.Minio.Bucket)
  244. }
  245. }
  246. if err != nil {
  247. logger.Fatalf("Failed to connect to MinIO: %v", err)
  248. }
  249. logger.Infoln("Connected to MinIO")
  250. // Connect to ClickHouse
  251. ckConn, err = clickhouse.Open(&clickhouse.Options{
  252. Addr: []string{appInitCfg.Stck.Host},
  253. })
  254. if err == nil {
  255. err = ckConn.Ping(context.Background())
  256. }
  257. if err != nil {
  258. logger.Fatalf("Failed to connect to ClickHouse: %v", err)
  259. }
  260. logger.Infoln("Connected to ClickHouse")
  261. // OK! Everything is ready now.
  262. // objUploadChan := make(chan string, 1024*256)
  263. objUploadChan := util.NewDChan[string](1024 * 16)
  264. // Start the main work
  265. logger.Infoln("Starting main worker...")
  266. gAppExitWaitGroup.Add(1)
  267. go func() {
  268. defer gAppExitWaitGroup.Done()
  269. main_worker(AppCtx{db, minioClient, ckConn}, objUploadChan)
  270. }()
  271. // Wait on signal.
  272. signalChan := make(chan os.Signal, 1)
  273. signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
  274. select {
  275. case <-signalChan:
  276. logger.Infof("received signal, stopping minio-into-stck")
  277. case <-programmaticQuitChan:
  278. logger.Infof("received programmatic quit signal, stopping minio-into-stck")
  279. }
  280. gAppQuitting = true
  281. // HACK: Notify the main worker to quit
  282. objUploadChan.In() <- ""
  283. // Wait for the goroutines to exit
  284. gAppExitWaitGroup.Wait()
  285. logger.Infof("minio-into-stck stopped gracefully")
  286. }
  287. type AppCtx struct {
  288. db *gorm.DB
  289. minioClient *minio.Client
  290. ckConn driver.Conn
  291. }
  292. var gAppCfg *AppConfig
  293. type PartUploadArgs struct {
  294. StreamInfo *StreamMetadata
  295. StreamName string
  296. PartName string
  297. }
  298. func main_worker(app AppCtx, objUploadChan *util.DChan[string]) {
  299. ctx := context.Background()
  300. // Load config from DB
  301. appCfg, err := load_app_cfg_from_db(app.db)
  302. if err != nil {
  303. logger.Fatalf("Failed to load app config from DB: %v", err)
  304. }
  305. gAppCfg = &appCfg
  306. // Start the notification listener
  307. go func() {
  308. for {
  309. // Register the bucket notification listener
  310. logger.Infoln("Registering bucket notification listener...")
  311. notifys := app.minioClient.ListenBucketNotification(
  312. ctx, appInitCfg.Minio.Bucket, "", "", []string{string(notification.ObjectCreatedAll)})
  313. // Listen OK, start the full upload trigger to upload maybe missed files
  314. go trigger_full_upload(app, objUploadChan.In())
  315. for notifyInfo := range notifys {
  316. for _, record := range notifyInfo.Records {
  317. key := record.S3.Object.Key
  318. logger.Debugln("New object notification:", key)
  319. // Only care when `.zst` / `metadata.json` files are uploaded
  320. // keyParts := strings.Split(key, "/")
  321. // keyLastPart := keyParts[len(keyParts)-1]
  322. keyLastPart := filepath.Base(key)
  323. if keyLastPart != "metadata.json" && !strings.HasSuffix(keyLastPart, ".zst") {
  324. continue
  325. }
  326. // key = strings.Join(keyParts[:len(keyParts)-1], "/") + "/"
  327. key = filepath.Dir(key) + "/"
  328. // Remove fail counter so that the object can be retried instead of
  329. // being spuriously ignored
  330. mutexObjFailCounter.Lock()
  331. delete(objFailCounter, key)
  332. mutexObjFailCounter.Unlock()
  333. // Queue the object for upload
  334. objUploadChan.DelayedWrite(key, time.Duration(appInitCfg.Main.NotifyToUploadDelaySeconds)*time.Second)
  335. }
  336. if notifyInfo.Err != nil {
  337. logger.Errorf("Bucket notification listener error: %v", notifyInfo.Err)
  338. }
  339. }
  340. logger.Warnln("Bucket notification listener stopped unexpectedly, retrying in 5 seconds...")
  341. time.Sleep(5 * time.Second)
  342. // Clear fail counter
  343. mutexObjFailCounter.Lock()
  344. objFailCounter = make(map[string]int)
  345. mutexObjFailCounter.Unlock()
  346. }
  347. }()
  348. // Start the main loop (streams upload worker)
  349. for objToUpload := range objUploadChan.Out() {
  350. objUploadChan.MarkElemReadDone(objToUpload)
  351. if gAppQuitting {
  352. logger.Infof("Quitting, stopping main worker")
  353. return
  354. }
  355. if objToUpload == "" {
  356. continue
  357. }
  358. logger.Infoln("Checking stream object:", objToUpload)
  359. if object_is_blacklisted(objToUpload) {
  360. logger.Infof("Object `%s` is blacklisted, skipping", objToUpload)
  361. continue
  362. }
  363. mutexObjFailCounter.Lock()
  364. if objFailCounter[objToUpload] >= appInitCfg.Main.UploadRetryMaxTimes {
  365. logger.Warnf("Retried upload stream `%s` for too many %d times, give up", objToUpload, objFailCounter[objToUpload])
  366. delete(objFailCounter, objToUpload)
  367. mutexObjFailCounter.Unlock()
  368. continue
  369. }
  370. objFailCounter[objToUpload]++
  371. mutexObjFailCounter.Unlock()
  372. fullyUploaded, err := upload_one_stream(app, objToUpload)
  373. if err != nil {
  374. // Queue the object for retry
  375. logger.Warnf("Failed to upload stream `%s`: `%v`, retrying after %d seconds",
  376. objToUpload, err, appInitCfg.Main.FailedRetryDelaySeconds)
  377. objUploadChan.DelayedWrite(objToUpload, time.Duration(appInitCfg.Main.FailedRetryDelaySeconds)*time.Second)
  378. continue
  379. }
  380. if fullyUploaded {
  381. // Mark the stream as fully uploaded
  382. //err = app.db.Create(&UploadRecord{Key: objToUpload}).Error
  383. err = app.db.Where(UploadRecord{Key: objToUpload}).FirstOrCreate(&UploadRecord{}).Error
  384. if err != nil {
  385. logger.Warnf("Failed to mark stream %s as uploaded: %v", objToUpload, err)
  386. } else {
  387. // We can now remove the stream parts from the parts table
  388. err = app.db.Where("stream_name = ?", objToUpload).Delete(&PartUploadRecord{}).Error
  389. if err != nil {
  390. logger.Warnf("Failed to remove parts of stream %s from parts table: %v", objToUpload, err)
  391. }
  392. logger.Infof("Marked stream %s as fully uploaded", objToUpload)
  393. }
  394. // Remove entry from the fail counter
  395. mutexObjFailCounter.Lock()
  396. delete(objFailCounter, objToUpload)
  397. mutexObjFailCounter.Unlock()
  398. } else {
  399. // Queue the object for retry
  400. logger.Warnf("Stream %s is not fully uploaded, retrying after %d seconds",
  401. objToUpload, appInitCfg.Main.FailedRetryDelaySeconds)
  402. objUploadChan.DelayedWrite(objToUpload, time.Duration(appInitCfg.Main.FailedRetryDelaySeconds)*time.Second)
  403. }
  404. }
  405. }
  406. func trigger_full_upload(app AppCtx, objToUploadChan chan<- string) {
  407. // // Upload all files in the directory
  408. // options := minio.ListObjectsOptions{
  409. // Recursive: false,
  410. // }
  411. // objectsCh := app.minioClient.ListObjects(context.Background(), appInitCfg.Minio.Bucket, options)
  412. // // NOTE: Streams in minio are organized as `<date>/<stream_name>/`.
  413. // for objInfo := range objectsCh {
  414. // if objInfo.Err != nil {
  415. // logger.Warnf("Error listing bucket `%s` objects: %v", appInitCfg.Minio.Bucket, objInfo.Err)
  416. // continue
  417. // }
  418. // key := objInfo.Key
  419. // if strings.HasSuffix(key, "/") {
  420. // // Is a directory (<date>), go deeper into it
  421. // options := minio.ListObjectsOptions{
  422. // Prefix: key,
  423. // Recursive: false,
  424. // }
  425. // for objInfo := range app.minioClient.ListObjects(context.Background(), appInitCfg.Minio.Bucket, options) {
  426. // if objInfo.Err != nil {
  427. // logger.Warnf("Error listing bucket `%s` folder `%s` objects: %v", appInitCfg.Minio.Bucket, key, objInfo.Err)
  428. // continue
  429. // }
  430. // key := objInfo.Key
  431. // if strings.HasSuffix(key, "/") {
  432. // // Is a directory, should be a stream then
  433. // uploaded := object_already_uploaded(app, key)
  434. // if !uploaded {
  435. // objToUploadChan <- key
  436. // }
  437. // }
  438. // }
  439. // }
  440. // }
  441. // WARN: Slow!!!
  442. // options := minio.ListObjectsOptions{
  443. // Recursive: true,
  444. // }
  445. options := minio.ListObjectsOptions{
  446. Recursive: false,
  447. }
  448. objectsCh := app.minioClient.ListObjects(context.Background(), appInitCfg.Minio.Bucket, options)
  449. // NOTE: Streams in minio are organized as `<date>/<stream_name>/`.
  450. for objInfo := range objectsCh {
  451. if objInfo.Err != nil {
  452. logger.Warnf("Error listing bucket `%s` objects: %v", appInitCfg.Minio.Bucket, objInfo.Err)
  453. continue
  454. }
  455. key := objInfo.Key
  456. // // If it's a directory with `metadata.json` file, it should be a stream
  457. // if strings.HasSuffix(key, "/metadata.json") {
  458. // streamName := strings.TrimSuffix(key, "metadata.json")
  459. // uploaded := object_already_uploaded(app, streamName)
  460. // if !uploaded {
  461. // objToUploadChan <- streamName
  462. // }
  463. // }
  464. // Scan through all subdirectories
  465. if strings.HasSuffix(key, "/") {
  466. exists, err := util.MinioObjectExists(app.minioClient, appInitCfg.Minio.Bucket, key+"metadata.json")
  467. if err != nil {
  468. logger.Warnf("Error checking if object `%s` exists: %v", key+"metadata.json", err)
  469. continue
  470. }
  471. if exists {
  472. // Go ahead and upload the stream
  473. uploaded := object_already_uploaded(app, key)
  474. if !uploaded {
  475. objToUploadChan <- key
  476. }
  477. } else {
  478. // Check inner directories
  479. options := minio.ListObjectsOptions{
  480. Prefix: key,
  481. Recursive: false,
  482. }
  483. for objInfo := range app.minioClient.ListObjects(context.Background(), appInitCfg.Minio.Bucket, options) {
  484. if objInfo.Err != nil {
  485. logger.Warnf("Error listing bucket `%s` folder `%s` objects: %v", appInitCfg.Minio.Bucket, key, objInfo.Err)
  486. continue
  487. }
  488. key := objInfo.Key
  489. if strings.HasSuffix(key, "/") {
  490. // Is a directory, should be a stream then
  491. uploaded := object_already_uploaded(app, key)
  492. if !uploaded {
  493. objToUploadChan <- key
  494. }
  495. }
  496. }
  497. }
  498. }
  499. }
  500. logger.Infoln("Full upload trigger finished")
  501. }
  502. func upload_one_stream(app AppCtx, streamName string) (fullyUploaded bool, err error) {
  503. type StreamObjectUploadStatistics struct {
  504. StartTime time.Time
  505. EndTime time.Time
  506. PartName string
  507. UpState string // "repeated" / "ok" / "fail"
  508. Msg string // "error message"
  509. }
  510. type StreamUploadStatistics struct {
  511. StartTime time.Time
  512. EndTime time.Time
  513. StreamName string
  514. MetaPointsCount int64
  515. MetaPartsCount int64
  516. Objects map[string]StreamObjectUploadStatistics
  517. Msg string
  518. }
  519. streamStats := StreamUploadStatistics{
  520. StartTime: time.Now(),
  521. StreamName: streamName,
  522. Objects: make(map[string]StreamObjectUploadStatistics),
  523. }
  524. logger.Infof("Going to upload stream `%s`", streamName)
  525. if object_already_uploaded(app, streamName) {
  526. logger.Infof("Stream `%s` is already uploaded", streamName)
  527. return true, nil
  528. }
  529. defer func() {
  530. streamStats.EndTime = time.Now()
  531. repeatedCount := int64(0)
  532. okCount := int64(0)
  533. failCount := int64(0)
  534. totalCount := int64(0)
  535. objDetailsMsg := ""
  536. for _, obj := range streamStats.Objects {
  537. totalCount++
  538. if obj.UpState == "repeated" {
  539. repeatedCount++
  540. objDetailsMsg += fmt.Sprintf("\n{%s: REPEATED (t: %v ~ %v, duration: %v)}",
  541. obj.PartName, util.FmtMyTime(obj.StartTime), util.FmtMyTime(obj.EndTime), obj.EndTime.Sub(obj.StartTime))
  542. } else if obj.UpState == "ok" {
  543. okCount++
  544. objDetailsMsg += fmt.Sprintf("\n{%s: OK (t: %v ~ %v, duration: %v)}",
  545. obj.PartName, util.FmtMyTime(obj.StartTime), util.FmtMyTime(obj.EndTime), obj.EndTime.Sub(obj.StartTime))
  546. } else {
  547. failCount++
  548. objDetailsMsg += fmt.Sprintf("\n{%s: FAIL (Reason: %s) (t: %v ~ %v, duration: %v)}",
  549. obj.PartName, obj.Msg, util.FmtMyTime(obj.StartTime), util.FmtMyTime(obj.EndTime), obj.EndTime.Sub(obj.StartTime))
  550. }
  551. }
  552. statLogger.Infof("Upload stream `%s` took %v (%v ~ %v); is fully uploaded: %v; metadata-reported part count=%d; "+
  553. "repeated=%d, ok=%d, fail=%d, total=%d; details:%s",
  554. streamName, streamStats.EndTime.Sub(streamStats.StartTime),
  555. util.FmtMyTime(streamStats.StartTime), util.FmtMyTime(streamStats.EndTime),
  556. fullyUploaded, streamStats.MetaPartsCount,
  557. repeatedCount, okCount, failCount, totalCount, objDetailsMsg)
  558. }()
  559. fullyUploaded = true
  560. // Get stream metadata
  561. streamInfo, err := get_stream_metadata(app, streamName)
  562. if err != nil {
  563. // Cannot continue without metadata
  564. return false, err
  565. }
  566. streamStats.MetaPointsCount = streamInfo.TotalPoints
  567. streamStats.MetaPartsCount = streamInfo.PartsCount
  568. if streamInfo.PartsCount == 0 {
  569. // Edge device didn't finish uploading the stream yet
  570. logger.Debugf("Marking stream `%s` as not fully uploaded, reason: parts_count=0", streamName)
  571. fullyUploaded = false
  572. }
  573. hasSomething := false
  574. // Upload parts
  575. streamObjPath := streamName
  576. options := minio.ListObjectsOptions{
  577. Prefix: streamObjPath,
  578. Recursive: false,
  579. }
  580. logger.Tracef("Listing minio objects in `%s`, bucket `%s`", streamObjPath, appInitCfg.Minio.Bucket)
  581. // hasSomething := false
  582. hasMetadata := false
  583. for objInfo := range app.minioClient.ListObjects(context.Background(), appInitCfg.Minio.Bucket, options) {
  584. if objInfo.Err != nil {
  585. return false, objInfo.Err
  586. }
  587. if gAppQuitting {
  588. logger.Infof("Quitting, stopping uploading one stream")
  589. return false, nil
  590. }
  591. logger.Tracef("Checking minio file `%s`", objInfo.Key)
  592. if strings.HasSuffix(objInfo.Key, "/") {
  593. continue
  594. }
  595. partName := filepath.Base(objInfo.Key)
  596. if partName == "metadata.json" {
  597. hasMetadata = true
  598. continue
  599. }
  600. hasSomething = true
  601. objStat := StreamObjectUploadStatistics{
  602. StartTime: time.Now(),
  603. PartName: partName,
  604. }
  605. if part_already_uploaded(app, streamName, partName) {
  606. objStat.EndTime = time.Now()
  607. objStat.UpState = "repeated"
  608. streamStats.Objects[objInfo.Key] = objStat
  609. logger.Infof("Part `%s` of stream `%s` is already uploaded", objInfo.Key, streamName)
  610. continue
  611. }
  612. if fullyUploaded {
  613. fullyUploaded = false
  614. logger.Debugf("Marking stream `%s` as not fully uploaded, reason: part `%s` not uploaded", streamName, objInfo.Key)
  615. }
  616. // Do the parts upload
  617. partInfo := PartUploadArgs{StreamInfo: streamInfo, StreamName: streamName, PartName: objInfo.Key}
  618. logger.Infof("Uploading part `%s` (total %d) of stream `%s`, total_points=%d",
  619. partInfo.PartName, partInfo.StreamInfo.PartsCount,
  620. partInfo.StreamName, partInfo.StreamInfo.TotalPoints)
  621. err := upload_one_part(app, partInfo.StreamInfo, partInfo.StreamName, partInfo.PartName)
  622. if err != nil {
  623. objStat.EndTime = time.Now()
  624. objStat.UpState = "fail"
  625. objStat.Msg = err.Error()
  626. logger.Warnf("Failed to upload part `%s` of stream `%s` (took %v): %v", partInfo.PartName, partInfo.StreamName,
  627. objStat.EndTime.Sub(objStat.StartTime), err)
  628. fullyUploaded = false
  629. } else {
  630. // Mark the part as uploaded
  631. //err = app.db.Create(&PartUploadRecord{StreamName: partInfo.StreamName, PartName: partInfo.PartName}).Error
  632. part := PartUploadRecord{StreamName: partInfo.StreamName, PartName: partName}
  633. err = app.db.Where(part).FirstOrCreate(&PartUploadRecord{}).Error
  634. if err != nil {
  635. logger.Warnf("Failed to mark part `%s` of stream `%s` as uploaded: %v", partInfo.PartName, partInfo.StreamName, err)
  636. }
  637. objStat.EndTime = time.Now()
  638. objStat.UpState = "ok"
  639. logger.Infof("Uploaded part `%s` of stream `%s`, took %v", objInfo.Key, streamName,
  640. objStat.EndTime.Sub(objStat.StartTime))
  641. }
  642. streamStats.Objects[objInfo.Key] = objStat
  643. }
  644. if !hasMetadata {
  645. logger.Warnf("Stream `%s` has no metadata file, will retry later", streamName)
  646. fullyUploaded = false
  647. }
  648. if !hasSomething {
  649. if streamInfo.PartsCount != 0 {
  650. logger.Errorf("Stream `%s` has no parts, but claims to have %d parts, %d points???",
  651. streamName, streamInfo.PartsCount, streamInfo.TotalPoints)
  652. } else {
  653. logger.Warnf("Stream `%s` has no parts in minio, will retry later", streamName)
  654. }
  655. fullyUploaded = false
  656. }
  657. return fullyUploaded, nil
  658. }
  659. func upload_one_part(app AppCtx, streamInfo *StreamMetadata, streamName string, partObjPath string) (err error) {
  660. partName := filepath.Base(partObjPath)
  661. if part_already_uploaded(app, streamName, partName) {
  662. return nil
  663. }
  664. dryRun := false
  665. if !dryRun {
  666. // Get the part data from MinIO
  667. obj, err := app.minioClient.GetObject(context.Background(), appInitCfg.Minio.Bucket, partObjPath,
  668. minio.GetObjectOptions{})
  669. if err != nil {
  670. return fmt.Errorf("failed to get part `%s` data: %w", partObjPath, err)
  671. }
  672. defer obj.Close()
  673. // Read the part data
  674. var partDataBuf = new(bytes.Buffer)
  675. _, err = partDataBuf.ReadFrom(obj)
  676. if err != nil {
  677. return fmt.Errorf("failed to read part `%s` data: %w", partObjPath, err)
  678. }
  679. // Process the part data
  680. partData, err := util.DecompressZstdBuffer(partDataBuf.Bytes())
  681. if err != nil {
  682. return fmt.Errorf("failed to decompress part `%s` data: %w", partObjPath, err)
  683. }
  684. // Use regex to extract the part index from the part name
  685. partIndex := int64(0)
  686. {
  687. re := regexp.MustCompile(`part_(\d+)\.zst`)
  688. matches := re.FindStringSubmatch(partObjPath)
  689. if len(matches) != 2 {
  690. return fmt.Errorf("failed to extract part index from part name `%s`", partObjPath)
  691. }
  692. partIndex, err = strconv.ParseInt(matches[1], 10, 64)
  693. if err != nil {
  694. return fmt.Errorf("failed to convert part index `%s` to integer: %w", matches[1], err)
  695. }
  696. // Check if the part index is correct
  697. if partIndex < 0 || (streamInfo.PartsCount != 0 && partIndex >= streamInfo.PartsCount) {
  698. return fmt.Errorf("part `%s` index out of bounds: %d / %d", partObjPath, partIndex, streamInfo.PartsCount)
  699. }
  700. // Check if the part data size is correct
  701. if streamInfo.PartsCount != 0 {
  702. left := int64(len(partData))
  703. if partIndex < streamInfo.PartsCount-1 {
  704. right := streamInfo.PointsPerPart * 8
  705. if left != right {
  706. return fmt.Errorf("part `%s` data size mismatch: %d versus %d", partObjPath, left, right)
  707. }
  708. } else if partIndex == streamInfo.PartsCount-1 {
  709. right := (streamInfo.TotalPoints % streamInfo.PointsPerPart) * 8
  710. if right == 0 {
  711. right = streamInfo.PointsPerPart * 8
  712. }
  713. if left != right {
  714. return fmt.Errorf("part `%s` data size mismatch: %d versus %d", partObjPath, left, right)
  715. }
  716. } else {
  717. return fmt.Errorf("part `%s` index out of bounds: %d", partObjPath, partIndex)
  718. }
  719. }
  720. }
  721. partPointsCount := len(partData) / 8
  722. // Insert the part data into ClickHouse
  723. customInsertFn := func() error {
  724. ctx := context.Background()
  725. sql := fmt.Sprintf("INSERT INTO %s (metric_name, point_name, tags, value, nanoseconds) "+
  726. "SELECT %s, %s, %s, col1, col2 FROM input('col1 Float64, col2 Int64') FORMAT NATIVE",
  727. appInitCfg.Stck.Table,
  728. util.ToSqlLiteral(streamInfo.MetricName), util.ToSqlLiteral(streamInfo.Name), "[]")
  729. c, err := ch.Dial(ctx, ch.Options{Address: appInitCfg.Stck.Host})
  730. if err != nil {
  731. return fmt.Errorf("failed to connect to stck: %w", err)
  732. }
  733. /*
  734. ┌─name────────┬─type────────────────────┬─default_type─┬─default_expression─┬─comment───────────────────┬─codec_expression─┬─ttl_expression─┐
  735. │ metric_name │ LowCardinality(String) │ │ │ Metric name │ │ │
  736. │ point_name │ LowCardinality(String) │ │ │ Point name │ │ │
  737. │ tags │ Map(String, String) │ │ │ Point tags │ │ │
  738. │ value │ Float64 │ │ │ Point value │ │ │
  739. │ nanoseconds │ Int64 │ │ │ Point time in nanoseconds │ DoubleDelta, LZ4 │ │
  740. └─────────────┴─────────────────────────┴──────────────┴────────────────────┴───────────────────────────┴──────────────────┴────────────────┘
  741. */
  742. logger.Debugf("Going to insert %d points for part `%s` of stream `%s`", partPointsCount, partObjPath, streamName)
  743. colValue := make(proto.ColFloat64, 0, partPointsCount)
  744. colNanoseconds := make(proto.ColInt64, 0, partPointsCount)
  745. for i := 0; i < partPointsCount; i++ {
  746. value := math.Float64frombits(binary.LittleEndian.Uint64(partData[i*8 : (i+1)*8]))
  747. // NOTE: TimestampOffset is in milliseconds, need to convert to nanoseconds
  748. nanoseconds := streamInfo.TimestampOffset * 1e6
  749. nanoseconds += int64(partIndex) * int64(streamInfo.PointsPerPart) * streamInfo.Interval
  750. nanoseconds += int64(i) * streamInfo.Interval
  751. colValue = append(colValue, value)
  752. colNanoseconds = append(colNanoseconds, nanoseconds)
  753. }
  754. input := proto.Input{
  755. {Name: "col1", Data: &colValue},
  756. {Name: "col2", Data: &colNanoseconds},
  757. }
  758. err = c.Do(ctx, ch.Query{
  759. Body: sql,
  760. Input: input,
  761. })
  762. if err != nil {
  763. return fmt.Errorf("failed to insert part into stck: %w", err)
  764. }
  765. logger.Debugf("Inserted %d points for part `%s` of stream `%s`", partPointsCount, partObjPath, streamName)
  766. return nil
  767. }
  768. err = customInsertFn()
  769. if err != nil {
  770. return fmt.Errorf("failed to insert part `%s` data into stck: %w", partObjPath, err)
  771. }
  772. // We made progress, reset the fail counter
  773. mutexObjFailCounter.Lock()
  774. objFailCounter[streamName] = 0
  775. mutexObjFailCounter.Unlock()
  776. }
  777. return nil
  778. }
  779. func load_app_cfg_from_db(db *gorm.DB) (AppConfig, error) {
  780. var cfg AppConfig
  781. err := db.AutoMigrate(&AppConfigDbEntry{})
  782. if err != nil {
  783. return cfg, err
  784. }
  785. //db.Create(&AppConfigDbEntry{Key: "ImportBlacklist", Value: `[]`})
  786. var dbEntry AppConfigDbEntry
  787. result := db.Where(AppConfigDbEntry{Key: "ImportBlacklist"}).Attrs(AppConfigDbEntry{Value: `[]`}).FirstOrCreate(&dbEntry)
  788. if result.Error != nil {
  789. //if errors.Is(result.Error, gorm.ErrRecordNotFound) {
  790. // dbEntry.Value = `[]`
  791. //} else {
  792. // return cfg, result.Error
  793. //}
  794. return cfg, result.Error
  795. }
  796. err = json.Unmarshal([]byte(dbEntry.Value), &cfg.ImportBlacklist)
  797. if err != nil {
  798. return cfg, err
  799. }
  800. return cfg, nil
  801. }
  802. func object_already_uploaded(app AppCtx, key string) bool {
  803. var record UploadRecord
  804. result := app.db.First(&record, "key", key)
  805. return result.Error == nil
  806. }
  807. func object_is_blacklisted(key string) bool {
  808. for _, regexPattern := range gAppCfg.ImportBlacklist {
  809. // TODO: Cache compiled regex patterns
  810. if matched, _ := regexp.MatchString(regexPattern, key); matched {
  811. return true
  812. }
  813. }
  814. return false
  815. }
  816. func part_already_uploaded(app AppCtx, streamName string, partName string) bool {
  817. var record PartUploadRecord
  818. result := app.db.First(&record, "stream_name = ? AND part_name = ?", streamName, partName)
  819. return result.Error == nil
  820. }
  821. func get_stream_metadata(app AppCtx, streamName string) (*StreamMetadata, error) {
  822. // Get the stream metadata from MinIO
  823. metadataObjPath := streamName + "metadata.json"
  824. obj, err := app.minioClient.GetObject(context.Background(), appInitCfg.Minio.Bucket,
  825. metadataObjPath, minio.GetObjectOptions{})
  826. if err != nil {
  827. return nil, fmt.Errorf("failed to get stream metadata: %w", err)
  828. }
  829. defer obj.Close()
  830. var streamInfo StreamMetadata
  831. err = json.NewDecoder(obj).Decode(&streamInfo)
  832. if err != nil {
  833. return nil, fmt.Errorf("failed to decode stream metadata: %w", err)
  834. }
  835. return &streamInfo, nil
  836. }