main.go 39 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199
  1. package main
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/binary"
  6. "encoding/json"
  7. "example/minio-into-stck/util"
  8. "fmt"
  9. "os"
  10. "os/exec"
  11. "os/signal"
  12. "path/filepath"
  13. "stck/stck-nsq-msg"
  14. smsg "stck/stck-nsq-msg/msg"
  15. "sync"
  16. "syscall"
  17. "io"
  18. // "log"
  19. "math"
  20. // "os"
  21. "regexp"
  22. "strconv"
  23. "strings"
  24. "time"
  25. "github.com/ClickHouse/ch-go"
  26. "github.com/ClickHouse/ch-go/proto"
  27. "github.com/ClickHouse/clickhouse-go/v2"
  28. "github.com/ClickHouse/clickhouse-go/v2/lib/driver"
  29. "github.com/fsnotify/fsnotify"
  30. "github.com/minio/minio-go/v7"
  31. "github.com/minio/minio-go/v7/pkg/credentials"
  32. "github.com/minio/minio-go/v7/pkg/notification"
  33. "github.com/natefinch/lumberjack"
  34. "github.com/nsqio/go-nsq"
  35. "github.com/sirupsen/logrus"
  36. "github.com/spf13/viper"
  37. "gorm.io/driver/mysql"
  38. "gorm.io/gorm"
  39. gorm_logger "gorm.io/gorm/logger"
  40. )
  41. // AppInitConfig is the initial configuration of the application, loaded from the config file.
  42. type AppInitConfig struct {
  43. Mysql struct {
  44. User string `mapstructure:"user"`
  45. Password string `mapstructure:"password"`
  46. Host string `mapstructure:"host"`
  47. Database string `mapstructure:"database"`
  48. } `mapstructure:"mysql"`
  49. Minio struct {
  50. AccessKey string `mapstructure:"accessKey"`
  51. Secret string `mapstructure:"secret"`
  52. Bucket string `mapstructure:"bucket"`
  53. Host string `mapstructure:"host"`
  54. } `mapstructure:"minio"`
  55. Stck struct {
  56. Host string `mapstructure:"host"`
  57. Table string `mapstructure:"table"`
  58. } `mapstructure:"stck"`
  59. Nsq struct {
  60. TcpAddr string `mapstructure:"tcpAddr"`
  61. LookupdHttpAddr string `mapstructure:"lookupdHttpAddr"`
  62. } `mapstructure:"nsq"`
  63. Main struct {
  64. UploadRetryMaxTimes int `mapstructure:"uploadRetryMaxTimes"`
  65. FailedRetryDelaySeconds int `mapstructure:"failedRetryDelaySeconds"`
  66. NotifyToUploadDelaySeconds int `mapstructure:"notifyToUploadDelaySeconds"`
  67. } `mapstructure:"main"`
  68. }
  69. type AppConfig struct {
  70. // List of name regex patterns to exclude from import.
  71. ImportBlacklist []string
  72. }
  73. type AppConfigDbEntry struct {
  74. Key string `gorm:"primaryKey"`
  75. Value string
  76. }
  77. // TODO: 插入时先创建行,插入完后更新插入耗时
  78. // UploadRecord Represents a record of an uploaded stream. (A major upload task.)
  79. type UploadRecord struct {
  80. Key string `gorm:"primaryKey"`
  81. CreatedAt time.Time
  82. }
  83. // PartUploadRecord Represents a record of an uploaded part of a stream. (A minor upload task.)
  84. type PartUploadRecord struct {
  85. StreamName string `gorm:"primaryKey"`
  86. PartName string `gorm:"primaryKey"`
  87. CreatedAt time.Time
  88. }
  89. type StreamMetadata struct {
  90. MetricName string `json:"metric_name"`
  91. Name string `json:"name"`
  92. TimestampOffset int64 `json:"timestamp_offset"`
  93. Interval int64 `json:"interval"`
  94. PartsCount int64 `json:"parts_count"`
  95. PointsPerPart int64 `json:"points_per_part"`
  96. TotalPoints int64 `json:"total_points"`
  97. }
  98. var gLocalIpAddress string
  99. var gMachineID string
  100. var gNsqProducer *nsq.Producer
  101. var gNsqConsumer *nsq.Consumer
  102. var gAppStartTime time.Time = time.Now()
  103. var programmaticQuitChan chan struct{} = make(chan struct{}, 1)
  104. var gAppQuitting = false
  105. var gAppExitWaitGroup sync.WaitGroup
  106. var buildtime string
  107. func MakeHeartbeatMsg() *smsg.DeviceHeartbeatMsg {
  108. m := smsg.MakeDeviceHeartbeatMsg(gMachineID, gAppStartTime.UnixNano(), time.Now().UnixNano(),
  109. gLocalIpAddress, "minio-into-stck", "0.1.0+dev."+buildtime)
  110. return &m
  111. }
  112. func PublishMessage(msg stcknsqmsg.StckNsqMsgVariant) error {
  113. payload, err := stcknsqmsg.ToStckNsqMsgString(msg)
  114. if err != nil {
  115. return fmt.Errorf("marshal message error: %w", err)
  116. }
  117. err = gNsqProducer.Publish(smsg.ServerDoNotifyTopic, []byte(payload))
  118. if err != nil {
  119. return fmt.Errorf("publish message error: %w", err)
  120. }
  121. return nil
  122. }
  123. func initNsq() {
  124. ip, err := stcknsqmsg.GetLocalIP()
  125. if err != nil {
  126. logger.Warnf("GetLocalIP error: %s", err)
  127. } else {
  128. gLocalIpAddress = ip.String()
  129. logger.Infof("Local IP: %s", gLocalIpAddress)
  130. }
  131. gMachineID = stcknsqmsg.MakeUniqueMachineID()
  132. logger.Infof("Machine ID: %s", gMachineID)
  133. fmt.Printf("IP: %s, Machine ID: %s\n", gLocalIpAddress, gMachineID)
  134. // Connect to NSQ
  135. nsqConfig := nsq.NewConfig()
  136. gNsqProducer, err = nsq.NewProducer(appInitCfg.Nsq.TcpAddr, nsqConfig)
  137. if err != nil {
  138. logger.Fatalf("NSQ Producer init error: %s", err)
  139. }
  140. gNsqConsumer, err = nsq.NewConsumer(smsg.ClientDoActionTopic, gMachineID, nsqConfig)
  141. if err != nil {
  142. logger.Fatalf("NSQ Consumer init error: %s", err)
  143. }
  144. gNsqConsumer.AddConcurrentHandlers(nsq.HandlerFunc(func(message *nsq.Message) error {
  145. logger.Debugf("NSQ Consumer received message: %s", message.Body)
  146. // Parse the message
  147. recvTime := message.Timestamp
  148. msg, err := stcknsqmsg.FromString(string(message.Body))
  149. if err != nil {
  150. logger.Errorf("NSQ Consumer unmarshal message error: %s", err)
  151. return nil
  152. }
  153. // Process the message
  154. switch data := msg.Data.(type) {
  155. case *smsg.DevicePingMsg:
  156. if !stcknsqmsg.DeviceIdMatches(data.DeviceID, gMachineID) {
  157. break
  158. }
  159. // Write pong
  160. pongMsg := smsg.MakeDevicePongMsg(gMachineID, recvTime)
  161. err := PublishMessage(&pongMsg)
  162. if err != nil {
  163. logger.Errorf("send pong error: %s", err)
  164. }
  165. case *smsg.RequestDeviceExecuteShellScriptMsg:
  166. if !stcknsqmsg.DeviceIdMatches(data.DeviceID, gMachineID) {
  167. break
  168. }
  169. // Execute the shell script
  170. resp := smsg.MakeDeviceActionDoneMsg(gMachineID, data.MsgType(), "", recvTime, -1, "")
  171. result, err := func(data *smsg.RequestDeviceExecuteShellScriptMsg) (string, error) {
  172. cmd := exec.Command("bash", "-c", data.Script)
  173. var out bytes.Buffer
  174. cmd.Stdout = &out
  175. cmd.Stderr = &out
  176. err := cmd.Run()
  177. return out.String(), err
  178. }(data)
  179. if err != nil {
  180. errMsg := fmt.Sprintf("execute shell script error:\n%s\n\noutput:\n%s", err, result)
  181. // Write error message
  182. resp.Status = -1
  183. resp.Msg = errMsg
  184. } else {
  185. // Write output
  186. resp.Status = 0
  187. resp.Msg = result
  188. }
  189. err = PublishMessage(&resp)
  190. if err != nil {
  191. logger.Errorf("send action done error: %s", err)
  192. }
  193. case *smsg.RequestDeviceUpdateMsg:
  194. if !stcknsqmsg.DeviceIdMatches(data.DeviceID, gMachineID) {
  195. break
  196. }
  197. if data.ServiceName != "minio-into-stck" {
  198. break
  199. }
  200. resp := smsg.MakeDeviceActionDoneMsg(gMachineID, data.MsgType(), "", recvTime, 0, "")
  201. result, err := func(data *smsg.RequestDeviceUpdateMsg) (string, error) {
  202. // Download the update file to /tmp
  203. downloadPath := "/tmp/minio-into-stck-updater"
  204. updateScriptPath := filepath.Join(downloadPath, "update.sh")
  205. var out bytes.Buffer
  206. err := os.RemoveAll(downloadPath)
  207. if err != nil {
  208. return "", err
  209. }
  210. err = os.MkdirAll(downloadPath, 0777)
  211. if err != nil {
  212. return "", err
  213. }
  214. updateScriptContent := fmt.Sprintf(`#!/bin/bash
  215. set -e
  216. cd %s
  217. wget --tries=3 -nv -O installer.tar.gz %s
  218. tar -xzf installer.tar.gz
  219. cd minio-into-stck-installer
  220. ./replacing-update.sh
  221. `, downloadPath, data.ServiceBinaryURL)
  222. err = os.WriteFile(updateScriptPath, []byte(updateScriptContent), 0777)
  223. if err != nil {
  224. return "", err
  225. }
  226. // Execute the update script
  227. cmd := exec.Command("bash", "-c", updateScriptPath)
  228. cmd.Stdout = &out
  229. cmd.Stderr = &out
  230. err = cmd.Run()
  231. return out.String(), err
  232. }(data)
  233. if err != nil {
  234. errMsg := fmt.Sprintf("execute update process error:\n%s\n\noutput:\n%s", err, result)
  235. // Write error message
  236. resp.Status = -1
  237. resp.Msg = errMsg
  238. } else {
  239. // Write output
  240. resp.Status = 0
  241. resp.Msg = "executed update process successfully\n\noutput:\n" + result
  242. }
  243. err = PublishMessage(&resp)
  244. if err != nil {
  245. logger.Errorf("send action done error: %s", err)
  246. }
  247. case *smsg.ForceDeviceRebootMsg:
  248. if !stcknsqmsg.DeviceIdMatches(data.DeviceID, gMachineID) {
  249. break
  250. }
  251. programmaticQuitChan <- struct{}{}
  252. case *smsg.ForceDeviceSendHeartbeatMsg:
  253. if !stcknsqmsg.DeviceIdMatches(data.DeviceID, gMachineID) {
  254. break
  255. }
  256. // Send heartbeat
  257. heartBeatMsg := MakeHeartbeatMsg()
  258. err := PublishMessage(heartBeatMsg)
  259. if err != nil {
  260. logger.Errorf("send heartbeat error: %s", err)
  261. }
  262. case *smsg.RequestDeviceUploadLogsToMinioMsg:
  263. if !stcknsqmsg.DeviceIdMatches(data.DeviceID, gMachineID) {
  264. break
  265. }
  266. // Upload logs to MinIO
  267. resp := smsg.MakeDeviceActionDoneMsg(gMachineID, data.MsgType(), "", recvTime, 0, "")
  268. minioCreds := credentials.NewStaticV4(appInitCfg.Minio.AccessKey, appInitCfg.Minio.Secret, "")
  269. minioClient, err := minio.New(appInitCfg.Minio.Host, &minio.Options{
  270. Creds: minioCreds,
  271. Secure: false,
  272. })
  273. if err != nil {
  274. logger.Errorf("MinIO Client init error: %s", err)
  275. resp.Status = -1
  276. resp.Msg += fmt.Sprintf("MinIO Client init error: %s\n", err)
  277. } else {
  278. if util.FileExists("./logs") {
  279. err = util.MinioUploadFolder(minioClient, data.RemoteBucket,
  280. filepath.Join(data.RemoteBasePath, gMachineID, "logs"), "logs")
  281. if err != nil {
  282. logger.Errorf("upload logs to MinIO error: %s", err)
  283. resp.Status = -1
  284. resp.Msg += fmt.Sprintf("upload logs to MinIO error: %s\n", err)
  285. }
  286. }
  287. if util.FileExists("./log") {
  288. err = util.MinioUploadFolder(minioClient, data.RemoteBucket,
  289. filepath.Join(data.RemoteBasePath, gMachineID, "log"), "log")
  290. if err != nil {
  291. logger.Errorf("upload log to MinIO error: %s", err)
  292. resp.Status = -1
  293. resp.Msg += fmt.Sprintf("upload log to MinIO error: %s\n", err)
  294. }
  295. }
  296. }
  297. err = PublishMessage(&resp)
  298. if err != nil {
  299. logger.Errorf("send action done error: %s", err)
  300. }
  301. default:
  302. logger.Debugf("NSQ Consumer ignored unknown or uninteresting message: %v", msg)
  303. }
  304. // Notify NSQ that the message is processed successfully
  305. return nil
  306. }), 1)
  307. // err = gNsqConsumer.ConnectToNSQLookupd(gAppConfig.Nsq.LookupdHttpAddr)
  308. err = gNsqConsumer.ConnectToNSQD(appInitCfg.Nsq.TcpAddr)
  309. if err != nil {
  310. logger.Fatalf("NSQ Consumer connect error: %s", err)
  311. }
  312. }
  313. var appInitCfg *AppInitConfig = &AppInitConfig{}
  314. func initLoadConfig() {
  315. viper.SetDefault("main.uploadRetryMaxTimes", 20)
  316. viper.SetDefault("main.failedRetryDelaySeconds", 5)
  317. viper.SetDefault("main.notifyToUploadDelaySeconds", 1)
  318. viper.SetConfigFile("./config/application.yaml")
  319. viper.WatchConfig()
  320. viper.OnConfigChange(func(e fsnotify.Event) {
  321. logger.Infoln("Config file changed:", e.Name)
  322. var newAppInitConfig AppInitConfig
  323. err := viper.Unmarshal(&newAppInitConfig)
  324. if err != nil {
  325. logger.Infoln("Failed to unmarshal config:", err)
  326. return
  327. }
  328. appInitCfg = &newAppInitConfig
  329. })
  330. err := viper.ReadInConfig()
  331. if err != nil {
  332. logger.Fatalf("Failed to read config file: %v", err)
  333. }
  334. err = viper.Unmarshal(appInitCfg)
  335. if err != nil {
  336. logger.Errorf("Failed to unmarshal config: %v, exiting...", err)
  337. fmt.Printf("Failed to unmarshal config: %v, exiting...\n", err)
  338. os.Exit(1)
  339. }
  340. }
  341. func initLog() *logrus.Logger {
  342. // 主日志文件
  343. log := &lumberjack.Logger{
  344. Filename: "./log/minio-into-stck.log", // 日志文件的位置
  345. MaxSize: 50, // 文件最大尺寸(以MB为单位)
  346. MaxBackups: 5, // 保留的最大旧文件数量
  347. MaxAge: 28, // 保留旧文件的最大天数
  348. Compress: true, // 是否压缩/归档旧文件
  349. LocalTime: true, // 使用本地时间创建时间戳
  350. }
  351. // 错误日志文件
  352. errorLog := &lumberjack.Logger{
  353. Filename: "./log/minio-into-stck.error.log", // 错误日志文件的位置
  354. MaxSize: 50, // 文件最大尺寸(以MB为单位)
  355. MaxBackups: 5, // 保留的最大旧文件数量
  356. MaxAge: 28, // 保留旧文件的最大天数
  357. Compress: true, // 是否压缩/归档旧文件
  358. LocalTime: true, // 使用本地时间创建时间戳
  359. }
  360. // 统计日志文件
  361. statLog := &lumberjack.Logger{
  362. Filename: "./log/minio-into-stck.stat.log", // 统计日志文件的位置
  363. MaxSize: 50, // 文件最大尺寸(以MB为单位)
  364. MaxBackups: 5, // 保留的最大旧文件数量
  365. MaxAge: 28, // 保留旧文件的最大天数
  366. Compress: true, // 是否压缩/归档旧文件
  367. LocalTime: true, // 使用本地时间创建时间戳
  368. }
  369. logger := logrus.New()
  370. if strings.ToLower(os.Getenv("LOG_LEVEL")) == "trace" {
  371. logger.SetLevel(logrus.TraceLevel)
  372. } else {
  373. logger.SetLevel(logrus.DebugLevel)
  374. }
  375. logger.Out = log
  376. // logger.Out = io.MultiWriter(os.Stdout, log)
  377. // 设置错误级别日志输出到额外的文件
  378. logger.AddHook(&ErrorHook{
  379. // Writer: errorLog,
  380. Writer: io.MultiWriter(os.Stderr, errorLog),
  381. LogLevels: []logrus.Level{
  382. logrus.ErrorLevel,
  383. logrus.FatalLevel,
  384. logrus.PanicLevel,
  385. },
  386. })
  387. statLogger = logrus.New()
  388. statLogger.SetLevel(logrus.InfoLevel)
  389. statLogger.Out = statLog
  390. return logger
  391. }
  392. // ErrorHook 用于将错误级别的日志输出到额外的文件
  393. type ErrorHook struct {
  394. Writer io.Writer
  395. LogLevels []logrus.Level
  396. }
  397. func (hook *ErrorHook) Fire(entry *logrus.Entry) error {
  398. line, err := entry.String()
  399. if err != nil {
  400. return err
  401. }
  402. _, err = hook.Writer.Write([]byte(line))
  403. return err
  404. }
  405. func (hook *ErrorHook) Levels() []logrus.Level {
  406. return hook.LogLevels
  407. }
  408. var logger *logrus.Logger
  409. var statLogger *logrus.Logger
  410. var mutexObjFailCounter = &sync.Mutex{}
  411. var objFailCounter map[string]int = make(map[string]int)
  412. func main() {
  413. var err error
  414. fmt.Println("Starting minio-into-stck, build time:", buildtime)
  415. logger = initLog()
  416. logger.Warnln("Logger initialized.")
  417. // Load configuration from file
  418. initLoadConfig()
  419. // 初始化 NSQ
  420. initNsq()
  421. var db *gorm.DB
  422. var minioClient *minio.Client
  423. var ckConn driver.Conn
  424. // Connect to MySQL
  425. dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=True&loc=Local",
  426. appInitCfg.Mysql.User, appInitCfg.Mysql.Password, appInitCfg.Mysql.Host, appInitCfg.Mysql.Database)
  427. db, err = gorm.Open(mysql.Open(dsn), &gorm.Config{})
  428. if err != nil {
  429. logger.Fatalf("Failed to connect to MySQL: %v", err)
  430. }
  431. // Disable logging
  432. db.Logger = gorm_logger.Discard
  433. // Get the underlying sql.DB object to close the connection later
  434. sqlDB, err := db.DB()
  435. if err != nil {
  436. logger.Fatalf("Failed to get MySQL DB: %v", err)
  437. }
  438. defer sqlDB.Close()
  439. // Ping the database to check if the connection is successful
  440. err = sqlDB.Ping()
  441. if err != nil {
  442. logger.Infof("ping db error: %v", err)
  443. return
  444. }
  445. logger.Infoln("Database connection successful")
  446. // Perform auto migration
  447. err = db.AutoMigrate(&AppConfigDbEntry{}, &UploadRecord{}, &PartUploadRecord{})
  448. if err != nil {
  449. logger.Infof("auto migrate error: %v", err)
  450. return
  451. }
  452. logger.Infoln("Auto migration completed")
  453. // Connect to MinIO
  454. minioClient, err = minio.New(util.ExpandShellStringInfallible(appInitCfg.Minio.Host), &minio.Options{
  455. Creds: credentials.NewStaticV4(appInitCfg.Minio.AccessKey, appInitCfg.Minio.Secret, ""),
  456. Secure: false,
  457. })
  458. if err == nil {
  459. bucketExists, err := minioClient.BucketExists(context.Background(), appInitCfg.Minio.Bucket)
  460. if err != nil {
  461. logger.Fatalf("Failed to check if bucket %s exists: %v", appInitCfg.Minio.Bucket, err)
  462. }
  463. if !bucketExists {
  464. logger.Fatalf("Bucket %s does not exist", appInitCfg.Minio.Bucket)
  465. }
  466. }
  467. if err != nil {
  468. logger.Fatalf("Failed to connect to MinIO: %v", err)
  469. }
  470. logger.Infoln("Connected to MinIO")
  471. // Connect to ClickHouse
  472. ckConn, err = clickhouse.Open(&clickhouse.Options{
  473. Addr: []string{appInitCfg.Stck.Host},
  474. })
  475. if err == nil {
  476. err = ckConn.Ping(context.Background())
  477. }
  478. if err != nil {
  479. logger.Fatalf("Failed to connect to ClickHouse: %v", err)
  480. }
  481. logger.Infoln("Connected to ClickHouse")
  482. // OK! Everything is ready now.
  483. // objUploadChan := make(chan string, 1024*256)
  484. objUploadChan := util.NewDChan[string](1024 * 16)
  485. // Start the main work
  486. logger.Infoln("Starting main worker...")
  487. gAppExitWaitGroup.Add(1)
  488. go func() {
  489. defer gAppExitWaitGroup.Done()
  490. main_worker(AppCtx{db, minioClient, ckConn}, objUploadChan)
  491. }()
  492. // Wait on signal.
  493. signalChan := make(chan os.Signal, 1)
  494. signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
  495. select {
  496. case <-signalChan:
  497. logger.Infof("received signal, stopping minio-into-stck")
  498. case <-programmaticQuitChan:
  499. logger.Infof("received programmatic quit signal, stopping minio-into-stck")
  500. }
  501. gAppQuitting = true
  502. // HACK: Notify the main worker to quit
  503. objUploadChan.In() <- ""
  504. // Close the NSQ producer and consumer
  505. gNsqProducer.Stop()
  506. gNsqConsumer.Stop()
  507. // Wait for the goroutines to exit
  508. gAppExitWaitGroup.Wait()
  509. logger.Infof("minio-into-stck stopped gracefully")
  510. }
  511. type AppCtx struct {
  512. db *gorm.DB
  513. minioClient *minio.Client
  514. ckConn driver.Conn
  515. }
  516. var gAppCfg *AppConfig
  517. type PartUploadArgs struct {
  518. StreamInfo *StreamMetadata
  519. StreamName string
  520. PartName string
  521. }
  522. func main_worker(app AppCtx, objUploadChan *util.DChan[string]) {
  523. ctx := context.Background()
  524. // Load config from DB
  525. appCfg, err := load_app_cfg_from_db(app.db)
  526. if err != nil {
  527. logger.Fatalf("Failed to load app config from DB: %v", err)
  528. }
  529. gAppCfg = &appCfg
  530. // Start the notification listener
  531. go func() {
  532. for {
  533. // Register the bucket notification listener
  534. logger.Infoln("Registering bucket notification listener...")
  535. notifys := app.minioClient.ListenBucketNotification(
  536. ctx, appInitCfg.Minio.Bucket, "", "", []string{string(notification.ObjectCreatedAll)})
  537. // Listen OK, start the full upload trigger to upload maybe missed files
  538. go trigger_full_upload(app, objUploadChan.In())
  539. for notifyInfo := range notifys {
  540. for _, record := range notifyInfo.Records {
  541. key := record.S3.Object.Key
  542. logger.Traceln("New object notification:", key)
  543. // Only care when `.zst` / `metadata.json` files are uploaded
  544. // keyParts := strings.Split(key, "/")
  545. // keyLastPart := keyParts[len(keyParts)-1]
  546. keyLastPart := filepath.Base(key)
  547. if keyLastPart != "metadata.json" && !strings.HasSuffix(keyLastPart, ".zst") {
  548. continue
  549. }
  550. // key = strings.Join(keyParts[:len(keyParts)-1], "/") + "/"
  551. key = filepath.Dir(key) + "/"
  552. // Remove fail counter so that the object can be retried instead of
  553. // being spuriously ignored
  554. mutexObjFailCounter.Lock()
  555. delete(objFailCounter, key)
  556. mutexObjFailCounter.Unlock()
  557. // Queue the object for upload
  558. objUploadChan.DelayedWrite(key, time.Duration(appInitCfg.Main.NotifyToUploadDelaySeconds)*time.Second)
  559. }
  560. if notifyInfo.Err != nil {
  561. logger.Errorf("Bucket notification listener error: %v", notifyInfo.Err)
  562. }
  563. }
  564. logger.Warnln("Bucket notification listener stopped unexpectedly, retrying in 5 seconds...")
  565. time.Sleep(5 * time.Second)
  566. // Clear fail counter
  567. mutexObjFailCounter.Lock()
  568. objFailCounter = make(map[string]int)
  569. mutexObjFailCounter.Unlock()
  570. }
  571. }()
  572. // Start the main loop (streams upload worker)
  573. for objToUpload := range objUploadChan.Out() {
  574. objUploadChan.MarkElemReadDone(objToUpload)
  575. if gAppQuitting {
  576. logger.Infof("Quitting, stopping main worker")
  577. return
  578. }
  579. if objToUpload == "" {
  580. continue
  581. }
  582. logger.Infoln("Checking stream object:", objToUpload)
  583. if object_is_blacklisted(objToUpload) {
  584. logger.Infof("Object `%s` is blacklisted, skipping", objToUpload)
  585. continue
  586. }
  587. mutexObjFailCounter.Lock()
  588. if objFailCounter[objToUpload] >= appInitCfg.Main.UploadRetryMaxTimes {
  589. logger.Warnf("Retried upload stream `%s` for too many %d times, give up", objToUpload, objFailCounter[objToUpload])
  590. delete(objFailCounter, objToUpload)
  591. mutexObjFailCounter.Unlock()
  592. continue
  593. }
  594. objFailCounter[objToUpload]++
  595. mutexObjFailCounter.Unlock()
  596. fullyUploaded, err := upload_one_stream(app, objToUpload)
  597. if err != nil {
  598. // Queue the object for retry
  599. logger.Warnf("Failed to upload stream `%s`: `%v`, retrying after %d seconds",
  600. objToUpload, err, appInitCfg.Main.FailedRetryDelaySeconds)
  601. objUploadChan.DelayedWrite(objToUpload, time.Duration(appInitCfg.Main.FailedRetryDelaySeconds)*time.Second)
  602. continue
  603. }
  604. if fullyUploaded {
  605. // Mark the stream as fully uploaded
  606. //err = app.db.Create(&UploadRecord{Key: objToUpload}).Error
  607. err = app.db.Where(UploadRecord{Key: objToUpload}).FirstOrCreate(&UploadRecord{}).Error
  608. if err != nil {
  609. logger.Warnf("Failed to mark stream %s as uploaded: %v", objToUpload, err)
  610. } else {
  611. // We can now remove the stream parts from the parts table
  612. err = app.db.Where("stream_name = ?", objToUpload).Delete(&PartUploadRecord{}).Error
  613. if err != nil {
  614. logger.Warnf("Failed to remove parts of stream %s from parts table: %v", objToUpload, err)
  615. }
  616. logger.Infof("Marked stream %s as fully uploaded", objToUpload)
  617. }
  618. // Remove entry from the fail counter
  619. mutexObjFailCounter.Lock()
  620. delete(objFailCounter, objToUpload)
  621. mutexObjFailCounter.Unlock()
  622. } else {
  623. // Queue the object for retry
  624. logger.Warnf("Stream %s is not fully uploaded, retrying after %d seconds",
  625. objToUpload, appInitCfg.Main.FailedRetryDelaySeconds)
  626. objUploadChan.DelayedWrite(objToUpload, time.Duration(appInitCfg.Main.FailedRetryDelaySeconds)*time.Second)
  627. }
  628. }
  629. }
  630. func trigger_full_upload(app AppCtx, objToUploadChan chan<- string) {
  631. // // Upload all files in the directory
  632. // options := minio.ListObjectsOptions{
  633. // Recursive: false,
  634. // }
  635. // objectsCh := app.minioClient.ListObjects(context.Background(), appInitCfg.Minio.Bucket, options)
  636. // // NOTE: Streams in minio are organized as `<date>/<stream_name>/`.
  637. // for objInfo := range objectsCh {
  638. // if objInfo.Err != nil {
  639. // logger.Warnf("Error listing bucket `%s` objects: %v", appInitCfg.Minio.Bucket, objInfo.Err)
  640. // continue
  641. // }
  642. // key := objInfo.Key
  643. // if strings.HasSuffix(key, "/") {
  644. // // Is a directory (<date>), go deeper into it
  645. // options := minio.ListObjectsOptions{
  646. // Prefix: key,
  647. // Recursive: false,
  648. // }
  649. // for objInfo := range app.minioClient.ListObjects(context.Background(), appInitCfg.Minio.Bucket, options) {
  650. // if objInfo.Err != nil {
  651. // logger.Warnf("Error listing bucket `%s` folder `%s` objects: %v", appInitCfg.Minio.Bucket, key, objInfo.Err)
  652. // continue
  653. // }
  654. // key := objInfo.Key
  655. // if strings.HasSuffix(key, "/") {
  656. // // Is a directory, should be a stream then
  657. // uploaded := object_already_uploaded(app, key)
  658. // if !uploaded {
  659. // objToUploadChan <- key
  660. // }
  661. // }
  662. // }
  663. // }
  664. // }
  665. // WARN: Slow!!!
  666. // options := minio.ListObjectsOptions{
  667. // Recursive: true,
  668. // }
  669. options := minio.ListObjectsOptions{
  670. Recursive: false,
  671. }
  672. objectsCh := app.minioClient.ListObjects(context.Background(), appInitCfg.Minio.Bucket, options)
  673. // NOTE: Streams in minio are organized as `<date>/<stream_name>/`.
  674. for objInfo := range objectsCh {
  675. if objInfo.Err != nil {
  676. logger.Warnf("Error listing bucket `%s` objects: %v", appInitCfg.Minio.Bucket, objInfo.Err)
  677. continue
  678. }
  679. key := objInfo.Key
  680. // // If it's a directory with `metadata.json` file, it should be a stream
  681. // if strings.HasSuffix(key, "/metadata.json") {
  682. // streamName := strings.TrimSuffix(key, "metadata.json")
  683. // uploaded := object_already_uploaded(app, streamName)
  684. // if !uploaded {
  685. // objToUploadChan <- streamName
  686. // }
  687. // }
  688. // Scan through all subdirectories
  689. if strings.HasSuffix(key, "/") {
  690. exists, err := util.MinioObjectExists(app.minioClient, appInitCfg.Minio.Bucket, key+"metadata.json")
  691. if err != nil {
  692. logger.Warnf("Error checking if object `%s` exists: %v", key+"metadata.json", err)
  693. continue
  694. }
  695. if exists {
  696. // Go ahead and upload the stream
  697. uploaded := object_already_uploaded(app, key)
  698. if !uploaded {
  699. objToUploadChan <- key
  700. }
  701. } else {
  702. // Check inner directories
  703. options := minio.ListObjectsOptions{
  704. Prefix: key,
  705. Recursive: false,
  706. }
  707. for objInfo := range app.minioClient.ListObjects(context.Background(), appInitCfg.Minio.Bucket, options) {
  708. if objInfo.Err != nil {
  709. logger.Warnf("Error listing bucket `%s` folder `%s` objects: %v", appInitCfg.Minio.Bucket, key, objInfo.Err)
  710. continue
  711. }
  712. key := objInfo.Key
  713. if strings.HasSuffix(key, "/") {
  714. // Is a directory, should be a stream then
  715. uploaded := object_already_uploaded(app, key)
  716. if !uploaded {
  717. objToUploadChan <- key
  718. }
  719. }
  720. }
  721. }
  722. }
  723. }
  724. logger.Infoln("Full upload trigger finished")
  725. }
  726. func upload_one_stream(app AppCtx, streamName string) (fullyUploaded bool, err error) {
  727. type StreamObjectUploadStatistics struct {
  728. StartTime time.Time
  729. EndTime time.Time
  730. PartName string
  731. UpState string // "repeated" / "ok" / "fail"
  732. Msg string // "error message"
  733. }
  734. type StreamUploadStatistics struct {
  735. StartTime time.Time
  736. EndTime time.Time
  737. StreamName string
  738. MetaPointsCount int64
  739. MetaPartsCount int64
  740. Objects map[string]StreamObjectUploadStatistics
  741. Msg string
  742. }
  743. streamStats := StreamUploadStatistics{
  744. StartTime: time.Now(),
  745. StreamName: streamName,
  746. Objects: make(map[string]StreamObjectUploadStatistics),
  747. }
  748. logger.Infof("Going to upload stream `%s`", streamName)
  749. if object_already_uploaded(app, streamName) {
  750. logger.Infof("Stream `%s` is already uploaded", streamName)
  751. return true, nil
  752. }
  753. defer func() {
  754. streamStats.EndTime = time.Now()
  755. repeatedCount := int64(0)
  756. okCount := int64(0)
  757. failCount := int64(0)
  758. totalCount := int64(0)
  759. objDetailsMsg := ""
  760. for _, obj := range streamStats.Objects {
  761. totalCount++
  762. if obj.UpState == "repeated" {
  763. repeatedCount++
  764. objDetailsMsg += fmt.Sprintf("\n{%s: REPEATED (t: %v ~ %v, duration: %v)}",
  765. obj.PartName, util.FmtMyTime(obj.StartTime), util.FmtMyTime(obj.EndTime), obj.EndTime.Sub(obj.StartTime))
  766. } else if obj.UpState == "ok" {
  767. okCount++
  768. objDetailsMsg += fmt.Sprintf("\n{%s: OK (t: %v ~ %v, duration: %v)}",
  769. obj.PartName, util.FmtMyTime(obj.StartTime), util.FmtMyTime(obj.EndTime), obj.EndTime.Sub(obj.StartTime))
  770. } else {
  771. failCount++
  772. objDetailsMsg += fmt.Sprintf("\n{%s: FAIL (Reason: %s) (t: %v ~ %v, duration: %v)}",
  773. obj.PartName, obj.Msg, util.FmtMyTime(obj.StartTime), util.FmtMyTime(obj.EndTime), obj.EndTime.Sub(obj.StartTime))
  774. }
  775. }
  776. statLogger.Infof("Upload stream `%s` took %v (%v ~ %v); is fully uploaded: %v; metadata-reported part count=%d; "+
  777. "repeated=%d, ok=%d, fail=%d, total=%d; details:%s",
  778. streamName, streamStats.EndTime.Sub(streamStats.StartTime),
  779. util.FmtMyTime(streamStats.StartTime), util.FmtMyTime(streamStats.EndTime),
  780. fullyUploaded, streamStats.MetaPartsCount,
  781. repeatedCount, okCount, failCount, totalCount, objDetailsMsg)
  782. // Send upload status changed message
  783. msg := smsg.MakeStreamInsertToStckStatusChangedMsg(gMachineID, streamStats.StartTime.UnixNano(),
  784. streamStats.StreamName, streamStats.MetaPointsCount, streamStats.MetaPartsCount,
  785. okCount, failCount, repeatedCount)
  786. err := PublishMessage(&msg)
  787. if err != nil {
  788. logger.Errorf("send stream insert to stck status changed message error: %s", err)
  789. }
  790. }()
  791. fullyUploaded = true
  792. // Get stream metadata
  793. streamInfo, err := get_stream_metadata(app, streamName)
  794. if err != nil {
  795. // Cannot continue without metadata
  796. return false, err
  797. }
  798. streamStats.MetaPointsCount = streamInfo.TotalPoints
  799. streamStats.MetaPartsCount = streamInfo.PartsCount
  800. if streamInfo.PartsCount == 0 {
  801. // Edge device didn't finish uploading the stream yet
  802. logger.Debugf("Marking stream `%s` as not fully uploaded, reason: parts_count=0", streamName)
  803. fullyUploaded = false
  804. }
  805. hasSomething := false
  806. // Upload parts
  807. streamObjPath := streamName
  808. options := minio.ListObjectsOptions{
  809. Prefix: streamObjPath,
  810. Recursive: false,
  811. }
  812. logger.Tracef("Listing minio objects in `%s`, bucket `%s`", streamObjPath, appInitCfg.Minio.Bucket)
  813. // hasSomething := false
  814. hasMetadata := false
  815. for objInfo := range app.minioClient.ListObjects(context.Background(), appInitCfg.Minio.Bucket, options) {
  816. if objInfo.Err != nil {
  817. return false, objInfo.Err
  818. }
  819. if gAppQuitting {
  820. logger.Infof("Quitting, stopping uploading one stream")
  821. return false, nil
  822. }
  823. logger.Tracef("Checking minio file `%s`", objInfo.Key)
  824. if strings.HasSuffix(objInfo.Key, "/") {
  825. continue
  826. }
  827. partName := filepath.Base(objInfo.Key)
  828. if partName == "metadata.json" {
  829. hasMetadata = true
  830. continue
  831. }
  832. hasSomething = true
  833. objStat := StreamObjectUploadStatistics{
  834. StartTime: time.Now(),
  835. PartName: partName,
  836. }
  837. if part_already_uploaded(app, streamName, partName) {
  838. objStat.EndTime = time.Now()
  839. objStat.UpState = "repeated"
  840. streamStats.Objects[objInfo.Key] = objStat
  841. logger.Infof("Part `%s` of stream `%s` is already uploaded", objInfo.Key, streamName)
  842. continue
  843. }
  844. if fullyUploaded {
  845. fullyUploaded = false
  846. logger.Debugf("Marking stream `%s` as not fully uploaded, reason: part `%s` not uploaded", streamName, objInfo.Key)
  847. }
  848. // Do the parts upload
  849. partInfo := PartUploadArgs{StreamInfo: streamInfo, StreamName: streamName, PartName: objInfo.Key}
  850. logger.Infof("Uploading part `%s` (total %d) of stream `%s`, total_points=%d",
  851. partInfo.PartName, partInfo.StreamInfo.PartsCount,
  852. partInfo.StreamName, partInfo.StreamInfo.TotalPoints)
  853. err := upload_one_part(app, partInfo.StreamInfo, partInfo.StreamName, partInfo.PartName)
  854. if err != nil {
  855. objStat.EndTime = time.Now()
  856. objStat.UpState = "fail"
  857. objStat.Msg = err.Error()
  858. logger.Warnf("Failed to upload part `%s` of stream `%s` (took %v): %v", partInfo.PartName, partInfo.StreamName,
  859. objStat.EndTime.Sub(objStat.StartTime), err)
  860. fullyUploaded = false
  861. } else {
  862. // Mark the part as uploaded
  863. //err = app.db.Create(&PartUploadRecord{StreamName: partInfo.StreamName, PartName: partInfo.PartName}).Error
  864. part := PartUploadRecord{StreamName: partInfo.StreamName, PartName: partName}
  865. err = app.db.Where(part).FirstOrCreate(&PartUploadRecord{}).Error
  866. if err != nil {
  867. logger.Warnf("Failed to mark part `%s` of stream `%s` as uploaded: %v", partInfo.PartName, partInfo.StreamName, err)
  868. }
  869. objStat.EndTime = time.Now()
  870. objStat.UpState = "ok"
  871. logger.Infof("Uploaded part `%s` of stream `%s`, took %v", objInfo.Key, streamName,
  872. objStat.EndTime.Sub(objStat.StartTime))
  873. }
  874. streamStats.Objects[objInfo.Key] = objStat
  875. partNum, err := util.ExtractNumberFromString(partName)
  876. if err != nil {
  877. // Not a part file? Skip
  878. continue
  879. }
  880. status := "success"
  881. if objStat.UpState != "ok" {
  882. status = "failed"
  883. }
  884. msg := smsg.MakePartInsertToStckStatusChangedMsg(gMachineID, objStat.StartTime.UnixNano(),
  885. streamName, partNum, streamStats.MetaPointsCount, status, objStat.Msg)
  886. err = PublishMessage(&msg)
  887. if err != nil {
  888. logger.Errorf("send part insert to stck status changed message error: %s", err)
  889. }
  890. }
  891. if !hasMetadata {
  892. logger.Warnf("Stream `%s` has no metadata file, will retry later", streamName)
  893. fullyUploaded = false
  894. }
  895. if !hasSomething {
  896. if streamInfo.PartsCount != 0 {
  897. logger.Errorf("Stream `%s` has no parts, but claims to have %d parts, %d points???",
  898. streamName, streamInfo.PartsCount, streamInfo.TotalPoints)
  899. } else {
  900. logger.Warnf("Stream `%s` has no parts in minio, will retry later", streamName)
  901. }
  902. fullyUploaded = false
  903. }
  904. return fullyUploaded, nil
  905. }
  906. func upload_one_part(app AppCtx, streamInfo *StreamMetadata, streamName string, partObjPath string) (err error) {
  907. partName := filepath.Base(partObjPath)
  908. if part_already_uploaded(app, streamName, partName) {
  909. return nil
  910. }
  911. dryRun := false
  912. if !dryRun {
  913. // Get the part data from MinIO
  914. obj, err := app.minioClient.GetObject(context.Background(), appInitCfg.Minio.Bucket, partObjPath,
  915. minio.GetObjectOptions{})
  916. if err != nil {
  917. return fmt.Errorf("failed to get part `%s` data: %w", partObjPath, err)
  918. }
  919. defer obj.Close()
  920. // Read the part data
  921. var partDataBuf = new(bytes.Buffer)
  922. _, err = partDataBuf.ReadFrom(obj)
  923. if err != nil {
  924. return fmt.Errorf("failed to read part `%s` data: %w", partObjPath, err)
  925. }
  926. // Process the part data
  927. partData, err := util.DecompressZstdBuffer(partDataBuf.Bytes())
  928. if err != nil {
  929. return fmt.Errorf("failed to decompress part `%s` data: %w", partObjPath, err)
  930. }
  931. // Use regex to extract the part index from the part name
  932. partIndex := int64(0)
  933. {
  934. re := regexp.MustCompile(`part_(\d+)\.zst`)
  935. matches := re.FindStringSubmatch(partObjPath)
  936. if len(matches) != 2 {
  937. return fmt.Errorf("failed to extract part index from part name `%s`", partObjPath)
  938. }
  939. partIndex, err = strconv.ParseInt(matches[1], 10, 64)
  940. if err != nil {
  941. return fmt.Errorf("failed to convert part index `%s` to integer: %w", matches[1], err)
  942. }
  943. // Check if the part index is correct
  944. if partIndex < 0 || (streamInfo.PartsCount != 0 && partIndex >= streamInfo.PartsCount) {
  945. return fmt.Errorf("part `%s` index out of bounds: %d / %d", partObjPath, partIndex, streamInfo.PartsCount)
  946. }
  947. // Check if the part data size is correct
  948. if streamInfo.PartsCount != 0 {
  949. left := int64(len(partData))
  950. if partIndex < streamInfo.PartsCount-1 {
  951. right := streamInfo.PointsPerPart * 8
  952. if left != right {
  953. return fmt.Errorf("part `%s` data size mismatch: %d versus %d", partObjPath, left, right)
  954. }
  955. } else if partIndex == streamInfo.PartsCount-1 {
  956. right := (streamInfo.TotalPoints % streamInfo.PointsPerPart) * 8
  957. if right == 0 {
  958. right = streamInfo.PointsPerPart * 8
  959. }
  960. if left != right {
  961. return fmt.Errorf("part `%s` data size mismatch: %d versus %d", partObjPath, left, right)
  962. }
  963. } else {
  964. return fmt.Errorf("part `%s` index out of bounds: %d", partObjPath, partIndex)
  965. }
  966. }
  967. }
  968. partPointsCount := len(partData) / 8
  969. // Insert the part data into ClickHouse
  970. customInsertFn := func() error {
  971. ctx := context.Background()
  972. sql := fmt.Sprintf("INSERT INTO %s (metric_name, point_name, tags, value, nanoseconds) "+
  973. "SELECT %s, %s, %s, col1, col2 FROM input('col1 Float64, col2 Int64') FORMAT NATIVE",
  974. appInitCfg.Stck.Table,
  975. util.ToSqlLiteral(streamInfo.MetricName), util.ToSqlLiteral(streamInfo.Name), "[]")
  976. c, err := ch.Dial(ctx, ch.Options{Address: appInitCfg.Stck.Host})
  977. if err != nil {
  978. return fmt.Errorf("failed to connect to stck: %w", err)
  979. }
  980. /*
  981. ┌─name────────┬─type────────────────────┬─default_type─┬─default_expression─┬─comment───────────────────┬─codec_expression─┬─ttl_expression─┐
  982. │ metric_name │ LowCardinality(String) │ │ │ Metric name │ │ │
  983. │ point_name │ LowCardinality(String) │ │ │ Point name │ │ │
  984. │ tags │ Map(String, String) │ │ │ Point tags │ │ │
  985. │ value │ Float64 │ │ │ Point value │ │ │
  986. │ nanoseconds │ Int64 │ │ │ Point time in nanoseconds │ DoubleDelta, LZ4 │ │
  987. └─────────────┴─────────────────────────┴──────────────┴────────────────────┴───────────────────────────┴──────────────────┴────────────────┘
  988. */
  989. logger.Debugf("Going to insert %d points for part `%s` of stream `%s`", partPointsCount, partObjPath, streamName)
  990. colValue := make(proto.ColFloat64, 0, partPointsCount)
  991. colNanoseconds := make(proto.ColInt64, 0, partPointsCount)
  992. for i := 0; i < partPointsCount; i++ {
  993. value := math.Float64frombits(binary.LittleEndian.Uint64(partData[i*8 : (i+1)*8]))
  994. // NOTE: TimestampOffset is in milliseconds, need to convert to nanoseconds
  995. nanoseconds := streamInfo.TimestampOffset * 1e6
  996. nanoseconds += int64(partIndex) * int64(streamInfo.PointsPerPart) * streamInfo.Interval
  997. nanoseconds += int64(i) * streamInfo.Interval
  998. colValue = append(colValue, value)
  999. colNanoseconds = append(colNanoseconds, nanoseconds)
  1000. }
  1001. input := proto.Input{
  1002. {Name: "col1", Data: &colValue},
  1003. {Name: "col2", Data: &colNanoseconds},
  1004. }
  1005. err = c.Do(ctx, ch.Query{
  1006. Body: sql,
  1007. Input: input,
  1008. })
  1009. if err != nil {
  1010. return fmt.Errorf("failed to insert part into stck: %w", err)
  1011. }
  1012. logger.Debugf("Inserted %d points for part `%s` of stream `%s`", partPointsCount, partObjPath, streamName)
  1013. return nil
  1014. }
  1015. err = customInsertFn()
  1016. if err != nil {
  1017. return fmt.Errorf("failed to insert part `%s` data into stck: %w", partObjPath, err)
  1018. }
  1019. // We made progress, reset the fail counter
  1020. mutexObjFailCounter.Lock()
  1021. objFailCounter[streamName] = 0
  1022. mutexObjFailCounter.Unlock()
  1023. }
  1024. return nil
  1025. }
  1026. func load_app_cfg_from_db(db *gorm.DB) (AppConfig, error) {
  1027. var cfg AppConfig
  1028. err := db.AutoMigrate(&AppConfigDbEntry{})
  1029. if err != nil {
  1030. return cfg, err
  1031. }
  1032. //db.Create(&AppConfigDbEntry{Key: "ImportBlacklist", Value: `[]`})
  1033. var dbEntry AppConfigDbEntry
  1034. result := db.Where(AppConfigDbEntry{Key: "ImportBlacklist"}).Attrs(AppConfigDbEntry{Value: `[]`}).FirstOrCreate(&dbEntry)
  1035. if result.Error != nil {
  1036. //if errors.Is(result.Error, gorm.ErrRecordNotFound) {
  1037. // dbEntry.Value = `[]`
  1038. //} else {
  1039. // return cfg, result.Error
  1040. //}
  1041. return cfg, result.Error
  1042. }
  1043. err = json.Unmarshal([]byte(dbEntry.Value), &cfg.ImportBlacklist)
  1044. if err != nil {
  1045. return cfg, err
  1046. }
  1047. return cfg, nil
  1048. }
  1049. func object_already_uploaded(app AppCtx, key string) bool {
  1050. var record UploadRecord
  1051. result := app.db.First(&record, "key", key)
  1052. return result.Error == nil
  1053. }
  1054. func object_is_blacklisted(key string) bool {
  1055. for _, regexPattern := range gAppCfg.ImportBlacklist {
  1056. // TODO: Cache compiled regex patterns
  1057. if matched, _ := regexp.MatchString(regexPattern, key); matched {
  1058. return true
  1059. }
  1060. }
  1061. return false
  1062. }
  1063. func part_already_uploaded(app AppCtx, streamName string, partName string) bool {
  1064. var record PartUploadRecord
  1065. result := app.db.First(&record, "stream_name = ? AND part_name = ?", streamName, partName)
  1066. return result.Error == nil
  1067. }
  1068. func get_stream_metadata(app AppCtx, streamName string) (*StreamMetadata, error) {
  1069. // Get the stream metadata from MinIO
  1070. metadataObjPath := streamName + "metadata.json"
  1071. obj, err := app.minioClient.GetObject(context.Background(), appInitCfg.Minio.Bucket,
  1072. metadataObjPath, minio.GetObjectOptions{})
  1073. if err != nil {
  1074. return nil, fmt.Errorf("failed to get stream metadata: %w", err)
  1075. }
  1076. defer obj.Close()
  1077. var streamInfo StreamMetadata
  1078. err = json.NewDecoder(obj).Decode(&streamInfo)
  1079. if err != nil {
  1080. return nil, fmt.Errorf("failed to decode stream metadata: %w", err)
  1081. }
  1082. return &streamInfo, nil
  1083. }