|
@@ -128,9 +128,7 @@ func uploadOneStreamFolder(localPath string, objPath string, minioClient *minio.
|
|
|
|
|
|
fullyUploaded = true
|
|
fullyUploaded = true
|
|
|
|
|
|
- // NOTE: We need to check metadata file before obtaining the list of files
|
|
|
|
- // to prevent TOCTOU issues.
|
|
|
|
- if fullyUploaded {
|
|
|
|
|
|
+ checkMetadataFile := func(localPath string) error {
|
|
// Check for JSON metadata file. If its `total_points` is 0,
|
|
// Check for JSON metadata file. If its `total_points` is 0,
|
|
// then the producer has not yet finished writing the stream, so we
|
|
// then the producer has not yet finished writing the stream, so we
|
|
// should not delete the folder.
|
|
// should not delete the folder.
|
|
@@ -139,7 +137,7 @@ func uploadOneStreamFolder(localPath string, objPath string, minioClient *minio.
|
|
// Read metadata file
|
|
// Read metadata file
|
|
metaFile, err := os.Open(metaFilePath)
|
|
metaFile, err := os.Open(metaFilePath)
|
|
if err != nil {
|
|
if err != nil {
|
|
- return false, fmt.Errorf("open metadata file `%s` error: %w", metaFilePath, err)
|
|
|
|
|
|
+ return fmt.Errorf("open metadata file `%s` error: %w", metaFilePath, err)
|
|
}
|
|
}
|
|
defer metaFile.Close()
|
|
defer metaFile.Close()
|
|
|
|
|
|
@@ -148,13 +146,24 @@ func uploadOneStreamFolder(localPath string, objPath string, minioClient *minio.
|
|
}
|
|
}
|
|
|
|
|
|
if err := json.NewDecoder(metaFile).Decode(&metadata); err != nil {
|
|
if err := json.NewDecoder(metaFile).Decode(&metadata); err != nil {
|
|
- return false, fmt.Errorf("decode metadata file `%s` error: %w", metaFilePath, err)
|
|
|
|
|
|
+ return fmt.Errorf("decode metadata file `%s` error: %w", metaFilePath, err)
|
|
}
|
|
}
|
|
|
|
|
|
if metadata.TotalPoints == 0 {
|
|
if metadata.TotalPoints == 0 {
|
|
logger.Infof("metadata file `%s` indicates not fully uploaded because total_points is 0", metaFilePath)
|
|
logger.Infof("metadata file `%s` indicates not fully uploaded because total_points is 0", metaFilePath)
|
|
fullyUploaded = false
|
|
fullyUploaded = false
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ return nil
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // NOTE: We need to check metadata file before obtaining the list of files
|
|
|
|
+ // to prevent TOCTOU issues.
|
|
|
|
+ if fullyUploaded {
|
|
|
|
+ err := checkMetadataFile(localPath)
|
|
|
|
+ if err != nil {
|
|
|
|
+ return false, err
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
// Enumerate all files in the folder
|
|
// Enumerate all files in the folder
|
|
@@ -180,10 +189,11 @@ func uploadOneStreamFolder(localPath string, objPath string, minioClient *minio.
|
|
continue
|
|
continue
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ innerLocalPath := filepath.Join(localPath, entry.Name())
|
|
|
|
+
|
|
if entry.IsDir() {
|
|
if entry.IsDir() {
|
|
// Upload files in the folder
|
|
// Upload files in the folder
|
|
// logger.Warnf("stream folder contains subfolder `%s`, ignoring", entry.Name())
|
|
// logger.Warnf("stream folder contains subfolder `%s`, ignoring", entry.Name())
|
|
- innerLocalPath := filepath.Join(localPath, entry.Name())
|
|
|
|
innerObjPath := objPath + "/" + entry.Name()
|
|
innerObjPath := objPath + "/" + entry.Name()
|
|
fullyUploadedInner, err := uploadOneStreamFolder(innerLocalPath, innerObjPath, minioClient)
|
|
fullyUploadedInner, err := uploadOneStreamFolder(innerLocalPath, innerObjPath, minioClient)
|
|
if err != nil {
|
|
if err != nil {
|
|
@@ -195,12 +205,20 @@ func uploadOneStreamFolder(localPath string, objPath string, minioClient *minio.
|
|
} else {
|
|
} else {
|
|
// Upload the file
|
|
// Upload the file
|
|
_, err := minioClient.FPutObject(ctx, bucket,
|
|
_, err := minioClient.FPutObject(ctx, bucket,
|
|
- objFolderPath+entry.Name(), filepath.Join(localPath, entry.Name()),
|
|
|
|
|
|
+ objFolderPath+entry.Name(), innerLocalPath,
|
|
minio.PutObjectOptions{ContentType: "application/octet-stream"})
|
|
minio.PutObjectOptions{ContentType: "application/octet-stream"})
|
|
if err != nil {
|
|
if err != nil {
|
|
return false, fmt.Errorf("upload file `%s` error: %w", entry.Name(), err)
|
|
return false, fmt.Errorf("upload file `%s` error: %w", entry.Name(), err)
|
|
}
|
|
}
|
|
logger.Infof("uploaded file `%s`", entry.Name())
|
|
logger.Infof("uploaded file `%s`", entry.Name())
|
|
|
|
+
|
|
|
|
+ // Remove *.zst files early
|
|
|
|
+ if strings.HasSuffix(entry.Name(), ".zst") {
|
|
|
|
+ err := os.Remove(innerLocalPath)
|
|
|
|
+ if err != nil {
|
|
|
|
+ logger.Errorf("remove file `%s` error: %v", entry.Name(), err)
|
|
|
|
+ }
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|