main.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533
  1. package main
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/binary"
  6. "encoding/json"
  7. "example/minio-into-stck/util"
  8. "fmt"
  9. "log"
  10. "math"
  11. "regexp"
  12. "strconv"
  13. "strings"
  14. "time"
  15. "github.com/ClickHouse/clickhouse-go/v2"
  16. "github.com/ClickHouse/clickhouse-go/v2/lib/driver"
  17. "github.com/fsnotify/fsnotify"
  18. "github.com/minio/minio-go/v7"
  19. "github.com/minio/minio-go/v7/pkg/credentials"
  20. "github.com/minio/minio-go/v7/pkg/notification"
  21. "gorm.io/driver/mysql"
  22. "gorm.io/gorm"
  23. "gorm.io/gorm/logger"
  24. // "github.com/natefinch/lumberjack"
  25. // "github.com/sirupsen/logrus"
  26. "github.com/spf13/viper"
  27. )
  28. // AppInitConfig is the initial configuration of the application, loaded from the config file.
  29. type AppInitConfig struct {
  30. Mysql struct {
  31. User string `mapstructure:"user"`
  32. Password string `mapstructure:"password"`
  33. Host string `mapstructure:"host"`
  34. Database string `mapstructure:"database"`
  35. } `mapstructure:"mysql"`
  36. Minio struct {
  37. AccessKey string `mapstructure:"accessKey"`
  38. Secret string `mapstructure:"secret"`
  39. Bucket string `mapstructure:"bucket"`
  40. Host string `mapstructure:"host"`
  41. } `mapstructure:"minio"`
  42. Stck struct {
  43. Host string `mapstructure:"host"`
  44. Table string `mapstructure:"table"`
  45. } `mapstructure:"stck"`
  46. }
  47. type AppConfig struct {
  48. // List of name regex patterns to exclude from import.
  49. ImportBlacklist []string
  50. }
  51. type AppConfigDbEntry struct {
  52. Key string `gorm:"primaryKey"`
  53. Value string
  54. }
  55. // UploadRecord Represents a record of an uploaded stream. (A major upload task.)
  56. type UploadRecord struct {
  57. Key string `gorm:"primaryKey"`
  58. CreatedAt time.Time
  59. }
  60. // PartUploadRecord Represents a record of an uploaded part of a stream. (A minor upload task.)
  61. type PartUploadRecord struct {
  62. StreamName string `gorm:"primaryKey"`
  63. PartName string `gorm:"primaryKey"`
  64. CreatedAt time.Time
  65. }
  66. type StreamMetadata struct {
  67. Name string `json:"name"`
  68. TimestampOffset int64 `json:"timestamp_offset"`
  69. Interval int64 `json:"interval"`
  70. PartsCount int `json:"parts_count"`
  71. PointsPerPart int `json:"points_per_part"`
  72. TotalPoints int `json:"total_points"`
  73. }
  74. var appInitCfg *AppInitConfig = &AppInitConfig{}
  75. func initLoadConfig() {
  76. viper.SetConfigFile("./config/application.yaml")
  77. viper.WatchConfig()
  78. viper.OnConfigChange(func(e fsnotify.Event) {
  79. log.Println("Config file changed:", e.Name)
  80. var newAppInitConfig AppInitConfig
  81. err := viper.Unmarshal(&newAppInitConfig)
  82. if err != nil {
  83. log.Println("Failed to unmarshal config:", err)
  84. }
  85. appInitCfg = &newAppInitConfig
  86. })
  87. err := viper.ReadInConfig()
  88. if err != nil {
  89. log.Fatalf("Failed to read config file: %v", err)
  90. }
  91. err = viper.Unmarshal(appInitCfg)
  92. if err != nil {
  93. log.Printf("Failed to unmarshal config: %v\n", err)
  94. }
  95. }
  96. func main() {
  97. var err error
  98. log.Println("Starting application...")
  99. // Load configuration from file
  100. initLoadConfig()
  101. // Connect to MySQL
  102. dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=True&loc=Local",
  103. appInitCfg.Mysql.User, appInitCfg.Mysql.Password, appInitCfg.Mysql.Host, appInitCfg.Mysql.Database)
  104. db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{})
  105. if err != nil {
  106. log.Fatalf("Failed to connect to MySQL: %v", err)
  107. }
  108. // Disable logging
  109. db.Logger = logger.Discard
  110. // Get the underlying sql.DB object to close the connection later
  111. sqlDB, err := db.DB()
  112. if err != nil {
  113. log.Fatalf("Failed to get MySQL DB: %v", err)
  114. }
  115. defer sqlDB.Close()
  116. // Ping the database to check if the connection is successful
  117. err = sqlDB.Ping()
  118. if err != nil {
  119. log.Printf("ping db error: %v", err)
  120. return
  121. }
  122. log.Println("Database connection successful")
  123. // Perform auto migration
  124. err = db.AutoMigrate(&AppConfigDbEntry{}, &UploadRecord{}, &PartUploadRecord{})
  125. if err != nil {
  126. log.Printf("auto migrate error: %v", err)
  127. return
  128. }
  129. log.Println("Auto migration completed")
  130. // Connect to MinIO
  131. minioClient, err := minio.New(appInitCfg.Minio.Host, &minio.Options{
  132. Creds: credentials.NewStaticV4(appInitCfg.Minio.AccessKey, appInitCfg.Minio.Secret, ""),
  133. Secure: false,
  134. })
  135. if err == nil {
  136. bucketExists, err := minioClient.BucketExists(context.Background(), appInitCfg.Minio.Bucket)
  137. if err != nil {
  138. log.Fatalf("Failed to check if bucket %s exists: %v", appInitCfg.Minio.Bucket, err)
  139. }
  140. if !bucketExists {
  141. log.Fatalf("Bucket %s does not exist", appInitCfg.Minio.Bucket)
  142. }
  143. }
  144. if err != nil {
  145. log.Fatalf("Failed to connect to MinIO: %v", err)
  146. }
  147. log.Println("Connected to MinIO")
  148. // Connect to ClickHouse
  149. ckConn, err := clickhouse.Open(&clickhouse.Options{
  150. Addr: []string{appInitCfg.Stck.Host},
  151. })
  152. if err == nil {
  153. err = ckConn.Ping(context.Background())
  154. }
  155. if err != nil {
  156. log.Fatalf("Failed to connect to ClickHouse: %v", err)
  157. }
  158. log.Println("Connected to ClickHouse")
  159. // Start the main work
  160. log.Println("Starting main worker...")
  161. main_worker(AppCtx{db, minioClient, ckConn})
  162. }
  163. type AppCtx struct {
  164. db *gorm.DB
  165. minioClient *minio.Client
  166. ckConn driver.Conn
  167. }
  168. /** Design notes:
  169. * The main worker will first load the app config from the database. Then it will pull the
  170. * list of objects from MinIO to build a local mirrored list (which gets rebuilt on every startup).
  171. * A bucket notification will be registered to listen for new objects at the same time.
  172. * For each new (stream) object, the worker will check if it's in the blacklist or if it's already
  173. * uploaded. If not, it will start the upload process by putting parts of the stream into parts table.
  174. * When all parts are uploaded, the worker will mark the stream as uploaded (insert into uploads table).
  175. */
  176. var gAppCfg *AppConfig
  177. type PartUploadArgs struct {
  178. StreamInfo *StreamMetadata
  179. StreamName string
  180. PartName string
  181. }
  182. func main_worker(app AppCtx) {
  183. ctx := context.Background()
  184. objUploadChan := make(chan string)
  185. partUploadChan := make(chan PartUploadArgs)
  186. // Load config from DB
  187. appCfg, err := load_app_cfg_from_db(app.db)
  188. if err != nil {
  189. log.Fatalf("Failed to load app config from DB: %v", err)
  190. }
  191. gAppCfg = &appCfg
  192. // Register bucket notification
  193. notifys := app.minioClient.ListenBucketNotification(
  194. ctx, appInitCfg.Minio.Bucket, "", "", []string{string(notification.ObjectCreatedAll)})
  195. // Start the notification listener
  196. go func() {
  197. for notifyInfo := range notifys {
  198. for _, record := range notifyInfo.Records {
  199. key := record.S3.Object.Key
  200. log.Println("New object notification:", key)
  201. // Only process root folders
  202. key = strings.Split(key, "/")[0]
  203. objUploadChan <- key
  204. }
  205. }
  206. }()
  207. // Start the full upload trigger
  208. go trigger_full_upload(app, objUploadChan)
  209. // Start the parts upload worker
  210. go upload_parts_worker(app, partUploadChan)
  211. // Start the main loop (streams upload worker)
  212. for objToUpload := range objUploadChan {
  213. log.Println("Checking stream object:", objToUpload)
  214. if object_is_blacklisted(objToUpload) {
  215. log.Printf("Object %s is blacklisted, skipping\n", objToUpload)
  216. continue
  217. }
  218. fullyUploaded, err := upload_one_stream(app, objToUpload, partUploadChan)
  219. if err != nil {
  220. // Queue the object for retry
  221. log.Printf("Failed to upload stream %s: `%v`, retrying\n", objToUpload, err)
  222. go func() {
  223. time.Sleep(5 * time.Second)
  224. objUploadChan <- objToUpload
  225. }()
  226. continue
  227. }
  228. if fullyUploaded {
  229. // Mark the stream as fully uploaded
  230. //err = app.db.Create(&UploadRecord{Key: objToUpload}).Error
  231. err = app.db.Where(UploadRecord{Key: objToUpload}).FirstOrCreate(&UploadRecord{}).Error
  232. if err != nil {
  233. log.Printf("Failed to mark stream %s as uploaded: %v\n", objToUpload, err)
  234. } else {
  235. // We can now remove the stream parts from the parts table
  236. err = app.db.Where("stream_name = ?", objToUpload).Delete(&PartUploadRecord{}).Error
  237. if err != nil {
  238. log.Printf("Failed to remove parts of stream %s from parts table: %v\n", objToUpload, err)
  239. }
  240. }
  241. } else {
  242. // Queue the object for retry
  243. log.Printf("Stream %s is not fully uploaded, retrying\n", objToUpload)
  244. go func() {
  245. time.Sleep(60 * time.Second)
  246. objUploadChan <- objToUpload
  247. }()
  248. }
  249. }
  250. }
  251. // Polls the parts table for not-yet-uploaded parts and uploads them.
  252. func upload_parts_worker(app AppCtx, partUploadChan chan PartUploadArgs) {
  253. for partInfo := range partUploadChan {
  254. log.Printf("Uploading part %s of stream %s\n", partInfo.PartName, partInfo.StreamName)
  255. err := upload_one_part(app, partInfo.StreamInfo, partInfo.StreamName, partInfo.PartName)
  256. if err != nil {
  257. log.Printf("Failed to upload part %s of stream %s: %v\n", partInfo.PartName, partInfo.StreamName, err)
  258. // Queue the part for retry
  259. go func() {
  260. time.Sleep(5 * time.Second)
  261. partUploadChan <- partInfo
  262. }()
  263. continue
  264. }
  265. // Mark the part as uploaded
  266. //err = app.db.Create(&PartUploadRecord{StreamName: partInfo.StreamName, PartName: partInfo.PartName}).Error
  267. part := PartUploadRecord{StreamName: partInfo.StreamName, PartName: partInfo.PartName}
  268. err = app.db.Where(part).FirstOrCreate(&PartUploadRecord{}).Error
  269. if err != nil {
  270. log.Printf("Failed to mark part %s of stream %s as uploaded: %v\n", partInfo.PartName, partInfo.StreamName, err)
  271. }
  272. }
  273. }
  274. func trigger_full_upload(app AppCtx, objToUploadChan chan<- string) {
  275. // Upload all files in the directory
  276. options := minio.ListObjectsOptions{
  277. Recursive: false,
  278. }
  279. objectsCh := app.minioClient.ListObjects(context.Background(), appInitCfg.Minio.Bucket, options)
  280. for objInfo := range objectsCh {
  281. if objInfo.Err != nil {
  282. log.Printf("Error listing objects: %v\n", objInfo.Err)
  283. break
  284. }
  285. key := objInfo.Key
  286. if strings.HasSuffix(key, "/") {
  287. objToUploadChan <- strings.Split(key, "/")[0]
  288. }
  289. }
  290. }
  291. func upload_one_stream(app AppCtx, streamName string, partUploadChan chan PartUploadArgs) (fullyUploaded bool, err error) {
  292. if object_already_uploaded(app, streamName) {
  293. return true, nil
  294. }
  295. fullyUploaded = true
  296. // Get stream metadata
  297. streamInfo, err := get_stream_metadata(app, streamName)
  298. if err != nil {
  299. return false, err
  300. }
  301. if streamInfo.PartsCount == 0 {
  302. // Edge device didn't finish uploading the stream yet
  303. fullyUploaded = false
  304. }
  305. // Upload parts
  306. streamObjPath := streamName + "/"
  307. options := minio.ListObjectsOptions{
  308. Prefix: streamObjPath,
  309. Recursive: false,
  310. }
  311. for objInfo := range app.minioClient.ListObjects(context.Background(), appInitCfg.Minio.Bucket, options) {
  312. if objInfo.Err != nil {
  313. return false, objInfo.Err
  314. }
  315. if strings.HasSuffix(objInfo.Key, "/") {
  316. continue
  317. }
  318. if objInfo.Key == streamObjPath+"metadata.json" {
  319. continue
  320. }
  321. if part_already_uploaded(app, streamName, objInfo.Key) {
  322. continue
  323. }
  324. fullyUploaded = false
  325. partUploadChan <- PartUploadArgs{StreamInfo: streamInfo, StreamName: streamName, PartName: objInfo.Key}
  326. }
  327. return fullyUploaded, nil
  328. }
  329. func upload_one_part(app AppCtx, streamInfo *StreamMetadata, streamName string, partName string) (err error) {
  330. if part_already_uploaded(app, streamName, partName) {
  331. return nil
  332. }
  333. dryRun := false
  334. if !dryRun {
  335. // Get the part data from MinIO
  336. obj, err := app.minioClient.GetObject(context.Background(), appInitCfg.Minio.Bucket, partName, minio.GetObjectOptions{})
  337. if err != nil {
  338. return fmt.Errorf("failed to get part data: %w", err)
  339. }
  340. defer obj.Close()
  341. // Read the part data
  342. var partDataBuf = new(bytes.Buffer)
  343. _, err = partDataBuf.ReadFrom(obj)
  344. if err != nil {
  345. return fmt.Errorf("failed to read part data: %w", err)
  346. }
  347. // Process the part data
  348. partData, err := util.DecompressZstdBuffer(partDataBuf.Bytes())
  349. if err != nil {
  350. return fmt.Errorf("failed to decompress part data: %w", err)
  351. }
  352. // Use regex to extract the part index from the part name
  353. partIndex := 0
  354. {
  355. re := regexp.MustCompile(`part_(\d+)\.zst`)
  356. matches := re.FindStringSubmatch(partName)
  357. if len(matches) != 2 {
  358. return fmt.Errorf("failed to extract part index from part name: %s", partName)
  359. }
  360. partIndex, err = strconv.Atoi(matches[1])
  361. if err != nil {
  362. return fmt.Errorf("failed to convert part index to integer: %w", err)
  363. }
  364. // Check if the part index is correct
  365. if partIndex < 0 || partIndex >= streamInfo.PartsCount {
  366. return fmt.Errorf("part index out of bounds: %d", partIndex)
  367. }
  368. // Check if the part data size is correct
  369. if partIndex < streamInfo.PartsCount-1 {
  370. if len(partData) != streamInfo.PointsPerPart*8 {
  371. return fmt.Errorf("part data size mismatch: %d", len(partData))
  372. }
  373. } else if partIndex == streamInfo.PartsCount-1 {
  374. if len(partData) != (streamInfo.TotalPoints%streamInfo.PointsPerPart)*8 {
  375. return fmt.Errorf("part data size mismatch: %d", len(partData))
  376. }
  377. } else {
  378. return fmt.Errorf("part index out of bounds: %d", partIndex)
  379. }
  380. }
  381. partPointsCount := len(partData) / 8
  382. // Insert the part data into ClickHouse
  383. batch, err := app.ckConn.PrepareBatch(context.Background(), "INSERT INTO "+appInitCfg.Stck.Table)
  384. if err != nil {
  385. return fmt.Errorf("failed to insert part data into ClickHouse: %w", err)
  386. }
  387. /*
  388. ┌─name────────┬─type────────────────────────────────────────────────┬─default_type─┬─default_expression─┬─comment───────────────────┬─codec_expression─┬─ttl_expression─┐
  389. │ metric_name │ LowCardinality(String) │ │ │ Metric name │ │ │
  390. │ point_name │ LowCardinality(String) │ │ │ Point name │ │ │
  391. │ tags │ Map(LowCardinality(String), LowCardinality(String)) │ │ │ Point tags │ │ │
  392. │ value │ Float64 │ │ │ Point value │ │ │
  393. │ nanoseconds │ Int64 │ │ │ Point time in nanoseconds │ DoubleDelta, LZ4 │ │
  394. └─────────────┴─────────────────────────────────────────────────────┴──────────────┴────────────────────┴───────────────────────────┴──────────────────┴────────────────┘
  395. */
  396. for i := 0; i < partPointsCount; i++ {
  397. metricName := ""
  398. pointName := streamInfo.Name
  399. tags := map[string]string{}
  400. value := math.Float64frombits(binary.LittleEndian.Uint64(partData[i*8 : (i+1)*8]))
  401. // NOTE: TimestampOffset is in milliseconds, need to convert to nanoseconds
  402. nanoseconds := streamInfo.TimestampOffset * 1e6
  403. nanoseconds += int64(partIndex) * int64(streamInfo.PointsPerPart) * streamInfo.Interval
  404. nanoseconds += int64(i) * streamInfo.Interval
  405. err := batch.Append(metricName, pointName, tags, value, nanoseconds)
  406. if err != nil {
  407. return err
  408. }
  409. }
  410. err = batch.Send()
  411. if err != nil {
  412. return err
  413. }
  414. }
  415. return nil
  416. }
  417. func load_app_cfg_from_db(db *gorm.DB) (AppConfig, error) {
  418. var cfg AppConfig
  419. err := db.AutoMigrate(&AppConfigDbEntry{})
  420. if err != nil {
  421. return cfg, err
  422. }
  423. //db.Create(&AppConfigDbEntry{Key: "ImportBlacklist", Value: `[]`})
  424. var dbEntry AppConfigDbEntry
  425. result := db.Where(AppConfigDbEntry{Key: "ImportBlacklist"}).Attrs(AppConfigDbEntry{Value: `[]`}).FirstOrCreate(&dbEntry)
  426. if result.Error != nil {
  427. //if errors.Is(result.Error, gorm.ErrRecordNotFound) {
  428. // dbEntry.Value = `[]`
  429. //} else {
  430. // return cfg, result.Error
  431. //}
  432. return cfg, result.Error
  433. }
  434. err = json.Unmarshal([]byte(dbEntry.Value), &cfg.ImportBlacklist)
  435. if err != nil {
  436. return cfg, err
  437. }
  438. return cfg, nil
  439. }
  440. func object_already_uploaded(app AppCtx, key string) bool {
  441. var record UploadRecord
  442. result := app.db.First(&record, "key", key)
  443. return result.Error == nil
  444. }
  445. func object_is_blacklisted(key string) bool {
  446. for _, regexPattern := range gAppCfg.ImportBlacklist {
  447. // TODO: Cache compiled regex patterns
  448. if matched, _ := regexp.MatchString(regexPattern, key); matched {
  449. return true
  450. }
  451. }
  452. return false
  453. }
  454. func part_already_uploaded(app AppCtx, streamName string, partName string) bool {
  455. var record PartUploadRecord
  456. result := app.db.First(&record, "stream_name = ? AND part_name = ?", streamName, partName)
  457. return result.Error == nil
  458. }
  459. func get_stream_metadata(app AppCtx, streamName string) (*StreamMetadata, error) {
  460. // Get the stream metadata from MinIO
  461. metadataObjPath := streamName + "/metadata.json"
  462. obj, err := app.minioClient.GetObject(context.Background(), appInitCfg.Minio.Bucket,
  463. metadataObjPath, minio.GetObjectOptions{})
  464. if err != nil {
  465. return nil, fmt.Errorf("failed to get stream metadata: %w", err)
  466. }
  467. defer obj.Close()
  468. var streamInfo StreamMetadata
  469. err = json.NewDecoder(obj).Decode(&streamInfo)
  470. if err != nil {
  471. return nil, fmt.Errorf("failed to decode stream metadata: %w", err)
  472. }
  473. return &streamInfo, nil
  474. }