main.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529
  1. package main
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/binary"
  6. "encoding/json"
  7. "example/minio-into-ck/util"
  8. "fmt"
  9. "log"
  10. "math"
  11. "os"
  12. "regexp"
  13. "strconv"
  14. "strings"
  15. "time"
  16. "github.com/ClickHouse/clickhouse-go/v2"
  17. "github.com/ClickHouse/clickhouse-go/v2/lib/driver"
  18. "github.com/minio/minio-go/v7"
  19. "github.com/minio/minio-go/v7/pkg/credentials"
  20. "github.com/minio/minio-go/v7/pkg/notification"
  21. "gorm.io/driver/mysql"
  22. "gorm.io/gorm"
  23. "gorm.io/gorm/logger"
  24. )
  25. var MYSQL_USER = "root"
  26. var MYSQL_PASSWORD = ""
  27. var MYSQL_HOST = "localhost:3306"
  28. var MYSQL_DATABASE = "minio-into-ck-db"
  29. var MINIO_ACCESS_KEY = "25cSXPqzdHrPwJkSIRkM"
  30. var MINIO_SECRET = "FN3AhQaVo7z1wgvce3IWiI1CI68T02OVeSUKCeRf"
  31. var MINIO_BUCKET = "bucket"
  32. var MINIO_HOST = "$(hostname).local:9000"
  33. var CLICKHOUSE_HOST = "localhost:9000"
  34. var CLICKHOUSE_TABLE = "tsdb_cpp"
  35. type AppConfig struct {
  36. // List of name regex patterns to exclude from import.
  37. ImportBlacklist []string
  38. }
  39. type AppConfigDbEntry struct {
  40. Key string `gorm:"primaryKey"`
  41. Value string
  42. }
  43. // UploadRecord Represents a record of an uploaded stream. (A major upload task.)
  44. type UploadRecord struct {
  45. Key string `gorm:"primaryKey"`
  46. CreatedAt time.Time
  47. }
  48. // PartUploadRecord Represents a record of an uploaded part of a stream. (A minor upload task.)
  49. type PartUploadRecord struct {
  50. StreamName string `gorm:"primaryKey"`
  51. PartName string `gorm:"primaryKey"`
  52. CreatedAt time.Time
  53. }
  54. type StreamMetadata struct {
  55. Name string `json:"name"`
  56. TimestampOffset int64 `json:"timestamp_offset"`
  57. Interval int64 `json:"interval"`
  58. PartsCount int `json:"parts_count"`
  59. PointsPerPart int `json:"points_per_part"`
  60. TotalPoints int `json:"total_points"`
  61. }
  62. func main() {
  63. var err error
  64. log.Println("Starting application...")
  65. // Load environment variables
  66. if os.Getenv("MYSQL_USER") != "" {
  67. MYSQL_USER = os.Getenv("MYSQL_USER")
  68. }
  69. if os.Getenv("MYSQL_PASSWORD") != "" {
  70. MYSQL_PASSWORD = os.Getenv("MYSQL_PASSWORD")
  71. }
  72. if os.Getenv("MYSQL_HOST") != "" {
  73. MYSQL_HOST = os.Getenv("MYSQL_HOST")
  74. }
  75. if os.Getenv("MYSQL_DATABASE") != "" {
  76. MYSQL_DATABASE = os.Getenv("MYSQL_DATABASE")
  77. }
  78. if os.Getenv("MINIO_ACCESS_KEY") != "" {
  79. MINIO_ACCESS_KEY = os.Getenv("MINIO_ACCESS_KEY")
  80. }
  81. if os.Getenv("MINIO_SECRET") != "" {
  82. MINIO_SECRET = os.Getenv("MINIO_SECRET")
  83. }
  84. if os.Getenv("MINIO_BUCKET") != "" {
  85. MINIO_BUCKET = os.Getenv("MINIO_BUCKET")
  86. }
  87. if os.Getenv("MINIO_HOST") != "" {
  88. MINIO_HOST = os.Getenv("MINIO_HOST")
  89. }
  90. MINIO_HOST, err = util.ExpandShellString(MINIO_HOST)
  91. if err != nil {
  92. log.Fatalf("Failed to expand shell string: %v", err)
  93. }
  94. if os.Getenv("CLICKHOUSE_HOST") != "" {
  95. CLICKHOUSE_HOST = os.Getenv("CLICKHOUSE_HOST")
  96. }
  97. if os.Getenv("CLICKHOUSE_TABLE") != "" {
  98. CLICKHOUSE_TABLE = os.Getenv("CLICKHOUSE_TABLE")
  99. }
  100. // Connect to MySQL
  101. dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=True&loc=Local",
  102. MYSQL_USER, MYSQL_PASSWORD, MYSQL_HOST, MYSQL_DATABASE)
  103. db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{})
  104. if err != nil {
  105. log.Fatalf("Failed to connect to MySQL: %v", err)
  106. }
  107. // Disable logging
  108. db.Logger = logger.Discard
  109. // Get the underlying sql.DB object to close the connection later
  110. sqlDB, err := db.DB()
  111. if err != nil {
  112. log.Fatalf("Failed to get MySQL DB: %v", err)
  113. }
  114. defer sqlDB.Close()
  115. // Ping the database to check if the connection is successful
  116. err = sqlDB.Ping()
  117. if err != nil {
  118. log.Printf("ping db error: %v", err)
  119. return
  120. }
  121. log.Println("Database connection successful")
  122. // Perform auto migration
  123. err = db.AutoMigrate(&AppConfigDbEntry{}, &UploadRecord{}, &PartUploadRecord{})
  124. if err != nil {
  125. log.Printf("auto migrate error: %v", err)
  126. return
  127. }
  128. log.Println("Auto migration completed")
  129. // Connect to MinIO
  130. minioClient, err := minio.New(MINIO_HOST, &minio.Options{
  131. Creds: credentials.NewStaticV4(MINIO_ACCESS_KEY, MINIO_SECRET, ""),
  132. Secure: false,
  133. })
  134. if err == nil {
  135. bucketExists, err := minioClient.BucketExists(context.Background(), MINIO_BUCKET)
  136. if err != nil {
  137. log.Fatalf("Failed to check if bucket %s exists: %v", MINIO_BUCKET, err)
  138. }
  139. if !bucketExists {
  140. log.Fatalf("Bucket %s does not exist", MINIO_BUCKET)
  141. }
  142. }
  143. if err != nil {
  144. log.Fatalf("Failed to connect to MinIO: %v", err)
  145. }
  146. log.Println("Connected to MinIO")
  147. // Connect to ClickHouse
  148. ckConn, err := clickhouse.Open(&clickhouse.Options{
  149. Addr: []string{CLICKHOUSE_HOST},
  150. })
  151. if err == nil {
  152. err = ckConn.Ping(context.Background())
  153. }
  154. if err != nil {
  155. log.Fatalf("Failed to connect to ClickHouse: %v", err)
  156. }
  157. log.Println("Connected to ClickHouse")
  158. // Start the main work
  159. log.Println("Starting main worker...")
  160. main_worker(AppCtx{db, minioClient, ckConn})
  161. }
  162. type AppCtx struct {
  163. db *gorm.DB
  164. minioClient *minio.Client
  165. ckConn driver.Conn
  166. }
  167. /** Design notes:
  168. * The main worker will first load the app config from the database. Then it will pull the
  169. * list of objects from MinIO to build a local mirrored list (which gets rebuilt on every startup).
  170. * A bucket notification will be registered to listen for new objects at the same time.
  171. * For each new (stream) object, the worker will check if it's in the blacklist or if it's already
  172. * uploaded. If not, it will start the upload process by putting parts of the stream into parts table.
  173. * When all parts are uploaded, the worker will mark the stream as uploaded (insert into uploads table).
  174. */
  175. var gAppCfg *AppConfig
  176. type PartUploadArgs struct {
  177. StreamInfo *StreamMetadata
  178. StreamName string
  179. PartName string
  180. }
  181. func main_worker(app AppCtx) {
  182. ctx := context.Background()
  183. objUploadChan := make(chan string)
  184. partUploadChan := make(chan PartUploadArgs)
  185. // Load config from DB
  186. appCfg, err := load_app_cfg_from_db(app.db)
  187. if err != nil {
  188. log.Fatalf("Failed to load app config from DB: %v", err)
  189. }
  190. gAppCfg = &appCfg
  191. // Register bucket notification
  192. notifys := app.minioClient.ListenBucketNotification(
  193. ctx, MINIO_BUCKET, "", "", []string{string(notification.ObjectCreatedAll)})
  194. // Start the notification listener
  195. go func() {
  196. for notifyInfo := range notifys {
  197. for _, record := range notifyInfo.Records {
  198. key := record.S3.Object.Key
  199. log.Println("New object notification:", key)
  200. // Only process root folders
  201. key = strings.Split(key, "/")[0]
  202. objUploadChan <- key
  203. }
  204. }
  205. }()
  206. // Start the full upload trigger
  207. go trigger_full_upload(app, objUploadChan)
  208. // Start the parts upload worker
  209. go upload_parts_worker(app, partUploadChan)
  210. // Start the main loop (streams upload worker)
  211. for objToUpload := range objUploadChan {
  212. log.Println("Checking stream object:", objToUpload)
  213. if object_is_blacklisted(objToUpload) {
  214. log.Printf("Object %s is blacklisted, skipping\n", objToUpload)
  215. continue
  216. }
  217. fullyUploaded, err := upload_one_stream(app, objToUpload, partUploadChan)
  218. if err != nil {
  219. // Queue the object for retry
  220. log.Printf("Failed to upload stream %s: `%v`, retrying\n", objToUpload, err)
  221. go func() {
  222. time.Sleep(5 * time.Second)
  223. objUploadChan <- objToUpload
  224. }()
  225. continue
  226. }
  227. if fullyUploaded {
  228. // Mark the stream as fully uploaded
  229. //err = app.db.Create(&UploadRecord{Key: objToUpload}).Error
  230. err = app.db.Where(UploadRecord{Key: objToUpload}).FirstOrCreate(&UploadRecord{}).Error
  231. if err != nil {
  232. log.Printf("Failed to mark stream %s as uploaded: %v\n", objToUpload, err)
  233. } else {
  234. // We can now remove the stream parts from the parts table
  235. err = app.db.Where("stream_name = ?", objToUpload).Delete(&PartUploadRecord{}).Error
  236. if err != nil {
  237. log.Printf("Failed to remove parts of stream %s from parts table: %v\n", objToUpload, err)
  238. }
  239. }
  240. } else {
  241. // Queue the object for retry
  242. log.Printf("Stream %s is not fully uploaded, retrying\n", objToUpload)
  243. go func() {
  244. time.Sleep(60 * time.Second)
  245. objUploadChan <- objToUpload
  246. }()
  247. }
  248. }
  249. }
  250. // Polls the parts table for not-yet-uploaded parts and uploads them.
  251. func upload_parts_worker(app AppCtx, partUploadChan chan PartUploadArgs) {
  252. for partInfo := range partUploadChan {
  253. log.Printf("Uploading part %s of stream %s\n", partInfo.PartName, partInfo.StreamName)
  254. err := upload_one_part(app, partInfo.StreamInfo, partInfo.StreamName, partInfo.PartName)
  255. if err != nil {
  256. log.Printf("Failed to upload part %s of stream %s: %v\n", partInfo.PartName, partInfo.StreamName, err)
  257. // Queue the part for retry
  258. go func() {
  259. time.Sleep(5 * time.Second)
  260. partUploadChan <- partInfo
  261. }()
  262. continue
  263. }
  264. // Mark the part as uploaded
  265. //err = app.db.Create(&PartUploadRecord{StreamName: partInfo.StreamName, PartName: partInfo.PartName}).Error
  266. part := PartUploadRecord{StreamName: partInfo.StreamName, PartName: partInfo.PartName}
  267. err = app.db.Where(part).FirstOrCreate(&PartUploadRecord{}).Error
  268. if err != nil {
  269. log.Printf("Failed to mark part %s of stream %s as uploaded: %v\n", partInfo.PartName, partInfo.StreamName, err)
  270. }
  271. }
  272. }
  273. func trigger_full_upload(app AppCtx, objToUploadChan chan<- string) {
  274. // Upload all files in the directory
  275. options := minio.ListObjectsOptions{
  276. Recursive: false,
  277. }
  278. objectsCh := app.minioClient.ListObjects(context.Background(), MINIO_BUCKET, options)
  279. for objInfo := range objectsCh {
  280. if objInfo.Err != nil {
  281. log.Printf("Error listing objects: %v\n", objInfo.Err)
  282. break
  283. }
  284. key := objInfo.Key
  285. if strings.HasSuffix(key, "/") {
  286. objToUploadChan <- strings.Split(key, "/")[0]
  287. }
  288. }
  289. }
  290. func upload_one_stream(app AppCtx, streamName string, partUploadChan chan PartUploadArgs) (fullyUploaded bool, err error) {
  291. if object_already_uploaded(app, streamName) {
  292. return true, nil
  293. }
  294. fullyUploaded = true
  295. // Get stream metadata
  296. streamInfo, err := get_stream_metadata(app, streamName)
  297. if err != nil {
  298. return false, err
  299. }
  300. if streamInfo.PartsCount == 0 {
  301. // Edge device didn't finish uploading the stream yet
  302. fullyUploaded = false
  303. }
  304. // Upload parts
  305. streamObjPath := streamName + "/"
  306. options := minio.ListObjectsOptions{
  307. Prefix: streamObjPath,
  308. Recursive: false,
  309. }
  310. for objInfo := range app.minioClient.ListObjects(context.Background(), MINIO_BUCKET, options) {
  311. if objInfo.Err != nil {
  312. return false, objInfo.Err
  313. }
  314. if strings.HasSuffix(objInfo.Key, "/") {
  315. continue
  316. }
  317. if objInfo.Key == streamObjPath+"metadata.json" {
  318. continue
  319. }
  320. if part_already_uploaded(app, streamName, objInfo.Key) {
  321. continue
  322. }
  323. fullyUploaded = false
  324. partUploadChan <- PartUploadArgs{StreamInfo: streamInfo, StreamName: streamName, PartName: objInfo.Key}
  325. }
  326. return fullyUploaded, nil
  327. }
  328. func upload_one_part(app AppCtx, streamInfo *StreamMetadata, streamName string, partName string) (err error) {
  329. if part_already_uploaded(app, streamName, partName) {
  330. return nil
  331. }
  332. dryRun := false
  333. if !dryRun {
  334. // Get the part data from MinIO
  335. obj, err := app.minioClient.GetObject(context.Background(), MINIO_BUCKET, partName, minio.GetObjectOptions{})
  336. if err != nil {
  337. return fmt.Errorf("failed to get part data: %w", err)
  338. }
  339. defer obj.Close()
  340. // Read the part data
  341. var partDataBuf = new(bytes.Buffer)
  342. _, err = partDataBuf.ReadFrom(obj)
  343. if err != nil {
  344. return fmt.Errorf("failed to read part data: %w", err)
  345. }
  346. // Process the part data
  347. partData, err := util.DecompressZstdBuffer(partDataBuf.Bytes())
  348. if err != nil {
  349. return fmt.Errorf("failed to decompress part data: %w", err)
  350. }
  351. // Use regex to extract the part index from the part name
  352. partIndex := 0
  353. {
  354. re := regexp.MustCompile(`part_(\d+)\.zst`)
  355. matches := re.FindStringSubmatch(partName)
  356. if len(matches) != 2 {
  357. return fmt.Errorf("failed to extract part index from part name: %s", partName)
  358. }
  359. partIndex, err = strconv.Atoi(matches[1])
  360. if err != nil {
  361. return fmt.Errorf("failed to convert part index to integer: %w", err)
  362. }
  363. // Check if the part index is correct
  364. if partIndex < 0 || partIndex >= streamInfo.PartsCount {
  365. return fmt.Errorf("part index out of bounds: %d", partIndex)
  366. }
  367. // Check if the part data size is correct
  368. if partIndex < streamInfo.PartsCount-1 {
  369. if len(partData) != streamInfo.PointsPerPart*8 {
  370. return fmt.Errorf("part data size mismatch: %d", len(partData))
  371. }
  372. } else if partIndex == streamInfo.PartsCount-1 {
  373. if len(partData) != (streamInfo.TotalPoints%streamInfo.PointsPerPart)*8 {
  374. return fmt.Errorf("part data size mismatch: %d", len(partData))
  375. }
  376. } else {
  377. return fmt.Errorf("part index out of bounds: %d", partIndex)
  378. }
  379. }
  380. partPointsCount := len(partData) / 8
  381. // Insert the part data into ClickHouse
  382. batch, err := app.ckConn.PrepareBatch(context.Background(), "INSERT INTO "+CLICKHOUSE_TABLE)
  383. if err != nil {
  384. return fmt.Errorf("failed to insert part data into ClickHouse: %w", err)
  385. }
  386. /*
  387. ┌─name────────┬─type────────────────────────────────────────────────┬─default_type─┬─default_expression─┬─comment───────────────────┬─codec_expression─┬─ttl_expression─┐
  388. │ metric_name │ LowCardinality(String) │ │ │ Metric name │ │ │
  389. │ point_name │ LowCardinality(String) │ │ │ Point name │ │ │
  390. │ tags │ Map(LowCardinality(String), LowCardinality(String)) │ │ │ Point tags │ │ │
  391. │ value │ Float64 │ │ │ Point value │ │ │
  392. │ nanoseconds │ Int64 │ │ │ Point time in nanoseconds │ DoubleDelta, LZ4 │ │
  393. └─────────────┴─────────────────────────────────────────────────────┴──────────────┴────────────────────┴───────────────────────────┴──────────────────┴────────────────┘
  394. */
  395. for i := 0; i < partPointsCount; i++ {
  396. metricName := ""
  397. pointName := streamInfo.Name
  398. tags := map[string]string{}
  399. value := math.Float64frombits(binary.LittleEndian.Uint64(partData[i*8 : (i+1)*8]))
  400. // NOTE: TimestampOffset is in milliseconds, need to convert to nanoseconds
  401. nanoseconds := streamInfo.TimestampOffset * 1e6
  402. nanoseconds += int64(partIndex) * int64(streamInfo.PointsPerPart) * streamInfo.Interval
  403. nanoseconds += int64(i) * streamInfo.Interval
  404. err := batch.Append(metricName, pointName, tags, value, nanoseconds)
  405. if err != nil {
  406. return err
  407. }
  408. }
  409. err = batch.Send()
  410. if err != nil {
  411. return err
  412. }
  413. }
  414. return nil
  415. }
  416. func load_app_cfg_from_db(db *gorm.DB) (AppConfig, error) {
  417. var cfg AppConfig
  418. err := db.AutoMigrate(&AppConfigDbEntry{})
  419. if err != nil {
  420. return cfg, err
  421. }
  422. //db.Create(&AppConfigDbEntry{Key: "ImportBlacklist", Value: `[]`})
  423. var dbEntry AppConfigDbEntry
  424. result := db.Where(AppConfigDbEntry{Key: "ImportBlacklist"}).Attrs(AppConfigDbEntry{Value: `[]`}).FirstOrCreate(&dbEntry)
  425. if result.Error != nil {
  426. //if errors.Is(result.Error, gorm.ErrRecordNotFound) {
  427. // dbEntry.Value = `[]`
  428. //} else {
  429. // return cfg, result.Error
  430. //}
  431. return cfg, result.Error
  432. }
  433. err = json.Unmarshal([]byte(dbEntry.Value), &cfg.ImportBlacklist)
  434. if err != nil {
  435. return cfg, err
  436. }
  437. return cfg, nil
  438. }
  439. func object_already_uploaded(app AppCtx, key string) bool {
  440. var record UploadRecord
  441. result := app.db.First(&record, "key", key)
  442. return result.Error == nil
  443. }
  444. func object_is_blacklisted(key string) bool {
  445. for _, regexPattern := range gAppCfg.ImportBlacklist {
  446. // TODO: Cache compiled regex patterns
  447. if matched, _ := regexp.MatchString(regexPattern, key); matched {
  448. return true
  449. }
  450. }
  451. return false
  452. }
  453. func part_already_uploaded(app AppCtx, streamName string, partName string) bool {
  454. var record PartUploadRecord
  455. result := app.db.First(&record, "stream_name = ? AND part_name = ?", streamName, partName)
  456. return result.Error == nil
  457. }
  458. func get_stream_metadata(app AppCtx, streamName string) (*StreamMetadata, error) {
  459. // Get the stream metadata from MinIO
  460. metadataObjPath := streamName + "/metadata.json"
  461. obj, err := app.minioClient.GetObject(context.Background(), MINIO_BUCKET, metadataObjPath, minio.GetObjectOptions{})
  462. if err != nil {
  463. return nil, fmt.Errorf("failed to get stream metadata: %w", err)
  464. }
  465. defer obj.Close()
  466. var streamInfo StreamMetadata
  467. err = json.NewDecoder(obj).Decode(&streamInfo)
  468. if err != nil {
  469. return nil, fmt.Errorf("failed to decode stream metadata: %w", err)
  470. }
  471. return &streamInfo, nil
  472. }