package main import ( "bufio" "fmt" "os" "stck/stck-nsq-msg" smsg "stck/stck-nsq-msg/msg" "strings" "time" // "github.com/natefinch/lumberjack" "github.com/nsqio/go-nsq" "github.com/sirupsen/logrus" // "github.com/spf13/viper" ) type AppConfig struct { Nsq struct { TcpAddr string `mapstructure:"tcp_addr"` LookupdHttpAddr string `mapstructure:"lookupd_http_addr"` } `mapstructure:"nsq"` Mysql struct { User string `mapstructure:"user"` Password string `mapstructure:"password"` Host string `mapstructure:"host"` Database string `mapstructure:"database"` } `mapstructure:"mysql"` } var logger *logrus.Logger var gAppConfig *AppConfig var gNsqProducer *nsq.Producer var gNsqConsumer *nsq.Consumer func PublishMessage(msg stcknsqmsg.StckNsqMsgVariant) error { payload, err := stcknsqmsg.ToStckNsqMsgString(msg) if err != nil { return fmt.Errorf("marshal message error: %w", err) } err = gNsqProducer.Publish(smsg.ClientDoActionTopic, []byte(payload)) if err != nil { return fmt.Errorf("publish message error: %w", err) } return nil } func main() { var err error // Init logger initLog() gAppConfig = &AppConfig{} gAppConfig.Nsq.TcpAddr = "localhost:4150" gAppConfig.Nsq.LookupdHttpAddr = "localhost:4161" // Init NSQ nsqConfig := nsq.NewConfig() gNsqProducer, err = nsq.NewProducer(gAppConfig.Nsq.TcpAddr, nsqConfig) if err != nil { logger.Fatalf("NSQ Producer init error: %s", err) } gNsqConsumer, err = nsq.NewConsumer(smsg.ServerDoNotifyTopic, "main-bastion-1", nsqConfig) if err != nil { logger.Fatalf("NSQ Consumer init error: %s", err) } gNsqConsumer.AddConcurrentHandlers(nsq.HandlerFunc(func(message *nsq.Message) error { logger.Debugf("NSQ Consumer received message: %s", message.Body) // Parse the message recvTime := message.Timestamp msg, err := stcknsqmsg.FromString(string(message.Body)) if err != nil { logger.Errorf("NSQ Consumer unmarshal message error: %s", err) return nil } // Process the message switch data := msg.Data.(type) { case *smsg.DeviceHeartbeatMsg: if time.Since(time.Unix(0, recvTime)) > time.Second*60 { logger.Debugf("NSQ Consumer ignored DeviceHeartbeatMsg: %v", data) break } logger.Infof("NSQ Consumer received DeviceHeartbeatMsg: %v", data) case *smsg.DeviceActionDoneMsg: logger.Infof("NSQ Consumer received DeviceActionDoneMsg: %v", data) default: logger.Debugf("NSQ Consumer ignored unknown or uninteresting message: %v", msg) } return nil }), 1) err = gNsqConsumer.ConnectToNSQD(gAppConfig.Nsq.TcpAddr) if err != nil { logger.Fatalf("NSQ Consumer connect error: %s", err) } // Enter CLI loop fmt.Print("Welcome to stck bastion CLI.\n\n") scanner := bufio.NewScanner(os.Stdin) for scanner.Scan() { fmt.Print("stck> ") var cmd string cmd = scanner.Text() cmd = strings.TrimSpace(cmd) cmds := strings.Split(cmd, " ") switch cmds[0] { case "": continue case "exit": goto Exit case "help": fmt.Println("Commands: exit, help") case "reboot": // Send a message to NSQ msg := smsg.MakeForceDeviceRebootMsg("*") err := PublishMessage(&msg) if err != nil { logger.Errorf("Publish message error: %s", err) } case "runcmd": // Send a message to NSQ cmd := strings.TrimSpace(strings.TrimPrefix(cmd, "runcmd")) if cmd == "" { cmd = "echo Hello\necho World!" } msg := smsg.MakeRequestDeviceExecuteShellScriptMsg("*", cmd) err := PublishMessage(&msg) if err != nil { logger.Errorf("Publish message error: %s", err) } case "update": if len(cmds) < 3 { fmt.Println("Usage: update ") continue } serviceName := cmds[1] binaryUrl := cmds[2] // http://localhost:19010/api/v1/buckets/app-updater/objects/download?prefix=watch-daemon-universal-installer-v2025-01-07.tar.gz msg := smsg.MakeRequestDeviceUpdateMsg("*", serviceName, "0.1.0", binaryUrl, "") err := PublishMessage(&msg) if err != nil { logger.Errorf("Publish message error: %s", err) } case "upload-log": msg := smsg.MakeRequestDeviceUploadLogsToMinioMsg("*", "app-logs", "logs") err := PublishMessage(&msg) if err != nil { logger.Errorf("Publish message error: %s", err) } default: fmt.Println("Unknown command. Type 'help' to see available commands.") } } Exit: fmt.Println("Bye :)") // Close NSQ gNsqConsumer.Stop() gNsqProducer.Stop() } func initLog() { logger = logrus.New() envLevel := strings.ToLower(os.Getenv("LOG_LEVEL")) if envLevel == "trace" { logger.SetLevel(logrus.TraceLevel) } else if envLevel == "debug" { logger.SetLevel(logrus.DebugLevel) } else { logger.SetLevel(logrus.InfoLevel) } logger.Out = os.Stdout }