|
@@ -8,11 +8,8 @@ import (
|
|
|
"example/minio-into-stck/util"
|
|
|
"fmt"
|
|
|
"os"
|
|
|
- "os/exec"
|
|
|
"os/signal"
|
|
|
"path/filepath"
|
|
|
- "stck/stck-nsq-msg"
|
|
|
- smsg "stck/stck-nsq-msg/msg"
|
|
|
"sync"
|
|
|
"syscall"
|
|
|
|
|
@@ -32,7 +29,6 @@ import (
|
|
|
"github.com/minio/minio-go/v7/pkg/credentials"
|
|
|
"github.com/minio/minio-go/v7/pkg/notification"
|
|
|
"github.com/natefinch/lumberjack"
|
|
|
- "github.com/nsqio/go-nsq"
|
|
|
"github.com/sirupsen/logrus"
|
|
|
"github.com/spf13/viper"
|
|
|
"gorm.io/driver/mysql"
|
|
@@ -58,10 +54,6 @@ type AppInitConfig struct {
|
|
|
Host string `mapstructure:"host"`
|
|
|
Table string `mapstructure:"table"`
|
|
|
} `mapstructure:"stck"`
|
|
|
- Nsq struct {
|
|
|
- TcpAddr string `mapstructure:"tcpAddr"`
|
|
|
- LookupdHttpAddr string `mapstructure:"lookupdHttpAddr"`
|
|
|
- } `mapstructure:"nsq"`
|
|
|
Main struct {
|
|
|
UploadRetryMaxTimes int `mapstructure:"uploadRetryMaxTimes"`
|
|
|
FailedRetryDelaySeconds int `mapstructure:"failedRetryDelaySeconds"`
|
|
@@ -103,231 +95,12 @@ type StreamMetadata struct {
|
|
|
TotalPoints int64 `json:"total_points"`
|
|
|
}
|
|
|
|
|
|
-var gLocalIpAddress string
|
|
|
-var gMachineID string
|
|
|
-var gNsqProducer *nsq.Producer
|
|
|
-var gNsqConsumer *nsq.Consumer
|
|
|
-var gAppStartTime time.Time = time.Now()
|
|
|
var programmaticQuitChan chan struct{} = make(chan struct{}, 1)
|
|
|
var gAppQuitting = false
|
|
|
var gAppExitWaitGroup sync.WaitGroup
|
|
|
|
|
|
var buildtime string
|
|
|
|
|
|
-func MakeHeartbeatMsg() *smsg.DeviceHeartbeatMsg {
|
|
|
- m := smsg.MakeDeviceHeartbeatMsg(gMachineID, gAppStartTime.UnixNano(), time.Now().UnixNano(),
|
|
|
- gLocalIpAddress, "minio-into-stck", "0.1.0+dev."+buildtime)
|
|
|
- return &m
|
|
|
-}
|
|
|
-
|
|
|
-func PublishMessage(msg stcknsqmsg.StckNsqMsgVariant) error {
|
|
|
- payload, err := stcknsqmsg.ToStckNsqMsgString(msg)
|
|
|
- if err != nil {
|
|
|
- return fmt.Errorf("marshal message error: %w", err)
|
|
|
- }
|
|
|
- err = gNsqProducer.Publish(smsg.ServerDoNotifyTopic, []byte(payload))
|
|
|
- if err != nil {
|
|
|
- return fmt.Errorf("publish message error: %w", err)
|
|
|
- }
|
|
|
- return nil
|
|
|
-}
|
|
|
-
|
|
|
-func initNsq() {
|
|
|
- ip, err := stcknsqmsg.GetLocalIP()
|
|
|
- if err != nil {
|
|
|
- logger.Warnf("GetLocalIP error: %s", err)
|
|
|
- } else {
|
|
|
- gLocalIpAddress = ip.String()
|
|
|
- logger.Infof("Local IP: %s", gLocalIpAddress)
|
|
|
- }
|
|
|
- gMachineID = stcknsqmsg.MakeUniqueMachineID()
|
|
|
- logger.Infof("Machine ID: %s", gMachineID)
|
|
|
-
|
|
|
- fmt.Printf("IP: %s, Machine ID: %s\n", gLocalIpAddress, gMachineID)
|
|
|
-
|
|
|
- // Connect to NSQ
|
|
|
- nsqConfig := nsq.NewConfig()
|
|
|
- gNsqProducer, err = nsq.NewProducer(appInitCfg.Nsq.TcpAddr, nsqConfig)
|
|
|
- if err != nil {
|
|
|
- logger.Fatalf("NSQ Producer init error: %s", err)
|
|
|
- }
|
|
|
- gNsqConsumer, err = nsq.NewConsumer(smsg.ClientDoActionTopic, gMachineID, nsqConfig)
|
|
|
- if err != nil {
|
|
|
- logger.Fatalf("NSQ Consumer init error: %s", err)
|
|
|
- }
|
|
|
- gNsqConsumer.AddConcurrentHandlers(nsq.HandlerFunc(func(message *nsq.Message) error {
|
|
|
- logger.Debugf("NSQ Consumer received message: %s", message.Body)
|
|
|
-
|
|
|
- // Parse the message
|
|
|
- recvTime := message.Timestamp
|
|
|
- msg, err := stcknsqmsg.FromString(string(message.Body))
|
|
|
- if err != nil {
|
|
|
- logger.Errorf("NSQ Consumer unmarshal message error: %s", err)
|
|
|
- return nil
|
|
|
- }
|
|
|
-
|
|
|
- // Process the message
|
|
|
- switch data := msg.Data.(type) {
|
|
|
- case *smsg.DevicePingMsg:
|
|
|
- if !stcknsqmsg.DeviceIdMatches(data.DeviceID, gMachineID) {
|
|
|
- break
|
|
|
- }
|
|
|
- // Write pong
|
|
|
- pongMsg := smsg.MakeDevicePongMsg(gMachineID, recvTime)
|
|
|
- err := PublishMessage(&pongMsg)
|
|
|
- if err != nil {
|
|
|
- logger.Errorf("send pong error: %s", err)
|
|
|
- }
|
|
|
- case *smsg.RequestDeviceExecuteShellScriptMsg:
|
|
|
- if !stcknsqmsg.DeviceIdMatches(data.DeviceID, gMachineID) {
|
|
|
- break
|
|
|
- }
|
|
|
- // Execute the shell script
|
|
|
- resp := smsg.MakeDeviceActionDoneMsg(gMachineID, data.MsgType(), "", recvTime, -1, "")
|
|
|
- result, err := func(data *smsg.RequestDeviceExecuteShellScriptMsg) (string, error) {
|
|
|
- cmd := exec.Command("bash", "-c", data.Script)
|
|
|
- var out bytes.Buffer
|
|
|
- cmd.Stdout = &out
|
|
|
- cmd.Stderr = &out
|
|
|
- err := cmd.Run()
|
|
|
- return out.String(), err
|
|
|
- }(data)
|
|
|
- if err != nil {
|
|
|
- errMsg := fmt.Sprintf("execute shell script error:\n%s\n\noutput:\n%s", err, result)
|
|
|
- // Write error message
|
|
|
- resp.Status = -1
|
|
|
- resp.Msg = errMsg
|
|
|
- } else {
|
|
|
- // Write output
|
|
|
- resp.Status = 0
|
|
|
- resp.Msg = result
|
|
|
- }
|
|
|
- err = PublishMessage(&resp)
|
|
|
- if err != nil {
|
|
|
- logger.Errorf("send action done error: %s", err)
|
|
|
- }
|
|
|
- case *smsg.RequestDeviceUpdateMsg:
|
|
|
- if !stcknsqmsg.DeviceIdMatches(data.DeviceID, gMachineID) {
|
|
|
- break
|
|
|
- }
|
|
|
- if data.ServiceName != "minio-into-stck" {
|
|
|
- break
|
|
|
- }
|
|
|
- resp := smsg.MakeDeviceActionDoneMsg(gMachineID, data.MsgType(), "", recvTime, 0, "")
|
|
|
- result, err := func(data *smsg.RequestDeviceUpdateMsg) (string, error) {
|
|
|
- // Download the update file to /tmp
|
|
|
- downloadPath := "/tmp/minio-into-stck-updater"
|
|
|
- updateScriptPath := filepath.Join(downloadPath, "update.sh")
|
|
|
- var out bytes.Buffer
|
|
|
- err := os.RemoveAll(downloadPath)
|
|
|
- if err != nil {
|
|
|
- return "", err
|
|
|
- }
|
|
|
- err = os.MkdirAll(downloadPath, 0777)
|
|
|
- if err != nil {
|
|
|
- return "", err
|
|
|
- }
|
|
|
- updateScriptContent := fmt.Sprintf(`#!/bin/bash
|
|
|
-set -e
|
|
|
-cd %s
|
|
|
-wget --tries=3 -nv -O installer.tar.gz %s
|
|
|
-tar -xzf installer.tar.gz
|
|
|
-cd minio-into-stck-installer
|
|
|
-./replacing-update.sh
|
|
|
-`, downloadPath, data.ServiceBinaryURL)
|
|
|
- err = os.WriteFile(updateScriptPath, []byte(updateScriptContent), 0777)
|
|
|
- if err != nil {
|
|
|
- return "", err
|
|
|
- }
|
|
|
- // Execute the update script
|
|
|
- cmd := exec.Command("bash", "-c", updateScriptPath)
|
|
|
- cmd.Stdout = &out
|
|
|
- cmd.Stderr = &out
|
|
|
- err = cmd.Run()
|
|
|
- return out.String(), err
|
|
|
- }(data)
|
|
|
- if err != nil {
|
|
|
- errMsg := fmt.Sprintf("execute update process error:\n%s\n\noutput:\n%s", err, result)
|
|
|
- // Write error message
|
|
|
- resp.Status = -1
|
|
|
- resp.Msg = errMsg
|
|
|
- } else {
|
|
|
- // Write output
|
|
|
- resp.Status = 0
|
|
|
- resp.Msg = "executed update process successfully\n\noutput:\n" + result
|
|
|
- }
|
|
|
- err = PublishMessage(&resp)
|
|
|
- if err != nil {
|
|
|
- logger.Errorf("send action done error: %s", err)
|
|
|
- }
|
|
|
- case *smsg.ForceDeviceRebootMsg:
|
|
|
- if !stcknsqmsg.DeviceIdMatches(data.DeviceID, gMachineID) {
|
|
|
- break
|
|
|
- }
|
|
|
- programmaticQuitChan <- struct{}{}
|
|
|
- case *smsg.ForceDeviceSendHeartbeatMsg:
|
|
|
- if !stcknsqmsg.DeviceIdMatches(data.DeviceID, gMachineID) {
|
|
|
- break
|
|
|
- }
|
|
|
- // Send heartbeat
|
|
|
- heartBeatMsg := MakeHeartbeatMsg()
|
|
|
- err := PublishMessage(heartBeatMsg)
|
|
|
- if err != nil {
|
|
|
- logger.Errorf("send heartbeat error: %s", err)
|
|
|
- }
|
|
|
- case *smsg.RequestDeviceUploadLogsToMinioMsg:
|
|
|
- if !stcknsqmsg.DeviceIdMatches(data.DeviceID, gMachineID) {
|
|
|
- break
|
|
|
- }
|
|
|
- // Upload logs to MinIO
|
|
|
- resp := smsg.MakeDeviceActionDoneMsg(gMachineID, data.MsgType(), "", recvTime, 0, "")
|
|
|
- minioCreds := credentials.NewStaticV4(appInitCfg.Minio.AccessKey, appInitCfg.Minio.Secret, "")
|
|
|
- minioClient, err := minio.New(appInitCfg.Minio.Host, &minio.Options{
|
|
|
- Creds: minioCreds,
|
|
|
- Secure: false,
|
|
|
- })
|
|
|
- if err != nil {
|
|
|
- logger.Errorf("MinIO Client init error: %s", err)
|
|
|
- resp.Status = -1
|
|
|
- resp.Msg += fmt.Sprintf("MinIO Client init error: %s\n", err)
|
|
|
- } else {
|
|
|
- if util.FileExists("./logs") {
|
|
|
- err = util.MinioUploadFolder(minioClient, data.RemoteBucket,
|
|
|
- filepath.Join(data.RemoteBasePath, gMachineID, "logs"), "logs")
|
|
|
- if err != nil {
|
|
|
- logger.Errorf("upload logs to MinIO error: %s", err)
|
|
|
- resp.Status = -1
|
|
|
- resp.Msg += fmt.Sprintf("upload logs to MinIO error: %s\n", err)
|
|
|
- }
|
|
|
- }
|
|
|
- if util.FileExists("./log") {
|
|
|
- err = util.MinioUploadFolder(minioClient, data.RemoteBucket,
|
|
|
- filepath.Join(data.RemoteBasePath, gMachineID, "log"), "log")
|
|
|
- if err != nil {
|
|
|
- logger.Errorf("upload log to MinIO error: %s", err)
|
|
|
- resp.Status = -1
|
|
|
- resp.Msg += fmt.Sprintf("upload log to MinIO error: %s\n", err)
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- err = PublishMessage(&resp)
|
|
|
- if err != nil {
|
|
|
- logger.Errorf("send action done error: %s", err)
|
|
|
- }
|
|
|
- default:
|
|
|
- logger.Debugf("NSQ Consumer ignored unknown or uninteresting message: %v", msg)
|
|
|
- }
|
|
|
-
|
|
|
- // Notify NSQ that the message is processed successfully
|
|
|
- return nil
|
|
|
- }), 1)
|
|
|
- // err = gNsqConsumer.ConnectToNSQLookupd(gAppConfig.Nsq.LookupdHttpAddr)
|
|
|
- err = gNsqConsumer.ConnectToNSQD(appInitCfg.Nsq.TcpAddr)
|
|
|
- if err != nil {
|
|
|
- logger.Fatalf("NSQ Consumer connect error: %s", err)
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
var appInitCfg *AppInitConfig = &AppInitConfig{}
|
|
|
|
|
|
func initLoadConfig() {
|
|
@@ -450,9 +223,6 @@ func main() {
|
|
|
// Load configuration from file
|
|
|
initLoadConfig()
|
|
|
|
|
|
- // 初始化 NSQ
|
|
|
- initNsq()
|
|
|
-
|
|
|
var db *gorm.DB
|
|
|
var minioClient *minio.Client
|
|
|
var ckConn driver.Conn
|
|
@@ -547,10 +317,6 @@ func main() {
|
|
|
// HACK: Notify the main worker to quit
|
|
|
objUploadChan.In() <- ""
|
|
|
|
|
|
- // Close the NSQ producer and consumer
|
|
|
- gNsqProducer.Stop()
|
|
|
- gNsqConsumer.Stop()
|
|
|
-
|
|
|
// Wait for the goroutines to exit
|
|
|
gAppExitWaitGroup.Wait()
|
|
|
|
|
@@ -849,15 +615,6 @@ func upload_one_stream(app AppCtx, streamName string) (fullyUploaded bool, err e
|
|
|
util.FmtMyTime(streamStats.StartTime), util.FmtMyTime(streamStats.EndTime),
|
|
|
fullyUploaded, streamStats.MetaPartsCount,
|
|
|
repeatedCount, okCount, failCount, totalCount, objDetailsMsg)
|
|
|
-
|
|
|
- // Send upload status changed message
|
|
|
- msg := smsg.MakeStreamInsertToStckStatusChangedMsg(gMachineID, streamStats.StartTime.UnixNano(),
|
|
|
- streamStats.StreamName, streamStats.MetaPointsCount, streamStats.MetaPartsCount,
|
|
|
- okCount, failCount, repeatedCount)
|
|
|
- err := PublishMessage(&msg)
|
|
|
- if err != nil {
|
|
|
- logger.Errorf("send stream insert to stck status changed message error: %s", err)
|
|
|
- }
|
|
|
}()
|
|
|
|
|
|
fullyUploaded = true
|
|
@@ -961,21 +718,6 @@ func upload_one_stream(app AppCtx, streamName string) (fullyUploaded bool, err e
|
|
|
}
|
|
|
|
|
|
streamStats.Objects[objInfo.Key] = objStat
|
|
|
- partNum, err := util.ExtractNumberFromString(partName)
|
|
|
- if err != nil {
|
|
|
- // Not a part file? Skip
|
|
|
- continue
|
|
|
- }
|
|
|
- status := "success"
|
|
|
- if objStat.UpState != "ok" {
|
|
|
- status = "failed"
|
|
|
- }
|
|
|
- msg := smsg.MakePartInsertToStckStatusChangedMsg(gMachineID, objStat.StartTime.UnixNano(),
|
|
|
- streamName, partNum, streamStats.MetaPointsCount, status, objStat.Msg)
|
|
|
- err = PublishMessage(&msg)
|
|
|
- if err != nil {
|
|
|
- logger.Errorf("send part insert to stck status changed message error: %s", err)
|
|
|
- }
|
|
|
}
|
|
|
if !hasMetadata {
|
|
|
logger.Warnf("Stream `%s` has no metadata file, will retry later", streamName)
|