apkipa пре 4 дана
родитељ
комит
94ce6fbe0e
5 измењених фајлова са 89 додато и 40 уклоњено
  1. 1 0
      config/application.yaml
  2. 1 0
      go.mod
  3. 2 0
      go.sum
  4. 80 40
      main.go
  5. 5 0
      util/util.go

+ 1 - 0
config/application.yaml

@@ -19,3 +19,4 @@ main:
   uploadRetryMaxTimes: 20
   failedRetryDelaySeconds: 5
   notifyToUploadDelaySeconds: 1
+  parallelUploadThreadsCount: 2

+ 1 - 0
go.mod

@@ -7,6 +7,7 @@ replace stck/stck-nsq-msg => ./stck-nsq-msg
 require (
 	github.com/ClickHouse/ch-go v0.61.5
 	github.com/ClickHouse/clickhouse-go/v2 v2.30.0
+	github.com/Jeffail/tunny v0.1.4
 	github.com/fsnotify/fsnotify v1.7.0
 	github.com/klauspost/compress v1.17.11
 	github.com/minio/minio-go/v7 v7.0.80

+ 2 - 0
go.sum

@@ -6,6 +6,8 @@ github.com/ClickHouse/clickhouse-go/v2 v2.30.0 h1:AG4D/hW39qa58+JHQIFOSnxyL46H6h
 github.com/ClickHouse/clickhouse-go/v2 v2.30.0/go.mod h1:i9ZQAojcayW3RsdCb3YR+n+wC2h65eJsZCscZ1Z1wyo=
 github.com/Masterminds/semver/v3 v3.3.1 h1:QtNSWtVZ3nBfk8mAOu/B6v7FMJ+NHTIgUPi7rj+4nv4=
 github.com/Masterminds/semver/v3 v3.3.1/go.mod h1:4V+yj/TJE1HU9XfppCwVMZq3I84lprf4nC11bSS5beM=
+github.com/Jeffail/tunny v0.1.4 h1:chtpdz+nUtaYQeCKlNBg6GycFF/kGVHOr6A3cmzTJXs=
+github.com/Jeffail/tunny v0.1.4/go.mod h1:P8xAx4XQl0xsuhjX1DtfaMDCSuavzdb2rwbd0lk+fvo=
 github.com/andybalholm/brotli v1.1.1 h1:PR2pgnyFznKEugtsUo0xLdDop5SKXd5Qf5ysW+7XdTA=
 github.com/andybalholm/brotli v1.1.1/go.mod h1:05ib4cKhjx3OQYUY22hTVd34Bc8upXjOLL2rKwwZBoA=
 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=

+ 80 - 40
main.go

@@ -29,6 +29,7 @@ import (
 	"github.com/ClickHouse/ch-go/proto"
 	"github.com/ClickHouse/clickhouse-go/v2"
 	"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
+	"github.com/Jeffail/tunny"
 	"github.com/fsnotify/fsnotify"
 	"github.com/minio/minio-go/v7"
 	"github.com/minio/minio-go/v7/pkg/credentials"
@@ -68,6 +69,7 @@ type AppInitConfig struct {
 		UploadRetryMaxTimes        int `mapstructure:"uploadRetryMaxTimes"`
 		FailedRetryDelaySeconds    int `mapstructure:"failedRetryDelaySeconds"`
 		NotifyToUploadDelaySeconds int `mapstructure:"notifyToUploadDelaySeconds"`
+		ParallelUploadThreadsCount int `mapstructure:"parallelUploadThreadsCount"`
 	} `mapstructure:"main"`
 }
 
@@ -337,6 +339,7 @@ func initLoadConfig() {
 	viper.SetDefault("main.uploadRetryMaxTimes", 20)
 	viper.SetDefault("main.failedRetryDelaySeconds", 5)
 	viper.SetDefault("main.notifyToUploadDelaySeconds", 1)
+	viper.SetDefault("main.parallelUploadThreadsCount", 2)
 	viper.SetConfigFile("./config/application.yaml")
 	viper.WatchConfig()
 	viper.OnConfigChange(func(e fsnotify.Event) {
@@ -894,50 +897,19 @@ func upload_one_stream(app AppCtx, streamName string) (fullyUploaded bool, err e
 	logger.Tracef("Listing minio objects in `%s`, bucket `%s`", streamObjPath, appInitCfg.Minio.Bucket)
 	// hasSomething := false
 	hasMetadata := false
-	for objInfo := range app.minioClient.ListObjects(context.Background(), appInitCfg.Minio.Bucket, options) {
-		if objInfo.Err != nil {
-			return false, objInfo.Err
-		}
-
-		if gAppQuitting {
-			logger.Infof("Quitting, stopping uploading one stream")
-			return false, nil
-		}
-
-		logger.Tracef("Checking minio file `%s`", objInfo.Key)
 
-		if strings.HasSuffix(objInfo.Key, "/") {
-			continue
-		}
-		partName := filepath.Base(objInfo.Key)
-		if partName == "metadata.json" {
-			hasMetadata = true
-			continue
-		}
-
-		hasSomething = true
+	var wg sync.WaitGroup
+	var mt sync.Mutex
+	pool := tunny.NewFunc(appInitCfg.Main.ParallelUploadThreadsCount, func(payload interface{}) interface{} {
+		packed := payload.(util.Pair[string, PartUploadArgs])
+		partName := packed.First
+		partInfo := packed.Second
 
 		objStat := StreamObjectUploadStatistics{
 			StartTime: time.Now(),
 			PartName:  partName,
 		}
 
-		if part_already_uploaded(app, streamName, partName) {
-			objStat.EndTime = time.Now()
-			objStat.UpState = "repeated"
-			streamStats.Objects[objInfo.Key] = objStat
-
-			logger.Infof("Part `%s` of stream `%s` is already uploaded", objInfo.Key, streamName)
-			continue
-		}
-		if fullyUploaded {
-			fullyUploaded = false
-			logger.Debugf("Marking stream `%s` as not fully uploaded, reason: part `%s` not uploaded", streamName, objInfo.Key)
-		}
-
-		// Do the parts upload
-		partInfo := PartUploadArgs{StreamInfo: streamInfo, StreamName: streamName, PartName: objInfo.Key}
-
 		logger.Infof("Uploading part `%s` (total %d) of stream `%s`, total_points=%d",
 			partInfo.PartName, partInfo.StreamInfo.PartsCount,
 			partInfo.StreamName, partInfo.StreamInfo.TotalPoints)
@@ -963,15 +935,20 @@ func upload_one_stream(app AppCtx, streamName string) (fullyUploaded bool, err e
 			objStat.EndTime = time.Now()
 			objStat.UpState = "ok"
 
-			logger.Infof("Uploaded part `%s` of stream `%s`, took %v", objInfo.Key, streamName,
+			logger.Infof("Uploaded part `%s` of stream `%s`, took %v", partInfo.PartName, streamName,
 				objStat.EndTime.Sub(objStat.StartTime))
 		}
 
-		streamStats.Objects[objInfo.Key] = objStat
+		func() {
+			mt.Lock()
+			defer mt.Unlock()
+			streamStats.Objects[partInfo.PartName] = objStat
+		}()
+
 		partNum, err := util.ExtractNumberFromString(partName)
 		if err != nil {
 			// Not a part file? Skip
-			continue
+			return nil
 		}
 		status := "success"
 		if objStat.UpState != "ok" {
@@ -983,7 +960,67 @@ func upload_one_stream(app AppCtx, streamName string) (fullyUploaded bool, err e
 		if err != nil {
 			logger.Errorf("send part insert to stck status changed message error: %s", err)
 		}
+
+		return nil
+	})
+
+	for objInfo := range app.minioClient.ListObjects(context.Background(), appInitCfg.Minio.Bucket, options) {
+		if objInfo.Err != nil {
+			return false, objInfo.Err
+		}
+
+		if gAppQuitting {
+			logger.Infof("Quitting, stopping uploading one stream")
+			return false, nil
+		}
+
+		logger.Tracef("Checking minio file `%s`", objInfo.Key)
+
+		if strings.HasSuffix(objInfo.Key, "/") {
+			continue
+		}
+		partName := filepath.Base(objInfo.Key)
+		if partName == "metadata.json" {
+			hasMetadata = true
+			continue
+		}
+
+		hasSomething = true
+
+		objStat := StreamObjectUploadStatistics{
+			StartTime: time.Now(),
+			PartName:  partName,
+		}
+
+		if part_already_uploaded(app, streamName, partName) {
+			objStat.EndTime = time.Now()
+			objStat.UpState = "repeated"
+			func() {
+				mt.Lock()
+				defer mt.Unlock()
+				streamStats.Objects[objInfo.Key] = objStat
+			}()
+
+			logger.Infof("Part `%s` of stream `%s` is already uploaded", objInfo.Key, streamName)
+			continue
+		}
+		if fullyUploaded {
+			fullyUploaded = false
+			logger.Debugf("Marking stream `%s` as not fully uploaded, reason: part `%s` not uploaded", streamName, objInfo.Key)
+		}
+
+		// Do the parts upload
+		partInfo := PartUploadArgs{StreamInfo: streamInfo, StreamName: streamName, PartName: objInfo.Key}
+		wg.Add(1)
+		go func() {
+			defer wg.Done()
+			pool.Process(util.Pair[string, PartUploadArgs]{First: partName, Second: partInfo})
+		}()
 	}
+
+	wg.Wait()
+	pool.Close()
+
 	if !hasMetadata {
 		logger.Warnf("Stream `%s` has no metadata file, will retry later", streamName)
 		fullyUploaded = false
@@ -1111,6 +1148,9 @@ func upload_one_part(app AppCtx, streamInfo *StreamMetadata, streamName string,
 			err = c.Do(ctx, ch.Query{
 				Body:  sql,
 				Input: input,
+				// Settings: []ch.Setting{
+				// 	ch.SettingInt("max_insert_threads", 2),
+				// },
 			})
 			if err != nil {
 				return fmt.Errorf("failed to insert part into stck: %w", err)

+ 5 - 0
util/util.go

@@ -112,3 +112,8 @@ func ExtractNumberFromString(filename string) (int64, error) {
 func ToSqlLiteral(s string) string {
 	return "'" + strings.ReplaceAll(s, "'", "''") + "'"
 }
+
+type Pair[T, U any] struct {
+    First  T
+    Second U
+}