main.go 38 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182
  1. package main
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/binary"
  6. "encoding/json"
  7. "example/minio-into-stck/util"
  8. "fmt"
  9. "os"
  10. "os/exec"
  11. "os/signal"
  12. "path/filepath"
  13. "stck/stck-nsq-msg"
  14. smsg "stck/stck-nsq-msg/msg"
  15. "sync"
  16. "syscall"
  17. "io"
  18. // "log"
  19. "math"
  20. // "os"
  21. "regexp"
  22. "strconv"
  23. "strings"
  24. "time"
  25. "github.com/ClickHouse/clickhouse-go/v2"
  26. "github.com/ClickHouse/clickhouse-go/v2/lib/driver"
  27. "github.com/fsnotify/fsnotify"
  28. "github.com/minio/minio-go/v7"
  29. "github.com/minio/minio-go/v7/pkg/credentials"
  30. "github.com/minio/minio-go/v7/pkg/notification"
  31. "github.com/natefinch/lumberjack"
  32. "github.com/nsqio/go-nsq"
  33. "github.com/sirupsen/logrus"
  34. "github.com/spf13/viper"
  35. "gorm.io/driver/mysql"
  36. "gorm.io/gorm"
  37. gorm_logger "gorm.io/gorm/logger"
  38. )
  39. // AppInitConfig is the initial configuration of the application, loaded from the config file.
  40. type AppInitConfig struct {
  41. Mysql struct {
  42. User string `mapstructure:"user"`
  43. Password string `mapstructure:"password"`
  44. Host string `mapstructure:"host"`
  45. Database string `mapstructure:"database"`
  46. } `mapstructure:"mysql"`
  47. Minio struct {
  48. AccessKey string `mapstructure:"accessKey"`
  49. Secret string `mapstructure:"secret"`
  50. Bucket string `mapstructure:"bucket"`
  51. Host string `mapstructure:"host"`
  52. } `mapstructure:"minio"`
  53. Stck struct {
  54. Host string `mapstructure:"host"`
  55. Table string `mapstructure:"table"`
  56. } `mapstructure:"stck"`
  57. Nsq struct {
  58. TcpAddr string `mapstructure:"tcpAddr"`
  59. LookupdHttpAddr string `mapstructure:"lookupdHttpAddr"`
  60. } `mapstructure:"nsq"`
  61. Main struct {
  62. UploadRetryMaxTimes int `mapstructure:"uploadRetryMaxTimes"`
  63. FailedRetryDelaySeconds int `mapstructure:"failedRetryDelaySeconds"`
  64. } `mapstructure:"main"`
  65. }
  66. type AppConfig struct {
  67. // List of name regex patterns to exclude from import.
  68. ImportBlacklist []string
  69. }
  70. type AppConfigDbEntry struct {
  71. Key string `gorm:"primaryKey"`
  72. Value string
  73. }
  74. // TODO: 插入时先创建行,插入完后更新插入耗时
  75. // UploadRecord Represents a record of an uploaded stream. (A major upload task.)
  76. type UploadRecord struct {
  77. Key string `gorm:"primaryKey"`
  78. CreatedAt time.Time
  79. }
  80. // PartUploadRecord Represents a record of an uploaded part of a stream. (A minor upload task.)
  81. type PartUploadRecord struct {
  82. StreamName string `gorm:"primaryKey"`
  83. PartName string `gorm:"primaryKey"`
  84. CreatedAt time.Time
  85. }
  86. type StreamMetadata struct {
  87. MetricName string `json:"metric_name"`
  88. Name string `json:"name"`
  89. TimestampOffset int64 `json:"timestamp_offset"`
  90. Interval int64 `json:"interval"`
  91. PartsCount int64 `json:"parts_count"`
  92. PointsPerPart int64 `json:"points_per_part"`
  93. TotalPoints int64 `json:"total_points"`
  94. }
  95. var gLocalIpAddress string
  96. var gMachineID string
  97. var gNsqProducer *nsq.Producer
  98. var gNsqConsumer *nsq.Consumer
  99. var gAppStartTime time.Time = time.Now()
  100. var programmaticQuitChan chan struct{} = make(chan struct{}, 1)
  101. var gAppQuitting = false
  102. var gAppExitWaitGroup sync.WaitGroup
  103. var buildtime string
  104. func MakeHeartbeatMsg() *smsg.DeviceHeartbeatMsg {
  105. m := smsg.MakeDeviceHeartbeatMsg(gMachineID, gAppStartTime.UnixNano(), time.Now().UnixNano(),
  106. gLocalIpAddress, "minio-into-stck", "0.1.0+dev."+buildtime)
  107. return &m
  108. }
  109. func PublishMessage(msg stcknsqmsg.StckNsqMsgVariant) error {
  110. payload, err := stcknsqmsg.ToStckNsqMsgString(msg)
  111. if err != nil {
  112. return fmt.Errorf("marshal message error: %w", err)
  113. }
  114. err = gNsqProducer.Publish(smsg.ServerDoNotifyTopic, []byte(payload))
  115. if err != nil {
  116. return fmt.Errorf("publish message error: %w", err)
  117. }
  118. return nil
  119. }
  120. func initNsq() {
  121. ip, err := stcknsqmsg.GetLocalIP()
  122. if err != nil {
  123. logger.Warnf("GetLocalIP error: %s", err)
  124. } else {
  125. gLocalIpAddress = ip.String()
  126. logger.Infof("Local IP: %s", gLocalIpAddress)
  127. }
  128. gMachineID = stcknsqmsg.MakeUniqueMachineID()
  129. logger.Infof("Machine ID: %s", gMachineID)
  130. fmt.Printf("IP: %s, Machine ID: %s\n", gLocalIpAddress, gMachineID)
  131. // Connect to NSQ
  132. nsqConfig := nsq.NewConfig()
  133. gNsqProducer, err = nsq.NewProducer(appInitCfg.Nsq.TcpAddr, nsqConfig)
  134. if err != nil {
  135. logger.Fatalf("NSQ Producer init error: %s", err)
  136. }
  137. gNsqConsumer, err = nsq.NewConsumer(smsg.ClientDoActionTopic, gMachineID, nsqConfig)
  138. if err != nil {
  139. logger.Fatalf("NSQ Consumer init error: %s", err)
  140. }
  141. gNsqConsumer.AddConcurrentHandlers(nsq.HandlerFunc(func(message *nsq.Message) error {
  142. logger.Debugf("NSQ Consumer received message: %s", message.Body)
  143. // Parse the message
  144. recvTime := message.Timestamp
  145. msg, err := stcknsqmsg.FromString(string(message.Body))
  146. if err != nil {
  147. logger.Errorf("NSQ Consumer unmarshal message error: %s", err)
  148. return nil
  149. }
  150. // Process the message
  151. switch data := msg.Data.(type) {
  152. case *smsg.DevicePingMsg:
  153. if !stcknsqmsg.DeviceIdMatches(data.DeviceID, gMachineID) {
  154. break
  155. }
  156. // Write pong
  157. pongMsg := smsg.MakeDevicePongMsg(gMachineID, recvTime)
  158. err := PublishMessage(&pongMsg)
  159. if err != nil {
  160. logger.Errorf("send pong error: %s", err)
  161. }
  162. case *smsg.RequestDeviceExecuteShellScriptMsg:
  163. if !stcknsqmsg.DeviceIdMatches(data.DeviceID, gMachineID) {
  164. break
  165. }
  166. // Execute the shell script
  167. resp := smsg.MakeDeviceActionDoneMsg(gMachineID, data.MsgType(), "", recvTime, -1, "")
  168. result, err := func(data *smsg.RequestDeviceExecuteShellScriptMsg) (string, error) {
  169. cmd := exec.Command("bash", "-c", data.Script)
  170. var out bytes.Buffer
  171. cmd.Stdout = &out
  172. cmd.Stderr = &out
  173. err := cmd.Run()
  174. return out.String(), err
  175. }(data)
  176. if err != nil {
  177. errMsg := fmt.Sprintf("execute shell script error:\n%s\n\noutput:\n%s", err, result)
  178. // Write error message
  179. resp.Status = -1
  180. resp.Msg = errMsg
  181. } else {
  182. // Write output
  183. resp.Status = 0
  184. resp.Msg = result
  185. }
  186. err = PublishMessage(&resp)
  187. if err != nil {
  188. logger.Errorf("send action done error: %s", err)
  189. }
  190. case *smsg.RequestDeviceUpdateMsg:
  191. if !stcknsqmsg.DeviceIdMatches(data.DeviceID, gMachineID) {
  192. break
  193. }
  194. if data.ServiceName != "minio-into-stck" {
  195. break
  196. }
  197. resp := smsg.MakeDeviceActionDoneMsg(gMachineID, data.MsgType(), "", recvTime, 0, "")
  198. result, err := func(data *smsg.RequestDeviceUpdateMsg) (string, error) {
  199. // Download the update file to /tmp
  200. downloadPath := "/tmp/minio-into-stck-updater"
  201. updateScriptPath := filepath.Join(downloadPath, "update.sh")
  202. var out bytes.Buffer
  203. err := os.RemoveAll(downloadPath)
  204. if err != nil {
  205. return "", err
  206. }
  207. err = os.MkdirAll(downloadPath, 0777)
  208. if err != nil {
  209. return "", err
  210. }
  211. updateScriptContent := fmt.Sprintf(`#!/bin/bash
  212. set -e
  213. cd %s
  214. wget --tries=3 -nv -O installer.tar.gz %s
  215. tar -xzf installer.tar.gz
  216. cd minio-into-stck-installer
  217. ./replacing-update.sh
  218. `, downloadPath, data.ServiceBinaryURL)
  219. err = os.WriteFile(updateScriptPath, []byte(updateScriptContent), 0777)
  220. if err != nil {
  221. return "", err
  222. }
  223. // Execute the update script
  224. cmd := exec.Command("bash", "-c", updateScriptPath)
  225. cmd.Stdout = &out
  226. cmd.Stderr = &out
  227. err = cmd.Run()
  228. return out.String(), err
  229. }(data)
  230. if err != nil {
  231. errMsg := fmt.Sprintf("execute update process error:\n%s\n\noutput:\n%s", err, result)
  232. // Write error message
  233. resp.Status = -1
  234. resp.Msg = errMsg
  235. } else {
  236. // Write output
  237. resp.Status = 0
  238. resp.Msg = "executed update process successfully\n\noutput:\n" + result
  239. }
  240. err = PublishMessage(&resp)
  241. if err != nil {
  242. logger.Errorf("send action done error: %s", err)
  243. }
  244. case *smsg.ForceDeviceRebootMsg:
  245. if !stcknsqmsg.DeviceIdMatches(data.DeviceID, gMachineID) {
  246. break
  247. }
  248. programmaticQuitChan <- struct{}{}
  249. case *smsg.ForceDeviceSendHeartbeatMsg:
  250. if !stcknsqmsg.DeviceIdMatches(data.DeviceID, gMachineID) {
  251. break
  252. }
  253. // Send heartbeat
  254. heartBeatMsg := MakeHeartbeatMsg()
  255. err := PublishMessage(heartBeatMsg)
  256. if err != nil {
  257. logger.Errorf("send heartbeat error: %s", err)
  258. }
  259. case *smsg.RequestDeviceUploadLogsToMinioMsg:
  260. if !stcknsqmsg.DeviceIdMatches(data.DeviceID, gMachineID) {
  261. break
  262. }
  263. // Upload logs to MinIO
  264. resp := smsg.MakeDeviceActionDoneMsg(gMachineID, data.MsgType(), "", recvTime, 0, "")
  265. minioCreds := credentials.NewStaticV4(appInitCfg.Minio.AccessKey, appInitCfg.Minio.Secret, "")
  266. minioClient, err := minio.New(appInitCfg.Minio.Host, &minio.Options{
  267. Creds: minioCreds,
  268. Secure: false,
  269. })
  270. if err != nil {
  271. logger.Errorf("MinIO Client init error: %s", err)
  272. resp.Status = -1
  273. resp.Msg += fmt.Sprintf("MinIO Client init error: %s\n", err)
  274. } else {
  275. if util.FileExists("./logs") {
  276. err = util.MinioUploadFolder(minioClient, data.RemoteBucket,
  277. filepath.Join(data.RemoteBasePath, gMachineID, "logs"), "logs")
  278. if err != nil {
  279. logger.Errorf("upload logs to MinIO error: %s", err)
  280. resp.Status = -1
  281. resp.Msg += fmt.Sprintf("upload logs to MinIO error: %s\n", err)
  282. }
  283. }
  284. if util.FileExists("./log") {
  285. err = util.MinioUploadFolder(minioClient, data.RemoteBucket,
  286. filepath.Join(data.RemoteBasePath, gMachineID, "log"), "log")
  287. if err != nil {
  288. logger.Errorf("upload log to MinIO error: %s", err)
  289. resp.Status = -1
  290. resp.Msg += fmt.Sprintf("upload log to MinIO error: %s\n", err)
  291. }
  292. }
  293. }
  294. err = PublishMessage(&resp)
  295. if err != nil {
  296. logger.Errorf("send action done error: %s", err)
  297. }
  298. default:
  299. logger.Debugf("NSQ Consumer ignored unknown or uninteresting message: %v", msg)
  300. }
  301. // Notify NSQ that the message is processed successfully
  302. return nil
  303. }), 1)
  304. // err = gNsqConsumer.ConnectToNSQLookupd(gAppConfig.Nsq.LookupdHttpAddr)
  305. err = gNsqConsumer.ConnectToNSQD(appInitCfg.Nsq.TcpAddr)
  306. if err != nil {
  307. logger.Fatalf("NSQ Consumer connect error: %s", err)
  308. }
  309. }
  310. var appInitCfg *AppInitConfig = &AppInitConfig{}
  311. func initLoadConfig() {
  312. viper.SetDefault("main.uploadRetryMaxTimes", 20)
  313. viper.SetDefault("main.failedRetryDelaySeconds", 5)
  314. viper.SetConfigFile("./config/application.yaml")
  315. viper.WatchConfig()
  316. viper.OnConfigChange(func(e fsnotify.Event) {
  317. logger.Infoln("Config file changed:", e.Name)
  318. var newAppInitConfig AppInitConfig
  319. err := viper.Unmarshal(&newAppInitConfig)
  320. if err != nil {
  321. logger.Infoln("Failed to unmarshal config:", err)
  322. return
  323. }
  324. appInitCfg = &newAppInitConfig
  325. })
  326. err := viper.ReadInConfig()
  327. if err != nil {
  328. logger.Fatalf("Failed to read config file: %v", err)
  329. }
  330. err = viper.Unmarshal(appInitCfg)
  331. if err != nil {
  332. logger.Errorf("Failed to unmarshal config: %v, exiting...", err)
  333. fmt.Printf("Failed to unmarshal config: %v, exiting...\n", err)
  334. os.Exit(1)
  335. }
  336. }
  337. func initLog() *logrus.Logger {
  338. // 主日志文件
  339. log := &lumberjack.Logger{
  340. Filename: "./log/minio-into-stck.log", // 日志文件的位置
  341. MaxSize: 50, // 文件最大尺寸(以MB为单位)
  342. MaxBackups: 5, // 保留的最大旧文件数量
  343. MaxAge: 28, // 保留旧文件的最大天数
  344. Compress: true, // 是否压缩/归档旧文件
  345. LocalTime: true, // 使用本地时间创建时间戳
  346. }
  347. // 错误日志文件
  348. errorLog := &lumberjack.Logger{
  349. Filename: "./log/minio-into-stck.error.log", // 错误日志文件的位置
  350. MaxSize: 50, // 文件最大尺寸(以MB为单位)
  351. MaxBackups: 5, // 保留的最大旧文件数量
  352. MaxAge: 28, // 保留旧文件的最大天数
  353. Compress: true, // 是否压缩/归档旧文件
  354. LocalTime: true, // 使用本地时间创建时间戳
  355. }
  356. // 统计日志文件
  357. statLog := &lumberjack.Logger{
  358. Filename: "./log/minio-into-stck.stat.log", // 统计日志文件的位置
  359. MaxSize: 50, // 文件最大尺寸(以MB为单位)
  360. MaxBackups: 5, // 保留的最大旧文件数量
  361. MaxAge: 28, // 保留旧文件的最大天数
  362. Compress: true, // 是否压缩/归档旧文件
  363. LocalTime: true, // 使用本地时间创建时间戳
  364. }
  365. logger := logrus.New()
  366. if strings.ToLower(os.Getenv("LOG_LEVEL")) == "trace" {
  367. logger.SetLevel(logrus.TraceLevel)
  368. } else {
  369. logger.SetLevel(logrus.DebugLevel)
  370. }
  371. logger.Out = log
  372. // logger.Out = io.MultiWriter(os.Stdout, log)
  373. // 设置错误级别日志输出到额外的文件
  374. logger.AddHook(&ErrorHook{
  375. Writer: errorLog,
  376. LogLevels: []logrus.Level{
  377. logrus.ErrorLevel,
  378. logrus.FatalLevel,
  379. logrus.PanicLevel,
  380. },
  381. })
  382. statLogger = logrus.New()
  383. statLogger.SetLevel(logrus.InfoLevel)
  384. statLogger.Out = statLog
  385. return logger
  386. }
  387. // ErrorHook 用于将错误级别的日志输出到额外的文件
  388. type ErrorHook struct {
  389. Writer io.Writer
  390. LogLevels []logrus.Level
  391. }
  392. func (hook *ErrorHook) Fire(entry *logrus.Entry) error {
  393. line, err := entry.String()
  394. if err != nil {
  395. return err
  396. }
  397. _, err = hook.Writer.Write([]byte(line))
  398. return err
  399. }
  400. func (hook *ErrorHook) Levels() []logrus.Level {
  401. return hook.LogLevels
  402. }
  403. var logger *logrus.Logger
  404. var statLogger *logrus.Logger
  405. var mutexObjFailCounter = &sync.Mutex{}
  406. var objFailCounter map[string]int = make(map[string]int)
  407. func main() {
  408. var err error
  409. fmt.Println("Starting minio-into-stck, build time:", buildtime)
  410. logger = initLog()
  411. logger.Warnln("Logger initialized.")
  412. // Load configuration from file
  413. initLoadConfig()
  414. // 初始化 NSQ
  415. initNsq()
  416. var db *gorm.DB
  417. var minioClient *minio.Client
  418. var ckConn driver.Conn
  419. // Connect to MySQL
  420. dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=True&loc=Local",
  421. appInitCfg.Mysql.User, appInitCfg.Mysql.Password, appInitCfg.Mysql.Host, appInitCfg.Mysql.Database)
  422. db, err = gorm.Open(mysql.Open(dsn), &gorm.Config{})
  423. if err != nil {
  424. logger.Fatalf("Failed to connect to MySQL: %v", err)
  425. }
  426. // Disable logging
  427. db.Logger = gorm_logger.Discard
  428. // Get the underlying sql.DB object to close the connection later
  429. sqlDB, err := db.DB()
  430. if err != nil {
  431. logger.Fatalf("Failed to get MySQL DB: %v", err)
  432. }
  433. defer sqlDB.Close()
  434. // Ping the database to check if the connection is successful
  435. err = sqlDB.Ping()
  436. if err != nil {
  437. logger.Infof("ping db error: %v", err)
  438. return
  439. }
  440. logger.Infoln("Database connection successful")
  441. // Perform auto migration
  442. err = db.AutoMigrate(&AppConfigDbEntry{}, &UploadRecord{}, &PartUploadRecord{})
  443. if err != nil {
  444. logger.Infof("auto migrate error: %v", err)
  445. return
  446. }
  447. logger.Infoln("Auto migration completed")
  448. // Connect to MinIO
  449. minioClient, err = minio.New(util.ExpandShellStringInfallible(appInitCfg.Minio.Host), &minio.Options{
  450. Creds: credentials.NewStaticV4(appInitCfg.Minio.AccessKey, appInitCfg.Minio.Secret, ""),
  451. Secure: false,
  452. })
  453. if err == nil {
  454. bucketExists, err := minioClient.BucketExists(context.Background(), appInitCfg.Minio.Bucket)
  455. if err != nil {
  456. logger.Fatalf("Failed to check if bucket %s exists: %v", appInitCfg.Minio.Bucket, err)
  457. }
  458. if !bucketExists {
  459. logger.Fatalf("Bucket %s does not exist", appInitCfg.Minio.Bucket)
  460. }
  461. }
  462. if err != nil {
  463. logger.Fatalf("Failed to connect to MinIO: %v", err)
  464. }
  465. logger.Infoln("Connected to MinIO")
  466. // Connect to ClickHouse
  467. ckConn, err = clickhouse.Open(&clickhouse.Options{
  468. Addr: []string{appInitCfg.Stck.Host},
  469. })
  470. if err == nil {
  471. err = ckConn.Ping(context.Background())
  472. }
  473. if err != nil {
  474. logger.Fatalf("Failed to connect to ClickHouse: %v", err)
  475. }
  476. logger.Infoln("Connected to ClickHouse")
  477. // OK! Everything is ready now.
  478. // objUploadChan := make(chan string, 1024*256)
  479. objUploadChan := util.NewDChan[string](1024 * 16)
  480. // Start the main work
  481. logger.Infoln("Starting main worker...")
  482. gAppExitWaitGroup.Add(1)
  483. go func() {
  484. defer gAppExitWaitGroup.Done()
  485. main_worker(AppCtx{db, minioClient, ckConn}, objUploadChan)
  486. }()
  487. // Wait on signal.
  488. signalChan := make(chan os.Signal, 1)
  489. signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
  490. select {
  491. case <-signalChan:
  492. logger.Infof("received signal, stopping minio-into-stck")
  493. case <-programmaticQuitChan:
  494. logger.Infof("received programmatic quit signal, stopping minio-into-stck")
  495. }
  496. gAppQuitting = true
  497. // HACK: Notify the main worker to quit
  498. objUploadChan.In() <- ""
  499. // Close the NSQ producer and consumer
  500. gNsqProducer.Stop()
  501. gNsqConsumer.Stop()
  502. // Wait for the goroutines to exit
  503. gAppExitWaitGroup.Wait()
  504. logger.Infof("minio-into-stck stopped gracefully")
  505. }
  506. type AppCtx struct {
  507. db *gorm.DB
  508. minioClient *minio.Client
  509. ckConn driver.Conn
  510. }
  511. var gAppCfg *AppConfig
  512. type PartUploadArgs struct {
  513. StreamInfo *StreamMetadata
  514. StreamName string
  515. PartName string
  516. }
  517. func main_worker(app AppCtx, objUploadChan *util.DChan[string]) {
  518. ctx := context.Background()
  519. // Load config from DB
  520. appCfg, err := load_app_cfg_from_db(app.db)
  521. if err != nil {
  522. logger.Fatalf("Failed to load app config from DB: %v", err)
  523. }
  524. gAppCfg = &appCfg
  525. // Start the notification listener
  526. go func() {
  527. for {
  528. // Register the bucket notification listener
  529. logger.Infoln("Registering bucket notification listener...")
  530. notifys := app.minioClient.ListenBucketNotification(
  531. ctx, appInitCfg.Minio.Bucket, "", "", []string{string(notification.ObjectCreatedAll)})
  532. // Listen OK, start the full upload trigger to upload maybe missed files
  533. go trigger_full_upload(app, objUploadChan.In())
  534. for notifyInfo := range notifys {
  535. for _, record := range notifyInfo.Records {
  536. key := record.S3.Object.Key
  537. logger.Traceln("New object notification:", key)
  538. // Only care when `.zst` / `metadata.json` files are uploaded
  539. // keyParts := strings.Split(key, "/")
  540. // keyLastPart := keyParts[len(keyParts)-1]
  541. keyLastPart := filepath.Base(key)
  542. if keyLastPart != "metadata.json" && !strings.HasSuffix(keyLastPart, ".zst") {
  543. continue
  544. }
  545. // key = strings.Join(keyParts[:len(keyParts)-1], "/") + "/"
  546. key = filepath.Dir(key) + "/"
  547. // Remove fail counter so that the object can be retried instead of
  548. // being spuriously ignored
  549. mutexObjFailCounter.Lock()
  550. delete(objFailCounter, key)
  551. mutexObjFailCounter.Unlock()
  552. // Queue the object for upload
  553. objUploadChan.Write(key)
  554. }
  555. if notifyInfo.Err != nil {
  556. logger.Errorf("Bucket notification listener error: %v", notifyInfo.Err)
  557. }
  558. }
  559. logger.Warnln("Bucket notification listener stopped unexpectedly, retrying in 5 seconds...")
  560. time.Sleep(5 * time.Second)
  561. // Clear fail counter
  562. mutexObjFailCounter.Lock()
  563. objFailCounter = make(map[string]int)
  564. mutexObjFailCounter.Unlock()
  565. }
  566. }()
  567. // Start the main loop (streams upload worker)
  568. for objToUpload := range objUploadChan.Out() {
  569. objUploadChan.MarkElemReadDone(objToUpload)
  570. if gAppQuitting {
  571. logger.Infof("Quitting, stopping main worker")
  572. return
  573. }
  574. if objToUpload == "" {
  575. continue
  576. }
  577. logger.Infoln("Checking stream object:", objToUpload)
  578. if object_is_blacklisted(objToUpload) {
  579. logger.Infof("Object `%s` is blacklisted, skipping", objToUpload)
  580. continue
  581. }
  582. mutexObjFailCounter.Lock()
  583. if objFailCounter[objToUpload] >= appInitCfg.Main.UploadRetryMaxTimes {
  584. logger.Warnf("Retried upload stream `%s` for too many %d times, give up", objToUpload, objFailCounter[objToUpload])
  585. delete(objFailCounter, objToUpload)
  586. mutexObjFailCounter.Unlock()
  587. continue
  588. }
  589. objFailCounter[objToUpload]++
  590. mutexObjFailCounter.Unlock()
  591. fullyUploaded, err := upload_one_stream(app, objToUpload)
  592. if err != nil {
  593. // Queue the object for retry
  594. logger.Warnf("Failed to upload stream `%s`: `%v`, retrying after %d seconds",
  595. objToUpload, err, appInitCfg.Main.FailedRetryDelaySeconds)
  596. objUploadChan.DelayedWrite(objToUpload, time.Duration(appInitCfg.Main.FailedRetryDelaySeconds)*time.Second)
  597. continue
  598. }
  599. if fullyUploaded {
  600. // Mark the stream as fully uploaded
  601. //err = app.db.Create(&UploadRecord{Key: objToUpload}).Error
  602. err = app.db.Where(UploadRecord{Key: objToUpload}).FirstOrCreate(&UploadRecord{}).Error
  603. if err != nil {
  604. logger.Warnf("Failed to mark stream %s as uploaded: %v", objToUpload, err)
  605. } else {
  606. // We can now remove the stream parts from the parts table
  607. err = app.db.Where("stream_name = ?", objToUpload).Delete(&PartUploadRecord{}).Error
  608. if err != nil {
  609. logger.Warnf("Failed to remove parts of stream %s from parts table: %v", objToUpload, err)
  610. }
  611. logger.Infof("Marked stream %s as fully uploaded", objToUpload)
  612. }
  613. // Remove entry from the fail counter
  614. mutexObjFailCounter.Lock()
  615. delete(objFailCounter, objToUpload)
  616. mutexObjFailCounter.Unlock()
  617. } else {
  618. // Queue the object for retry
  619. logger.Warnf("Stream %s is not fully uploaded, retrying after %d seconds",
  620. objToUpload, appInitCfg.Main.FailedRetryDelaySeconds)
  621. objUploadChan.DelayedWrite(objToUpload, time.Duration(appInitCfg.Main.FailedRetryDelaySeconds)*time.Second)
  622. }
  623. }
  624. }
  625. func trigger_full_upload(app AppCtx, objToUploadChan chan<- string) {
  626. // // Upload all files in the directory
  627. // options := minio.ListObjectsOptions{
  628. // Recursive: false,
  629. // }
  630. // objectsCh := app.minioClient.ListObjects(context.Background(), appInitCfg.Minio.Bucket, options)
  631. // // NOTE: Streams in minio are organized as `<date>/<stream_name>/`.
  632. // for objInfo := range objectsCh {
  633. // if objInfo.Err != nil {
  634. // logger.Warnf("Error listing bucket `%s` objects: %v", appInitCfg.Minio.Bucket, objInfo.Err)
  635. // continue
  636. // }
  637. // key := objInfo.Key
  638. // if strings.HasSuffix(key, "/") {
  639. // // Is a directory (<date>), go deeper into it
  640. // options := minio.ListObjectsOptions{
  641. // Prefix: key,
  642. // Recursive: false,
  643. // }
  644. // for objInfo := range app.minioClient.ListObjects(context.Background(), appInitCfg.Minio.Bucket, options) {
  645. // if objInfo.Err != nil {
  646. // logger.Warnf("Error listing bucket `%s` folder `%s` objects: %v", appInitCfg.Minio.Bucket, key, objInfo.Err)
  647. // continue
  648. // }
  649. // key := objInfo.Key
  650. // if strings.HasSuffix(key, "/") {
  651. // // Is a directory, should be a stream then
  652. // uploaded := object_already_uploaded(app, key)
  653. // if !uploaded {
  654. // objToUploadChan <- key
  655. // }
  656. // }
  657. // }
  658. // }
  659. // }
  660. // WARN: Slow!!!
  661. // options := minio.ListObjectsOptions{
  662. // Recursive: true,
  663. // }
  664. options := minio.ListObjectsOptions{
  665. Recursive: false,
  666. }
  667. objectsCh := app.minioClient.ListObjects(context.Background(), appInitCfg.Minio.Bucket, options)
  668. // NOTE: Streams in minio are organized as `<date>/<stream_name>/`.
  669. for objInfo := range objectsCh {
  670. if objInfo.Err != nil {
  671. logger.Warnf("Error listing bucket `%s` objects: %v", appInitCfg.Minio.Bucket, objInfo.Err)
  672. continue
  673. }
  674. key := objInfo.Key
  675. // // If it's a directory with `metadata.json` file, it should be a stream
  676. // if strings.HasSuffix(key, "/metadata.json") {
  677. // streamName := strings.TrimSuffix(key, "metadata.json")
  678. // uploaded := object_already_uploaded(app, streamName)
  679. // if !uploaded {
  680. // objToUploadChan <- streamName
  681. // }
  682. // }
  683. // Scan through all subdirectories
  684. if strings.HasSuffix(key, "/") {
  685. exists, err := util.MinioObjectExists(app.minioClient, appInitCfg.Minio.Bucket, key+"metadata.json")
  686. if err != nil {
  687. logger.Warnf("Error checking if object `%s` exists: %v", key+"metadata.json", err)
  688. continue
  689. }
  690. if exists {
  691. // Go ahead and upload the stream
  692. uploaded := object_already_uploaded(app, key)
  693. if !uploaded {
  694. objToUploadChan <- key
  695. }
  696. } else {
  697. // Check inner directories
  698. options := minio.ListObjectsOptions{
  699. Prefix: key,
  700. Recursive: false,
  701. }
  702. for objInfo := range app.minioClient.ListObjects(context.Background(), appInitCfg.Minio.Bucket, options) {
  703. if objInfo.Err != nil {
  704. logger.Warnf("Error listing bucket `%s` folder `%s` objects: %v", appInitCfg.Minio.Bucket, key, objInfo.Err)
  705. continue
  706. }
  707. key := objInfo.Key
  708. if strings.HasSuffix(key, "/") {
  709. // Is a directory, should be a stream then
  710. uploaded := object_already_uploaded(app, key)
  711. if !uploaded {
  712. objToUploadChan <- key
  713. }
  714. }
  715. }
  716. }
  717. }
  718. }
  719. }
  720. func upload_one_stream(app AppCtx, streamName string) (fullyUploaded bool, err error) {
  721. type StreamObjectUploadStatistics struct {
  722. StartTime time.Time
  723. EndTime time.Time
  724. PartName string
  725. UpState string // "repeated" / "ok" / "fail"
  726. Msg string // "error message"
  727. }
  728. type StreamUploadStatistics struct {
  729. StartTime time.Time
  730. EndTime time.Time
  731. StreamName string
  732. MetaPointsCount int64
  733. MetaPartsCount int64
  734. Objects map[string]StreamObjectUploadStatistics
  735. Msg string
  736. }
  737. streamStats := StreamUploadStatistics{
  738. StartTime: time.Now(),
  739. StreamName: streamName,
  740. Objects: make(map[string]StreamObjectUploadStatistics),
  741. }
  742. logger.Infof("Going to upload stream `%s`", streamName)
  743. if object_already_uploaded(app, streamName) {
  744. logger.Infof("Stream `%s` is already uploaded", streamName)
  745. return true, nil
  746. }
  747. defer func() {
  748. streamStats.EndTime = time.Now()
  749. repeatedCount := int64(0)
  750. okCount := int64(0)
  751. failCount := int64(0)
  752. totalCount := int64(0)
  753. objDetailsMsg := ""
  754. for _, obj := range streamStats.Objects {
  755. totalCount++
  756. if obj.UpState == "repeated" {
  757. repeatedCount++
  758. objDetailsMsg += fmt.Sprintf("\n{%s: REPEATED (t: %v ~ %v, duration: %v)}",
  759. obj.PartName, util.FmtMyTime(obj.StartTime), util.FmtMyTime(obj.EndTime), obj.EndTime.Sub(obj.StartTime))
  760. } else if obj.UpState == "ok" {
  761. okCount++
  762. objDetailsMsg += fmt.Sprintf("\n{%s: OK (t: %v ~ %v, duration: %v)}",
  763. obj.PartName, util.FmtMyTime(obj.StartTime), util.FmtMyTime(obj.EndTime), obj.EndTime.Sub(obj.StartTime))
  764. } else {
  765. failCount++
  766. objDetailsMsg += fmt.Sprintf("\n{%s: FAIL (Reason: %s) (t: %v ~ %v, duration: %v)}",
  767. obj.PartName, obj.Msg, util.FmtMyTime(obj.StartTime), util.FmtMyTime(obj.EndTime), obj.EndTime.Sub(obj.StartTime))
  768. }
  769. }
  770. statLogger.Infof("Upload stream `%s` took %v (%v ~ %v); is fully uploaded: %v; metadata-reported part count=%d; "+
  771. "repeated=%d, ok=%d, fail=%d, total=%d; details:%s",
  772. streamName, streamStats.EndTime.Sub(streamStats.StartTime),
  773. util.FmtMyTime(streamStats.StartTime), util.FmtMyTime(streamStats.EndTime),
  774. fullyUploaded, streamStats.MetaPartsCount,
  775. repeatedCount, okCount, failCount, totalCount, objDetailsMsg)
  776. // Send upload status changed message
  777. msg := smsg.MakeStreamInsertToStckStatusChangedMsg(gMachineID, streamStats.StartTime.UnixNano(),
  778. streamStats.StreamName, streamStats.MetaPointsCount, streamStats.MetaPartsCount,
  779. okCount, failCount, repeatedCount)
  780. err := PublishMessage(&msg)
  781. if err != nil {
  782. logger.Errorf("send stream insert to stck status changed message error: %s", err)
  783. }
  784. }()
  785. fullyUploaded = true
  786. // Get stream metadata
  787. streamInfo, err := get_stream_metadata(app, streamName)
  788. if err != nil {
  789. // Cannot continue without metadata
  790. return false, err
  791. }
  792. streamStats.MetaPointsCount = streamInfo.TotalPoints
  793. streamStats.MetaPartsCount = streamInfo.PartsCount
  794. if streamInfo.PartsCount == 0 {
  795. // Edge device didn't finish uploading the stream yet
  796. logger.Debugf("Marking stream `%s` as not fully uploaded, reason: parts_count=0", streamName)
  797. fullyUploaded = false
  798. }
  799. hasSomething := false
  800. // Upload parts
  801. streamObjPath := streamName
  802. options := minio.ListObjectsOptions{
  803. Prefix: streamObjPath,
  804. Recursive: false,
  805. }
  806. logger.Tracef("Listing minio objects in `%s`, bucket `%s`", streamObjPath, appInitCfg.Minio.Bucket)
  807. // hasSomething := false
  808. hasMetadata := false
  809. for objInfo := range app.minioClient.ListObjects(context.Background(), appInitCfg.Minio.Bucket, options) {
  810. if objInfo.Err != nil {
  811. return false, objInfo.Err
  812. }
  813. if gAppQuitting {
  814. logger.Infof("Quitting, stopping uploading one stream")
  815. return false, nil
  816. }
  817. logger.Tracef("Checking minio file `%s`", objInfo.Key)
  818. if strings.HasSuffix(objInfo.Key, "/") {
  819. continue
  820. }
  821. partName := filepath.Base(objInfo.Key)
  822. if partName == "metadata.json" {
  823. hasMetadata = true
  824. continue
  825. }
  826. hasSomething = true
  827. objStat := StreamObjectUploadStatistics{
  828. StartTime: time.Now(),
  829. PartName: partName,
  830. }
  831. if part_already_uploaded(app, streamName, partName) {
  832. objStat.EndTime = time.Now()
  833. objStat.UpState = "repeated"
  834. streamStats.Objects[objInfo.Key] = objStat
  835. logger.Infof("Part `%s` of stream `%s` is already uploaded", objInfo.Key, streamName)
  836. continue
  837. }
  838. if fullyUploaded {
  839. fullyUploaded = false
  840. logger.Debugf("Marking stream `%s` as not fully uploaded, reason: part `%s` not uploaded", streamName, objInfo.Key)
  841. }
  842. // Do the parts upload
  843. partInfo := PartUploadArgs{StreamInfo: streamInfo, StreamName: streamName, PartName: objInfo.Key}
  844. logger.Infof("Uploading part `%s` (total %d) of stream `%s`, total_points=%d",
  845. partInfo.PartName, partInfo.StreamInfo.PartsCount,
  846. partInfo.StreamName, partInfo.StreamInfo.TotalPoints)
  847. err := upload_one_part(app, partInfo.StreamInfo, partInfo.StreamName, partInfo.PartName)
  848. if err != nil {
  849. objStat.EndTime = time.Now()
  850. objStat.UpState = "fail"
  851. objStat.Msg = err.Error()
  852. logger.Warnf("Failed to upload part `%s` of stream `%s` (took %v): %v", partInfo.PartName, partInfo.StreamName,
  853. objStat.EndTime.Sub(objStat.StartTime), err)
  854. fullyUploaded = false
  855. } else {
  856. // Mark the part as uploaded
  857. //err = app.db.Create(&PartUploadRecord{StreamName: partInfo.StreamName, PartName: partInfo.PartName}).Error
  858. part := PartUploadRecord{StreamName: partInfo.StreamName, PartName: partName}
  859. err = app.db.Where(part).FirstOrCreate(&PartUploadRecord{}).Error
  860. if err != nil {
  861. logger.Warnf("Failed to mark part `%s` of stream `%s` as uploaded: %v", partInfo.PartName, partInfo.StreamName, err)
  862. }
  863. objStat.EndTime = time.Now()
  864. objStat.UpState = "ok"
  865. logger.Infof("Uploaded part `%s` of stream `%s`, took %v", objInfo.Key, streamName,
  866. objStat.EndTime.Sub(objStat.StartTime))
  867. }
  868. streamStats.Objects[objInfo.Key] = objStat
  869. partNum, err := util.ExtractNumberFromString(partName)
  870. if err != nil {
  871. // Not a part file? Skip
  872. continue
  873. }
  874. status := "success"
  875. if objStat.UpState != "ok" {
  876. status = "failed"
  877. }
  878. msg := smsg.MakePartInsertToStckStatusChangedMsg(gMachineID, objStat.StartTime.UnixNano(),
  879. streamName, partNum, streamStats.MetaPointsCount, status, objStat.Msg)
  880. err = PublishMessage(&msg)
  881. if err != nil {
  882. logger.Errorf("send part insert to stck status changed message error: %s", err)
  883. }
  884. }
  885. if !hasMetadata {
  886. logger.Warnf("Stream `%s` has no metadata file, will retry later", streamName)
  887. fullyUploaded = false
  888. }
  889. if !hasSomething {
  890. if streamInfo.PartsCount != 0 {
  891. logger.Errorf("Stream `%s` has no parts, but claims to have %d parts, %d points???",
  892. streamName, streamInfo.PartsCount, streamInfo.TotalPoints)
  893. } else {
  894. logger.Warnf("Stream `%s` has no parts in minio, will retry later", streamName)
  895. }
  896. fullyUploaded = false
  897. }
  898. return fullyUploaded, nil
  899. }
  900. func upload_one_part(app AppCtx, streamInfo *StreamMetadata, streamName string, partObjPath string) (err error) {
  901. partName := filepath.Base(partObjPath)
  902. if part_already_uploaded(app, streamName, partName) {
  903. return nil
  904. }
  905. dryRun := false
  906. if !dryRun {
  907. // Get the part data from MinIO
  908. obj, err := app.minioClient.GetObject(context.Background(), appInitCfg.Minio.Bucket, partObjPath,
  909. minio.GetObjectOptions{})
  910. if err != nil {
  911. return fmt.Errorf("failed to get part `%s` data: %w", partObjPath, err)
  912. }
  913. defer obj.Close()
  914. // Read the part data
  915. var partDataBuf = new(bytes.Buffer)
  916. _, err = partDataBuf.ReadFrom(obj)
  917. if err != nil {
  918. return fmt.Errorf("failed to read part `%s` data: %w", partObjPath, err)
  919. }
  920. // Process the part data
  921. partData, err := util.DecompressZstdBuffer(partDataBuf.Bytes())
  922. if err != nil {
  923. return fmt.Errorf("failed to decompress part `%s` data: %w", partObjPath, err)
  924. }
  925. // Use regex to extract the part index from the part name
  926. partIndex := int64(0)
  927. {
  928. re := regexp.MustCompile(`part_(\d+)\.zst`)
  929. matches := re.FindStringSubmatch(partObjPath)
  930. if len(matches) != 2 {
  931. return fmt.Errorf("failed to extract part index from part name `%s`", partObjPath)
  932. }
  933. partIndex, err = strconv.ParseInt(matches[1], 10, 64)
  934. if err != nil {
  935. return fmt.Errorf("failed to convert part index `%s` to integer: %w", matches[1], err)
  936. }
  937. // Check if the part index is correct
  938. if partIndex < 0 || (streamInfo.PartsCount != 0 && partIndex >= streamInfo.PartsCount) {
  939. return fmt.Errorf("part `%s` index out of bounds: %d / %d", partObjPath, partIndex, streamInfo.PartsCount)
  940. }
  941. // Check if the part data size is correct
  942. if streamInfo.PartsCount != 0 {
  943. left := int64(len(partData))
  944. if partIndex < streamInfo.PartsCount-1 {
  945. right := streamInfo.PointsPerPart * 8
  946. if left != right {
  947. return fmt.Errorf("part `%s` data size mismatch: %d versus %d", partObjPath, left, right)
  948. }
  949. } else if partIndex == streamInfo.PartsCount-1 {
  950. right := (streamInfo.TotalPoints % streamInfo.PointsPerPart) * 8
  951. if right == 0 {
  952. right = streamInfo.PointsPerPart * 8
  953. }
  954. if left != right {
  955. return fmt.Errorf("part `%s` data size mismatch: %d versus %d", partObjPath, left, right)
  956. }
  957. } else {
  958. return fmt.Errorf("part `%s` index out of bounds: %d", partObjPath, partIndex)
  959. }
  960. }
  961. }
  962. partPointsCount := len(partData) / 8
  963. // Insert the part data into ClickHouse
  964. batch, err := app.ckConn.PrepareBatch(context.Background(), "INSERT INTO "+appInitCfg.Stck.Table)
  965. if err != nil {
  966. return fmt.Errorf("failed to insert part data into stck: %w", err)
  967. }
  968. /*
  969. ┌─name────────┬─type────────────────────────────────────────────────┬─default_type─┬─default_expression─┬─comment───────────────────┬─codec_expression─┬─ttl_expression─┐
  970. │ metric_name │ LowCardinality(String) │ │ │ Metric name │ │ │
  971. │ point_name │ LowCardinality(String) │ │ │ Point name │ │ │
  972. │ tags │ Map(LowCardinality(String), LowCardinality(String)) │ │ │ Point tags │ │ │
  973. │ value │ Float64 │ │ │ Point value │ │ │
  974. │ nanoseconds │ Int64 │ │ │ Point time in nanoseconds │ DoubleDelta, LZ4 │ │
  975. └─────────────┴─────────────────────────────────────────────────────┴──────────────┴────────────────────┴───────────────────────────┴──────────────────┴────────────────┘
  976. */
  977. logger.Debugf("Going to insert %d points for part `%s` of stream `%s`", partPointsCount, partObjPath, streamName)
  978. for i := 0; i < partPointsCount; i++ {
  979. metricName := streamInfo.MetricName
  980. pointName := streamInfo.Name
  981. tags := map[string]string{}
  982. value := math.Float64frombits(binary.LittleEndian.Uint64(partData[i*8 : (i+1)*8]))
  983. // NOTE: TimestampOffset is in milliseconds, need to convert to nanoseconds
  984. nanoseconds := streamInfo.TimestampOffset * 1e6
  985. nanoseconds += int64(partIndex) * int64(streamInfo.PointsPerPart) * streamInfo.Interval
  986. nanoseconds += int64(i) * streamInfo.Interval
  987. err := batch.Append(metricName, pointName, tags, value, nanoseconds)
  988. if err != nil {
  989. return err
  990. }
  991. }
  992. if batch.Rows() != partPointsCount {
  993. logger.Errorf("Batch rows mismatch: %d / %d???", batch.Rows(), partPointsCount)
  994. }
  995. err = batch.Send()
  996. if err != nil {
  997. return err
  998. }
  999. if !batch.IsSent() {
  1000. logger.Errorln("Batch not sent???")
  1001. }
  1002. logger.Debugf("Inserted %d points for part `%s` of stream `%s`", partPointsCount, partObjPath, streamName)
  1003. // We made progress, reset the fail counter
  1004. mutexObjFailCounter.Lock()
  1005. objFailCounter[streamName] = 0
  1006. mutexObjFailCounter.Unlock()
  1007. }
  1008. return nil
  1009. }
  1010. func load_app_cfg_from_db(db *gorm.DB) (AppConfig, error) {
  1011. var cfg AppConfig
  1012. err := db.AutoMigrate(&AppConfigDbEntry{})
  1013. if err != nil {
  1014. return cfg, err
  1015. }
  1016. //db.Create(&AppConfigDbEntry{Key: "ImportBlacklist", Value: `[]`})
  1017. var dbEntry AppConfigDbEntry
  1018. result := db.Where(AppConfigDbEntry{Key: "ImportBlacklist"}).Attrs(AppConfigDbEntry{Value: `[]`}).FirstOrCreate(&dbEntry)
  1019. if result.Error != nil {
  1020. //if errors.Is(result.Error, gorm.ErrRecordNotFound) {
  1021. // dbEntry.Value = `[]`
  1022. //} else {
  1023. // return cfg, result.Error
  1024. //}
  1025. return cfg, result.Error
  1026. }
  1027. err = json.Unmarshal([]byte(dbEntry.Value), &cfg.ImportBlacklist)
  1028. if err != nil {
  1029. return cfg, err
  1030. }
  1031. return cfg, nil
  1032. }
  1033. func object_already_uploaded(app AppCtx, key string) bool {
  1034. var record UploadRecord
  1035. result := app.db.First(&record, "key", key)
  1036. return result.Error == nil
  1037. }
  1038. func object_is_blacklisted(key string) bool {
  1039. for _, regexPattern := range gAppCfg.ImportBlacklist {
  1040. // TODO: Cache compiled regex patterns
  1041. if matched, _ := regexp.MatchString(regexPattern, key); matched {
  1042. return true
  1043. }
  1044. }
  1045. return false
  1046. }
  1047. func part_already_uploaded(app AppCtx, streamName string, partName string) bool {
  1048. var record PartUploadRecord
  1049. result := app.db.First(&record, "stream_name = ? AND part_name = ?", streamName, partName)
  1050. return result.Error == nil
  1051. }
  1052. func get_stream_metadata(app AppCtx, streamName string) (*StreamMetadata, error) {
  1053. // Get the stream metadata from MinIO
  1054. metadataObjPath := streamName + "metadata.json"
  1055. obj, err := app.minioClient.GetObject(context.Background(), appInitCfg.Minio.Bucket,
  1056. metadataObjPath, minio.GetObjectOptions{})
  1057. if err != nil {
  1058. return nil, fmt.Errorf("failed to get stream metadata: %w", err)
  1059. }
  1060. defer obj.Close()
  1061. var streamInfo StreamMetadata
  1062. err = json.NewDecoder(obj).Decode(&streamInfo)
  1063. if err != nil {
  1064. return nil, fmt.Errorf("failed to decode stream metadata: %w", err)
  1065. }
  1066. return &streamInfo, nil
  1067. }