|
@@ -26,6 +26,7 @@ import (
|
|
|
"github.com/ClickHouse/ch-go/proto"
|
|
|
"github.com/ClickHouse/clickhouse-go/v2"
|
|
|
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
|
|
|
+ "github.com/Jeffail/tunny"
|
|
|
"github.com/fsnotify/fsnotify"
|
|
|
"github.com/minio/minio-go/v7"
|
|
|
"github.com/minio/minio-go/v7/pkg/credentials"
|
|
@@ -60,6 +61,7 @@ type AppInitConfig struct {
|
|
|
UploadRetryMaxTimes int `mapstructure:"uploadRetryMaxTimes"`
|
|
|
FailedRetryDelaySeconds int `mapstructure:"failedRetryDelaySeconds"`
|
|
|
NotifyToUploadDelaySeconds int `mapstructure:"notifyToUploadDelaySeconds"`
|
|
|
+ ParallelUploadThreadsCount int `mapstructure:"parallelUploadThreadsCount"`
|
|
|
} `mapstructure:"main"`
|
|
|
}
|
|
|
|
|
@@ -110,6 +112,7 @@ func initLoadConfig() {
|
|
|
viper.SetDefault("main.uploadRetryMaxTimes", 20)
|
|
|
viper.SetDefault("main.failedRetryDelaySeconds", 5)
|
|
|
viper.SetDefault("main.notifyToUploadDelaySeconds", 1)
|
|
|
+ viper.SetDefault("main.parallelUploadThreadsCount", 2)
|
|
|
viper.SetConfigFile("./config/application.yaml")
|
|
|
viper.WatchConfig()
|
|
|
viper.OnConfigChange(func(e fsnotify.Event) {
|
|
@@ -651,6 +654,57 @@ func upload_one_stream(app AppCtx, streamName string) (fullyUploaded bool, err e
|
|
|
logger.Tracef("Listing minio objects in `%s`, bucket `%s`", streamObjPath, appInitCfg.Minio.Bucket)
|
|
|
// hasSomething := false
|
|
|
hasMetadata := false
|
|
|
+
|
|
|
+ var wg sync.WaitGroup
|
|
|
+ var mt sync.Mutex
|
|
|
+ pool := tunny.NewFunc(appInitCfg.Main.ParallelUploadThreadsCount, func(payload interface{}) interface{} {
|
|
|
+ packed := payload.(util.Pair[string, PartUploadArgs])
|
|
|
+ partName := packed.First
|
|
|
+ partInfo := packed.Second
|
|
|
+
|
|
|
+ objStat := StreamObjectUploadStatistics{
|
|
|
+ StartTime: time.Now(),
|
|
|
+ PartName: partName,
|
|
|
+ }
|
|
|
+
|
|
|
+ logger.Infof("Uploading part `%s` (total %d) of stream `%s`, total_points=%d",
|
|
|
+ partInfo.PartName, partInfo.StreamInfo.PartsCount,
|
|
|
+ partInfo.StreamName, partInfo.StreamInfo.TotalPoints)
|
|
|
+
|
|
|
+ err := upload_one_part(app, partInfo.StreamInfo, partInfo.StreamName, partInfo.PartName)
|
|
|
+ if err != nil {
|
|
|
+ objStat.EndTime = time.Now()
|
|
|
+ objStat.UpState = "fail"
|
|
|
+ objStat.Msg = err.Error()
|
|
|
+
|
|
|
+ logger.Warnf("Failed to upload part `%s` of stream `%s` (took %v): %v", partInfo.PartName, partInfo.StreamName,
|
|
|
+ objStat.EndTime.Sub(objStat.StartTime), err)
|
|
|
+ fullyUploaded = false
|
|
|
+ } else {
|
|
|
+ // Mark the part as uploaded
|
|
|
+ //err = app.db.Create(&PartUploadRecord{StreamName: partInfo.StreamName, PartName: partInfo.PartName}).Error
|
|
|
+ part := PartUploadRecord{StreamName: partInfo.StreamName, PartName: partName}
|
|
|
+ err = app.db.Where(part).FirstOrCreate(&PartUploadRecord{}).Error
|
|
|
+ if err != nil {
|
|
|
+ logger.Warnf("Failed to mark part `%s` of stream `%s` as uploaded: %v", partInfo.PartName, partInfo.StreamName, err)
|
|
|
+ }
|
|
|
+
|
|
|
+ objStat.EndTime = time.Now()
|
|
|
+ objStat.UpState = "ok"
|
|
|
+
|
|
|
+ logger.Infof("Uploaded part `%s` of stream `%s`, took %v", partInfo.PartName, streamName,
|
|
|
+ objStat.EndTime.Sub(objStat.StartTime))
|
|
|
+ }
|
|
|
+
|
|
|
+ func() {
|
|
|
+ mt.Lock()
|
|
|
+ defer mt.Unlock()
|
|
|
+ streamStats.Objects[partInfo.PartName] = objStat
|
|
|
+ }()
|
|
|
+
|
|
|
+ return nil
|
|
|
+ })
|
|
|
+
|
|
|
for objInfo := range app.minioClient.ListObjects(context.Background(), appInitCfg.Minio.Bucket, options) {
|
|
|
if objInfo.Err != nil {
|
|
|
return false, objInfo.Err
|
|
@@ -682,7 +736,11 @@ func upload_one_stream(app AppCtx, streamName string) (fullyUploaded bool, err e
|
|
|
if part_already_uploaded(app, streamName, partName) {
|
|
|
objStat.EndTime = time.Now()
|
|
|
objStat.UpState = "repeated"
|
|
|
- streamStats.Objects[objInfo.Key] = objStat
|
|
|
+ func() {
|
|
|
+ mt.Lock()
|
|
|
+ defer mt.Unlock()
|
|
|
+ streamStats.Objects[objInfo.Key] = objStat
|
|
|
+ }()
|
|
|
|
|
|
logger.Infof("Part `%s` of stream `%s` is already uploaded", objInfo.Key, streamName)
|
|
|
continue
|
|
@@ -694,38 +752,16 @@ func upload_one_stream(app AppCtx, streamName string) (fullyUploaded bool, err e
|
|
|
|
|
|
// Do the parts upload
|
|
|
partInfo := PartUploadArgs{StreamInfo: streamInfo, StreamName: streamName, PartName: objInfo.Key}
|
|
|
+ wg.Add(1)
|
|
|
+ go func() {
|
|
|
+ defer wg.Done()
|
|
|
+ pool.Process(util.Pair[string, PartUploadArgs]{First: partName, Second: partInfo})
|
|
|
+ }()
|
|
|
+ }
|
|
|
|
|
|
- logger.Infof("Uploading part `%s` (total %d) of stream `%s`, total_points=%d",
|
|
|
- partInfo.PartName, partInfo.StreamInfo.PartsCount,
|
|
|
- partInfo.StreamName, partInfo.StreamInfo.TotalPoints)
|
|
|
-
|
|
|
- err := upload_one_part(app, partInfo.StreamInfo, partInfo.StreamName, partInfo.PartName)
|
|
|
- if err != nil {
|
|
|
- objStat.EndTime = time.Now()
|
|
|
- objStat.UpState = "fail"
|
|
|
- objStat.Msg = err.Error()
|
|
|
-
|
|
|
- logger.Warnf("Failed to upload part `%s` of stream `%s` (took %v): %v", partInfo.PartName, partInfo.StreamName,
|
|
|
- objStat.EndTime.Sub(objStat.StartTime), err)
|
|
|
- fullyUploaded = false
|
|
|
- } else {
|
|
|
- // Mark the part as uploaded
|
|
|
- //err = app.db.Create(&PartUploadRecord{StreamName: partInfo.StreamName, PartName: partInfo.PartName}).Error
|
|
|
- part := PartUploadRecord{StreamName: partInfo.StreamName, PartName: partName}
|
|
|
- err = app.db.Where(part).FirstOrCreate(&PartUploadRecord{}).Error
|
|
|
- if err != nil {
|
|
|
- logger.Warnf("Failed to mark part `%s` of stream `%s` as uploaded: %v", partInfo.PartName, partInfo.StreamName, err)
|
|
|
- }
|
|
|
-
|
|
|
- objStat.EndTime = time.Now()
|
|
|
- objStat.UpState = "ok"
|
|
|
-
|
|
|
- logger.Infof("Uploaded part `%s` of stream `%s`, took %v", objInfo.Key, streamName,
|
|
|
- objStat.EndTime.Sub(objStat.StartTime))
|
|
|
- }
|
|
|
+ wg.Wait()
|
|
|
+ pool.Close()
|
|
|
|
|
|
- streamStats.Objects[objInfo.Key] = objStat
|
|
|
- }
|
|
|
if !hasMetadata {
|
|
|
logger.Warnf("Stream `%s` has no metadata file, will retry later", streamName)
|
|
|
fullyUploaded = false
|
|
@@ -853,6 +889,9 @@ func upload_one_part(app AppCtx, streamInfo *StreamMetadata, streamName string,
|
|
|
err = c.Do(ctx, ch.Query{
|
|
|
Body: sql,
|
|
|
Input: input,
|
|
|
+ // Settings: []ch.Setting{
|
|
|
+ // ch.SettingInt("max_insert_threads", 2),
|
|
|
+ // },
|
|
|
})
|
|
|
if err != nil {
|
|
|
return fmt.Errorf("failed to insert part into stck: %w", err)
|