|
@@ -0,0 +1,176 @@
|
|
|
+package main
|
|
|
+
|
|
|
+import (
|
|
|
+ "bufio"
|
|
|
+ "fmt"
|
|
|
+ "os"
|
|
|
+ "stck/stck-nsq-msg"
|
|
|
+ smsg "stck/stck-nsq-msg/msg"
|
|
|
+ "strings"
|
|
|
+ "time"
|
|
|
+
|
|
|
+ // "github.com/natefinch/lumberjack"
|
|
|
+ "github.com/nsqio/go-nsq"
|
|
|
+ "github.com/sirupsen/logrus"
|
|
|
+ // "github.com/spf13/viper"
|
|
|
+)
|
|
|
+
|
|
|
+type AppConfig struct {
|
|
|
+ Nsq struct {
|
|
|
+ TcpAddr string `mapstructure:"tcp_addr"`
|
|
|
+ LookupdHttpAddr string `mapstructure:"lookupd_http_addr"`
|
|
|
+ } `mapstructure:"nsq"`
|
|
|
+ Mysql struct {
|
|
|
+ User string `mapstructure:"user"`
|
|
|
+ Password string `mapstructure:"password"`
|
|
|
+ Host string `mapstructure:"host"`
|
|
|
+ Database string `mapstructure:"database"`
|
|
|
+ } `mapstructure:"mysql"`
|
|
|
+}
|
|
|
+
|
|
|
+var logger *logrus.Logger
|
|
|
+var gAppConfig *AppConfig
|
|
|
+var gNsqProducer *nsq.Producer
|
|
|
+var gNsqConsumer *nsq.Consumer
|
|
|
+
|
|
|
+func PublishMessage(msg stcknsqmsg.StckNsqMsgVariant) error {
|
|
|
+ payload, err := stcknsqmsg.ToStckNsqMsgString(msg)
|
|
|
+ if err != nil {
|
|
|
+ return fmt.Errorf("marshal message error: %w", err)
|
|
|
+ }
|
|
|
+ err = gNsqProducer.Publish(smsg.ClientDoActionTopic, []byte(payload))
|
|
|
+ if err != nil {
|
|
|
+ return fmt.Errorf("publish message error: %w", err)
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+func main() {
|
|
|
+ var err error
|
|
|
+
|
|
|
+ // Init logger
|
|
|
+ initLog()
|
|
|
+
|
|
|
+ gAppConfig = &AppConfig{}
|
|
|
+ gAppConfig.Nsq.TcpAddr = "localhost:4150"
|
|
|
+ gAppConfig.Nsq.LookupdHttpAddr = "localhost:4161"
|
|
|
+
|
|
|
+ // Init NSQ
|
|
|
+ nsqConfig := nsq.NewConfig()
|
|
|
+ gNsqProducer, err = nsq.NewProducer(gAppConfig.Nsq.TcpAddr, nsqConfig)
|
|
|
+ if err != nil {
|
|
|
+ logger.Fatalf("NSQ Producer init error: %s", err)
|
|
|
+ }
|
|
|
+ gNsqConsumer, err = nsq.NewConsumer(smsg.ServerDoNotifyTopic, "main-bastion-1", nsqConfig)
|
|
|
+ if err != nil {
|
|
|
+ logger.Fatalf("NSQ Consumer init error: %s", err)
|
|
|
+ }
|
|
|
+ gNsqConsumer.AddConcurrentHandlers(nsq.HandlerFunc(func(message *nsq.Message) error {
|
|
|
+ logger.Debugf("NSQ Consumer received message: %s", message.Body)
|
|
|
+
|
|
|
+ // Parse the message
|
|
|
+ recvTime := message.Timestamp
|
|
|
+ msg, err := stcknsqmsg.FromString(string(message.Body))
|
|
|
+ if err != nil {
|
|
|
+ logger.Errorf("NSQ Consumer unmarshal message error: %s", err)
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+
|
|
|
+ // Process the message
|
|
|
+ switch data := msg.Data.(type) {
|
|
|
+ case *smsg.DeviceHeartbeatMsg:
|
|
|
+ if time.Since(time.Unix(0, recvTime)) > time.Second*60 {
|
|
|
+ logger.Debugf("NSQ Consumer ignored DeviceHeartbeatMsg: %v", data)
|
|
|
+ break
|
|
|
+ }
|
|
|
+ logger.Infof("NSQ Consumer received DeviceHeartbeatMsg: %v", data)
|
|
|
+ case *smsg.DeviceActionDoneMsg:
|
|
|
+ logger.Infof("NSQ Consumer received DeviceActionDoneMsg: %v", data)
|
|
|
+ default:
|
|
|
+ logger.Debugf("NSQ Consumer ignored unknown or uninteresting message: %v", msg)
|
|
|
+ }
|
|
|
+
|
|
|
+ return nil
|
|
|
+ }), 1)
|
|
|
+ err = gNsqConsumer.ConnectToNSQD(gAppConfig.Nsq.TcpAddr)
|
|
|
+ if err != nil {
|
|
|
+ logger.Fatalf("NSQ Consumer connect error: %s", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ // Enter CLI loop
|
|
|
+ fmt.Print("Welcome to stck bastion CLI.\n\n")
|
|
|
+ scanner := bufio.NewScanner(os.Stdin)
|
|
|
+ for scanner.Scan() {
|
|
|
+ fmt.Print("stck> ")
|
|
|
+ var cmd string
|
|
|
+ cmd = scanner.Text()
|
|
|
+ cmd = strings.TrimSpace(cmd)
|
|
|
+ cmds := strings.Split(cmd, " ")
|
|
|
+ switch cmds[0] {
|
|
|
+ case "":
|
|
|
+ continue
|
|
|
+ case "exit":
|
|
|
+ goto Exit
|
|
|
+ case "help":
|
|
|
+ fmt.Println("Commands: exit, help")
|
|
|
+ case "reboot":
|
|
|
+ // Send a message to NSQ
|
|
|
+ msg := smsg.MakeForceDeviceRebootMsg("*")
|
|
|
+ err := PublishMessage(&msg)
|
|
|
+ if err != nil {
|
|
|
+ logger.Errorf("Publish message error: %s", err)
|
|
|
+ }
|
|
|
+ case "runcmd":
|
|
|
+ // Send a message to NSQ
|
|
|
+ cmd := strings.TrimSpace(strings.TrimPrefix(cmd, "runcmd"))
|
|
|
+ if cmd == "" {
|
|
|
+ cmd = "echo Hello\necho World!"
|
|
|
+ }
|
|
|
+ msg := smsg.MakeRequestDeviceExecuteShellScriptMsg("*", cmd)
|
|
|
+ err := PublishMessage(&msg)
|
|
|
+ if err != nil {
|
|
|
+ logger.Errorf("Publish message error: %s", err)
|
|
|
+ }
|
|
|
+ case "update":
|
|
|
+ if len(cmds) < 3 {
|
|
|
+ fmt.Println("Usage: update <service_name> <binary_url>")
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ serviceName := cmds[1]
|
|
|
+ binaryUrl := cmds[2]
|
|
|
+ // http://localhost:19010/api/v1/buckets/app-updater/objects/download?prefix=watch-daemon-universal-installer-v2025-01-07.tar.gz
|
|
|
+ msg := smsg.MakeRequestDeviceUpdateMsg("*", serviceName, "0.1.0", binaryUrl, "")
|
|
|
+ err := PublishMessage(&msg)
|
|
|
+ if err != nil {
|
|
|
+ logger.Errorf("Publish message error: %s", err)
|
|
|
+ }
|
|
|
+ case "upload-log":
|
|
|
+ msg := smsg.MakeRequestDeviceUploadLogsToMinioMsg("*", "app-logs", "logs")
|
|
|
+ err := PublishMessage(&msg)
|
|
|
+ if err != nil {
|
|
|
+ logger.Errorf("Publish message error: %s", err)
|
|
|
+ }
|
|
|
+ default:
|
|
|
+ fmt.Println("Unknown command. Type 'help' to see available commands.")
|
|
|
+ }
|
|
|
+ }
|
|
|
+Exit:
|
|
|
+ fmt.Println("Bye :)")
|
|
|
+
|
|
|
+ // Close NSQ
|
|
|
+ gNsqConsumer.Stop()
|
|
|
+ gNsqProducer.Stop()
|
|
|
+}
|
|
|
+
|
|
|
+func initLog() {
|
|
|
+ logger = logrus.New()
|
|
|
+ envLevel := strings.ToLower(os.Getenv("LOG_LEVEL"))
|
|
|
+ if envLevel == "trace" {
|
|
|
+ logger.SetLevel(logrus.TraceLevel)
|
|
|
+ } else if envLevel == "debug" {
|
|
|
+ logger.SetLevel(logrus.DebugLevel)
|
|
|
+ } else {
|
|
|
+ logger.SetLevel(logrus.InfoLevel)
|
|
|
+ }
|
|
|
+ logger.Out = os.Stdout
|
|
|
+}
|