|
@@ -8,7 +8,13 @@ import (
|
|
|
"example/minio-into-stck/util"
|
|
|
"fmt"
|
|
|
"os"
|
|
|
+ "os/exec"
|
|
|
+ "os/signal"
|
|
|
+ "path/filepath"
|
|
|
+ "stck/stck-nsq-msg"
|
|
|
+ smsg "stck/stck-nsq-msg/msg"
|
|
|
"sync"
|
|
|
+ "syscall"
|
|
|
|
|
|
"io"
|
|
|
// "log"
|
|
@@ -26,6 +32,7 @@ import (
|
|
|
"github.com/minio/minio-go/v7/pkg/credentials"
|
|
|
"github.com/minio/minio-go/v7/pkg/notification"
|
|
|
"github.com/natefinch/lumberjack"
|
|
|
+ "github.com/nsqio/go-nsq"
|
|
|
"github.com/sirupsen/logrus"
|
|
|
"github.com/spf13/viper"
|
|
|
"gorm.io/driver/mysql"
|
|
@@ -51,6 +58,10 @@ type AppInitConfig struct {
|
|
|
Host string `mapstructure:"host"`
|
|
|
Table string `mapstructure:"table"`
|
|
|
} `mapstructure:"stck"`
|
|
|
+ Nsq struct {
|
|
|
+ TcpAddr string `mapstructure:"tcpAddr"`
|
|
|
+ LookupdHttpAddr string `mapstructure:"lookupdHttpAddr"`
|
|
|
+ } `mapstructure:"nsq"`
|
|
|
Main struct {
|
|
|
UploadRetryMaxTimes int `mapstructure:"uploadRetryMaxTimes"`
|
|
|
FailedRetryDelaySeconds int `mapstructure:"failedRetryDelaySeconds"`
|
|
@@ -87,9 +98,234 @@ type StreamMetadata struct {
|
|
|
Name string `json:"name"`
|
|
|
TimestampOffset int64 `json:"timestamp_offset"`
|
|
|
Interval int64 `json:"interval"`
|
|
|
- PartsCount int `json:"parts_count"`
|
|
|
- PointsPerPart int `json:"points_per_part"`
|
|
|
- TotalPoints int `json:"total_points"`
|
|
|
+ PartsCount int64 `json:"parts_count"`
|
|
|
+ PointsPerPart int64 `json:"points_per_part"`
|
|
|
+ TotalPoints int64 `json:"total_points"`
|
|
|
+}
|
|
|
+
|
|
|
+var gLocalIpAddress string
|
|
|
+var gMachineID string
|
|
|
+var gNsqProducer *nsq.Producer
|
|
|
+var gNsqConsumer *nsq.Consumer
|
|
|
+var gAppStartTime time.Time = time.Now()
|
|
|
+var programmaticQuitChan chan struct{} = make(chan struct{}, 1)
|
|
|
+var gAppQuitting = false
|
|
|
+var gAppExitWaitGroup sync.WaitGroup
|
|
|
+
|
|
|
+var buildtime string
|
|
|
+
|
|
|
+func MakeHeartbeatMsg() *smsg.DeviceHeartbeatMsg {
|
|
|
+ m := smsg.MakeDeviceHeartbeatMsg(gMachineID, gAppStartTime.UnixNano(), time.Now().UnixNano(),
|
|
|
+ gLocalIpAddress, "minio-into-stck", "0.1.0+dev."+buildtime)
|
|
|
+ return &m
|
|
|
+}
|
|
|
+
|
|
|
+func PublishMessage(msg stcknsqmsg.StckNsqMsgVariant) error {
|
|
|
+ payload, err := stcknsqmsg.ToStckNsqMsgString(msg)
|
|
|
+ if err != nil {
|
|
|
+ return fmt.Errorf("marshal message error: %w", err)
|
|
|
+ }
|
|
|
+ err = gNsqProducer.Publish(smsg.ServerDoNotifyTopic, []byte(payload))
|
|
|
+ if err != nil {
|
|
|
+ return fmt.Errorf("publish message error: %w", err)
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+func initNsq() {
|
|
|
+ ip, err := stcknsqmsg.GetLocalIP()
|
|
|
+ if err != nil {
|
|
|
+ logger.Warnf("GetLocalIP error: %s", err)
|
|
|
+ } else {
|
|
|
+ gLocalIpAddress = ip.String()
|
|
|
+ logger.Infof("Local IP: %s", gLocalIpAddress)
|
|
|
+ }
|
|
|
+ gMachineID = stcknsqmsg.MakeUniqueMachineID()
|
|
|
+ logger.Infof("Machine ID: %s", gMachineID)
|
|
|
+
|
|
|
+ fmt.Printf("IP: %s, Machine ID: %s\n", gLocalIpAddress, gMachineID)
|
|
|
+
|
|
|
+ // Connect to NSQ
|
|
|
+ nsqConfig := nsq.NewConfig()
|
|
|
+ gNsqProducer, err = nsq.NewProducer(appInitCfg.Nsq.TcpAddr, nsqConfig)
|
|
|
+ if err != nil {
|
|
|
+ logger.Fatalf("NSQ Producer init error: %s", err)
|
|
|
+ }
|
|
|
+ gNsqConsumer, err = nsq.NewConsumer(smsg.ClientDoActionTopic, gMachineID, nsqConfig)
|
|
|
+ if err != nil {
|
|
|
+ logger.Fatalf("NSQ Consumer init error: %s", err)
|
|
|
+ }
|
|
|
+ gNsqConsumer.AddConcurrentHandlers(nsq.HandlerFunc(func(message *nsq.Message) error {
|
|
|
+ logger.Debugf("NSQ Consumer received message: %s", message.Body)
|
|
|
+
|
|
|
+ // Parse the message
|
|
|
+ recvTime := message.Timestamp
|
|
|
+ msg, err := stcknsqmsg.FromString(string(message.Body))
|
|
|
+ if err != nil {
|
|
|
+ logger.Errorf("NSQ Consumer unmarshal message error: %s", err)
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+
|
|
|
+ // Process the message
|
|
|
+ switch data := msg.Data.(type) {
|
|
|
+ case *smsg.DevicePingMsg:
|
|
|
+ if !stcknsqmsg.DeviceIdMatches(data.DeviceID, gMachineID) {
|
|
|
+ break
|
|
|
+ }
|
|
|
+ // Write pong
|
|
|
+ pongMsg := smsg.MakeDevicePongMsg(gMachineID, recvTime)
|
|
|
+ err := PublishMessage(&pongMsg)
|
|
|
+ if err != nil {
|
|
|
+ logger.Errorf("send pong error: %s", err)
|
|
|
+ }
|
|
|
+ case *smsg.RequestDeviceExecuteShellScriptMsg:
|
|
|
+ if !stcknsqmsg.DeviceIdMatches(data.DeviceID, gMachineID) {
|
|
|
+ break
|
|
|
+ }
|
|
|
+ // Execute the shell script
|
|
|
+ resp := smsg.MakeDeviceActionDoneMsg(gMachineID, data.MsgType(), "", recvTime, -1, "")
|
|
|
+ result, err := func(data *smsg.RequestDeviceExecuteShellScriptMsg) (string, error) {
|
|
|
+ cmd := exec.Command("bash", "-c", data.Script)
|
|
|
+ var out bytes.Buffer
|
|
|
+ cmd.Stdout = &out
|
|
|
+ cmd.Stderr = &out
|
|
|
+ err := cmd.Run()
|
|
|
+ return out.String(), err
|
|
|
+ }(data)
|
|
|
+ if err != nil {
|
|
|
+ errMsg := fmt.Sprintf("execute shell script error:\n%s\n\noutput:\n%s", err, result)
|
|
|
+ // Write error message
|
|
|
+ resp.Status = -1
|
|
|
+ resp.Msg = errMsg
|
|
|
+ } else {
|
|
|
+ // Write output
|
|
|
+ resp.Status = 0
|
|
|
+ resp.Msg = result
|
|
|
+ }
|
|
|
+ err = PublishMessage(&resp)
|
|
|
+ if err != nil {
|
|
|
+ logger.Errorf("send action done error: %s", err)
|
|
|
+ }
|
|
|
+ case *smsg.RequestDeviceUpdateMsg:
|
|
|
+ if !stcknsqmsg.DeviceIdMatches(data.DeviceID, gMachineID) {
|
|
|
+ break
|
|
|
+ }
|
|
|
+ if data.ServiceName != "minio-into-stck" {
|
|
|
+ break
|
|
|
+ }
|
|
|
+ resp := smsg.MakeDeviceActionDoneMsg(gMachineID, data.MsgType(), "", recvTime, 0, "")
|
|
|
+ result, err := func(data *smsg.RequestDeviceUpdateMsg) (string, error) {
|
|
|
+ // Download the update file to /tmp
|
|
|
+ downloadPath := "/tmp/minio-into-stck-updater"
|
|
|
+ updateScriptPath := filepath.Join(downloadPath, "update.sh")
|
|
|
+ var out bytes.Buffer
|
|
|
+ err := os.RemoveAll(downloadPath)
|
|
|
+ if err != nil {
|
|
|
+ return "", err
|
|
|
+ }
|
|
|
+ err = os.MkdirAll(downloadPath, 0777)
|
|
|
+ if err != nil {
|
|
|
+ return "", err
|
|
|
+ }
|
|
|
+ updateScriptContent := fmt.Sprintf(`#!/bin/bash
|
|
|
+set -e
|
|
|
+cd %s
|
|
|
+wget --tries=3 -nv -O installer.tar.gz %s
|
|
|
+tar -xzf installer.tar.gz
|
|
|
+cd minio-into-stck-installer
|
|
|
+./replacing-update.sh
|
|
|
+`, downloadPath, data.ServiceBinaryURL)
|
|
|
+ err = os.WriteFile(updateScriptPath, []byte(updateScriptContent), 0777)
|
|
|
+ if err != nil {
|
|
|
+ return "", err
|
|
|
+ }
|
|
|
+ // Execute the update script
|
|
|
+ cmd := exec.Command("bash", "-c", updateScriptPath)
|
|
|
+ cmd.Stdout = &out
|
|
|
+ cmd.Stderr = &out
|
|
|
+ err = cmd.Run()
|
|
|
+ return out.String(), err
|
|
|
+ }(data)
|
|
|
+ if err != nil {
|
|
|
+ errMsg := fmt.Sprintf("execute update process error:\n%s\n\noutput:\n%s", err, result)
|
|
|
+ // Write error message
|
|
|
+ resp.Status = -1
|
|
|
+ resp.Msg = errMsg
|
|
|
+ } else {
|
|
|
+ // Write output
|
|
|
+ resp.Status = 0
|
|
|
+ resp.Msg = "executed update process successfully\n\noutput:\n" + result
|
|
|
+ }
|
|
|
+ err = PublishMessage(&resp)
|
|
|
+ if err != nil {
|
|
|
+ logger.Errorf("send action done error: %s", err)
|
|
|
+ }
|
|
|
+ case *smsg.ForceDeviceRebootMsg:
|
|
|
+ if !stcknsqmsg.DeviceIdMatches(data.DeviceID, gMachineID) {
|
|
|
+ break
|
|
|
+ }
|
|
|
+ programmaticQuitChan <- struct{}{}
|
|
|
+ case *smsg.ForceDeviceSendHeartbeatMsg:
|
|
|
+ if !stcknsqmsg.DeviceIdMatches(data.DeviceID, gMachineID) {
|
|
|
+ break
|
|
|
+ }
|
|
|
+ // Send heartbeat
|
|
|
+ heartBeatMsg := MakeHeartbeatMsg()
|
|
|
+ err := PublishMessage(heartBeatMsg)
|
|
|
+ if err != nil {
|
|
|
+ logger.Errorf("send heartbeat error: %s", err)
|
|
|
+ }
|
|
|
+ case *smsg.RequestDeviceUploadLogsToMinioMsg:
|
|
|
+ if !stcknsqmsg.DeviceIdMatches(data.DeviceID, gMachineID) {
|
|
|
+ break
|
|
|
+ }
|
|
|
+ // Upload logs to MinIO
|
|
|
+ resp := smsg.MakeDeviceActionDoneMsg(gMachineID, data.MsgType(), "", recvTime, 0, "")
|
|
|
+ minioCreds := credentials.NewStaticV4(appInitCfg.Minio.AccessKey, appInitCfg.Minio.Secret, "")
|
|
|
+ minioClient, err := minio.New(appInitCfg.Minio.Host, &minio.Options{
|
|
|
+ Creds: minioCreds,
|
|
|
+ Secure: false,
|
|
|
+ })
|
|
|
+ if err != nil {
|
|
|
+ logger.Errorf("MinIO Client init error: %s", err)
|
|
|
+ resp.Status = -1
|
|
|
+ resp.Msg += fmt.Sprintf("MinIO Client init error: %s\n", err)
|
|
|
+ } else {
|
|
|
+ if util.FileExists("./logs") {
|
|
|
+ err = util.MinioUploadFolder(minioClient, data.RemoteBucket,
|
|
|
+ filepath.Join(data.RemoteBasePath, gMachineID, "logs"), "logs")
|
|
|
+ if err != nil {
|
|
|
+ logger.Errorf("upload logs to MinIO error: %s", err)
|
|
|
+ resp.Status = -1
|
|
|
+ resp.Msg += fmt.Sprintf("upload logs to MinIO error: %s\n", err)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if util.FileExists("./log") {
|
|
|
+ err = util.MinioUploadFolder(minioClient, data.RemoteBucket,
|
|
|
+ filepath.Join(data.RemoteBasePath, gMachineID, "log"), "log")
|
|
|
+ if err != nil {
|
|
|
+ logger.Errorf("upload log to MinIO error: %s", err)
|
|
|
+ resp.Status = -1
|
|
|
+ resp.Msg += fmt.Sprintf("upload log to MinIO error: %s\n", err)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ err = PublishMessage(&resp)
|
|
|
+ if err != nil {
|
|
|
+ logger.Errorf("send action done error: %s", err)
|
|
|
+ }
|
|
|
+ default:
|
|
|
+ logger.Debugf("NSQ Consumer ignored unknown or uninteresting message: %v", msg)
|
|
|
+ }
|
|
|
+
|
|
|
+ // Notify NSQ that the message is processed successfully
|
|
|
+ return nil
|
|
|
+ }), 1)
|
|
|
+ // err = gNsqConsumer.ConnectToNSQLookupd(gAppConfig.Nsq.LookupdHttpAddr)
|
|
|
+ err = gNsqConsumer.ConnectToNSQD(appInitCfg.Nsq.TcpAddr)
|
|
|
+ if err != nil {
|
|
|
+ logger.Fatalf("NSQ Consumer connect error: %s", err)
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
var appInitCfg *AppInitConfig = &AppInitConfig{}
|
|
@@ -202,43 +438,45 @@ var statLogger *logrus.Logger
|
|
|
var mutexObjFailCounter = &sync.Mutex{}
|
|
|
var objFailCounter map[string]int = make(map[string]int)
|
|
|
|
|
|
-var mutexHasObjEmitted = &sync.Mutex{}
|
|
|
-var hasObjEmitted map[string]bool = make(map[string]bool)
|
|
|
-
|
|
|
-func markObjEmitted(obj string) {
|
|
|
- mutexHasObjEmitted.Lock()
|
|
|
- hasObjEmitted[obj] = true
|
|
|
- mutexHasObjEmitted.Unlock()
|
|
|
-}
|
|
|
-
|
|
|
-func unmarkObjEmitted(obj string) {
|
|
|
- mutexHasObjEmitted.Lock()
|
|
|
- delete(hasObjEmitted, obj)
|
|
|
- mutexHasObjEmitted.Unlock()
|
|
|
-}
|
|
|
-
|
|
|
-func hasObjAlreadyEmitted(obj string) bool {
|
|
|
- mutexHasObjEmitted.Lock()
|
|
|
- defer mutexHasObjEmitted.Unlock()
|
|
|
- value, exists := hasObjEmitted[obj]
|
|
|
- return exists && value
|
|
|
-}
|
|
|
-
|
|
|
-func writeToObjChanDeduplicated(obj string, objChan chan<- string) {
|
|
|
- if !hasObjAlreadyEmitted(obj) {
|
|
|
- markObjEmitted(obj)
|
|
|
- objChan <- obj
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-func readFromObjChanDeduplicated(objChan <-chan string) string {
|
|
|
- obj := <-objChan
|
|
|
- unmarkObjEmitted(obj)
|
|
|
- return obj
|
|
|
-}
|
|
|
+// var mutexHasObjEmitted = &sync.Mutex{}
|
|
|
+// var hasObjEmitted map[string]bool = make(map[string]bool)
|
|
|
+
|
|
|
+// func markObjEmitted(obj string) {
|
|
|
+// mutexHasObjEmitted.Lock()
|
|
|
+// hasObjEmitted[obj] = true
|
|
|
+// mutexHasObjEmitted.Unlock()
|
|
|
+// }
|
|
|
+
|
|
|
+// func unmarkObjEmitted(obj string) {
|
|
|
+// mutexHasObjEmitted.Lock()
|
|
|
+// delete(hasObjEmitted, obj)
|
|
|
+// mutexHasObjEmitted.Unlock()
|
|
|
+// }
|
|
|
+
|
|
|
+// func hasObjAlreadyEmitted(obj string) bool {
|
|
|
+// mutexHasObjEmitted.Lock()
|
|
|
+// defer mutexHasObjEmitted.Unlock()
|
|
|
+// value, exists := hasObjEmitted[obj]
|
|
|
+// return exists && value
|
|
|
+// }
|
|
|
+
|
|
|
+// func writeToObjChanDeduplicated(obj string, objChan chan<- string) {
|
|
|
+// if !hasObjAlreadyEmitted(obj) {
|
|
|
+// markObjEmitted(obj)
|
|
|
+// objChan <- obj
|
|
|
+// }
|
|
|
+// }
|
|
|
+
|
|
|
+// func readFromObjChanDeduplicated(objChan <-chan string) string {
|
|
|
+// obj := <-objChan
|
|
|
+// unmarkObjEmitted(obj)
|
|
|
+// return obj
|
|
|
+// }
|
|
|
|
|
|
func main() {
|
|
|
- fmt.Println("Starting application...")
|
|
|
+ var err error
|
|
|
+
|
|
|
+ fmt.Println("Starting minio-into-stck, build time:", buildtime)
|
|
|
|
|
|
logger = initLog()
|
|
|
|
|
@@ -247,7 +485,9 @@ func main() {
|
|
|
// Load configuration from file
|
|
|
initLoadConfig()
|
|
|
|
|
|
- var err error
|
|
|
+ // 初始化 NSQ
|
|
|
+ initNsq()
|
|
|
+
|
|
|
var db *gorm.DB
|
|
|
var minioClient *minio.Client
|
|
|
var ckConn driver.Conn
|
|
@@ -317,7 +557,33 @@ func main() {
|
|
|
|
|
|
// Start the main work
|
|
|
logger.Infoln("Starting main worker...")
|
|
|
- main_worker(AppCtx{db, minioClient, ckConn})
|
|
|
+ gAppExitWaitGroup.Add(1)
|
|
|
+ go func() {
|
|
|
+ defer gAppExitWaitGroup.Done()
|
|
|
+ main_worker(AppCtx{db, minioClient, ckConn})
|
|
|
+ }()
|
|
|
+
|
|
|
+ // Wait on signal.
|
|
|
+ signalChan := make(chan os.Signal, 1)
|
|
|
+ signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
|
|
|
+
|
|
|
+ select {
|
|
|
+ case <-signalChan:
|
|
|
+ logger.Infof("received signal, stopping watch-daemon")
|
|
|
+ case <-programmaticQuitChan:
|
|
|
+ logger.Infof("received programmatic quit signal, stopping watch-daemon")
|
|
|
+ }
|
|
|
+
|
|
|
+ gAppQuitting = true
|
|
|
+
|
|
|
+ // Close the NSQ producer and consumer
|
|
|
+ gNsqProducer.Stop()
|
|
|
+ gNsqConsumer.Stop()
|
|
|
+
|
|
|
+ // Wait for the goroutines to exit
|
|
|
+ gAppExitWaitGroup.Wait()
|
|
|
+
|
|
|
+ logger.Infof("watch-daemon stopped gracefully")
|
|
|
}
|
|
|
|
|
|
type AppCtx struct {
|
|
@@ -337,7 +603,8 @@ type PartUploadArgs struct {
|
|
|
func main_worker(app AppCtx) {
|
|
|
ctx := context.Background()
|
|
|
|
|
|
- objUploadChan := make(chan string, 1024*256)
|
|
|
+ // objUploadChan := make(chan string, 1024*256)
|
|
|
+ objUploadChan := util.NewDChan[string](1024 * 16)
|
|
|
|
|
|
// Load config from DB
|
|
|
appCfg, err := load_app_cfg_from_db(app.db)
|
|
@@ -355,12 +622,9 @@ func main_worker(app AppCtx) {
|
|
|
ctx, appInitCfg.Minio.Bucket, "", "", []string{string(notification.ObjectCreatedAll)})
|
|
|
|
|
|
// Listen OK, start the full upload trigger to upload maybe missed files
|
|
|
- go trigger_full_upload(app, objUploadChan)
|
|
|
-
|
|
|
- lastSentKey := ""
|
|
|
+ go trigger_full_upload(app, objUploadChan.In())
|
|
|
|
|
|
for notifyInfo := range notifys {
|
|
|
- lastSentKey = ""
|
|
|
for _, record := range notifyInfo.Records {
|
|
|
key := record.S3.Object.Key
|
|
|
logger.Traceln("New object notification:", key)
|
|
@@ -371,11 +635,8 @@ func main_worker(app AppCtx) {
|
|
|
continue
|
|
|
}
|
|
|
key = strings.Join(keyParts[:len(keyParts)-1], "/") + "/"
|
|
|
- if key != lastSentKey {
|
|
|
- lastSentKey = key
|
|
|
- // objUploadChan <- key
|
|
|
- writeToObjChanDeduplicated(key, objUploadChan)
|
|
|
- }
|
|
|
+ // Queue the object for upload
|
|
|
+ objUploadChan.Write(key)
|
|
|
}
|
|
|
if notifyInfo.Err != nil {
|
|
|
logger.Errorf("Bucket notification listener error: %v", notifyInfo.Err)
|
|
@@ -387,8 +648,13 @@ func main_worker(app AppCtx) {
|
|
|
}()
|
|
|
|
|
|
// Start the main loop (streams upload worker)
|
|
|
- for objToUpload := range objUploadChan {
|
|
|
- unmarkObjEmitted(objToUpload)
|
|
|
+ for objToUpload := range objUploadChan.Out() {
|
|
|
+ objUploadChan.MarkElemReadDone(objToUpload)
|
|
|
+
|
|
|
+ if gAppQuitting {
|
|
|
+ logger.Infof("Quitting, stopping main worker")
|
|
|
+ return
|
|
|
+ }
|
|
|
|
|
|
logger.Infoln("Checking stream object:", objToUpload)
|
|
|
if object_is_blacklisted(objToUpload) {
|
|
@@ -408,16 +674,12 @@ func main_worker(app AppCtx) {
|
|
|
fullyUploaded, err := upload_one_stream(app, objToUpload)
|
|
|
if err != nil {
|
|
|
// Queue the object for retry
|
|
|
- logger.Warnf("Failed to upload stream `%s`: `%v`, retrying", objToUpload, err)
|
|
|
+ logger.Warnf("Failed to upload stream `%s`: `%v`, retrying after %d seconds",
|
|
|
+ objToUpload, err, appInitCfg.Main.FailedRetryDelaySeconds)
|
|
|
mutexObjFailCounter.Lock()
|
|
|
objFailCounter[objToUpload]++
|
|
|
mutexObjFailCounter.Unlock()
|
|
|
- go func() {
|
|
|
- markObjEmitted(objToUpload)
|
|
|
- time.Sleep(time.Duration(appInitCfg.Main.FailedRetryDelaySeconds) * time.Second)
|
|
|
- objUploadChan <- objToUpload
|
|
|
- // writeToObjChanDeduplicated(objToUpload, objUploadChan)
|
|
|
- }()
|
|
|
+ objUploadChan.DelayedWrite(objToUpload, time.Duration(appInitCfg.Main.FailedRetryDelaySeconds)*time.Second)
|
|
|
continue
|
|
|
}
|
|
|
|
|
@@ -447,12 +709,7 @@ func main_worker(app AppCtx) {
|
|
|
mutexObjFailCounter.Unlock()
|
|
|
logger.Warnf("Stream %s is not fully uploaded, retrying after %d seconds",
|
|
|
objToUpload, appInitCfg.Main.FailedRetryDelaySeconds)
|
|
|
- go func() {
|
|
|
- markObjEmitted(objToUpload)
|
|
|
- time.Sleep(time.Duration(appInitCfg.Main.FailedRetryDelaySeconds) * time.Second)
|
|
|
- objUploadChan <- objToUpload
|
|
|
- // writeToObjChanDeduplicated(objToUpload, objUploadChan)
|
|
|
- }()
|
|
|
+ objUploadChan.DelayedWrite(objToUpload, time.Duration(appInitCfg.Main.FailedRetryDelaySeconds)*time.Second)
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -486,20 +743,10 @@ func trigger_full_upload(app AppCtx, objToUploadChan chan<- string) {
|
|
|
// Is a directory, should be a stream then
|
|
|
uploaded := object_already_uploaded(app, key)
|
|
|
if !uploaded {
|
|
|
- markObjEmitted(key)
|
|
|
objToUploadChan <- key
|
|
|
- // writeToObjChanDeduplicated(key, objToUploadChan)
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- // folder := strings.Split(key, "/")[0]
|
|
|
- // uploaded := object_already_uploaded(app, folder)
|
|
|
- // if !uploaded {
|
|
|
- // markObjEmitted(folder)
|
|
|
- // objToUploadChan <- folder
|
|
|
- // // writeToObjChanDeduplicated(folder, objToUploadChan)
|
|
|
- // }
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -513,12 +760,13 @@ func upload_one_stream(app AppCtx, streamName string) (fullyUploaded bool, err e
|
|
|
Msg string // "error message"
|
|
|
}
|
|
|
type StreamUploadStatistics struct {
|
|
|
- StartTime time.Time
|
|
|
- EndTime time.Time
|
|
|
- StreamName string
|
|
|
- MetaPartsCount int
|
|
|
- Objects map[string]StreamObjectUploadStatistics
|
|
|
- Msg string
|
|
|
+ StartTime time.Time
|
|
|
+ EndTime time.Time
|
|
|
+ StreamName string
|
|
|
+ MetaPointsCount int64
|
|
|
+ MetaPartsCount int64
|
|
|
+ Objects map[string]StreamObjectUploadStatistics
|
|
|
+ Msg string
|
|
|
}
|
|
|
streamStats := StreamUploadStatistics{
|
|
|
StartTime: time.Now(),
|
|
@@ -535,10 +783,10 @@ func upload_one_stream(app AppCtx, streamName string) (fullyUploaded bool, err e
|
|
|
|
|
|
defer func() {
|
|
|
streamStats.EndTime = time.Now()
|
|
|
- repeatedCount := 0
|
|
|
- okCount := 0
|
|
|
- failCount := 0
|
|
|
- totalCount := 0
|
|
|
+ repeatedCount := int64(0)
|
|
|
+ okCount := int64(0)
|
|
|
+ failCount := int64(0)
|
|
|
+ totalCount := int64(0)
|
|
|
objDetailsMsg := ""
|
|
|
for _, obj := range streamStats.Objects {
|
|
|
totalCount++
|
|
@@ -562,6 +810,15 @@ func upload_one_stream(app AppCtx, streamName string) (fullyUploaded bool, err e
|
|
|
util.FmtMyTime(streamStats.StartTime), util.FmtMyTime(streamStats.EndTime),
|
|
|
fullyUploaded, streamStats.MetaPartsCount,
|
|
|
repeatedCount, okCount, failCount, totalCount, objDetailsMsg)
|
|
|
+
|
|
|
+ // Send upload status changed message
|
|
|
+ msg := smsg.MakeStreamInsertToStckStatusChangedMsg(gMachineID, streamStats.StartTime.UnixNano(),
|
|
|
+ streamStats.StreamName, streamStats.MetaPointsCount, streamStats.MetaPartsCount,
|
|
|
+ okCount, failCount, repeatedCount)
|
|
|
+ err := PublishMessage(&msg)
|
|
|
+ if err != nil {
|
|
|
+ logger.Errorf("send stream insert to stck status changed message error: %s", err)
|
|
|
+ }
|
|
|
}()
|
|
|
|
|
|
fullyUploaded = true
|
|
@@ -572,6 +829,7 @@ func upload_one_stream(app AppCtx, streamName string) (fullyUploaded bool, err e
|
|
|
// Cannot continue without metadata
|
|
|
return false, err
|
|
|
}
|
|
|
+ streamStats.MetaPointsCount = streamInfo.TotalPoints
|
|
|
streamStats.MetaPartsCount = streamInfo.PartsCount
|
|
|
if streamInfo.PartsCount == 0 {
|
|
|
// Edge device didn't finish uploading the stream yet
|
|
@@ -595,12 +853,17 @@ func upload_one_stream(app AppCtx, streamName string) (fullyUploaded bool, err e
|
|
|
return false, objInfo.Err
|
|
|
}
|
|
|
|
|
|
+ if gAppQuitting {
|
|
|
+ logger.Infof("Quitting, stopping uploading one stream")
|
|
|
+ return false, nil
|
|
|
+ }
|
|
|
+
|
|
|
logger.Tracef("Checking minio file `%s`", objInfo.Key)
|
|
|
|
|
|
if strings.HasSuffix(objInfo.Key, "/") {
|
|
|
continue
|
|
|
}
|
|
|
- partName := util.LastElem(strings.Split(objInfo.Key, "/"))
|
|
|
+ partName := filepath.Base(objInfo.Key)
|
|
|
if partName == "metadata.json" {
|
|
|
hasMetadata = true
|
|
|
continue
|
|
@@ -608,15 +871,15 @@ func upload_one_stream(app AppCtx, streamName string) (fullyUploaded bool, err e
|
|
|
|
|
|
hasSomething = true
|
|
|
|
|
|
- partObjUploadStats := StreamObjectUploadStatistics{
|
|
|
+ objStat := StreamObjectUploadStatistics{
|
|
|
StartTime: time.Now(),
|
|
|
PartName: partName,
|
|
|
}
|
|
|
|
|
|
if part_already_uploaded(app, streamName, objInfo.Key) {
|
|
|
- partObjUploadStats.EndTime = time.Now()
|
|
|
- partObjUploadStats.UpState = "repeated"
|
|
|
- streamStats.Objects[objInfo.Key] = partObjUploadStats
|
|
|
+ objStat.EndTime = time.Now()
|
|
|
+ objStat.UpState = "repeated"
|
|
|
+ streamStats.Objects[objInfo.Key] = objStat
|
|
|
|
|
|
logger.Infof("Part `%s` of stream `%s` is already uploaded", objInfo.Key, streamName)
|
|
|
continue
|
|
@@ -635,31 +898,45 @@ func upload_one_stream(app AppCtx, streamName string) (fullyUploaded bool, err e
|
|
|
|
|
|
err := upload_one_part(app, partInfo.StreamInfo, partInfo.StreamName, partInfo.PartName)
|
|
|
if err != nil {
|
|
|
- partObjUploadStats.EndTime = time.Now()
|
|
|
- partObjUploadStats.UpState = "fail"
|
|
|
- partObjUploadStats.Msg = err.Error()
|
|
|
- streamStats.Objects[objInfo.Key] = partObjUploadStats
|
|
|
+ objStat.EndTime = time.Now()
|
|
|
+ objStat.UpState = "fail"
|
|
|
+ objStat.Msg = err.Error()
|
|
|
|
|
|
logger.Warnf("Failed to upload part `%s` of stream `%s` (took %v): %v", partInfo.PartName, partInfo.StreamName,
|
|
|
- partObjUploadStats.EndTime.Sub(partObjUploadStats.StartTime), err)
|
|
|
+ objStat.EndTime.Sub(objStat.StartTime), err)
|
|
|
fullyUploaded = false
|
|
|
- continue
|
|
|
+ } else {
|
|
|
+ // Mark the part as uploaded
|
|
|
+ //err = app.db.Create(&PartUploadRecord{StreamName: partInfo.StreamName, PartName: partInfo.PartName}).Error
|
|
|
+ part := PartUploadRecord{StreamName: partInfo.StreamName, PartName: partInfo.PartName}
|
|
|
+ err = app.db.Where(part).FirstOrCreate(&PartUploadRecord{}).Error
|
|
|
+ if err != nil {
|
|
|
+ logger.Warnf("Failed to mark part `%s` of stream `%s` as uploaded: %v", partInfo.PartName, partInfo.StreamName, err)
|
|
|
+ }
|
|
|
+
|
|
|
+ objStat.EndTime = time.Now()
|
|
|
+ objStat.UpState = "ok"
|
|
|
+
|
|
|
+ logger.Infof("Uploaded part `%s` of stream `%s`, took %v", objInfo.Key, streamName,
|
|
|
+ objStat.EndTime.Sub(objStat.StartTime))
|
|
|
}
|
|
|
|
|
|
- // Mark the part as uploaded
|
|
|
- //err = app.db.Create(&PartUploadRecord{StreamName: partInfo.StreamName, PartName: partInfo.PartName}).Error
|
|
|
- part := PartUploadRecord{StreamName: partInfo.StreamName, PartName: partInfo.PartName}
|
|
|
- err = app.db.Where(part).FirstOrCreate(&PartUploadRecord{}).Error
|
|
|
+ streamStats.Objects[objInfo.Key] = objStat
|
|
|
+ partNum, err := util.ExtractNumberFromString(partName)
|
|
|
if err != nil {
|
|
|
- logger.Warnf("Failed to mark part `%s` of stream `%s` as uploaded: %v", partInfo.PartName, partInfo.StreamName, err)
|
|
|
+ // Not a part file? Skip
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ status := "success"
|
|
|
+ if objStat.UpState != "ok" {
|
|
|
+ status = "failed"
|
|
|
+ }
|
|
|
+ msg := smsg.MakePartInsertToStckStatusChangedMsg(gMachineID, objStat.StartTime.UnixNano(),
|
|
|
+ streamName, partNum, streamStats.MetaPointsCount, status, objStat.Msg)
|
|
|
+ err = PublishMessage(&msg)
|
|
|
+ if err != nil {
|
|
|
+ logger.Errorf("send part insert to stck status changed message error: %s", err)
|
|
|
}
|
|
|
-
|
|
|
- partObjUploadStats.EndTime = time.Now()
|
|
|
- partObjUploadStats.UpState = "ok"
|
|
|
- streamStats.Objects[objInfo.Key] = partObjUploadStats
|
|
|
-
|
|
|
- logger.Infof("Uploaded part `%s` of stream `%s`, took %v", objInfo.Key, streamName,
|
|
|
- partObjUploadStats.EndTime.Sub(partObjUploadStats.StartTime))
|
|
|
}
|
|
|
if !hasMetadata {
|
|
|
logger.Warnf("Stream `%s` has no metadata file, will retry later", streamName)
|
|
@@ -707,14 +984,14 @@ func upload_one_part(app AppCtx, streamInfo *StreamMetadata, streamName string,
|
|
|
}
|
|
|
|
|
|
// Use regex to extract the part index from the part name
|
|
|
- partIndex := 0
|
|
|
+ partIndex := int64(0)
|
|
|
{
|
|
|
re := regexp.MustCompile(`part_(\d+)\.zst`)
|
|
|
matches := re.FindStringSubmatch(partName)
|
|
|
if len(matches) != 2 {
|
|
|
return fmt.Errorf("failed to extract part index from part name `%s`", partName)
|
|
|
}
|
|
|
- partIndex, err = strconv.Atoi(matches[1])
|
|
|
+ partIndex, err = strconv.ParseInt(matches[1], 10, 64)
|
|
|
if err != nil {
|
|
|
return fmt.Errorf("failed to convert part index `%s` to integer: %w", matches[1], err)
|
|
|
}
|
|
@@ -726,7 +1003,7 @@ func upload_one_part(app AppCtx, streamInfo *StreamMetadata, streamName string,
|
|
|
|
|
|
// Check if the part data size is correct
|
|
|
if streamInfo.PartsCount != 0 {
|
|
|
- left := len(partData)
|
|
|
+ left := int64(len(partData))
|
|
|
if partIndex < streamInfo.PartsCount-1 {
|
|
|
right := streamInfo.PointsPerPart * 8
|
|
|
if left != right {
|