main.go 40 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240
  1. package main
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/binary"
  6. "encoding/json"
  7. "example/minio-into-stck/util"
  8. "fmt"
  9. "os"
  10. "os/exec"
  11. "os/signal"
  12. "path/filepath"
  13. "stck/stck-nsq-msg"
  14. smsg "stck/stck-nsq-msg/msg"
  15. "sync"
  16. "syscall"
  17. "io"
  18. // "log"
  19. "math"
  20. // "os"
  21. "regexp"
  22. "strconv"
  23. "strings"
  24. "time"
  25. "github.com/ClickHouse/ch-go"
  26. "github.com/ClickHouse/ch-go/proto"
  27. "github.com/ClickHouse/clickhouse-go/v2"
  28. "github.com/ClickHouse/clickhouse-go/v2/lib/driver"
  29. "github.com/Jeffail/tunny"
  30. "github.com/fsnotify/fsnotify"
  31. "github.com/minio/minio-go/v7"
  32. "github.com/minio/minio-go/v7/pkg/credentials"
  33. "github.com/minio/minio-go/v7/pkg/notification"
  34. "github.com/natefinch/lumberjack"
  35. "github.com/nsqio/go-nsq"
  36. "github.com/sirupsen/logrus"
  37. "github.com/spf13/viper"
  38. "gorm.io/driver/mysql"
  39. "gorm.io/gorm"
  40. gorm_logger "gorm.io/gorm/logger"
  41. )
  42. // AppInitConfig is the initial configuration of the application, loaded from the config file.
  43. type AppInitConfig struct {
  44. Mysql struct {
  45. User string `mapstructure:"user"`
  46. Password string `mapstructure:"password"`
  47. Host string `mapstructure:"host"`
  48. Database string `mapstructure:"database"`
  49. } `mapstructure:"mysql"`
  50. Minio struct {
  51. AccessKey string `mapstructure:"accessKey"`
  52. Secret string `mapstructure:"secret"`
  53. Bucket string `mapstructure:"bucket"`
  54. Host string `mapstructure:"host"`
  55. } `mapstructure:"minio"`
  56. Stck struct {
  57. Host string `mapstructure:"host"`
  58. Table string `mapstructure:"table"`
  59. } `mapstructure:"stck"`
  60. Nsq struct {
  61. TcpAddr string `mapstructure:"tcpAddr"`
  62. LookupdHttpAddr string `mapstructure:"lookupdHttpAddr"`
  63. } `mapstructure:"nsq"`
  64. Main struct {
  65. UploadRetryMaxTimes int `mapstructure:"uploadRetryMaxTimes"`
  66. FailedRetryDelaySeconds int `mapstructure:"failedRetryDelaySeconds"`
  67. NotifyToUploadDelaySeconds int `mapstructure:"notifyToUploadDelaySeconds"`
  68. ParallelUploadThreadsCount int `mapstructure:"parallelUploadThreadsCount"`
  69. } `mapstructure:"main"`
  70. }
  71. type AppConfig struct {
  72. // List of name regex patterns to exclude from import.
  73. ImportBlacklist []string
  74. }
  75. type AppConfigDbEntry struct {
  76. Key string `gorm:"primaryKey"`
  77. Value string
  78. }
  79. // TODO: 插入时先创建行,插入完后更新插入耗时
  80. // UploadRecord Represents a record of an uploaded stream. (A major upload task.)
  81. type UploadRecord struct {
  82. Key string `gorm:"primaryKey"`
  83. CreatedAt time.Time
  84. }
  85. // PartUploadRecord Represents a record of an uploaded part of a stream. (A minor upload task.)
  86. type PartUploadRecord struct {
  87. StreamName string `gorm:"primaryKey"`
  88. PartName string `gorm:"primaryKey"`
  89. CreatedAt time.Time
  90. }
  91. type StreamMetadata struct {
  92. MetricName string `json:"metric_name"`
  93. Name string `json:"name"`
  94. TimestampOffset int64 `json:"timestamp_offset"`
  95. Interval int64 `json:"interval"`
  96. PartsCount int64 `json:"parts_count"`
  97. PointsPerPart int64 `json:"points_per_part"`
  98. TotalPoints int64 `json:"total_points"`
  99. }
  100. var gLocalIpAddress string
  101. var gMachineID string
  102. var gNsqProducer *nsq.Producer
  103. var gNsqConsumer *nsq.Consumer
  104. var gAppStartTime time.Time = time.Now()
  105. var programmaticQuitChan chan struct{} = make(chan struct{}, 1)
  106. var gAppQuitting = false
  107. var gAppExitWaitGroup sync.WaitGroup
  108. var buildtime string
  109. func MakeHeartbeatMsg() *smsg.DeviceHeartbeatMsg {
  110. m := smsg.MakeDeviceHeartbeatMsg(gMachineID, gAppStartTime.UnixNano(), time.Now().UnixNano(),
  111. gLocalIpAddress, "minio-into-stck", "0.1.0+dev."+buildtime)
  112. return &m
  113. }
  114. func PublishMessage(msg stcknsqmsg.StckNsqMsgVariant) error {
  115. payload, err := stcknsqmsg.ToStckNsqMsgString(msg)
  116. if err != nil {
  117. return fmt.Errorf("marshal message error: %w", err)
  118. }
  119. err = gNsqProducer.Publish(smsg.ServerDoNotifyTopic, []byte(payload))
  120. if err != nil {
  121. return fmt.Errorf("publish message error: %w", err)
  122. }
  123. return nil
  124. }
  125. func initNsq() {
  126. ip, err := stcknsqmsg.GetLocalIP()
  127. if err != nil {
  128. logger.Warnf("GetLocalIP error: %s", err)
  129. } else {
  130. gLocalIpAddress = ip.String()
  131. logger.Infof("Local IP: %s", gLocalIpAddress)
  132. }
  133. gMachineID = stcknsqmsg.MakeUniqueMachineID()
  134. logger.Infof("Machine ID: %s", gMachineID)
  135. fmt.Printf("IP: %s, Machine ID: %s\n", gLocalIpAddress, gMachineID)
  136. // Connect to NSQ
  137. nsqConfig := nsq.NewConfig()
  138. gNsqProducer, err = nsq.NewProducer(appInitCfg.Nsq.TcpAddr, nsqConfig)
  139. if err != nil {
  140. logger.Fatalf("NSQ Producer init error: %s", err)
  141. }
  142. gNsqConsumer, err = nsq.NewConsumer(smsg.ClientDoActionTopic, gMachineID, nsqConfig)
  143. if err != nil {
  144. logger.Fatalf("NSQ Consumer init error: %s", err)
  145. }
  146. gNsqConsumer.AddConcurrentHandlers(nsq.HandlerFunc(func(message *nsq.Message) error {
  147. logger.Debugf("NSQ Consumer received message: %s", message.Body)
  148. // Parse the message
  149. recvTime := message.Timestamp
  150. msg, err := stcknsqmsg.FromString(string(message.Body))
  151. if err != nil {
  152. logger.Errorf("NSQ Consumer unmarshal message error: %s", err)
  153. return nil
  154. }
  155. // Process the message
  156. switch data := msg.Data.(type) {
  157. case *smsg.DevicePingMsg:
  158. if !stcknsqmsg.DeviceIdMatches(data.DeviceID, gMachineID) {
  159. break
  160. }
  161. // Write pong
  162. pongMsg := smsg.MakeDevicePongMsg(gMachineID, recvTime)
  163. err := PublishMessage(&pongMsg)
  164. if err != nil {
  165. logger.Errorf("send pong error: %s", err)
  166. }
  167. case *smsg.RequestDeviceExecuteShellScriptMsg:
  168. if !stcknsqmsg.DeviceIdMatches(data.DeviceID, gMachineID) {
  169. break
  170. }
  171. // Execute the shell script
  172. resp := smsg.MakeDeviceActionDoneMsg(gMachineID, data.MsgType(), "", recvTime, -1, "")
  173. result, err := func(data *smsg.RequestDeviceExecuteShellScriptMsg) (string, error) {
  174. cmd := exec.Command("bash", "-c", data.Script)
  175. var out bytes.Buffer
  176. cmd.Stdout = &out
  177. cmd.Stderr = &out
  178. err := cmd.Run()
  179. return out.String(), err
  180. }(data)
  181. if err != nil {
  182. errMsg := fmt.Sprintf("execute shell script error:\n%s\n\noutput:\n%s", err, result)
  183. // Write error message
  184. resp.Status = -1
  185. resp.Msg = errMsg
  186. } else {
  187. // Write output
  188. resp.Status = 0
  189. resp.Msg = result
  190. }
  191. err = PublishMessage(&resp)
  192. if err != nil {
  193. logger.Errorf("send action done error: %s", err)
  194. }
  195. case *smsg.RequestDeviceUpdateMsg:
  196. if !stcknsqmsg.DeviceIdMatches(data.DeviceID, gMachineID) {
  197. break
  198. }
  199. if data.ServiceName != "minio-into-stck" {
  200. break
  201. }
  202. resp := smsg.MakeDeviceActionDoneMsg(gMachineID, data.MsgType(), "", recvTime, 0, "")
  203. result, err := func(data *smsg.RequestDeviceUpdateMsg) (string, error) {
  204. // Download the update file to /tmp
  205. downloadPath := "/tmp/minio-into-stck-updater"
  206. updateScriptPath := filepath.Join(downloadPath, "update.sh")
  207. var out bytes.Buffer
  208. err := os.RemoveAll(downloadPath)
  209. if err != nil {
  210. return "", err
  211. }
  212. err = os.MkdirAll(downloadPath, 0777)
  213. if err != nil {
  214. return "", err
  215. }
  216. updateScriptContent := fmt.Sprintf(`#!/bin/bash
  217. set -e
  218. cd %s
  219. wget --tries=3 -nv -O installer.tar.gz %s
  220. tar -xzf installer.tar.gz
  221. cd minio-into-stck-installer
  222. ./replacing-update.sh
  223. `, downloadPath, data.ServiceBinaryURL)
  224. err = os.WriteFile(updateScriptPath, []byte(updateScriptContent), 0777)
  225. if err != nil {
  226. return "", err
  227. }
  228. // Execute the update script
  229. cmd := exec.Command("bash", "-c", updateScriptPath)
  230. cmd.Stdout = &out
  231. cmd.Stderr = &out
  232. err = cmd.Run()
  233. return out.String(), err
  234. }(data)
  235. if err != nil {
  236. errMsg := fmt.Sprintf("execute update process error:\n%s\n\noutput:\n%s", err, result)
  237. // Write error message
  238. resp.Status = -1
  239. resp.Msg = errMsg
  240. } else {
  241. // Write output
  242. resp.Status = 0
  243. resp.Msg = "executed update process successfully\n\noutput:\n" + result
  244. }
  245. err = PublishMessage(&resp)
  246. if err != nil {
  247. logger.Errorf("send action done error: %s", err)
  248. }
  249. case *smsg.ForceDeviceRebootMsg:
  250. if !stcknsqmsg.DeviceIdMatches(data.DeviceID, gMachineID) {
  251. break
  252. }
  253. programmaticQuitChan <- struct{}{}
  254. case *smsg.ForceDeviceSendHeartbeatMsg:
  255. if !stcknsqmsg.DeviceIdMatches(data.DeviceID, gMachineID) {
  256. break
  257. }
  258. // Send heartbeat
  259. heartBeatMsg := MakeHeartbeatMsg()
  260. err := PublishMessage(heartBeatMsg)
  261. if err != nil {
  262. logger.Errorf("send heartbeat error: %s", err)
  263. }
  264. case *smsg.RequestDeviceUploadLogsToMinioMsg:
  265. if !stcknsqmsg.DeviceIdMatches(data.DeviceID, gMachineID) {
  266. break
  267. }
  268. // Upload logs to MinIO
  269. resp := smsg.MakeDeviceActionDoneMsg(gMachineID, data.MsgType(), "", recvTime, 0, "")
  270. minioCreds := credentials.NewStaticV4(appInitCfg.Minio.AccessKey, appInitCfg.Minio.Secret, "")
  271. minioClient, err := minio.New(appInitCfg.Minio.Host, &minio.Options{
  272. Creds: minioCreds,
  273. Secure: false,
  274. })
  275. if err != nil {
  276. logger.Errorf("MinIO Client init error: %s", err)
  277. resp.Status = -1
  278. resp.Msg += fmt.Sprintf("MinIO Client init error: %s\n", err)
  279. } else {
  280. if util.FileExists("./logs") {
  281. err = util.MinioUploadFolder(minioClient, data.RemoteBucket,
  282. filepath.Join(data.RemoteBasePath, gMachineID, "logs"), "logs")
  283. if err != nil {
  284. logger.Errorf("upload logs to MinIO error: %s", err)
  285. resp.Status = -1
  286. resp.Msg += fmt.Sprintf("upload logs to MinIO error: %s\n", err)
  287. }
  288. }
  289. if util.FileExists("./log") {
  290. err = util.MinioUploadFolder(minioClient, data.RemoteBucket,
  291. filepath.Join(data.RemoteBasePath, gMachineID, "log"), "log")
  292. if err != nil {
  293. logger.Errorf("upload log to MinIO error: %s", err)
  294. resp.Status = -1
  295. resp.Msg += fmt.Sprintf("upload log to MinIO error: %s\n", err)
  296. }
  297. }
  298. }
  299. err = PublishMessage(&resp)
  300. if err != nil {
  301. logger.Errorf("send action done error: %s", err)
  302. }
  303. default:
  304. logger.Debugf("NSQ Consumer ignored unknown or uninteresting message: %v", msg)
  305. }
  306. // Notify NSQ that the message is processed successfully
  307. return nil
  308. }), 1)
  309. // err = gNsqConsumer.ConnectToNSQLookupd(gAppConfig.Nsq.LookupdHttpAddr)
  310. err = gNsqConsumer.ConnectToNSQD(appInitCfg.Nsq.TcpAddr)
  311. if err != nil {
  312. logger.Fatalf("NSQ Consumer connect error: %s", err)
  313. }
  314. }
  315. var appInitCfg *AppInitConfig = &AppInitConfig{}
  316. func initLoadConfig() {
  317. viper.SetDefault("main.uploadRetryMaxTimes", 20)
  318. viper.SetDefault("main.failedRetryDelaySeconds", 5)
  319. viper.SetDefault("main.notifyToUploadDelaySeconds", 1)
  320. viper.SetDefault("main.parallelUploadThreadsCount", 2)
  321. viper.SetConfigFile("./config/application.yaml")
  322. viper.WatchConfig()
  323. viper.OnConfigChange(func(e fsnotify.Event) {
  324. logger.Infoln("Config file changed:", e.Name)
  325. var newAppInitConfig AppInitConfig
  326. err := viper.Unmarshal(&newAppInitConfig)
  327. if err != nil {
  328. logger.Infoln("Failed to unmarshal config:", err)
  329. return
  330. }
  331. appInitCfg = &newAppInitConfig
  332. })
  333. err := viper.ReadInConfig()
  334. if err != nil {
  335. logger.Fatalf("Failed to read config file: %v", err)
  336. }
  337. err = viper.Unmarshal(appInitCfg)
  338. if err != nil {
  339. logger.Errorf("Failed to unmarshal config: %v, exiting...", err)
  340. fmt.Printf("Failed to unmarshal config: %v, exiting...\n", err)
  341. os.Exit(1)
  342. }
  343. }
  344. func initLog() *logrus.Logger {
  345. // 主日志文件
  346. log := &lumberjack.Logger{
  347. Filename: "./log/minio-into-stck.log", // 日志文件的位置
  348. MaxSize: 50, // 文件最大尺寸(以MB为单位)
  349. MaxBackups: 5, // 保留的最大旧文件数量
  350. MaxAge: 28, // 保留旧文件的最大天数
  351. Compress: true, // 是否压缩/归档旧文件
  352. LocalTime: true, // 使用本地时间创建时间戳
  353. }
  354. // 错误日志文件
  355. errorLog := &lumberjack.Logger{
  356. Filename: "./log/minio-into-stck.error.log", // 错误日志文件的位置
  357. MaxSize: 50, // 文件最大尺寸(以MB为单位)
  358. MaxBackups: 5, // 保留的最大旧文件数量
  359. MaxAge: 28, // 保留旧文件的最大天数
  360. Compress: true, // 是否压缩/归档旧文件
  361. LocalTime: true, // 使用本地时间创建时间戳
  362. }
  363. // 统计日志文件
  364. statLog := &lumberjack.Logger{
  365. Filename: "./log/minio-into-stck.stat.log", // 统计日志文件的位置
  366. MaxSize: 50, // 文件最大尺寸(以MB为单位)
  367. MaxBackups: 5, // 保留的最大旧文件数量
  368. MaxAge: 28, // 保留旧文件的最大天数
  369. Compress: true, // 是否压缩/归档旧文件
  370. LocalTime: true, // 使用本地时间创建时间戳
  371. }
  372. logger := logrus.New()
  373. if strings.ToLower(os.Getenv("LOG_LEVEL")) == "trace" {
  374. logger.SetLevel(logrus.TraceLevel)
  375. } else {
  376. logger.SetLevel(logrus.DebugLevel)
  377. }
  378. logger.Out = log
  379. // logger.Out = io.MultiWriter(os.Stdout, log)
  380. // 设置错误级别日志输出到额外的文件
  381. logger.AddHook(&ErrorHook{
  382. // Writer: errorLog,
  383. Writer: io.MultiWriter(os.Stderr, errorLog),
  384. LogLevels: []logrus.Level{
  385. logrus.ErrorLevel,
  386. logrus.FatalLevel,
  387. logrus.PanicLevel,
  388. },
  389. })
  390. statLogger = logrus.New()
  391. statLogger.SetLevel(logrus.InfoLevel)
  392. statLogger.Out = statLog
  393. return logger
  394. }
  395. // ErrorHook 用于将错误级别的日志输出到额外的文件
  396. type ErrorHook struct {
  397. Writer io.Writer
  398. LogLevels []logrus.Level
  399. }
  400. func (hook *ErrorHook) Fire(entry *logrus.Entry) error {
  401. line, err := entry.String()
  402. if err != nil {
  403. return err
  404. }
  405. _, err = hook.Writer.Write([]byte(line))
  406. return err
  407. }
  408. func (hook *ErrorHook) Levels() []logrus.Level {
  409. return hook.LogLevels
  410. }
  411. var logger *logrus.Logger
  412. var statLogger *logrus.Logger
  413. var mutexObjFailCounter = &sync.Mutex{}
  414. var objFailCounter map[string]int = make(map[string]int)
  415. func main() {
  416. var err error
  417. fmt.Println("Starting minio-into-stck, build time:", buildtime)
  418. logger = initLog()
  419. logger.Warnln("Logger initialized.")
  420. // Load configuration from file
  421. initLoadConfig()
  422. // 初始化 NSQ
  423. initNsq()
  424. var db *gorm.DB
  425. var minioClient *minio.Client
  426. var ckConn driver.Conn
  427. // Connect to MySQL
  428. dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=True&loc=Local",
  429. appInitCfg.Mysql.User, appInitCfg.Mysql.Password, appInitCfg.Mysql.Host, appInitCfg.Mysql.Database)
  430. db, err = gorm.Open(mysql.Open(dsn), &gorm.Config{})
  431. if err != nil {
  432. logger.Fatalf("Failed to connect to MySQL: %v", err)
  433. }
  434. // Disable logging
  435. db.Logger = gorm_logger.Discard
  436. // Get the underlying sql.DB object to close the connection later
  437. sqlDB, err := db.DB()
  438. if err != nil {
  439. logger.Fatalf("Failed to get MySQL DB: %v", err)
  440. }
  441. defer sqlDB.Close()
  442. // Ping the database to check if the connection is successful
  443. err = sqlDB.Ping()
  444. if err != nil {
  445. logger.Infof("ping db error: %v", err)
  446. return
  447. }
  448. logger.Infoln("Database connection successful")
  449. // Perform auto migration
  450. err = db.AutoMigrate(&AppConfigDbEntry{}, &UploadRecord{}, &PartUploadRecord{})
  451. if err != nil {
  452. logger.Infof("auto migrate error: %v", err)
  453. return
  454. }
  455. logger.Infoln("Auto migration completed")
  456. // Connect to MinIO
  457. minioClient, err = minio.New(util.ExpandShellStringInfallible(appInitCfg.Minio.Host), &minio.Options{
  458. Creds: credentials.NewStaticV4(appInitCfg.Minio.AccessKey, appInitCfg.Minio.Secret, ""),
  459. Secure: false,
  460. })
  461. if err == nil {
  462. bucketExists, err := minioClient.BucketExists(context.Background(), appInitCfg.Minio.Bucket)
  463. if err != nil {
  464. logger.Fatalf("Failed to check if bucket %s exists: %v", appInitCfg.Minio.Bucket, err)
  465. }
  466. if !bucketExists {
  467. logger.Fatalf("Bucket %s does not exist", appInitCfg.Minio.Bucket)
  468. }
  469. }
  470. if err != nil {
  471. logger.Fatalf("Failed to connect to MinIO: %v", err)
  472. }
  473. logger.Infoln("Connected to MinIO")
  474. // Connect to ClickHouse
  475. ckConn, err = clickhouse.Open(&clickhouse.Options{
  476. Addr: []string{appInitCfg.Stck.Host},
  477. })
  478. if err == nil {
  479. err = ckConn.Ping(context.Background())
  480. }
  481. if err != nil {
  482. logger.Fatalf("Failed to connect to ClickHouse: %v", err)
  483. }
  484. logger.Infoln("Connected to ClickHouse")
  485. // OK! Everything is ready now.
  486. // objUploadChan := make(chan string, 1024*256)
  487. objUploadChan := util.NewDChan[string](1024 * 16)
  488. // Start the main work
  489. logger.Infoln("Starting main worker...")
  490. gAppExitWaitGroup.Add(1)
  491. go func() {
  492. defer gAppExitWaitGroup.Done()
  493. main_worker(AppCtx{db, minioClient, ckConn}, objUploadChan)
  494. }()
  495. // Wait on signal.
  496. signalChan := make(chan os.Signal, 1)
  497. signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
  498. select {
  499. case <-signalChan:
  500. logger.Infof("received signal, stopping minio-into-stck")
  501. case <-programmaticQuitChan:
  502. logger.Infof("received programmatic quit signal, stopping minio-into-stck")
  503. }
  504. gAppQuitting = true
  505. // HACK: Notify the main worker to quit
  506. objUploadChan.In() <- ""
  507. // Close the NSQ producer and consumer
  508. gNsqProducer.Stop()
  509. gNsqConsumer.Stop()
  510. // Wait for the goroutines to exit
  511. gAppExitWaitGroup.Wait()
  512. logger.Infof("minio-into-stck stopped gracefully")
  513. }
  514. type AppCtx struct {
  515. db *gorm.DB
  516. minioClient *minio.Client
  517. ckConn driver.Conn
  518. }
  519. var gAppCfg *AppConfig
  520. type PartUploadArgs struct {
  521. StreamInfo *StreamMetadata
  522. StreamName string
  523. PartName string
  524. }
  525. func main_worker(app AppCtx, objUploadChan *util.DChan[string]) {
  526. ctx := context.Background()
  527. // Load config from DB
  528. appCfg, err := load_app_cfg_from_db(app.db)
  529. if err != nil {
  530. logger.Fatalf("Failed to load app config from DB: %v", err)
  531. }
  532. gAppCfg = &appCfg
  533. // Start the notification listener
  534. go func() {
  535. for {
  536. // Register the bucket notification listener
  537. logger.Infoln("Registering bucket notification listener...")
  538. notifys := app.minioClient.ListenBucketNotification(
  539. ctx, appInitCfg.Minio.Bucket, "", "", []string{string(notification.ObjectCreatedAll)})
  540. // Listen OK, start the full upload trigger to upload maybe missed files
  541. go trigger_full_upload(app, objUploadChan.In())
  542. for notifyInfo := range notifys {
  543. for _, record := range notifyInfo.Records {
  544. key := record.S3.Object.Key
  545. logger.Traceln("New object notification:", key)
  546. // Only care when `.zst` / `metadata.json` files are uploaded
  547. // keyParts := strings.Split(key, "/")
  548. // keyLastPart := keyParts[len(keyParts)-1]
  549. keyLastPart := filepath.Base(key)
  550. if keyLastPart != "metadata.json" && !strings.HasSuffix(keyLastPart, ".zst") {
  551. continue
  552. }
  553. // key = strings.Join(keyParts[:len(keyParts)-1], "/") + "/"
  554. key = filepath.Dir(key) + "/"
  555. // Remove fail counter so that the object can be retried instead of
  556. // being spuriously ignored
  557. mutexObjFailCounter.Lock()
  558. delete(objFailCounter, key)
  559. mutexObjFailCounter.Unlock()
  560. // Queue the object for upload
  561. objUploadChan.DelayedWrite(key, time.Duration(appInitCfg.Main.NotifyToUploadDelaySeconds)*time.Second)
  562. }
  563. if notifyInfo.Err != nil {
  564. logger.Errorf("Bucket notification listener error: %v", notifyInfo.Err)
  565. }
  566. }
  567. logger.Warnln("Bucket notification listener stopped unexpectedly, retrying in 5 seconds...")
  568. time.Sleep(5 * time.Second)
  569. // Clear fail counter
  570. mutexObjFailCounter.Lock()
  571. objFailCounter = make(map[string]int)
  572. mutexObjFailCounter.Unlock()
  573. }
  574. }()
  575. // Start the main loop (streams upload worker)
  576. for objToUpload := range objUploadChan.Out() {
  577. objUploadChan.MarkElemReadDone(objToUpload)
  578. if gAppQuitting {
  579. logger.Infof("Quitting, stopping main worker")
  580. return
  581. }
  582. if objToUpload == "" {
  583. continue
  584. }
  585. logger.Infoln("Checking stream object:", objToUpload)
  586. if object_is_blacklisted(objToUpload) {
  587. logger.Infof("Object `%s` is blacklisted, skipping", objToUpload)
  588. continue
  589. }
  590. mutexObjFailCounter.Lock()
  591. if objFailCounter[objToUpload] >= appInitCfg.Main.UploadRetryMaxTimes {
  592. logger.Warnf("Retried upload stream `%s` for too many %d times, give up", objToUpload, objFailCounter[objToUpload])
  593. delete(objFailCounter, objToUpload)
  594. mutexObjFailCounter.Unlock()
  595. continue
  596. }
  597. objFailCounter[objToUpload]++
  598. mutexObjFailCounter.Unlock()
  599. fullyUploaded, err := upload_one_stream(app, objToUpload)
  600. if err != nil {
  601. // Queue the object for retry
  602. logger.Warnf("Failed to upload stream `%s`: `%v`, retrying after %d seconds",
  603. objToUpload, err, appInitCfg.Main.FailedRetryDelaySeconds)
  604. objUploadChan.DelayedWrite(objToUpload, time.Duration(appInitCfg.Main.FailedRetryDelaySeconds)*time.Second)
  605. continue
  606. }
  607. if fullyUploaded {
  608. // Mark the stream as fully uploaded
  609. //err = app.db.Create(&UploadRecord{Key: objToUpload}).Error
  610. err = app.db.Where(UploadRecord{Key: objToUpload}).FirstOrCreate(&UploadRecord{}).Error
  611. if err != nil {
  612. logger.Warnf("Failed to mark stream %s as uploaded: %v", objToUpload, err)
  613. } else {
  614. // We can now remove the stream parts from the parts table
  615. err = app.db.Where("stream_name = ?", objToUpload).Delete(&PartUploadRecord{}).Error
  616. if err != nil {
  617. logger.Warnf("Failed to remove parts of stream %s from parts table: %v", objToUpload, err)
  618. }
  619. logger.Infof("Marked stream %s as fully uploaded", objToUpload)
  620. }
  621. // Remove entry from the fail counter
  622. mutexObjFailCounter.Lock()
  623. delete(objFailCounter, objToUpload)
  624. mutexObjFailCounter.Unlock()
  625. } else {
  626. // Queue the object for retry
  627. logger.Warnf("Stream %s is not fully uploaded, retrying after %d seconds",
  628. objToUpload, appInitCfg.Main.FailedRetryDelaySeconds)
  629. objUploadChan.DelayedWrite(objToUpload, time.Duration(appInitCfg.Main.FailedRetryDelaySeconds)*time.Second)
  630. }
  631. }
  632. }
  633. func trigger_full_upload(app AppCtx, objToUploadChan chan<- string) {
  634. // // Upload all files in the directory
  635. // options := minio.ListObjectsOptions{
  636. // Recursive: false,
  637. // }
  638. // objectsCh := app.minioClient.ListObjects(context.Background(), appInitCfg.Minio.Bucket, options)
  639. // // NOTE: Streams in minio are organized as `<date>/<stream_name>/`.
  640. // for objInfo := range objectsCh {
  641. // if objInfo.Err != nil {
  642. // logger.Warnf("Error listing bucket `%s` objects: %v", appInitCfg.Minio.Bucket, objInfo.Err)
  643. // continue
  644. // }
  645. // key := objInfo.Key
  646. // if strings.HasSuffix(key, "/") {
  647. // // Is a directory (<date>), go deeper into it
  648. // options := minio.ListObjectsOptions{
  649. // Prefix: key,
  650. // Recursive: false,
  651. // }
  652. // for objInfo := range app.minioClient.ListObjects(context.Background(), appInitCfg.Minio.Bucket, options) {
  653. // if objInfo.Err != nil {
  654. // logger.Warnf("Error listing bucket `%s` folder `%s` objects: %v", appInitCfg.Minio.Bucket, key, objInfo.Err)
  655. // continue
  656. // }
  657. // key := objInfo.Key
  658. // if strings.HasSuffix(key, "/") {
  659. // // Is a directory, should be a stream then
  660. // uploaded := object_already_uploaded(app, key)
  661. // if !uploaded {
  662. // objToUploadChan <- key
  663. // }
  664. // }
  665. // }
  666. // }
  667. // }
  668. // WARN: Slow!!!
  669. // options := minio.ListObjectsOptions{
  670. // Recursive: true,
  671. // }
  672. options := minio.ListObjectsOptions{
  673. Recursive: false,
  674. }
  675. objectsCh := app.minioClient.ListObjects(context.Background(), appInitCfg.Minio.Bucket, options)
  676. // NOTE: Streams in minio are organized as `<date>/<stream_name>/`.
  677. for objInfo := range objectsCh {
  678. if objInfo.Err != nil {
  679. logger.Warnf("Error listing bucket `%s` objects: %v", appInitCfg.Minio.Bucket, objInfo.Err)
  680. continue
  681. }
  682. key := objInfo.Key
  683. // // If it's a directory with `metadata.json` file, it should be a stream
  684. // if strings.HasSuffix(key, "/metadata.json") {
  685. // streamName := strings.TrimSuffix(key, "metadata.json")
  686. // uploaded := object_already_uploaded(app, streamName)
  687. // if !uploaded {
  688. // objToUploadChan <- streamName
  689. // }
  690. // }
  691. // Scan through all subdirectories
  692. if strings.HasSuffix(key, "/") {
  693. exists, err := util.MinioObjectExists(app.minioClient, appInitCfg.Minio.Bucket, key+"metadata.json")
  694. if err != nil {
  695. logger.Warnf("Error checking if object `%s` exists: %v", key+"metadata.json", err)
  696. continue
  697. }
  698. if exists {
  699. // Go ahead and upload the stream
  700. uploaded := object_already_uploaded(app, key)
  701. if !uploaded {
  702. objToUploadChan <- key
  703. }
  704. } else {
  705. // Check inner directories
  706. options := minio.ListObjectsOptions{
  707. Prefix: key,
  708. Recursive: false,
  709. }
  710. for objInfo := range app.minioClient.ListObjects(context.Background(), appInitCfg.Minio.Bucket, options) {
  711. if objInfo.Err != nil {
  712. logger.Warnf("Error listing bucket `%s` folder `%s` objects: %v", appInitCfg.Minio.Bucket, key, objInfo.Err)
  713. continue
  714. }
  715. key := objInfo.Key
  716. if strings.HasSuffix(key, "/") {
  717. // Is a directory, should be a stream then
  718. uploaded := object_already_uploaded(app, key)
  719. if !uploaded {
  720. objToUploadChan <- key
  721. }
  722. }
  723. }
  724. }
  725. }
  726. }
  727. logger.Infoln("Full upload trigger finished")
  728. }
  729. func upload_one_stream(app AppCtx, streamName string) (fullyUploaded bool, err error) {
  730. type StreamObjectUploadStatistics struct {
  731. StartTime time.Time
  732. EndTime time.Time
  733. PartName string
  734. UpState string // "repeated" / "ok" / "fail"
  735. Msg string // "error message"
  736. }
  737. type StreamUploadStatistics struct {
  738. StartTime time.Time
  739. EndTime time.Time
  740. StreamName string
  741. MetaPointsCount int64
  742. MetaPartsCount int64
  743. Objects map[string]StreamObjectUploadStatistics
  744. Msg string
  745. }
  746. streamStats := StreamUploadStatistics{
  747. StartTime: time.Now(),
  748. StreamName: streamName,
  749. Objects: make(map[string]StreamObjectUploadStatistics),
  750. }
  751. logger.Infof("Going to upload stream `%s`", streamName)
  752. if object_already_uploaded(app, streamName) {
  753. logger.Infof("Stream `%s` is already uploaded", streamName)
  754. return true, nil
  755. }
  756. defer func() {
  757. streamStats.EndTime = time.Now()
  758. repeatedCount := int64(0)
  759. okCount := int64(0)
  760. failCount := int64(0)
  761. totalCount := int64(0)
  762. objDetailsMsg := ""
  763. for _, obj := range streamStats.Objects {
  764. totalCount++
  765. if obj.UpState == "repeated" {
  766. repeatedCount++
  767. objDetailsMsg += fmt.Sprintf("\n{%s: REPEATED (t: %v ~ %v, duration: %v)}",
  768. obj.PartName, util.FmtMyTime(obj.StartTime), util.FmtMyTime(obj.EndTime), obj.EndTime.Sub(obj.StartTime))
  769. } else if obj.UpState == "ok" {
  770. okCount++
  771. objDetailsMsg += fmt.Sprintf("\n{%s: OK (t: %v ~ %v, duration: %v)}",
  772. obj.PartName, util.FmtMyTime(obj.StartTime), util.FmtMyTime(obj.EndTime), obj.EndTime.Sub(obj.StartTime))
  773. } else {
  774. failCount++
  775. objDetailsMsg += fmt.Sprintf("\n{%s: FAIL (Reason: %s) (t: %v ~ %v, duration: %v)}",
  776. obj.PartName, obj.Msg, util.FmtMyTime(obj.StartTime), util.FmtMyTime(obj.EndTime), obj.EndTime.Sub(obj.StartTime))
  777. }
  778. }
  779. statLogger.Infof("Upload stream `%s` took %v (%v ~ %v); is fully uploaded: %v; metadata-reported part count=%d; "+
  780. "repeated=%d, ok=%d, fail=%d, total=%d; details:%s",
  781. streamName, streamStats.EndTime.Sub(streamStats.StartTime),
  782. util.FmtMyTime(streamStats.StartTime), util.FmtMyTime(streamStats.EndTime),
  783. fullyUploaded, streamStats.MetaPartsCount,
  784. repeatedCount, okCount, failCount, totalCount, objDetailsMsg)
  785. // Send upload status changed message
  786. msg := smsg.MakeStreamInsertToStckStatusChangedMsg(gMachineID, streamStats.StartTime.UnixNano(),
  787. streamStats.StreamName, streamStats.MetaPointsCount, streamStats.MetaPartsCount,
  788. okCount, failCount, repeatedCount)
  789. err := PublishMessage(&msg)
  790. if err != nil {
  791. logger.Errorf("send stream insert to stck status changed message error: %s", err)
  792. }
  793. }()
  794. fullyUploaded = true
  795. // Get stream metadata
  796. streamInfo, err := get_stream_metadata(app, streamName)
  797. if err != nil {
  798. // Cannot continue without metadata
  799. return false, err
  800. }
  801. streamStats.MetaPointsCount = streamInfo.TotalPoints
  802. streamStats.MetaPartsCount = streamInfo.PartsCount
  803. if streamInfo.PartsCount == 0 {
  804. // Edge device didn't finish uploading the stream yet
  805. logger.Debugf("Marking stream `%s` as not fully uploaded, reason: parts_count=0", streamName)
  806. fullyUploaded = false
  807. }
  808. hasSomething := false
  809. // Upload parts
  810. streamObjPath := streamName
  811. options := minio.ListObjectsOptions{
  812. Prefix: streamObjPath,
  813. Recursive: false,
  814. }
  815. logger.Tracef("Listing minio objects in `%s`, bucket `%s`", streamObjPath, appInitCfg.Minio.Bucket)
  816. // hasSomething := false
  817. hasMetadata := false
  818. var wg sync.WaitGroup
  819. var mt sync.Mutex
  820. pool := tunny.NewFunc(appInitCfg.Main.ParallelUploadThreadsCount, func(payload interface{}) interface{} {
  821. packed := payload.(util.Pair[string, PartUploadArgs])
  822. partName := packed.First
  823. partInfo := packed.Second
  824. objStat := StreamObjectUploadStatistics{
  825. StartTime: time.Now(),
  826. PartName: partName,
  827. }
  828. logger.Infof("Uploading part `%s` (total %d) of stream `%s`, total_points=%d",
  829. partInfo.PartName, partInfo.StreamInfo.PartsCount,
  830. partInfo.StreamName, partInfo.StreamInfo.TotalPoints)
  831. err := upload_one_part(app, partInfo.StreamInfo, partInfo.StreamName, partInfo.PartName)
  832. if err != nil {
  833. objStat.EndTime = time.Now()
  834. objStat.UpState = "fail"
  835. objStat.Msg = err.Error()
  836. logger.Warnf("Failed to upload part `%s` of stream `%s` (took %v): %v", partInfo.PartName, partInfo.StreamName,
  837. objStat.EndTime.Sub(objStat.StartTime), err)
  838. fullyUploaded = false
  839. } else {
  840. // Mark the part as uploaded
  841. //err = app.db.Create(&PartUploadRecord{StreamName: partInfo.StreamName, PartName: partInfo.PartName}).Error
  842. part := PartUploadRecord{StreamName: partInfo.StreamName, PartName: partName}
  843. err = app.db.Where(part).FirstOrCreate(&PartUploadRecord{}).Error
  844. if err != nil {
  845. logger.Warnf("Failed to mark part `%s` of stream `%s` as uploaded: %v", partInfo.PartName, partInfo.StreamName, err)
  846. }
  847. objStat.EndTime = time.Now()
  848. objStat.UpState = "ok"
  849. logger.Infof("Uploaded part `%s` of stream `%s`, took %v", partInfo.PartName, streamName,
  850. objStat.EndTime.Sub(objStat.StartTime))
  851. }
  852. func() {
  853. mt.Lock()
  854. defer mt.Unlock()
  855. streamStats.Objects[partInfo.PartName] = objStat
  856. }()
  857. partNum, err := util.ExtractNumberFromString(partName)
  858. if err != nil {
  859. // Not a part file? Skip
  860. return nil
  861. }
  862. status := "success"
  863. if objStat.UpState != "ok" {
  864. status = "failed"
  865. }
  866. msg := smsg.MakePartInsertToStckStatusChangedMsg(gMachineID, objStat.StartTime.UnixNano(),
  867. streamName, partNum, streamStats.MetaPointsCount, status, objStat.Msg)
  868. err = PublishMessage(&msg)
  869. if err != nil {
  870. logger.Errorf("send part insert to stck status changed message error: %s", err)
  871. }
  872. return nil
  873. })
  874. for objInfo := range app.minioClient.ListObjects(context.Background(), appInitCfg.Minio.Bucket, options) {
  875. if objInfo.Err != nil {
  876. return false, objInfo.Err
  877. }
  878. if gAppQuitting {
  879. logger.Infof("Quitting, stopping uploading one stream")
  880. return false, nil
  881. }
  882. logger.Tracef("Checking minio file `%s`", objInfo.Key)
  883. if strings.HasSuffix(objInfo.Key, "/") {
  884. continue
  885. }
  886. partName := filepath.Base(objInfo.Key)
  887. if partName == "metadata.json" {
  888. hasMetadata = true
  889. continue
  890. }
  891. hasSomething = true
  892. objStat := StreamObjectUploadStatistics{
  893. StartTime: time.Now(),
  894. PartName: partName,
  895. }
  896. if part_already_uploaded(app, streamName, partName) {
  897. objStat.EndTime = time.Now()
  898. objStat.UpState = "repeated"
  899. func() {
  900. mt.Lock()
  901. defer mt.Unlock()
  902. streamStats.Objects[objInfo.Key] = objStat
  903. }()
  904. logger.Infof("Part `%s` of stream `%s` is already uploaded", objInfo.Key, streamName)
  905. continue
  906. }
  907. if fullyUploaded {
  908. fullyUploaded = false
  909. logger.Debugf("Marking stream `%s` as not fully uploaded, reason: part `%s` not uploaded", streamName, objInfo.Key)
  910. }
  911. // Do the parts upload
  912. partInfo := PartUploadArgs{StreamInfo: streamInfo, StreamName: streamName, PartName: objInfo.Key}
  913. wg.Add(1)
  914. go func() {
  915. defer wg.Done()
  916. pool.Process(util.Pair[string, PartUploadArgs]{First: partName, Second: partInfo})
  917. }()
  918. }
  919. wg.Wait()
  920. pool.Close()
  921. if !hasMetadata {
  922. logger.Warnf("Stream `%s` has no metadata file, will retry later", streamName)
  923. fullyUploaded = false
  924. }
  925. if !hasSomething {
  926. if streamInfo.PartsCount != 0 {
  927. logger.Errorf("Stream `%s` has no parts, but claims to have %d parts, %d points???",
  928. streamName, streamInfo.PartsCount, streamInfo.TotalPoints)
  929. } else {
  930. logger.Warnf("Stream `%s` has no parts in minio, will retry later", streamName)
  931. }
  932. fullyUploaded = false
  933. }
  934. return fullyUploaded, nil
  935. }
  936. func upload_one_part(app AppCtx, streamInfo *StreamMetadata, streamName string, partObjPath string) (err error) {
  937. partName := filepath.Base(partObjPath)
  938. if part_already_uploaded(app, streamName, partName) {
  939. return nil
  940. }
  941. dryRun := false
  942. if !dryRun {
  943. // Get the part data from MinIO
  944. obj, err := app.minioClient.GetObject(context.Background(), appInitCfg.Minio.Bucket, partObjPath,
  945. minio.GetObjectOptions{})
  946. if err != nil {
  947. return fmt.Errorf("failed to get part `%s` data: %w", partObjPath, err)
  948. }
  949. defer obj.Close()
  950. // Read the part data
  951. var partDataBuf = new(bytes.Buffer)
  952. _, err = partDataBuf.ReadFrom(obj)
  953. if err != nil {
  954. return fmt.Errorf("failed to read part `%s` data: %w", partObjPath, err)
  955. }
  956. // Process the part data
  957. partData, err := util.DecompressZstdBuffer(partDataBuf.Bytes())
  958. if err != nil {
  959. return fmt.Errorf("failed to decompress part `%s` data: %w", partObjPath, err)
  960. }
  961. // Use regex to extract the part index from the part name
  962. partIndex := int64(0)
  963. {
  964. re := regexp.MustCompile(`part_(\d+)\.zst`)
  965. matches := re.FindStringSubmatch(partObjPath)
  966. if len(matches) != 2 {
  967. return fmt.Errorf("failed to extract part index from part name `%s`", partObjPath)
  968. }
  969. partIndex, err = strconv.ParseInt(matches[1], 10, 64)
  970. if err != nil {
  971. return fmt.Errorf("failed to convert part index `%s` to integer: %w", matches[1], err)
  972. }
  973. // Check if the part index is correct
  974. if partIndex < 0 || (streamInfo.PartsCount != 0 && partIndex >= streamInfo.PartsCount) {
  975. return fmt.Errorf("part `%s` index out of bounds: %d / %d", partObjPath, partIndex, streamInfo.PartsCount)
  976. }
  977. // Check if the part data size is correct
  978. if streamInfo.PartsCount != 0 {
  979. left := int64(len(partData))
  980. if partIndex < streamInfo.PartsCount-1 {
  981. right := streamInfo.PointsPerPart * 8
  982. if left != right {
  983. return fmt.Errorf("part `%s` data size mismatch: %d versus %d", partObjPath, left, right)
  984. }
  985. } else if partIndex == streamInfo.PartsCount-1 {
  986. right := (streamInfo.TotalPoints % streamInfo.PointsPerPart) * 8
  987. if right == 0 {
  988. right = streamInfo.PointsPerPart * 8
  989. }
  990. if left != right {
  991. return fmt.Errorf("part `%s` data size mismatch: %d versus %d", partObjPath, left, right)
  992. }
  993. } else {
  994. return fmt.Errorf("part `%s` index out of bounds: %d", partObjPath, partIndex)
  995. }
  996. }
  997. }
  998. partPointsCount := len(partData) / 8
  999. // Insert the part data into ClickHouse
  1000. customInsertFn := func() error {
  1001. ctx := context.Background()
  1002. sql := fmt.Sprintf("INSERT INTO %s (metric_name, point_name, tags, value, nanoseconds) "+
  1003. "SELECT %s, %s, %s, col1, col2 FROM input('col1 Float64, col2 Int64') FORMAT NATIVE",
  1004. appInitCfg.Stck.Table,
  1005. util.ToSqlLiteral(streamInfo.MetricName), util.ToSqlLiteral(streamInfo.Name), "[]")
  1006. c, err := ch.Dial(ctx, ch.Options{Address: appInitCfg.Stck.Host})
  1007. if err != nil {
  1008. return fmt.Errorf("failed to connect to stck: %w", err)
  1009. }
  1010. defer c.Close()
  1011. /*
  1012. ┌─name────────┬─type────────────────────┬─default_type─┬─default_expression─┬─comment───────────────────┬─codec_expression─┬─ttl_expression─┐
  1013. │ metric_name │ LowCardinality(String) │ │ │ Metric name │ │ │
  1014. │ point_name │ LowCardinality(String) │ │ │ Point name │ │ │
  1015. │ tags │ Map(String, String) │ │ │ Point tags │ │ │
  1016. │ value │ Float64 │ │ │ Point value │ │ │
  1017. │ nanoseconds │ Int64 │ │ │ Point time in nanoseconds │ DoubleDelta, LZ4 │ │
  1018. └─────────────┴─────────────────────────┴──────────────┴────────────────────┴───────────────────────────┴──────────────────┴────────────────┘
  1019. */
  1020. logger.Debugf("Going to insert %d points for part `%s` of stream `%s`", partPointsCount, partObjPath, streamName)
  1021. colValue := make(proto.ColFloat64, 0, partPointsCount)
  1022. colNanoseconds := make(proto.ColInt64, 0, partPointsCount)
  1023. for i := 0; i < partPointsCount; i++ {
  1024. value := math.Float64frombits(binary.LittleEndian.Uint64(partData[i*8 : (i+1)*8]))
  1025. // NOTE: TimestampOffset is in milliseconds, need to convert to nanoseconds
  1026. nanoseconds := streamInfo.TimestampOffset * 1e6
  1027. nanoseconds += int64(partIndex) * int64(streamInfo.PointsPerPart) * streamInfo.Interval
  1028. nanoseconds += int64(i) * streamInfo.Interval
  1029. colValue = append(colValue, value)
  1030. colNanoseconds = append(colNanoseconds, nanoseconds)
  1031. }
  1032. input := proto.Input{
  1033. {Name: "col1", Data: &colValue},
  1034. {Name: "col2", Data: &colNanoseconds},
  1035. }
  1036. err = c.Do(ctx, ch.Query{
  1037. Body: sql,
  1038. Input: input,
  1039. // Settings: []ch.Setting{
  1040. // ch.SettingInt("max_insert_threads", 2),
  1041. // },
  1042. })
  1043. if err != nil {
  1044. return fmt.Errorf("failed to insert part into stck: %w", err)
  1045. }
  1046. logger.Debugf("Inserted %d points for part `%s` of stream `%s`", partPointsCount, partObjPath, streamName)
  1047. return nil
  1048. }
  1049. err = customInsertFn()
  1050. if err != nil {
  1051. return fmt.Errorf("failed to insert part `%s` data into stck: %w", partObjPath, err)
  1052. }
  1053. // We made progress, reset the fail counter
  1054. mutexObjFailCounter.Lock()
  1055. objFailCounter[streamName] = 0
  1056. mutexObjFailCounter.Unlock()
  1057. }
  1058. return nil
  1059. }
  1060. func load_app_cfg_from_db(db *gorm.DB) (AppConfig, error) {
  1061. var cfg AppConfig
  1062. err := db.AutoMigrate(&AppConfigDbEntry{})
  1063. if err != nil {
  1064. return cfg, err
  1065. }
  1066. //db.Create(&AppConfigDbEntry{Key: "ImportBlacklist", Value: `[]`})
  1067. var dbEntry AppConfigDbEntry
  1068. result := db.Where(AppConfigDbEntry{Key: "ImportBlacklist"}).Attrs(AppConfigDbEntry{Value: `[]`}).FirstOrCreate(&dbEntry)
  1069. if result.Error != nil {
  1070. //if errors.Is(result.Error, gorm.ErrRecordNotFound) {
  1071. // dbEntry.Value = `[]`
  1072. //} else {
  1073. // return cfg, result.Error
  1074. //}
  1075. return cfg, result.Error
  1076. }
  1077. err = json.Unmarshal([]byte(dbEntry.Value), &cfg.ImportBlacklist)
  1078. if err != nil {
  1079. return cfg, err
  1080. }
  1081. return cfg, nil
  1082. }
  1083. func object_already_uploaded(app AppCtx, key string) bool {
  1084. var record UploadRecord
  1085. result := app.db.First(&record, "key", key)
  1086. return result.Error == nil
  1087. }
  1088. func object_is_blacklisted(key string) bool {
  1089. for _, regexPattern := range gAppCfg.ImportBlacklist {
  1090. // TODO: Cache compiled regex patterns
  1091. if matched, _ := regexp.MatchString(regexPattern, key); matched {
  1092. return true
  1093. }
  1094. }
  1095. return false
  1096. }
  1097. func part_already_uploaded(app AppCtx, streamName string, partName string) bool {
  1098. var record PartUploadRecord
  1099. result := app.db.First(&record, "stream_name = ? AND part_name = ?", streamName, partName)
  1100. return result.Error == nil
  1101. }
  1102. func get_stream_metadata(app AppCtx, streamName string) (*StreamMetadata, error) {
  1103. // Get the stream metadata from MinIO
  1104. metadataObjPath := streamName + "metadata.json"
  1105. obj, err := app.minioClient.GetObject(context.Background(), appInitCfg.Minio.Bucket,
  1106. metadataObjPath, minio.GetObjectOptions{})
  1107. if err != nil {
  1108. return nil, fmt.Errorf("failed to get stream metadata: %w", err)
  1109. }
  1110. defer obj.Close()
  1111. var streamInfo StreamMetadata
  1112. err = json.NewDecoder(obj).Decode(&streamInfo)
  1113. if err != nil {
  1114. return nil, fmt.Errorf("failed to decode stream metadata: %w", err)
  1115. }
  1116. return &streamInfo, nil
  1117. }