main.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551
  1. package main
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/binary"
  6. "encoding/json"
  7. "example/minio-into-stck/util"
  8. "fmt"
  9. "log"
  10. "math"
  11. "regexp"
  12. "strconv"
  13. "strings"
  14. "time"
  15. "github.com/ClickHouse/clickhouse-go/v2"
  16. "github.com/ClickHouse/clickhouse-go/v2/lib/driver"
  17. "github.com/fsnotify/fsnotify"
  18. "github.com/minio/minio-go/v7"
  19. "github.com/minio/minio-go/v7/pkg/credentials"
  20. "github.com/minio/minio-go/v7/pkg/notification"
  21. "gorm.io/driver/mysql"
  22. "gorm.io/gorm"
  23. "gorm.io/gorm/logger"
  24. // "github.com/natefinch/lumberjack"
  25. // "github.com/sirupsen/logrus"
  26. "github.com/spf13/viper"
  27. )
  28. // AppInitConfig is the initial configuration of the application, loaded from the config file.
  29. type AppInitConfig struct {
  30. Mysql struct {
  31. User string `mapstructure:"user"`
  32. Password string `mapstructure:"password"`
  33. Host string `mapstructure:"host"`
  34. Database string `mapstructure:"database"`
  35. } `mapstructure:"mysql"`
  36. Minio struct {
  37. AccessKey string `mapstructure:"accessKey"`
  38. Secret string `mapstructure:"secret"`
  39. Bucket string `mapstructure:"bucket"`
  40. Host string `mapstructure:"host"`
  41. } `mapstructure:"minio"`
  42. Stck struct {
  43. Host string `mapstructure:"host"`
  44. Table string `mapstructure:"table"`
  45. } `mapstructure:"stck"`
  46. }
  47. type AppConfig struct {
  48. // List of name regex patterns to exclude from import.
  49. ImportBlacklist []string
  50. }
  51. type AppConfigDbEntry struct {
  52. Key string `gorm:"primaryKey"`
  53. Value string
  54. }
  55. // TODO: 插入时先创建行,插入完后更新插入耗时
  56. // UploadRecord Represents a record of an uploaded stream. (A major upload task.)
  57. type UploadRecord struct {
  58. Key string `gorm:"primaryKey"`
  59. CreatedAt time.Time
  60. }
  61. // PartUploadRecord Represents a record of an uploaded part of a stream. (A minor upload task.)
  62. type PartUploadRecord struct {
  63. StreamName string `gorm:"primaryKey"`
  64. PartName string `gorm:"primaryKey"`
  65. CreatedAt time.Time
  66. }
  67. type StreamMetadata struct {
  68. MetricName string `json:"metric_name"`
  69. Name string `json:"name"`
  70. TimestampOffset int64 `json:"timestamp_offset"`
  71. Interval int64 `json:"interval"`
  72. PartsCount int `json:"parts_count"`
  73. PointsPerPart int `json:"points_per_part"`
  74. TotalPoints int `json:"total_points"`
  75. }
  76. var appInitCfg *AppInitConfig = &AppInitConfig{}
  77. func initLoadConfig() {
  78. viper.SetConfigFile("./config/application.yaml")
  79. viper.WatchConfig()
  80. viper.OnConfigChange(func(e fsnotify.Event) {
  81. log.Println("Config file changed:", e.Name)
  82. var newAppInitConfig AppInitConfig
  83. err := viper.Unmarshal(&newAppInitConfig)
  84. if err != nil {
  85. log.Println("Failed to unmarshal config:", err)
  86. }
  87. appInitCfg = &newAppInitConfig
  88. })
  89. err := viper.ReadInConfig()
  90. if err != nil {
  91. log.Fatalf("Failed to read config file: %v", err)
  92. }
  93. err = viper.Unmarshal(appInitCfg)
  94. if err != nil {
  95. log.Printf("Failed to unmarshal config: %v\n", err)
  96. }
  97. }
  98. func main() {
  99. var err error
  100. log.Println("Starting application...")
  101. // Load configuration from file
  102. initLoadConfig()
  103. // Connect to MySQL
  104. dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=True&loc=Local",
  105. appInitCfg.Mysql.User, appInitCfg.Mysql.Password, appInitCfg.Mysql.Host, appInitCfg.Mysql.Database)
  106. db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{})
  107. if err != nil {
  108. log.Fatalf("Failed to connect to MySQL: %v", err)
  109. }
  110. // Disable logging
  111. db.Logger = logger.Discard
  112. // Get the underlying sql.DB object to close the connection later
  113. sqlDB, err := db.DB()
  114. if err != nil {
  115. log.Fatalf("Failed to get MySQL DB: %v", err)
  116. }
  117. defer sqlDB.Close()
  118. // Ping the database to check if the connection is successful
  119. err = sqlDB.Ping()
  120. if err != nil {
  121. log.Printf("ping db error: %v", err)
  122. return
  123. }
  124. log.Println("Database connection successful")
  125. // Perform auto migration
  126. err = db.AutoMigrate(&AppConfigDbEntry{}, &UploadRecord{}, &PartUploadRecord{})
  127. if err != nil {
  128. log.Printf("auto migrate error: %v", err)
  129. return
  130. }
  131. log.Println("Auto migration completed")
  132. // Connect to MinIO
  133. minioClient, err := minio.New(util.ExpandShellStringInfallible(appInitCfg.Minio.Host), &minio.Options{
  134. Creds: credentials.NewStaticV4(appInitCfg.Minio.AccessKey, appInitCfg.Minio.Secret, ""),
  135. Secure: false,
  136. })
  137. if err == nil {
  138. bucketExists, err := minioClient.BucketExists(context.Background(), appInitCfg.Minio.Bucket)
  139. if err != nil {
  140. log.Fatalf("Failed to check if bucket %s exists: %v", appInitCfg.Minio.Bucket, err)
  141. }
  142. if !bucketExists {
  143. log.Fatalf("Bucket %s does not exist", appInitCfg.Minio.Bucket)
  144. }
  145. }
  146. if err != nil {
  147. log.Fatalf("Failed to connect to MinIO: %v", err)
  148. }
  149. log.Println("Connected to MinIO")
  150. // Connect to ClickHouse
  151. ckConn, err := clickhouse.Open(&clickhouse.Options{
  152. Addr: []string{appInitCfg.Stck.Host},
  153. })
  154. if err == nil {
  155. err = ckConn.Ping(context.Background())
  156. }
  157. if err != nil {
  158. log.Fatalf("Failed to connect to ClickHouse: %v", err)
  159. }
  160. log.Println("Connected to ClickHouse")
  161. // Start the main work
  162. log.Println("Starting main worker...")
  163. main_worker(AppCtx{db, minioClient, ckConn})
  164. }
  165. type AppCtx struct {
  166. db *gorm.DB
  167. minioClient *minio.Client
  168. ckConn driver.Conn
  169. }
  170. /** Design notes:
  171. * The main worker will first load the app config from the database. Then it will pull the
  172. * list of objects from MinIO to build a local mirrored list (which gets rebuilt on every startup).
  173. * A bucket notification will be registered to listen for new objects at the same time.
  174. * For each new (stream) object, the worker will check if it's in the blacklist or if it's already
  175. * uploaded. If not, it will start the upload process by putting parts of the stream into parts table.
  176. * When all parts are uploaded, the worker will mark the stream as uploaded (insert into uploads table).
  177. */
  178. var gAppCfg *AppConfig
  179. type PartUploadArgs struct {
  180. StreamInfo *StreamMetadata
  181. StreamName string
  182. PartName string
  183. }
  184. func main_worker(app AppCtx) {
  185. ctx := context.Background()
  186. objUploadChan := make(chan string)
  187. partUploadChan := make(chan PartUploadArgs)
  188. // Load config from DB
  189. appCfg, err := load_app_cfg_from_db(app.db)
  190. if err != nil {
  191. log.Fatalf("Failed to load app config from DB: %v", err)
  192. }
  193. gAppCfg = &appCfg
  194. // Register bucket notification
  195. notifys := app.minioClient.ListenBucketNotification(
  196. ctx, appInitCfg.Minio.Bucket, "", "", []string{string(notification.ObjectCreatedAll)})
  197. // Start the notification listener
  198. go func() {
  199. for notifyInfo := range notifys {
  200. for _, record := range notifyInfo.Records {
  201. key := record.S3.Object.Key
  202. log.Println("New object notification:", key)
  203. // Only process root folders
  204. key = strings.Split(key, "/")[0]
  205. objUploadChan <- key
  206. }
  207. }
  208. }()
  209. // Start the full upload trigger
  210. go trigger_full_upload(app, objUploadChan)
  211. // Start the parts upload worker
  212. go upload_parts_worker(app, partUploadChan)
  213. // Start the main loop (streams upload worker)
  214. for objToUpload := range objUploadChan {
  215. log.Println("Checking stream object:", objToUpload)
  216. if object_is_blacklisted(objToUpload) {
  217. log.Printf("Object %s is blacklisted, skipping\n", objToUpload)
  218. continue
  219. }
  220. fullyUploaded, err := upload_one_stream(app, objToUpload, partUploadChan)
  221. if err != nil {
  222. // Queue the object for retry
  223. log.Printf("Failed to upload stream %s: `%v`, retrying\n", objToUpload, err)
  224. go func() {
  225. time.Sleep(5 * time.Second)
  226. objUploadChan <- objToUpload
  227. }()
  228. continue
  229. }
  230. if fullyUploaded {
  231. // Mark the stream as fully uploaded
  232. //err = app.db.Create(&UploadRecord{Key: objToUpload}).Error
  233. err = app.db.Where(UploadRecord{Key: objToUpload}).FirstOrCreate(&UploadRecord{}).Error
  234. if err != nil {
  235. log.Printf("Failed to mark stream %s as uploaded: %v\n", objToUpload, err)
  236. } else {
  237. // We can now remove the stream parts from the parts table
  238. err = app.db.Where("stream_name = ?", objToUpload).Delete(&PartUploadRecord{}).Error
  239. if err != nil {
  240. log.Printf("Failed to remove parts of stream %s from parts table: %v\n", objToUpload, err)
  241. }
  242. log.Printf("Marked stream %s as fully uploaded\n", objToUpload)
  243. }
  244. } else {
  245. // Queue the object for retry
  246. log.Printf("Stream %s is not fully uploaded, retrying\n", objToUpload)
  247. go func() {
  248. time.Sleep(60 * time.Second)
  249. objUploadChan <- objToUpload
  250. }()
  251. }
  252. }
  253. }
  254. // Polls the parts table for not-yet-uploaded parts and uploads them.
  255. func upload_parts_worker(app AppCtx, partUploadChan chan PartUploadArgs) {
  256. for partInfo := range partUploadChan {
  257. log.Printf("Uploading part %s of stream %s\n", partInfo.PartName, partInfo.StreamName)
  258. err := upload_one_part(app, partInfo.StreamInfo, partInfo.StreamName, partInfo.PartName)
  259. if err != nil {
  260. log.Printf("Failed to upload part %s of stream %s: %v\n", partInfo.PartName, partInfo.StreamName, err)
  261. // Queue the part for retry
  262. go func() {
  263. time.Sleep(5 * time.Second)
  264. partUploadChan <- partInfo
  265. }()
  266. continue
  267. }
  268. // Mark the part as uploaded
  269. //err = app.db.Create(&PartUploadRecord{StreamName: partInfo.StreamName, PartName: partInfo.PartName}).Error
  270. part := PartUploadRecord{StreamName: partInfo.StreamName, PartName: partInfo.PartName}
  271. err = app.db.Where(part).FirstOrCreate(&PartUploadRecord{}).Error
  272. if err != nil {
  273. log.Printf("Failed to mark part %s of stream %s as uploaded: %v\n", partInfo.PartName, partInfo.StreamName, err)
  274. }
  275. }
  276. }
  277. func trigger_full_upload(app AppCtx, objToUploadChan chan<- string) {
  278. // Upload all files in the directory
  279. options := minio.ListObjectsOptions{
  280. Recursive: false,
  281. }
  282. objectsCh := app.minioClient.ListObjects(context.Background(), appInitCfg.Minio.Bucket, options)
  283. for objInfo := range objectsCh {
  284. if objInfo.Err != nil {
  285. log.Printf("Error listing objects: %v\n", objInfo.Err)
  286. break
  287. }
  288. key := objInfo.Key
  289. if strings.HasSuffix(key, "/") {
  290. objToUploadChan <- strings.Split(key, "/")[0]
  291. }
  292. }
  293. }
  294. func upload_one_stream(app AppCtx, streamName string, partUploadChan chan PartUploadArgs) (fullyUploaded bool, err error) {
  295. log.Printf("Going to upload stream %s\n", streamName)
  296. if object_already_uploaded(app, streamName) {
  297. log.Printf("Stream %s is already uploaded\n", streamName)
  298. return true, nil
  299. }
  300. fullyUploaded = true
  301. // Get stream metadata
  302. streamInfo, err := get_stream_metadata(app, streamName)
  303. if err != nil {
  304. return false, err
  305. }
  306. if streamInfo.PartsCount == 0 {
  307. // Edge device didn't finish uploading the stream yet
  308. fullyUploaded = false
  309. }
  310. // Upload parts
  311. streamObjPath := streamName + "/"
  312. options := minio.ListObjectsOptions{
  313. Prefix: streamObjPath,
  314. Recursive: false,
  315. }
  316. hasSomething := false
  317. for objInfo := range app.minioClient.ListObjects(context.Background(), appInitCfg.Minio.Bucket, options) {
  318. if objInfo.Err != nil {
  319. return false, objInfo.Err
  320. }
  321. if strings.HasSuffix(objInfo.Key, "/") {
  322. continue
  323. }
  324. if objInfo.Key == streamObjPath+"metadata.json" {
  325. continue
  326. }
  327. hasSomething = true
  328. if part_already_uploaded(app, streamName, objInfo.Key) {
  329. log.Printf("Part %s of stream %s is already uploaded\n", objInfo.Key, streamName)
  330. continue
  331. }
  332. fullyUploaded = false
  333. partUploadChan <- PartUploadArgs{StreamInfo: streamInfo, StreamName: streamName, PartName: objInfo.Key}
  334. }
  335. if !hasSomething {
  336. log.Printf("Stream %s has no parts, but claims to have %d parts???\n", streamName, streamInfo.PartsCount)
  337. fullyUploaded = false
  338. }
  339. return fullyUploaded, nil
  340. }
  341. func upload_one_part(app AppCtx, streamInfo *StreamMetadata, streamName string, partName string) (err error) {
  342. if part_already_uploaded(app, streamName, partName) {
  343. return nil
  344. }
  345. dryRun := false
  346. if !dryRun {
  347. // Get the part data from MinIO
  348. obj, err := app.minioClient.GetObject(context.Background(), appInitCfg.Minio.Bucket, partName, minio.GetObjectOptions{})
  349. if err != nil {
  350. return fmt.Errorf("failed to get part data: %w", err)
  351. }
  352. defer obj.Close()
  353. // Read the part data
  354. var partDataBuf = new(bytes.Buffer)
  355. _, err = partDataBuf.ReadFrom(obj)
  356. if err != nil {
  357. return fmt.Errorf("failed to read part data: %w", err)
  358. }
  359. // Process the part data
  360. partData, err := util.DecompressZstdBuffer(partDataBuf.Bytes())
  361. if err != nil {
  362. return fmt.Errorf("failed to decompress part data: %w", err)
  363. }
  364. // Use regex to extract the part index from the part name
  365. partIndex := 0
  366. {
  367. re := regexp.MustCompile(`part_(\d+)\.zst`)
  368. matches := re.FindStringSubmatch(partName)
  369. if len(matches) != 2 {
  370. return fmt.Errorf("failed to extract part index from part name: %s", partName)
  371. }
  372. partIndex, err = strconv.Atoi(matches[1])
  373. if err != nil {
  374. return fmt.Errorf("failed to convert part index to integer: %w", err)
  375. }
  376. // Check if the part index is correct
  377. if partIndex < 0 || (streamInfo.PartsCount != 0 && partIndex >= streamInfo.PartsCount) {
  378. return fmt.Errorf("part index out of bounds: %d / %d", partIndex, streamInfo.PartsCount)
  379. }
  380. // Check if the part data size is correct
  381. if streamInfo.PartsCount != 0 {
  382. if partIndex < streamInfo.PartsCount-1 {
  383. if len(partData) != streamInfo.PointsPerPart*8 {
  384. return fmt.Errorf("part data size mismatch: %d", len(partData))
  385. }
  386. } else if partIndex == streamInfo.PartsCount-1 {
  387. if len(partData) != (streamInfo.TotalPoints%streamInfo.PointsPerPart)*8 {
  388. return fmt.Errorf("part data size mismatch: %d", len(partData))
  389. }
  390. } else {
  391. return fmt.Errorf("part index out of bounds: %d", partIndex)
  392. }
  393. }
  394. }
  395. partPointsCount := len(partData) / 8
  396. // Insert the part data into ClickHouse
  397. batch, err := app.ckConn.PrepareBatch(context.Background(), "INSERT INTO "+appInitCfg.Stck.Table)
  398. if err != nil {
  399. return fmt.Errorf("failed to insert part data into ClickHouse: %w", err)
  400. }
  401. /*
  402. ┌─name────────┬─type────────────────────────────────────────────────┬─default_type─┬─default_expression─┬─comment───────────────────┬─codec_expression─┬─ttl_expression─┐
  403. │ metric_name │ LowCardinality(String) │ │ │ Metric name │ │ │
  404. │ point_name │ LowCardinality(String) │ │ │ Point name │ │ │
  405. │ tags │ Map(LowCardinality(String), LowCardinality(String)) │ │ │ Point tags │ │ │
  406. │ value │ Float64 │ │ │ Point value │ │ │
  407. │ nanoseconds │ Int64 │ │ │ Point time in nanoseconds │ DoubleDelta, LZ4 │ │
  408. └─────────────┴─────────────────────────────────────────────────────┴──────────────┴────────────────────┴───────────────────────────┴──────────────────┴────────────────┘
  409. */
  410. fmt.Printf("Going to insert %d points for part %s of stream %s\n", partPointsCount, partName, streamName)
  411. for i := 0; i < partPointsCount; i++ {
  412. metricName := streamInfo.MetricName
  413. pointName := streamInfo.Name
  414. tags := map[string]string{}
  415. value := math.Float64frombits(binary.LittleEndian.Uint64(partData[i*8 : (i+1)*8]))
  416. // NOTE: TimestampOffset is in milliseconds, need to convert to nanoseconds
  417. nanoseconds := streamInfo.TimestampOffset * 1e6
  418. nanoseconds += int64(partIndex) * int64(streamInfo.PointsPerPart) * streamInfo.Interval
  419. nanoseconds += int64(i) * streamInfo.Interval
  420. err := batch.Append(metricName, pointName, tags, value, nanoseconds)
  421. if err != nil {
  422. return err
  423. }
  424. }
  425. err = batch.Send()
  426. if err != nil {
  427. return err
  428. }
  429. }
  430. return nil
  431. }
  432. func load_app_cfg_from_db(db *gorm.DB) (AppConfig, error) {
  433. var cfg AppConfig
  434. err := db.AutoMigrate(&AppConfigDbEntry{})
  435. if err != nil {
  436. return cfg, err
  437. }
  438. //db.Create(&AppConfigDbEntry{Key: "ImportBlacklist", Value: `[]`})
  439. var dbEntry AppConfigDbEntry
  440. result := db.Where(AppConfigDbEntry{Key: "ImportBlacklist"}).Attrs(AppConfigDbEntry{Value: `[]`}).FirstOrCreate(&dbEntry)
  441. if result.Error != nil {
  442. //if errors.Is(result.Error, gorm.ErrRecordNotFound) {
  443. // dbEntry.Value = `[]`
  444. //} else {
  445. // return cfg, result.Error
  446. //}
  447. return cfg, result.Error
  448. }
  449. err = json.Unmarshal([]byte(dbEntry.Value), &cfg.ImportBlacklist)
  450. if err != nil {
  451. return cfg, err
  452. }
  453. return cfg, nil
  454. }
  455. func object_already_uploaded(app AppCtx, key string) bool {
  456. var record UploadRecord
  457. result := app.db.First(&record, "key", key)
  458. return result.Error == nil
  459. }
  460. func object_is_blacklisted(key string) bool {
  461. for _, regexPattern := range gAppCfg.ImportBlacklist {
  462. // TODO: Cache compiled regex patterns
  463. if matched, _ := regexp.MatchString(regexPattern, key); matched {
  464. return true
  465. }
  466. }
  467. return false
  468. }
  469. func part_already_uploaded(app AppCtx, streamName string, partName string) bool {
  470. var record PartUploadRecord
  471. result := app.db.First(&record, "stream_name = ? AND part_name = ?", streamName, partName)
  472. return result.Error == nil
  473. }
  474. func get_stream_metadata(app AppCtx, streamName string) (*StreamMetadata, error) {
  475. // Get the stream metadata from MinIO
  476. metadataObjPath := streamName + "/metadata.json"
  477. obj, err := app.minioClient.GetObject(context.Background(), appInitCfg.Minio.Bucket,
  478. metadataObjPath, minio.GetObjectOptions{})
  479. if err != nil {
  480. return nil, fmt.Errorf("failed to get stream metadata: %w", err)
  481. }
  482. defer obj.Close()
  483. var streamInfo StreamMetadata
  484. err = json.NewDecoder(obj).Decode(&streamInfo)
  485. if err != nil {
  486. return nil, fmt.Errorf("failed to decode stream metadata: %w", err)
  487. }
  488. return &streamInfo, nil
  489. }