|
@@ -126,6 +126,37 @@ func uploadOneStreamFolder(localPath string, objPath string, minioClient *minio.
|
|
|
ctx := context.Background()
|
|
|
bucket := appConfig.MinIoConfig.Bucket
|
|
|
|
|
|
+ fullyUploaded = true
|
|
|
+
|
|
|
+ // NOTE: We need to check metadata file before obtaining the list of files
|
|
|
+ // to prevent TOCTOU issues.
|
|
|
+ if fullyUploaded {
|
|
|
+ // Check for JSON metadata file. If its `total_points` is 0,
|
|
|
+ // then the producer has not yet finished writing the stream, so we
|
|
|
+ // should not delete the folder.
|
|
|
+
|
|
|
+ metaFilePath := filepath.Join(localPath, "metadata.json")
|
|
|
+ // Read metadata file
|
|
|
+ metaFile, err := os.Open(metaFilePath)
|
|
|
+ if err != nil {
|
|
|
+ return false, fmt.Errorf("open metadata file `%s` error: %w", metaFilePath, err)
|
|
|
+ }
|
|
|
+ defer metaFile.Close()
|
|
|
+
|
|
|
+ var metadata struct {
|
|
|
+ TotalPoints int `json:"total_points"`
|
|
|
+ }
|
|
|
+
|
|
|
+ if err := json.NewDecoder(metaFile).Decode(&metadata); err != nil {
|
|
|
+ return false, fmt.Errorf("decode metadata file `%s` error: %w", metaFilePath, err)
|
|
|
+ }
|
|
|
+
|
|
|
+ if metadata.TotalPoints == 0 {
|
|
|
+ logger.Infof("metadata file `%s` indicates not fully uploaded because total_points is 0", metaFilePath)
|
|
|
+ fullyUploaded = false
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
// Enumerate all files in the folder
|
|
|
entries, err := os.ReadDir(localPath)
|
|
|
if err != nil {
|
|
@@ -139,8 +170,6 @@ func uploadOneStreamFolder(localPath string, objPath string, minioClient *minio.
|
|
|
return false, fmt.Errorf("create folder `%s` error: %w", objPath, err)
|
|
|
}
|
|
|
|
|
|
- fullyUploaded = true
|
|
|
-
|
|
|
// Upload files
|
|
|
// NOTE: We overwrite the existing files in the minio bucket
|
|
|
for _, entry := range entries {
|
|
@@ -175,33 +204,6 @@ func uploadOneStreamFolder(localPath string, objPath string, minioClient *minio.
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- if fullyUploaded {
|
|
|
- // Finally check for JSON metadata file. If its `total_points` is 0,
|
|
|
- // then the producer has not yet finished writing the stream, so we
|
|
|
- // should not delete the folder.
|
|
|
-
|
|
|
- metaFilePath := filepath.Join(localPath, "metadata.json")
|
|
|
- // Read metadata file
|
|
|
- metaFile, err := os.Open(metaFilePath)
|
|
|
- if err != nil {
|
|
|
- return false, fmt.Errorf("open metadata file `%s` error: %w", metaFilePath, err)
|
|
|
- }
|
|
|
- defer metaFile.Close()
|
|
|
-
|
|
|
- var metadata struct {
|
|
|
- TotalPoints int `json:"total_points"`
|
|
|
- }
|
|
|
-
|
|
|
- if err := json.NewDecoder(metaFile).Decode(&metadata); err != nil {
|
|
|
- return false, fmt.Errorf("decode metadata file `%s` error: %w", metaFilePath, err)
|
|
|
- }
|
|
|
-
|
|
|
- if metadata.TotalPoints == 0 {
|
|
|
- logger.Infof("metadata file `%s` indicates not fully uploaded because total_points is 0", metaFilePath)
|
|
|
- fullyUploaded = false
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
return fullyUploaded, nil
|
|
|
}
|
|
|
|