main.go 39 KB


  1. package main
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/binary"
  6. "encoding/json"
  7. "example/minio-into-stck/util"
  8. "fmt"
  9. "os"
  10. "os/exec"
  11. "os/signal"
  12. "path/filepath"
  13. "stck/stck-nsq-msg"
  14. smsg "stck/stck-nsq-msg/msg"
  15. "sync"
  16. "syscall"
  17. "io"
  18. // "log"
  19. "math"
  20. // "os"
  21. "regexp"
  22. "strconv"
  23. "strings"
  24. "time"
  25. "github.com/ClickHouse/clickhouse-go/v2"
  26. "github.com/ClickHouse/clickhouse-go/v2/lib/driver"
  27. "github.com/fsnotify/fsnotify"
  28. "github.com/minio/minio-go/v7"
  29. "github.com/minio/minio-go/v7/pkg/credentials"
  30. "github.com/minio/minio-go/v7/pkg/notification"
  31. "github.com/natefinch/lumberjack"
  32. "github.com/nsqio/go-nsq"
  33. "github.com/sirupsen/logrus"
  34. "github.com/spf13/viper"
  35. "gorm.io/driver/mysql"
  36. "gorm.io/gorm"
  37. gorm_logger "gorm.io/gorm/logger"
  38. )
  39. // AppInitConfig is the initial configuration of the application, loaded from the config file.
  40. type AppInitConfig struct {
  41. Mysql struct {
  42. User string `mapstructure:"user"`
  43. Password string `mapstructure:"password"`
  44. Host string `mapstructure:"host"`
  45. Database string `mapstructure:"database"`
  46. } `mapstructure:"mysql"`
  47. Minio struct {
  48. AccessKey string `mapstructure:"accessKey"`
  49. Secret string `mapstructure:"secret"`
  50. Bucket string `mapstructure:"bucket"`
  51. Host string `mapstructure:"host"`
  52. } `mapstructure:"minio"`
  53. Stck struct {
  54. Host string `mapstructure:"host"`
  55. Table string `mapstructure:"table"`
  56. } `mapstructure:"stck"`
  57. Nsq struct {
  58. TcpAddr string `mapstructure:"tcpAddr"`
  59. LookupdHttpAddr string `mapstructure:"lookupdHttpAddr"`
  60. } `mapstructure:"nsq"`
  61. Main struct {
  62. UploadRetryMaxTimes int `mapstructure:"uploadRetryMaxTimes"`
  63. FailedRetryDelaySeconds int `mapstructure:"failedRetryDelaySeconds"`
  64. NotifyToUploadDelaySeconds int `mapstructure:"notifyToUploadDelaySeconds"`
  65. } `mapstructure:"main"`
  66. }
  67. type AppConfig struct {
  68. // List of name regex patterns to exclude from import.
  69. ImportBlacklist []string
  70. }
  71. type AppConfigDbEntry struct {
  72. Key string `gorm:"primaryKey"`
  73. Value string
  74. }
  75. // TODO: 插入时先创建行,插入完后更新插入耗时
  76. // UploadRecord Represents a record of an uploaded stream. (A major upload task.)
  77. type UploadRecord struct {
  78. Key string `gorm:"primaryKey"`
  79. CreatedAt time.Time
  80. }
  81. // PartUploadRecord Represents a record of an uploaded part of a stream. (A minor upload task.)
  82. type PartUploadRecord struct {
  83. StreamName string `gorm:"primaryKey"`
  84. PartName string `gorm:"primaryKey"`
  85. CreatedAt time.Time
  86. }
  87. type StreamMetadata struct {
  88. MetricName string `json:"metric_name"`
  89. Name string `json:"name"`
  90. TimestampOffset int64 `json:"timestamp_offset"`
  91. Interval int64 `json:"interval"`
  92. PartsCount int64 `json:"parts_count"`
  93. PointsPerPart int64 `json:"points_per_part"`
  94. TotalPoints int64 `json:"total_points"`
  95. }
  96. var gLocalIpAddress string
  97. var gMachineID string
  98. var gNsqProducer *nsq.Producer
  99. var gNsqConsumer *nsq.Consumer
  100. var gAppStartTime time.Time = time.Now()
  101. var programmaticQuitChan chan struct{} = make(chan struct{}, 1)
  102. var gAppQuitting = false
  103. var gAppExitWaitGroup sync.WaitGroup
  104. var buildtime string
  105. func MakeHeartbeatMsg() *smsg.DeviceHeartbeatMsg {
  106. m := smsg.MakeDeviceHeartbeatMsg(gMachineID, gAppStartTime.UnixNano(), time.Now().UnixNano(),
  107. gLocalIpAddress, "minio-into-stck", "0.1.0+dev."+buildtime)
  108. return &m
  109. }
  110. func PublishMessage(msg stcknsqmsg.StckNsqMsgVariant) error {
  111. payload, err := stcknsqmsg.ToStckNsqMsgString(msg)
  112. if err != nil {
  113. return fmt.Errorf("marshal message error: %w", err)
  114. }
  115. err = gNsqProducer.Publish(smsg.ServerDoNotifyTopic, []byte(payload))
  116. if err != nil {
  117. return fmt.Errorf("publish message error: %w", err)
  118. }
  119. return nil
  120. }
  121. func initNsq() {
  122. ip, err := stcknsqmsg.GetLocalIP()
  123. if err != nil {
  124. logger.Warnf("GetLocalIP error: %s", err)
  125. } else {
  126. gLocalIpAddress = ip.String()
  127. logger.Infof("Local IP: %s", gLocalIpAddress)
  128. }
  129. gMachineID = stcknsqmsg.MakeUniqueMachineID()
  130. logger.Infof("Machine ID: %s", gMachineID)
  131. fmt.Printf("IP: %s, Machine ID: %s\n", gLocalIpAddress, gMachineID)
  132. // Connect to NSQ
  133. nsqConfig := nsq.NewConfig()
  134. gNsqProducer, err = nsq.NewProducer(appInitCfg.Nsq.TcpAddr, nsqConfig)
  135. if err != nil {
  136. logger.Fatalf("NSQ Producer init error: %s", err)
  137. }
  138. gNsqConsumer, err = nsq.NewConsumer(smsg.ClientDoActionTopic, gMachineID, nsqConfig)
  139. if err != nil {
  140. logger.Fatalf("NSQ Consumer init error: %s", err)
  141. }
  142. gNsqConsumer.AddConcurrentHandlers(nsq.HandlerFunc(func(message *nsq.Message) error {
  143. logger.Debugf("NSQ Consumer received message: %s", message.Body)
  144. // Parse the message
  145. recvTime := message.Timestamp
  146. msg, err := stcknsqmsg.FromString(string(message.Body))
  147. if err != nil {
  148. logger.Errorf("NSQ Consumer unmarshal message error: %s", err)
  149. return nil
  150. }
  151. // Process the message
  152. switch data := msg.Data.(type) {
  153. case *smsg.DevicePingMsg:
  154. if !stcknsqmsg.DeviceIdMatches(data.DeviceID, gMachineID) {
  155. break
  156. }
  157. // Write pong
  158. pongMsg := smsg.MakeDevicePongMsg(gMachineID, recvTime)
  159. err := PublishMessage(&pongMsg)
  160. if err != nil {
  161. logger.Errorf("send pong error: %s", err)
  162. }
  163. case *smsg.RequestDeviceExecuteShellScriptMsg:
  164. if !stcknsqmsg.DeviceIdMatches(data.DeviceID, gMachineID) {
  165. break
  166. }
  167. // Execute the shell script
  168. resp := smsg.MakeDeviceActionDoneMsg(gMachineID, data.MsgType(), "", recvTime, -1, "")
  169. result, err := func(data *smsg.RequestDeviceExecuteShellScriptMsg) (string, error) {
  170. cmd := exec.Command("bash", "-c", data.Script)
  171. var out bytes.Buffer
  172. cmd.Stdout = &out
  173. cmd.Stderr = &out
  174. err := cmd.Run()
  175. return out.String(), err
  176. }(data)
  177. if err != nil {
  178. errMsg := fmt.Sprintf("execute shell script error:\n%s\n\noutput:\n%s", err, result)
  179. // Write error message
  180. resp.Status = -1
  181. resp.Msg = errMsg
  182. } else {
  183. // Write output
  184. resp.Status = 0
  185. resp.Msg = result
  186. }
  187. err = PublishMessage(&resp)
  188. if err != nil {
  189. logger.Errorf("send action done error: %s", err)
  190. }
  191. case *smsg.RequestDeviceUpdateMsg:
  192. if !stcknsqmsg.DeviceIdMatches(data.DeviceID, gMachineID) {
  193. break
  194. }
  195. if data.ServiceName != "minio-into-stck" {
  196. break
  197. }
  198. resp := smsg.MakeDeviceActionDoneMsg(gMachineID, data.MsgType(), "", recvTime, 0, "")
  199. result, err := func(data *smsg.RequestDeviceUpdateMsg) (string, error) {
  200. // Download the update file to /tmp
  201. downloadPath := "/tmp/minio-into-stck-updater"
  202. updateScriptPath := filepath.Join(downloadPath, "update.sh")
  203. var out bytes.Buffer
  204. err := os.RemoveAll(downloadPath)
  205. if err != nil {
  206. return "", err
  207. }
  208. err = os.MkdirAll(downloadPath, 0777)
  209. if err != nil {
  210. return "", err
  211. }
  212. updateScriptContent := fmt.Sprintf(`#!/bin/bash
  213. set -e
  214. cd %s
  215. wget --tries=3 -nv -O installer.tar.gz %s
  216. tar -xzf installer.tar.gz
  217. cd minio-into-stck-installer
  218. ./replacing-update.sh
  219. `, downloadPath, data.ServiceBinaryURL)
  220. err = os.WriteFile(updateScriptPath, []byte(updateScriptContent), 0777)
  221. if err != nil {
  222. return "", err
  223. }
  224. // Execute the update script
  225. cmd := exec.Command("bash", "-c", updateScriptPath)
  226. cmd.Stdout = &out
  227. cmd.Stderr = &out
  228. err = cmd.Run()
  229. return out.String(), err
  230. }(data)
  231. if err != nil {
  232. errMsg := fmt.Sprintf("execute update process error:\n%s\n\noutput:\n%s", err, result)
  233. // Write error message
  234. resp.Status = -1
  235. resp.Msg = errMsg
  236. } else {
  237. // Write output
  238. resp.Status = 0
  239. resp.Msg = "executed update process successfully\n\noutput:\n" + result
  240. }
  241. err = PublishMessage(&resp)
  242. if err != nil {
  243. logger.Errorf("send action done error: %s", err)
  244. }
  245. case *smsg.ForceDeviceRebootMsg:
  246. if !stcknsqmsg.DeviceIdMatches(data.DeviceID, gMachineID) {
  247. break
  248. }
  249. programmaticQuitChan <- struct{}{}
  250. case *smsg.ForceDeviceSendHeartbeatMsg:
  251. if !stcknsqmsg.DeviceIdMatches(data.DeviceID, gMachineID) {
  252. break
  253. }
  254. // Send heartbeat
  255. heartBeatMsg := MakeHeartbeatMsg()
  256. err := PublishMessage(heartBeatMsg)
  257. if err != nil {
  258. logger.Errorf("send heartbeat error: %s", err)
  259. }
  260. case *smsg.RequestDeviceUploadLogsToMinioMsg:
  261. if !stcknsqmsg.DeviceIdMatches(data.DeviceID, gMachineID) {
  262. break
  263. }
  264. // Upload logs to MinIO
  265. resp := smsg.MakeDeviceActionDoneMsg(gMachineID, data.MsgType(), "", recvTime, 0, "")
  266. minioCreds := credentials.NewStaticV4(appInitCfg.Minio.AccessKey, appInitCfg.Minio.Secret, "")
  267. minioClient, err := minio.New(appInitCfg.Minio.Host, &minio.Options{
  268. Creds: minioCreds,
  269. Secure: false,
  270. })
  271. if err != nil {
  272. logger.Errorf("MinIO Client init error: %s", err)
  273. resp.Status = -1
  274. resp.Msg += fmt.Sprintf("MinIO Client init error: %s\n", err)
  275. } else {
  276. if util.FileExists("./logs") {
  277. err = util.MinioUploadFolder(minioClient, data.RemoteBucket,
  278. filepath.Join(data.RemoteBasePath, gMachineID, "logs"), "logs")
  279. if err != nil {
  280. logger.Errorf("upload logs to MinIO error: %s", err)
  281. resp.Status = -1
  282. resp.Msg += fmt.Sprintf("upload logs to MinIO error: %s\n", err)
  283. }
  284. }
  285. if util.FileExists("./log") {
  286. err = util.MinioUploadFolder(minioClient, data.RemoteBucket,
  287. filepath.Join(data.RemoteBasePath, gMachineID, "log"), "log")
  288. if err != nil {
  289. logger.Errorf("upload log to MinIO error: %s", err)
  290. resp.Status = -1
  291. resp.Msg += fmt.Sprintf("upload log to MinIO error: %s\n", err)
  292. }
  293. }
  294. }
  295. err = PublishMessage(&resp)
  296. if err != nil {
  297. logger.Errorf("send action done error: %s", err)
  298. }
  299. default:
  300. logger.Debugf("NSQ Consumer ignored unknown or uninteresting message: %v", msg)
  301. }
  302. // Notify NSQ that the message is processed successfully
  303. return nil
  304. }), 1)
  305. // err = gNsqConsumer.ConnectToNSQLookupd(gAppConfig.Nsq.LookupdHttpAddr)
  306. err = gNsqConsumer.ConnectToNSQD(appInitCfg.Nsq.TcpAddr)
  307. if err != nil {
  308. logger.Fatalf("NSQ Consumer connect error: %s", err)
  309. }
  310. }
  311. var appInitCfg *AppInitConfig = &AppInitConfig{}
  312. func initLoadConfig() {
  313. viper.SetDefault("main.uploadRetryMaxTimes", 20)
  314. viper.SetDefault("main.failedRetryDelaySeconds", 5)
  315. viper.SetDefault("main.notifyToUploadDelaySeconds", 1)
  316. viper.SetConfigFile("./config/application.yaml")
  317. viper.WatchConfig()
  318. viper.OnConfigChange(func(e fsnotify.Event) {
  319. logger.Infoln("Config file changed:", e.Name)
  320. var newAppInitConfig AppInitConfig
  321. err := viper.Unmarshal(&newAppInitConfig)
  322. if err != nil {
  323. logger.Infoln("Failed to unmarshal config:", err)
  324. return
  325. }
  326. appInitCfg = &newAppInitConfig
  327. })
  328. err := viper.ReadInConfig()
  329. if err != nil {
  330. logger.Fatalf("Failed to read config file: %v", err)
  331. }
  332. err = viper.Unmarshal(appInitCfg)
  333. if err != nil {
  334. logger.Errorf("Failed to unmarshal config: %v, exiting...", err)
  335. fmt.Printf("Failed to unmarshal config: %v, exiting...\n", err)
  336. os.Exit(1)
  337. }
  338. }
  339. func initLog() *logrus.Logger {
  340. // 主日志文件
  341. log := &lumberjack.Logger{
  342. Filename: "./log/minio-into-stck.log", // 日志文件的位置
  343. MaxSize: 50, // 文件最大尺寸(以MB为单位)
  344. MaxBackups: 5, // 保留的最大旧文件数量
  345. MaxAge: 28, // 保留旧文件的最大天数
  346. Compress: true, // 是否压缩/归档旧文件
  347. LocalTime: true, // 使用本地时间创建时间戳
  348. }
  349. // 错误日志文件
  350. errorLog := &lumberjack.Logger{
  351. Filename: "./log/minio-into-stck.error.log", // 错误日志文件的位置
  352. MaxSize: 50, // 文件最大尺寸(以MB为单位)
  353. MaxBackups: 5, // 保留的最大旧文件数量
  354. MaxAge: 28, // 保留旧文件的最大天数
  355. Compress: true, // 是否压缩/归档旧文件
  356. LocalTime: true, // 使用本地时间创建时间戳
  357. }
  358. // 统计日志文件
  359. statLog := &lumberjack.Logger{
  360. Filename: "./log/minio-into-stck.stat.log", // 统计日志文件的位置
  361. MaxSize: 50, // 文件最大尺寸(以MB为单位)
  362. MaxBackups: 5, // 保留的最大旧文件数量
  363. MaxAge: 28, // 保留旧文件的最大天数
  364. Compress: true, // 是否压缩/归档旧文件
  365. LocalTime: true, // 使用本地时间创建时间戳
  366. }
  367. logger := logrus.New()
  368. if strings.ToLower(os.Getenv("LOG_LEVEL")) == "trace" {
  369. logger.SetLevel(logrus.TraceLevel)
  370. } else {
  371. logger.SetLevel(logrus.DebugLevel)
  372. }
  373. logger.Out = log
  374. // logger.Out = io.MultiWriter(os.Stdout, log)
  375. // 设置错误级别日志输出到额外的文件
  376. logger.AddHook(&ErrorHook{
  377. // Writer: errorLog,
  378. Writer: io.MultiWriter(os.Stderr, errorLog),
  379. LogLevels: []logrus.Level{
  380. logrus.ErrorLevel,
  381. logrus.FatalLevel,
  382. logrus.PanicLevel,
  383. },
  384. })
  385. statLogger = logrus.New()
  386. statLogger.SetLevel(logrus.InfoLevel)
  387. statLogger.Out = statLog
  388. return logger
  389. }
  390. // ErrorHook 用于将错误级别的日志输出到额外的文件
  391. type ErrorHook struct {
  392. Writer io.Writer
  393. LogLevels []logrus.Level
  394. }
  395. func (hook *ErrorHook) Fire(entry *logrus.Entry) error {
  396. line, err := entry.String()
  397. if err != nil {
  398. return err
  399. }
  400. _, err = hook.Writer.Write([]byte(line))
  401. return err
  402. }
  403. func (hook *ErrorHook) Levels() []logrus.Level {
  404. return hook.LogLevels
  405. }
  406. var logger *logrus.Logger
  407. var statLogger *logrus.Logger
  408. var mutexObjFailCounter = &sync.Mutex{}
  409. var objFailCounter map[string]int = make(map[string]int)
  410. func main() {
  411. var err error
  412. fmt.Println("Starting minio-into-stck, build time:", buildtime)
  413. logger = initLog()
  414. logger.Warnln("Logger initialized.")
  415. // Load configuration from file
  416. initLoadConfig()
  417. // 初始化 NSQ
  418. initNsq()
  419. var db *gorm.DB
  420. var minioClient *minio.Client
  421. var ckConn driver.Conn
  422. // Connect to MySQL
  423. dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=True&loc=Local",
  424. appInitCfg.Mysql.User, appInitCfg.Mysql.Password, appInitCfg.Mysql.Host, appInitCfg.Mysql.Database)
  425. db, err = gorm.Open(mysql.Open(dsn), &gorm.Config{})
  426. if err != nil {
  427. logger.Fatalf("Failed to connect to MySQL: %v", err)
  428. }
  429. // Disable logging
  430. db.Logger = gorm_logger.Discard
  431. // Get the underlying sql.DB object to close the connection later
  432. sqlDB, err := db.DB()
  433. if err != nil {
  434. logger.Fatalf("Failed to get MySQL DB: %v", err)
  435. }
  436. defer sqlDB.Close()
  437. // Ping the database to check if the connection is successful
  438. err = sqlDB.Ping()
  439. if err != nil {
  440. logger.Infof("ping db error: %v", err)
  441. return
  442. }
  443. logger.Infoln("Database connection successful")
  444. // Perform auto migration
  445. err = db.AutoMigrate(&AppConfigDbEntry{}, &UploadRecord{}, &PartUploadRecord{})
  446. if err != nil {
  447. logger.Infof("auto migrate error: %v", err)
  448. return
  449. }
  450. logger.Infoln("Auto migration completed")
  451. // Connect to MinIO
  452. minioClient, err = minio.New(util.ExpandShellStringInfallible(appInitCfg.Minio.Host), &minio.Options{
  453. Creds: credentials.NewStaticV4(appInitCfg.Minio.AccessKey, appInitCfg.Minio.Secret, ""),
  454. Secure: false,
  455. })
  456. if err == nil {
  457. bucketExists, err := minioClient.BucketExists(context.Background(), appInitCfg.Minio.Bucket)
  458. if err != nil {
  459. logger.Fatalf("Failed to check if bucket %s exists: %v", appInitCfg.Minio.Bucket, err)
  460. }
  461. if !bucketExists {
  462. logger.Fatalf("Bucket %s does not exist", appInitCfg.Minio.Bucket)
  463. }
  464. }
  465. if err != nil {
  466. logger.Fatalf("Failed to connect to MinIO: %v", err)
  467. }
  468. logger.Infoln("Connected to MinIO")
  469. // Connect to ClickHouse
  470. ckConn, err = clickhouse.Open(&clickhouse.Options{
  471. Addr: []string{appInitCfg.Stck.Host},
  472. })
  473. if err == nil {
  474. err = ckConn.Ping(context.Background())
  475. }
  476. if err != nil {
  477. logger.Fatalf("Failed to connect to ClickHouse: %v", err)
  478. }
  479. logger.Infoln("Connected to ClickHouse")
  480. // OK! Everything is ready now.
  481. // objUploadChan := make(chan string, 1024*256)
  482. objUploadChan := util.NewDChan[string](1024 * 16)
  483. // Start the main work
  484. logger.Infoln("Starting main worker...")
  485. gAppExitWaitGroup.Add(1)
  486. go func() {
  487. defer gAppExitWaitGroup.Done()
  488. main_worker(AppCtx{db, minioClient, ckConn}, objUploadChan)
  489. }()
  490. // Wait on signal.
  491. signalChan := make(chan os.Signal, 1)
  492. signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
  493. select {
  494. case <-signalChan:
  495. logger.Infof("received signal, stopping minio-into-stck")
  496. case <-programmaticQuitChan:
  497. logger.Infof("received programmatic quit signal, stopping minio-into-stck")
  498. }
  499. gAppQuitting = true
  500. // HACK: Notify the main worker to quit
  501. objUploadChan.In() <- ""
  502. // Close the NSQ producer and consumer
  503. gNsqProducer.Stop()
  504. gNsqConsumer.Stop()
  505. // Wait for the goroutines to exit
  506. gAppExitWaitGroup.Wait()
  507. logger.Infof("minio-into-stck stopped gracefully")
  508. }
  509. type AppCtx struct {
  510. db *gorm.DB
  511. minioClient *minio.Client
  512. ckConn driver.Conn
  513. }
  514. var gAppCfg *AppConfig
  515. type PartUploadArgs struct {
  516. StreamInfo *StreamMetadata
  517. StreamName string
  518. PartName string
  519. }
  520. func main_worker(app AppCtx, objUploadChan *util.DChan[string]) {
  521. ctx := context.Background()
  522. // Load config from DB
  523. appCfg, err := load_app_cfg_from_db(app.db)
  524. if err != nil {
  525. logger.Fatalf("Failed to load app config from DB: %v", err)
  526. }
  527. gAppCfg = &appCfg
  528. // Start the notification listener
  529. go func() {
  530. for {
  531. // Register the bucket notification listener
  532. logger.Infoln("Registering bucket notification listener...")
  533. notifys := app.minioClient.ListenBucketNotification(
  534. ctx, appInitCfg.Minio.Bucket, "", "", []string{string(notification.ObjectCreatedAll)})
  535. // Listen OK, start the full upload trigger to upload maybe missed files
  536. go trigger_full_upload(app, objUploadChan.In())
  537. for notifyInfo := range notifys {
  538. for _, record := range notifyInfo.Records {
  539. key := record.S3.Object.Key
  540. logger.Traceln("New object notification:", key)
  541. // Only care when `.zst` / `metadata.json` files are uploaded
  542. // keyParts := strings.Split(key, "/")
  543. // keyLastPart := keyParts[len(keyParts)-1]
  544. keyLastPart := filepath.Base(key)
  545. if keyLastPart != "metadata.json" && !strings.HasSuffix(keyLastPart, ".zst") {
  546. continue
  547. }
  548. // key = strings.Join(keyParts[:len(keyParts)-1], "/") + "/"
  549. key = filepath.Dir(key) + "/"
  550. // Remove fail counter so that the object can be retried instead of
  551. // being spuriously ignored
  552. mutexObjFailCounter.Lock()
  553. delete(objFailCounter, key)
  554. mutexObjFailCounter.Unlock()
  555. // Queue the object for upload
  556. objUploadChan.DelayedWrite(key, time.Duration(appInitCfg.Main.NotifyToUploadDelaySeconds)*time.Second)
  557. }
  558. if notifyInfo.Err != nil {
  559. logger.Errorf("Bucket notification listener error: %v", notifyInfo.Err)
  560. }
  561. }
  562. logger.Warnln("Bucket notification listener stopped unexpectedly, retrying in 5 seconds...")
  563. time.Sleep(5 * time.Second)
  564. // Clear fail counter
  565. mutexObjFailCounter.Lock()
  566. objFailCounter = make(map[string]int)
  567. mutexObjFailCounter.Unlock()
  568. }
  569. }()
  570. // Start the main loop (streams upload worker)
  571. for objToUpload := range objUploadChan.Out() {
  572. objUploadChan.MarkElemReadDone(objToUpload)
  573. if gAppQuitting {
  574. logger.Infof("Quitting, stopping main worker")
  575. return
  576. }
  577. if objToUpload == "" {
  578. continue
  579. }
  580. logger.Infoln("Checking stream object:", objToUpload)
  581. if object_is_blacklisted(objToUpload) {
  582. logger.Infof("Object `%s` is blacklisted, skipping", objToUpload)
  583. continue
  584. }
  585. mutexObjFailCounter.Lock()
  586. if objFailCounter[objToUpload] >= appInitCfg.Main.UploadRetryMaxTimes {
  587. logger.Warnf("Retried upload stream `%s` for too many %d times, give up", objToUpload, objFailCounter[objToUpload])
  588. delete(objFailCounter, objToUpload)
  589. mutexObjFailCounter.Unlock()
  590. continue
  591. }
  592. objFailCounter[objToUpload]++
  593. mutexObjFailCounter.Unlock()
  594. fullyUploaded, err := upload_one_stream(app, objToUpload)
  595. if err != nil {
  596. // Queue the object for retry
  597. logger.Warnf("Failed to upload stream `%s`: `%v`, retrying after %d seconds",
  598. objToUpload, err, appInitCfg.Main.FailedRetryDelaySeconds)
  599. objUploadChan.DelayedWrite(objToUpload, time.Duration(appInitCfg.Main.FailedRetryDelaySeconds)*time.Second)
  600. continue
  601. }
  602. if fullyUploaded {
  603. // Mark the stream as fully uploaded
  604. //err = app.db.Create(&UploadRecord{Key: objToUpload}).Error
  605. err = app.db.Where(UploadRecord{Key: objToUpload}).FirstOrCreate(&UploadRecord{}).Error
  606. if err != nil {
  607. logger.Warnf("Failed to mark stream %s as uploaded: %v", objToUpload, err)
  608. } else {
  609. // We can now remove the stream parts from the parts table
  610. err = app.db.Where("stream_name = ?", objToUpload).Delete(&PartUploadRecord{}).Error
  611. if err != nil {
  612. logger.Warnf("Failed to remove parts of stream %s from parts table: %v", objToUpload, err)
  613. }
  614. logger.Infof("Marked stream %s as fully uploaded", objToUpload)
  615. }
  616. // Remove entry from the fail counter
  617. mutexObjFailCounter.Lock()
  618. delete(objFailCounter, objToUpload)
  619. mutexObjFailCounter.Unlock()
  620. } else {
  621. // Queue the object for retry
  622. logger.Warnf("Stream %s is not fully uploaded, retrying after %d seconds",
  623. objToUpload, appInitCfg.Main.FailedRetryDelaySeconds)
  624. objUploadChan.DelayedWrite(objToUpload, time.Duration(appInitCfg.Main.FailedRetryDelaySeconds)*time.Second)
  625. }
  626. }
  627. }
  628. func trigger_full_upload(app AppCtx, objToUploadChan chan<- string) {
  629. // // Upload all files in the directory
  630. // options := minio.ListObjectsOptions{
  631. // Recursive: false,
  632. // }
  633. // objectsCh := app.minioClient.ListObjects(context.Background(), appInitCfg.Minio.Bucket, options)
  634. // // NOTE: Streams in minio are organized as `<date>/<stream_name>/`.
  635. // for objInfo := range objectsCh {
  636. // if objInfo.Err != nil {
  637. // logger.Warnf("Error listing bucket `%s` objects: %v", appInitCfg.Minio.Bucket, objInfo.Err)
  638. // continue
  639. // }
  640. // key := objInfo.Key
  641. // if strings.HasSuffix(key, "/") {
  642. // // Is a directory (<date>), go deeper into it
  643. // options := minio.ListObjectsOptions{
  644. // Prefix: key,
  645. // Recursive: false,
  646. // }
  647. // for objInfo := range app.minioClient.ListObjects(context.Background(), appInitCfg.Minio.Bucket, options) {
  648. // if objInfo.Err != nil {
  649. // logger.Warnf("Error listing bucket `%s` folder `%s` objects: %v", appInitCfg.Minio.Bucket, key, objInfo.Err)
  650. // continue
  651. // }
  652. // key := objInfo.Key
  653. // if strings.HasSuffix(key, "/") {
  654. // // Is a directory, should be a stream then
  655. // uploaded := object_already_uploaded(app, key)
  656. // if !uploaded {
  657. // objToUploadChan <- key
  658. // }
  659. // }
  660. // }
  661. // }
  662. // }
  663. // WARN: Slow!!!
  664. // options := minio.ListObjectsOptions{
  665. // Recursive: true,
  666. // }
  667. options := minio.ListObjectsOptions{
  668. Recursive: false,
  669. }
  670. objectsCh := app.minioClient.ListObjects(context.Background(), appInitCfg.Minio.Bucket, options)
  671. // NOTE: Streams in minio are organized as `<date>/<stream_name>/`.
  672. for objInfo := range objectsCh {
  673. if objInfo.Err != nil {
  674. logger.Warnf("Error listing bucket `%s` objects: %v", appInitCfg.Minio.Bucket, objInfo.Err)
  675. continue
  676. }
  677. key := objInfo.Key
  678. // // If it's a directory with `metadata.json` file, it should be a stream
  679. // if strings.HasSuffix(key, "/metadata.json") {
  680. // streamName := strings.TrimSuffix(key, "metadata.json")
  681. // uploaded := object_already_uploaded(app, streamName)
  682. // if !uploaded {
  683. // objToUploadChan <- streamName
  684. // }
  685. // }
  686. // Scan through all subdirectories
  687. if strings.HasSuffix(key, "/") {
  688. exists, err := util.MinioObjectExists(app.minioClient, appInitCfg.Minio.Bucket, key+"metadata.json")
  689. if err != nil {
  690. logger.Warnf("Error checking if object `%s` exists: %v", key+"metadata.json", err)
  691. continue
  692. }
  693. if exists {
  694. // Go ahead and upload the stream
  695. uploaded := object_already_uploaded(app, key)
  696. if !uploaded {
  697. objToUploadChan <- key
  698. }
  699. } else {
  700. // Check inner directories
  701. options := minio.ListObjectsOptions{
  702. Prefix: key,
  703. Recursive: false,
  704. }
  705. for objInfo := range app.minioClient.ListObjects(context.Background(), appInitCfg.Minio.Bucket, options) {
  706. if objInfo.Err != nil {
  707. logger.Warnf("Error listing bucket `%s` folder `%s` objects: %v", appInitCfg.Minio.Bucket, key, objInfo.Err)
  708. continue
  709. }
  710. key := objInfo.Key
  711. if strings.HasSuffix(key, "/") {
  712. // Is a directory, should be a stream then
  713. uploaded := object_already_uploaded(app, key)
  714. if !uploaded {
  715. objToUploadChan <- key
  716. }
  717. }
  718. }
  719. }
  720. }
  721. }
  722. logger.Infoln("Full upload trigger finished")
  723. }
  724. func upload_one_stream(app AppCtx, streamName string) (fullyUploaded bool, err error) {
  725. type StreamObjectUploadStatistics struct {
  726. StartTime time.Time
  727. EndTime time.Time
  728. PartName string
  729. UpState string // "repeated" / "ok" / "fail"
  730. Msg string // "error message"
  731. }
  732. type StreamUploadStatistics struct {
  733. StartTime time.Time
  734. EndTime time.Time
  735. StreamName string
  736. MetaPointsCount int64
  737. MetaPartsCount int64
  738. Objects map[string]StreamObjectUploadStatistics
  739. Msg string
  740. }
  741. streamStats := StreamUploadStatistics{
  742. StartTime: time.Now(),
  743. StreamName: streamName,
  744. Objects: make(map[string]StreamObjectUploadStatistics),
  745. }
  746. logger.Infof("Going to upload stream `%s`", streamName)
  747. if object_already_uploaded(app, streamName) {
  748. logger.Infof("Stream `%s` is already uploaded", streamName)
  749. return true, nil
  750. }
  751. defer func() {
  752. streamStats.EndTime = time.Now()
  753. repeatedCount := int64(0)
  754. okCount := int64(0)
  755. failCount := int64(0)
  756. totalCount := int64(0)
  757. objDetailsMsg := ""
  758. for _, obj := range streamStats.Objects {
  759. totalCount++
  760. if obj.UpState == "repeated" {
  761. repeatedCount++
  762. objDetailsMsg += fmt.Sprintf("\n{%s: REPEATED (t: %v ~ %v, duration: %v)}",
  763. obj.PartName, util.FmtMyTime(obj.StartTime), util.FmtMyTime(obj.EndTime), obj.EndTime.Sub(obj.StartTime))
  764. } else if obj.UpState == "ok" {
  765. okCount++
  766. objDetailsMsg += fmt.Sprintf("\n{%s: OK (t: %v ~ %v, duration: %v)}",
  767. obj.PartName, util.FmtMyTime(obj.StartTime), util.FmtMyTime(obj.EndTime), obj.EndTime.Sub(obj.StartTime))
  768. } else {
  769. failCount++
  770. objDetailsMsg += fmt.Sprintf("\n{%s: FAIL (Reason: %s) (t: %v ~ %v, duration: %v)}",
  771. obj.PartName, obj.Msg, util.FmtMyTime(obj.StartTime), util.FmtMyTime(obj.EndTime), obj.EndTime.Sub(obj.StartTime))
  772. }
  773. }
  774. statLogger.Infof("Upload stream `%s` took %v (%v ~ %v); is fully uploaded: %v; metadata-reported part count=%d; "+
  775. "repeated=%d, ok=%d, fail=%d, total=%d; details:%s",
  776. streamName, streamStats.EndTime.Sub(streamStats.StartTime),
  777. util.FmtMyTime(streamStats.StartTime), util.FmtMyTime(streamStats.EndTime),
  778. fullyUploaded, streamStats.MetaPartsCount,
  779. repeatedCount, okCount, failCount, totalCount, objDetailsMsg)
  780. // Send upload status changed message
  781. msg := smsg.MakeStreamInsertToStckStatusChangedMsg(gMachineID, streamStats.StartTime.UnixNano(),
  782. streamStats.StreamName, streamStats.MetaPointsCount, streamStats.MetaPartsCount,
  783. okCount, failCount, repeatedCount)
  784. err := PublishMessage(&msg)
  785. if err != nil {
  786. logger.Errorf("send stream insert to stck status changed message error: %s", err)
  787. }
  788. }()
  789. fullyUploaded = true
  790. // Get stream metadata
  791. streamInfo, err := get_stream_metadata(app, streamName)
  792. if err != nil {
  793. // Cannot continue without metadata
  794. return false, err
  795. }
  796. streamStats.MetaPointsCount = streamInfo.TotalPoints
  797. streamStats.MetaPartsCount = streamInfo.PartsCount
  798. if streamInfo.PartsCount == 0 {
  799. // Edge device didn't finish uploading the stream yet
  800. logger.Debugf("Marking stream `%s` as not fully uploaded, reason: parts_count=0", streamName)
  801. fullyUploaded = false
  802. }
  803. hasSomething := false
  804. // Upload parts
  805. streamObjPath := streamName
  806. options := minio.ListObjectsOptions{
  807. Prefix: streamObjPath,
  808. Recursive: false,
  809. }
  810. logger.Tracef("Listing minio objects in `%s`, bucket `%s`", streamObjPath, appInitCfg.Minio.Bucket)
  811. // hasSomething := false
  812. hasMetadata := false
  813. for objInfo := range app.minioClient.ListObjects(context.Background(), appInitCfg.Minio.Bucket, options) {
  814. if objInfo.Err != nil {
  815. return false, objInfo.Err
  816. }
  817. if gAppQuitting {
  818. logger.Infof("Quitting, stopping uploading one stream")
  819. return false, nil
  820. }
  821. logger.Tracef("Checking minio file `%s`", objInfo.Key)
  822. if strings.HasSuffix(objInfo.Key, "/") {
  823. continue
  824. }
  825. partName := filepath.Base(objInfo.Key)
  826. if partName == "metadata.json" {
  827. hasMetadata = true
  828. continue
  829. }
  830. hasSomething = true
  831. objStat := StreamObjectUploadStatistics{
  832. StartTime: time.Now(),
  833. PartName: partName,
  834. }
  835. if part_already_uploaded(app, streamName, partName) {
  836. objStat.EndTime = time.Now()
  837. objStat.UpState = "repeated"
  838. streamStats.Objects[objInfo.Key] = objStat
  839. logger.Infof("Part `%s` of stream `%s` is already uploaded", objInfo.Key, streamName)
  840. continue
  841. }
  842. if fullyUploaded {
  843. fullyUploaded = false
  844. logger.Debugf("Marking stream `%s` as not fully uploaded, reason: part `%s` not uploaded", streamName, objInfo.Key)
  845. }
  846. // Do the parts upload
  847. partInfo := PartUploadArgs{StreamInfo: streamInfo, StreamName: streamName, PartName: objInfo.Key}
  848. logger.Infof("Uploading part `%s` (total %d) of stream `%s`, total_points=%d",
  849. partInfo.PartName, partInfo.StreamInfo.PartsCount,
  850. partInfo.StreamName, partInfo.StreamInfo.TotalPoints)
  851. err := upload_one_part(app, partInfo.StreamInfo, partInfo.StreamName, partInfo.PartName)
  852. if err != nil {
  853. objStat.EndTime = time.Now()
  854. objStat.UpState = "fail"
  855. objStat.Msg = err.Error()
  856. logger.Warnf("Failed to upload part `%s` of stream `%s` (took %v): %v", partInfo.PartName, partInfo.StreamName,
  857. objStat.EndTime.Sub(objStat.StartTime), err)
  858. fullyUploaded = false
  859. } else {
  860. // Mark the part as uploaded
  861. //err = app.db.Create(&PartUploadRecord{StreamName: partInfo.StreamName, PartName: partInfo.PartName}).Error
  862. part := PartUploadRecord{StreamName: partInfo.StreamName, PartName: partName}
  863. err = app.db.Where(part).FirstOrCreate(&PartUploadRecord{}).Error
  864. if err != nil {
  865. logger.Warnf("Failed to mark part `%s` of stream `%s` as uploaded: %v", partInfo.PartName, partInfo.StreamName, err)
  866. }
  867. objStat.EndTime = time.Now()
  868. objStat.UpState = "ok"
  869. logger.Infof("Uploaded part `%s` of stream `%s`, took %v", objInfo.Key, streamName,
  870. objStat.EndTime.Sub(objStat.StartTime))
  871. }
  872. streamStats.Objects[objInfo.Key] = objStat
  873. partNum, err := util.ExtractNumberFromString(partName)
  874. if err != nil {
  875. // Not a part file? Skip
  876. continue
  877. }
  878. status := "success"
  879. if objStat.UpState != "ok" {
  880. status = "failed"
  881. }
  882. msg := smsg.MakePartInsertToStckStatusChangedMsg(gMachineID, objStat.StartTime.UnixNano(),
  883. streamName, partNum, streamStats.MetaPointsCount, status, objStat.Msg)
  884. err = PublishMessage(&msg)
  885. if err != nil {
  886. logger.Errorf("send part insert to stck status changed message error: %s", err)
  887. }
  888. }
  889. if !hasMetadata {
  890. logger.Warnf("Stream `%s` has no metadata file, will retry later", streamName)
  891. fullyUploaded = false
  892. }
  893. if !hasSomething {
  894. if streamInfo.PartsCount != 0 {
  895. logger.Errorf("Stream `%s` has no parts, but claims to have %d parts, %d points???",
  896. streamName, streamInfo.PartsCount, streamInfo.TotalPoints)
  897. } else {
  898. logger.Warnf("Stream `%s` has no parts in minio, will retry later", streamName)
  899. }
  900. fullyUploaded = false
  901. }
  902. return fullyUploaded, nil
  903. }
  904. func upload_one_part(app AppCtx, streamInfo *StreamMetadata, streamName string, partObjPath string) (err error) {
  905. partName := filepath.Base(partObjPath)
  906. if part_already_uploaded(app, streamName, partName) {
  907. return nil
  908. }
  909. dryRun := false
  910. if !dryRun {
  911. // Get the part data from MinIO
  912. obj, err := app.minioClient.GetObject(context.Background(), appInitCfg.Minio.Bucket, partObjPath,
  913. minio.GetObjectOptions{})
  914. if err != nil {
  915. return fmt.Errorf("failed to get part `%s` data: %w", partObjPath, err)
  916. }
  917. defer obj.Close()
  918. // Read the part data
  919. var partDataBuf = new(bytes.Buffer)
  920. _, err = partDataBuf.ReadFrom(obj)
  921. if err != nil {
  922. return fmt.Errorf("failed to read part `%s` data: %w", partObjPath, err)
  923. }
  924. // Process the part data
  925. partData, err := util.DecompressZstdBuffer(partDataBuf.Bytes())
  926. if err != nil {
  927. return fmt.Errorf("failed to decompress part `%s` data: %w", partObjPath, err)
  928. }
  929. // Use regex to extract the part index from the part name
  930. partIndex := int64(0)
  931. {
  932. re := regexp.MustCompile(`part_(\d+)\.zst`)
  933. matches := re.FindStringSubmatch(partObjPath)
  934. if len(matches) != 2 {
  935. return fmt.Errorf("failed to extract part index from part name `%s`", partObjPath)
  936. }
  937. partIndex, err = strconv.ParseInt(matches[1], 10, 64)
  938. if err != nil {
  939. return fmt.Errorf("failed to convert part index `%s` to integer: %w", matches[1], err)
  940. }
  941. // Check if the part index is correct
  942. if partIndex < 0 || (streamInfo.PartsCount != 0 && partIndex >= streamInfo.PartsCount) {
  943. return fmt.Errorf("part `%s` index out of bounds: %d / %d", partObjPath, partIndex, streamInfo.PartsCount)
  944. }
  945. // Check if the part data size is correct
  946. if streamInfo.PartsCount != 0 {
  947. left := int64(len(partData))
  948. if partIndex < streamInfo.PartsCount-1 {
  949. right := streamInfo.PointsPerPart * 8
  950. if left != right {
  951. return fmt.Errorf("part `%s` data size mismatch: %d versus %d", partObjPath, left, right)
  952. }
  953. } else if partIndex == streamInfo.PartsCount-1 {
  954. right := (streamInfo.TotalPoints % streamInfo.PointsPerPart) * 8
  955. if right == 0 {
  956. right = streamInfo.PointsPerPart * 8
  957. }
  958. if left != right {
  959. return fmt.Errorf("part `%s` data size mismatch: %d versus %d", partObjPath, left, right)
  960. }
  961. } else {
  962. return fmt.Errorf("part `%s` index out of bounds: %d", partObjPath, partIndex)
  963. }
  964. }
  965. }
  966. partPointsCount := len(partData) / 8
  967. // Insert the part data into ClickHouse
  968. batch, err := app.ckConn.PrepareBatch(context.Background(), "INSERT INTO "+appInitCfg.Stck.Table)
  969. if err != nil {
  970. return fmt.Errorf("failed to insert part data into stck: %w", err)
  971. }
  972. /*
  973. ┌─name────────┬─type────────────────────────────────────────────────┬─default_type─┬─default_expression─┬─comment───────────────────┬─codec_expression─┬─ttl_expression─┐
  974. │ metric_name │ LowCardinality(String) │ │ │ Metric name │ │ │
  975. │ point_name │ LowCardinality(String) │ │ │ Point name │ │ │
  976. │ tags │ Map(LowCardinality(String), LowCardinality(String)) │ │ │ Point tags │ │ │
  977. │ value │ Float64 │ │ │ Point value │ │ │
  978. │ nanoseconds │ Int64 │ │ │ Point time in nanoseconds │ DoubleDelta, LZ4 │ │
  979. └─────────────┴─────────────────────────────────────────────────────┴──────────────┴────────────────────┴───────────────────────────┴──────────────────┴────────────────┘
  980. */
  981. logger.Debugf("Going to insert %d points for part `%s` of stream `%s`", partPointsCount, partObjPath, streamName)
  982. for i := 0; i < partPointsCount; i++ {
  983. metricName := streamInfo.MetricName
  984. pointName := streamInfo.Name
  985. tags := map[string]string{}
  986. value := math.Float64frombits(binary.LittleEndian.Uint64(partData[i*8 : (i+1)*8]))
  987. // NOTE: TimestampOffset is in milliseconds, need to convert to nanoseconds
  988. nanoseconds := streamInfo.TimestampOffset * 1e6
  989. nanoseconds += int64(partIndex) * int64(streamInfo.PointsPerPart) * streamInfo.Interval
  990. nanoseconds += int64(i) * streamInfo.Interval
  991. err := batch.Append(metricName, pointName, tags, value, nanoseconds)
  992. if err != nil {
  993. return err
  994. }
  995. }
  996. if batch.Rows() != partPointsCount {
  997. logger.Errorf("Batch rows mismatch: %d / %d???", batch.Rows(), partPointsCount)
  998. }
  999. err = batch.Send()
  1000. if err != nil {
  1001. return err
  1002. }
  1003. if !batch.IsSent() {
  1004. logger.Errorln("Batch not sent???")
  1005. }
  1006. logger.Debugf("Inserted %d points for part `%s` of stream `%s`", partPointsCount, partObjPath, streamName)
  1007. // We made progress, reset the fail counter
  1008. mutexObjFailCounter.Lock()
  1009. objFailCounter[streamName] = 0
  1010. mutexObjFailCounter.Unlock()
  1011. }
  1012. return nil
  1013. }
  1014. func load_app_cfg_from_db(db *gorm.DB) (AppConfig, error) {
  1015. var cfg AppConfig
  1016. err := db.AutoMigrate(&AppConfigDbEntry{})
  1017. if err != nil {
  1018. return cfg, err
  1019. }
  1020. //db.Create(&AppConfigDbEntry{Key: "ImportBlacklist", Value: `[]`})
  1021. var dbEntry AppConfigDbEntry
  1022. result := db.Where(AppConfigDbEntry{Key: "ImportBlacklist"}).Attrs(AppConfigDbEntry{Value: `[]`}).FirstOrCreate(&dbEntry)
  1023. if result.Error != nil {
  1024. //if errors.Is(result.Error, gorm.ErrRecordNotFound) {
  1025. // dbEntry.Value = `[]`
  1026. //} else {
  1027. // return cfg, result.Error
  1028. //}
  1029. return cfg, result.Error
  1030. }
  1031. err = json.Unmarshal([]byte(dbEntry.Value), &cfg.ImportBlacklist)
  1032. if err != nil {
  1033. return cfg, err
  1034. }
  1035. return cfg, nil
  1036. }
  1037. func object_already_uploaded(app AppCtx, key string) bool {
  1038. var record UploadRecord
  1039. result := app.db.First(&record, "key", key)
  1040. return result.Error == nil
  1041. }
  1042. func object_is_blacklisted(key string) bool {
  1043. for _, regexPattern := range gAppCfg.ImportBlacklist {
  1044. // TODO: Cache compiled regex patterns
  1045. if matched, _ := regexp.MatchString(regexPattern, key); matched {
  1046. return true
  1047. }
  1048. }
  1049. return false
  1050. }
  1051. func part_already_uploaded(app AppCtx, streamName string, partName string) bool {
  1052. var record PartUploadRecord
  1053. result := app.db.First(&record, "stream_name = ? AND part_name = ?", streamName, partName)
  1054. return result.Error == nil
  1055. }
  1056. func get_stream_metadata(app AppCtx, streamName string) (*StreamMetadata, error) {
  1057. // Get the stream metadata from MinIO
  1058. metadataObjPath := streamName + "metadata.json"
  1059. obj, err := app.minioClient.GetObject(context.Background(), appInitCfg.Minio.Bucket,
  1060. metadataObjPath, minio.GetObjectOptions{})
  1061. if err != nil {
  1062. return nil, fmt.Errorf("failed to get stream metadata: %w", err)
  1063. }
  1064. defer obj.Close()
  1065. var streamInfo StreamMetadata
  1066. err = json.NewDecoder(obj).Decode(&streamInfo)
  1067. if err != nil {
  1068. return nil, fmt.Errorf("failed to decode stream metadata: %w", err)
  1069. }
  1070. return &streamInfo, nil
  1071. }