main.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537
  1. package main
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/binary"
  6. "encoding/json"
  7. "example/minio-into-stck/util"
  8. "fmt"
  9. "log"
  10. "math"
  11. "regexp"
  12. "strconv"
  13. "strings"
  14. "time"
  15. "github.com/ClickHouse/clickhouse-go/v2"
  16. "github.com/ClickHouse/clickhouse-go/v2/lib/driver"
  17. "github.com/fsnotify/fsnotify"
  18. "github.com/minio/minio-go/v7"
  19. "github.com/minio/minio-go/v7/pkg/credentials"
  20. "github.com/minio/minio-go/v7/pkg/notification"
  21. "gorm.io/driver/mysql"
  22. "gorm.io/gorm"
  23. "gorm.io/gorm/logger"
  24. // "github.com/natefinch/lumberjack"
  25. // "github.com/sirupsen/logrus"
  26. "github.com/spf13/viper"
  27. )
  28. // AppInitConfig is the initial configuration of the application, loaded from the config file.
  29. type AppInitConfig struct {
  30. Mysql struct {
  31. User string `mapstructure:"user"`
  32. Password string `mapstructure:"password"`
  33. Host string `mapstructure:"host"`
  34. Database string `mapstructure:"database"`
  35. } `mapstructure:"mysql"`
  36. Minio struct {
  37. AccessKey string `mapstructure:"accessKey"`
  38. Secret string `mapstructure:"secret"`
  39. Bucket string `mapstructure:"bucket"`
  40. Host string `mapstructure:"host"`
  41. } `mapstructure:"minio"`
  42. Stck struct {
  43. Host string `mapstructure:"host"`
  44. Table string `mapstructure:"table"`
  45. } `mapstructure:"stck"`
  46. }
  47. type AppConfig struct {
  48. // List of name regex patterns to exclude from import.
  49. ImportBlacklist []string
  50. }
  51. type AppConfigDbEntry struct {
  52. Key string `gorm:"primaryKey"`
  53. Value string
  54. }
  55. // TODO: 插入时先创建行,插入完后更新插入耗时
  56. // UploadRecord Represents a record of an uploaded stream. (A major upload task.)
  57. type UploadRecord struct {
  58. Key string `gorm:"primaryKey"`
  59. CreatedAt time.Time
  60. }
  61. // PartUploadRecord Represents a record of an uploaded part of a stream. (A minor upload task.)
  62. type PartUploadRecord struct {
  63. StreamName string `gorm:"primaryKey"`
  64. PartName string `gorm:"primaryKey"`
  65. CreatedAt time.Time
  66. }
  67. type StreamMetadata struct {
  68. Name string `json:"name"`
  69. TimestampOffset int64 `json:"timestamp_offset"`
  70. Interval int64 `json:"interval"`
  71. PartsCount int `json:"parts_count"`
  72. PointsPerPart int `json:"points_per_part"`
  73. TotalPoints int `json:"total_points"`
  74. }
  75. var appInitCfg *AppInitConfig = &AppInitConfig{}
  76. func initLoadConfig() {
  77. viper.SetConfigFile("./config/application.yaml")
  78. viper.WatchConfig()
  79. viper.OnConfigChange(func(e fsnotify.Event) {
  80. log.Println("Config file changed:", e.Name)
  81. var newAppInitConfig AppInitConfig
  82. err := viper.Unmarshal(&newAppInitConfig)
  83. if err != nil {
  84. log.Println("Failed to unmarshal config:", err)
  85. }
  86. appInitCfg = &newAppInitConfig
  87. })
  88. err := viper.ReadInConfig()
  89. if err != nil {
  90. log.Fatalf("Failed to read config file: %v", err)
  91. }
  92. err = viper.Unmarshal(appInitCfg)
  93. if err != nil {
  94. log.Printf("Failed to unmarshal config: %v\n", err)
  95. }
  96. }
  97. func main() {
  98. var err error
  99. log.Println("Starting application...")
  100. // Load configuration from file
  101. initLoadConfig()
  102. // Connect to MySQL
  103. dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=True&loc=Local",
  104. appInitCfg.Mysql.User, appInitCfg.Mysql.Password, appInitCfg.Mysql.Host, appInitCfg.Mysql.Database)
  105. db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{})
  106. if err != nil {
  107. log.Fatalf("Failed to connect to MySQL: %v", err)
  108. }
  109. // Disable logging
  110. db.Logger = logger.Discard
  111. // Get the underlying sql.DB object to close the connection later
  112. sqlDB, err := db.DB()
  113. if err != nil {
  114. log.Fatalf("Failed to get MySQL DB: %v", err)
  115. }
  116. defer sqlDB.Close()
  117. // Ping the database to check if the connection is successful
  118. err = sqlDB.Ping()
  119. if err != nil {
  120. log.Printf("ping db error: %v", err)
  121. return
  122. }
  123. log.Println("Database connection successful")
  124. // Perform auto migration
  125. err = db.AutoMigrate(&AppConfigDbEntry{}, &UploadRecord{}, &PartUploadRecord{})
  126. if err != nil {
  127. log.Printf("auto migrate error: %v", err)
  128. return
  129. }
  130. log.Println("Auto migration completed")
  131. // Connect to MinIO
  132. minioClient, err := minio.New(util.ExpandShellStringInfallible(appInitCfg.Minio.Host), &minio.Options{
  133. Creds: credentials.NewStaticV4(appInitCfg.Minio.AccessKey, appInitCfg.Minio.Secret, ""),
  134. Secure: false,
  135. })
  136. if err == nil {
  137. bucketExists, err := minioClient.BucketExists(context.Background(), appInitCfg.Minio.Bucket)
  138. if err != nil {
  139. log.Fatalf("Failed to check if bucket %s exists: %v", appInitCfg.Minio.Bucket, err)
  140. }
  141. if !bucketExists {
  142. log.Fatalf("Bucket %s does not exist", appInitCfg.Minio.Bucket)
  143. }
  144. }
  145. if err != nil {
  146. log.Fatalf("Failed to connect to MinIO: %v", err)
  147. }
  148. log.Println("Connected to MinIO")
  149. // Connect to ClickHouse
  150. ckConn, err := clickhouse.Open(&clickhouse.Options{
  151. Addr: []string{appInitCfg.Stck.Host},
  152. })
  153. if err == nil {
  154. err = ckConn.Ping(context.Background())
  155. }
  156. if err != nil {
  157. log.Fatalf("Failed to connect to ClickHouse: %v", err)
  158. }
  159. log.Println("Connected to ClickHouse")
  160. // Start the main work
  161. log.Println("Starting main worker...")
  162. main_worker(AppCtx{db, minioClient, ckConn})
  163. }
  164. type AppCtx struct {
  165. db *gorm.DB
  166. minioClient *minio.Client
  167. ckConn driver.Conn
  168. }
  169. /** Design notes:
  170. * The main worker will first load the app config from the database. Then it will pull the
  171. * list of objects from MinIO to build a local mirrored list (which gets rebuilt on every startup).
  172. * A bucket notification will be registered to listen for new objects at the same time.
  173. * For each new (stream) object, the worker will check if it's in the blacklist or if it's already
  174. * uploaded. If not, it will start the upload process by putting parts of the stream into parts table.
  175. * When all parts are uploaded, the worker will mark the stream as uploaded (insert into uploads table).
  176. */
  177. var gAppCfg *AppConfig
  178. type PartUploadArgs struct {
  179. StreamInfo *StreamMetadata
  180. StreamName string
  181. PartName string
  182. }
  183. func main_worker(app AppCtx) {
  184. ctx := context.Background()
  185. objUploadChan := make(chan string)
  186. partUploadChan := make(chan PartUploadArgs)
  187. // Load config from DB
  188. appCfg, err := load_app_cfg_from_db(app.db)
  189. if err != nil {
  190. log.Fatalf("Failed to load app config from DB: %v", err)
  191. }
  192. gAppCfg = &appCfg
  193. // Register bucket notification
  194. notifys := app.minioClient.ListenBucketNotification(
  195. ctx, appInitCfg.Minio.Bucket, "", "", []string{string(notification.ObjectCreatedAll)})
  196. // Start the notification listener
  197. go func() {
  198. for notifyInfo := range notifys {
  199. for _, record := range notifyInfo.Records {
  200. key := record.S3.Object.Key
  201. log.Println("New object notification:", key)
  202. // Only process root folders
  203. key = strings.Split(key, "/")[0]
  204. objUploadChan <- key
  205. }
  206. }
  207. }()
  208. // Start the full upload trigger
  209. go trigger_full_upload(app, objUploadChan)
  210. // Start the parts upload worker
  211. go upload_parts_worker(app, partUploadChan)
  212. // Start the main loop (streams upload worker)
  213. for objToUpload := range objUploadChan {
  214. log.Println("Checking stream object:", objToUpload)
  215. if object_is_blacklisted(objToUpload) {
  216. log.Printf("Object %s is blacklisted, skipping\n", objToUpload)
  217. continue
  218. }
  219. fullyUploaded, err := upload_one_stream(app, objToUpload, partUploadChan)
  220. if err != nil {
  221. // Queue the object for retry
  222. log.Printf("Failed to upload stream %s: `%v`, retrying\n", objToUpload, err)
  223. go func() {
  224. time.Sleep(5 * time.Second)
  225. objUploadChan <- objToUpload
  226. }()
  227. continue
  228. }
  229. if fullyUploaded {
  230. // Mark the stream as fully uploaded
  231. //err = app.db.Create(&UploadRecord{Key: objToUpload}).Error
  232. err = app.db.Where(UploadRecord{Key: objToUpload}).FirstOrCreate(&UploadRecord{}).Error
  233. if err != nil {
  234. log.Printf("Failed to mark stream %s as uploaded: %v\n", objToUpload, err)
  235. } else {
  236. // We can now remove the stream parts from the parts table
  237. err = app.db.Where("stream_name = ?", objToUpload).Delete(&PartUploadRecord{}).Error
  238. if err != nil {
  239. log.Printf("Failed to remove parts of stream %s from parts table: %v\n", objToUpload, err)
  240. }
  241. }
  242. } else {
  243. // Queue the object for retry
  244. log.Printf("Stream %s is not fully uploaded, retrying\n", objToUpload)
  245. go func() {
  246. time.Sleep(60 * time.Second)
  247. objUploadChan <- objToUpload
  248. }()
  249. }
  250. }
  251. }
  252. // Polls the parts table for not-yet-uploaded parts and uploads them.
  253. func upload_parts_worker(app AppCtx, partUploadChan chan PartUploadArgs) {
  254. for partInfo := range partUploadChan {
  255. log.Printf("Uploading part %s of stream %s\n", partInfo.PartName, partInfo.StreamName)
  256. err := upload_one_part(app, partInfo.StreamInfo, partInfo.StreamName, partInfo.PartName)
  257. if err != nil {
  258. log.Printf("Failed to upload part %s of stream %s: %v\n", partInfo.PartName, partInfo.StreamName, err)
  259. // Queue the part for retry
  260. go func() {
  261. time.Sleep(5 * time.Second)
  262. partUploadChan <- partInfo
  263. }()
  264. continue
  265. }
  266. // Mark the part as uploaded
  267. //err = app.db.Create(&PartUploadRecord{StreamName: partInfo.StreamName, PartName: partInfo.PartName}).Error
  268. part := PartUploadRecord{StreamName: partInfo.StreamName, PartName: partInfo.PartName}
  269. err = app.db.Where(part).FirstOrCreate(&PartUploadRecord{}).Error
  270. if err != nil {
  271. log.Printf("Failed to mark part %s of stream %s as uploaded: %v\n", partInfo.PartName, partInfo.StreamName, err)
  272. }
  273. }
  274. }
  275. func trigger_full_upload(app AppCtx, objToUploadChan chan<- string) {
  276. // Upload all files in the directory
  277. options := minio.ListObjectsOptions{
  278. Recursive: false,
  279. }
  280. objectsCh := app.minioClient.ListObjects(context.Background(), appInitCfg.Minio.Bucket, options)
  281. for objInfo := range objectsCh {
  282. if objInfo.Err != nil {
  283. log.Printf("Error listing objects: %v\n", objInfo.Err)
  284. break
  285. }
  286. key := objInfo.Key
  287. if strings.HasSuffix(key, "/") {
  288. objToUploadChan <- strings.Split(key, "/")[0]
  289. }
  290. }
  291. }
  292. func upload_one_stream(app AppCtx, streamName string, partUploadChan chan PartUploadArgs) (fullyUploaded bool, err error) {
  293. if object_already_uploaded(app, streamName) {
  294. return true, nil
  295. }
  296. fullyUploaded = true
  297. // Get stream metadata
  298. streamInfo, err := get_stream_metadata(app, streamName)
  299. if err != nil {
  300. return false, err
  301. }
  302. if streamInfo.PartsCount == 0 {
  303. // Edge device didn't finish uploading the stream yet
  304. fullyUploaded = false
  305. }
  306. // Upload parts
  307. streamObjPath := streamName + "/"
  308. options := minio.ListObjectsOptions{
  309. Prefix: streamObjPath,
  310. Recursive: false,
  311. }
  312. for objInfo := range app.minioClient.ListObjects(context.Background(), appInitCfg.Minio.Bucket, options) {
  313. if objInfo.Err != nil {
  314. return false, objInfo.Err
  315. }
  316. if strings.HasSuffix(objInfo.Key, "/") {
  317. continue
  318. }
  319. if objInfo.Key == streamObjPath+"metadata.json" {
  320. continue
  321. }
  322. if part_already_uploaded(app, streamName, objInfo.Key) {
  323. continue
  324. }
  325. fullyUploaded = false
  326. partUploadChan <- PartUploadArgs{StreamInfo: streamInfo, StreamName: streamName, PartName: objInfo.Key}
  327. }
  328. return fullyUploaded, nil
  329. }
  330. func upload_one_part(app AppCtx, streamInfo *StreamMetadata, streamName string, partName string) (err error) {
  331. if part_already_uploaded(app, streamName, partName) {
  332. return nil
  333. }
  334. dryRun := false
  335. if !dryRun {
  336. // Get the part data from MinIO
  337. obj, err := app.minioClient.GetObject(context.Background(), appInitCfg.Minio.Bucket, partName, minio.GetObjectOptions{})
  338. if err != nil {
  339. return fmt.Errorf("failed to get part data: %w", err)
  340. }
  341. defer obj.Close()
  342. // Read the part data
  343. var partDataBuf = new(bytes.Buffer)
  344. _, err = partDataBuf.ReadFrom(obj)
  345. if err != nil {
  346. return fmt.Errorf("failed to read part data: %w", err)
  347. }
  348. // Process the part data
  349. partData, err := util.DecompressZstdBuffer(partDataBuf.Bytes())
  350. if err != nil {
  351. return fmt.Errorf("failed to decompress part data: %w", err)
  352. }
  353. // Use regex to extract the part index from the part name
  354. partIndex := 0
  355. {
  356. re := regexp.MustCompile(`part_(\d+)\.zst`)
  357. matches := re.FindStringSubmatch(partName)
  358. if len(matches) != 2 {
  359. return fmt.Errorf("failed to extract part index from part name: %s", partName)
  360. }
  361. partIndex, err = strconv.Atoi(matches[1])
  362. if err != nil {
  363. return fmt.Errorf("failed to convert part index to integer: %w", err)
  364. }
  365. // Check if the part index is correct
  366. if partIndex < 0 || (streamInfo.PartsCount != 0 && partIndex >= streamInfo.PartsCount) {
  367. return fmt.Errorf("part index out of bounds: %d / %d", partIndex, streamInfo.PartsCount)
  368. }
  369. // Check if the part data size is correct
  370. if streamInfo.PartsCount != 0 {
  371. if partIndex < streamInfo.PartsCount-1 {
  372. if len(partData) != streamInfo.PointsPerPart*8 {
  373. return fmt.Errorf("part data size mismatch: %d", len(partData))
  374. }
  375. } else if partIndex == streamInfo.PartsCount-1 {
  376. if len(partData) != (streamInfo.TotalPoints%streamInfo.PointsPerPart)*8 {
  377. return fmt.Errorf("part data size mismatch: %d", len(partData))
  378. }
  379. } else {
  380. return fmt.Errorf("part index out of bounds: %d", partIndex)
  381. }
  382. }
  383. }
  384. partPointsCount := len(partData) / 8
  385. // Insert the part data into ClickHouse
  386. batch, err := app.ckConn.PrepareBatch(context.Background(), "INSERT INTO "+appInitCfg.Stck.Table)
  387. if err != nil {
  388. return fmt.Errorf("failed to insert part data into ClickHouse: %w", err)
  389. }
  390. /*
  391. ┌─name────────┬─type────────────────────────────────────────────────┬─default_type─┬─default_expression─┬─comment───────────────────┬─codec_expression─┬─ttl_expression─┐
  392. │ metric_name │ LowCardinality(String) │ │ │ Metric name │ │ │
  393. │ point_name │ LowCardinality(String) │ │ │ Point name │ │ │
  394. │ tags │ Map(LowCardinality(String), LowCardinality(String)) │ │ │ Point tags │ │ │
  395. │ value │ Float64 │ │ │ Point value │ │ │
  396. │ nanoseconds │ Int64 │ │ │ Point time in nanoseconds │ DoubleDelta, LZ4 │ │
  397. └─────────────┴─────────────────────────────────────────────────────┴──────────────┴────────────────────┴───────────────────────────┴──────────────────┴────────────────┘
  398. */
  399. for i := 0; i < partPointsCount; i++ {
  400. metricName := ""
  401. pointName := streamInfo.Name
  402. tags := map[string]string{}
  403. value := math.Float64frombits(binary.LittleEndian.Uint64(partData[i*8 : (i+1)*8]))
  404. // NOTE: TimestampOffset is in milliseconds, need to convert to nanoseconds
  405. nanoseconds := streamInfo.TimestampOffset * 1e6
  406. nanoseconds += int64(partIndex) * int64(streamInfo.PointsPerPart) * streamInfo.Interval
  407. nanoseconds += int64(i) * streamInfo.Interval
  408. err := batch.Append(metricName, pointName, tags, value, nanoseconds)
  409. if err != nil {
  410. return err
  411. }
  412. }
  413. err = batch.Send()
  414. if err != nil {
  415. return err
  416. }
  417. }
  418. return nil
  419. }
  420. func load_app_cfg_from_db(db *gorm.DB) (AppConfig, error) {
  421. var cfg AppConfig
  422. err := db.AutoMigrate(&AppConfigDbEntry{})
  423. if err != nil {
  424. return cfg, err
  425. }
  426. //db.Create(&AppConfigDbEntry{Key: "ImportBlacklist", Value: `[]`})
  427. var dbEntry AppConfigDbEntry
  428. result := db.Where(AppConfigDbEntry{Key: "ImportBlacklist"}).Attrs(AppConfigDbEntry{Value: `[]`}).FirstOrCreate(&dbEntry)
  429. if result.Error != nil {
  430. //if errors.Is(result.Error, gorm.ErrRecordNotFound) {
  431. // dbEntry.Value = `[]`
  432. //} else {
  433. // return cfg, result.Error
  434. //}
  435. return cfg, result.Error
  436. }
  437. err = json.Unmarshal([]byte(dbEntry.Value), &cfg.ImportBlacklist)
  438. if err != nil {
  439. return cfg, err
  440. }
  441. return cfg, nil
  442. }
  443. func object_already_uploaded(app AppCtx, key string) bool {
  444. var record UploadRecord
  445. result := app.db.First(&record, "key", key)
  446. return result.Error == nil
  447. }
  448. func object_is_blacklisted(key string) bool {
  449. for _, regexPattern := range gAppCfg.ImportBlacklist {
  450. // TODO: Cache compiled regex patterns
  451. if matched, _ := regexp.MatchString(regexPattern, key); matched {
  452. return true
  453. }
  454. }
  455. return false
  456. }
  457. func part_already_uploaded(app AppCtx, streamName string, partName string) bool {
  458. var record PartUploadRecord
  459. result := app.db.First(&record, "stream_name = ? AND part_name = ?", streamName, partName)
  460. return result.Error == nil
  461. }
  462. func get_stream_metadata(app AppCtx, streamName string) (*StreamMetadata, error) {
  463. // Get the stream metadata from MinIO
  464. metadataObjPath := streamName + "/metadata.json"
  465. obj, err := app.minioClient.GetObject(context.Background(), appInitCfg.Minio.Bucket,
  466. metadataObjPath, minio.GetObjectOptions{})
  467. if err != nil {
  468. return nil, fmt.Errorf("failed to get stream metadata: %w", err)
  469. }
  470. defer obj.Close()
  471. var streamInfo StreamMetadata
  472. err = json.NewDecoder(obj).Decode(&streamInfo)
  473. if err != nil {
  474. return nil, fmt.Errorf("failed to decode stream metadata: %w", err)
  475. }
  476. return &streamInfo, nil
  477. }