123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176 |
- package main
- import (
- "bufio"
- "fmt"
- "os"
- "stck/stck-nsq-msg"
- smsg "stck/stck-nsq-msg/msg"
- "strings"
- "time"
- // "github.com/natefinch/lumberjack"
- "github.com/nsqio/go-nsq"
- "github.com/sirupsen/logrus"
- // "github.com/spf13/viper"
- )
- type AppConfig struct {
- Nsq struct {
- TcpAddr string `mapstructure:"tcp_addr"`
- LookupdHttpAddr string `mapstructure:"lookupd_http_addr"`
- } `mapstructure:"nsq"`
- Mysql struct {
- User string `mapstructure:"user"`
- Password string `mapstructure:"password"`
- Host string `mapstructure:"host"`
- Database string `mapstructure:"database"`
- } `mapstructure:"mysql"`
- }
- var logger *logrus.Logger
- var gAppConfig *AppConfig
- var gNsqProducer *nsq.Producer
- var gNsqConsumer *nsq.Consumer
- func PublishMessage(msg stcknsqmsg.StckNsqMsgVariant) error {
- payload, err := stcknsqmsg.ToStckNsqMsgString(msg)
- if err != nil {
- return fmt.Errorf("marshal message error: %w", err)
- }
- err = gNsqProducer.Publish(smsg.ClientDoActionTopic, []byte(payload))
- if err != nil {
- return fmt.Errorf("publish message error: %w", err)
- }
- return nil
- }
- func main() {
- var err error
- // Init logger
- initLog()
- gAppConfig = &AppConfig{}
- gAppConfig.Nsq.TcpAddr = "localhost:4150"
- gAppConfig.Nsq.LookupdHttpAddr = "localhost:4161"
- // Init NSQ
- nsqConfig := nsq.NewConfig()
- gNsqProducer, err = nsq.NewProducer(gAppConfig.Nsq.TcpAddr, nsqConfig)
- if err != nil {
- logger.Fatalf("NSQ Producer init error: %s", err)
- }
- gNsqConsumer, err = nsq.NewConsumer(smsg.ServerDoNotifyTopic, "main-bastion-1", nsqConfig)
- if err != nil {
- logger.Fatalf("NSQ Consumer init error: %s", err)
- }
- gNsqConsumer.AddConcurrentHandlers(nsq.HandlerFunc(func(message *nsq.Message) error {
- logger.Debugf("NSQ Consumer received message: %s", message.Body)
- // Parse the message
- recvTime := message.Timestamp
- msg, err := stcknsqmsg.FromString(string(message.Body))
- if err != nil {
- logger.Errorf("NSQ Consumer unmarshal message error: %s", err)
- return nil
- }
- // Process the message
- switch data := msg.Data.(type) {
- case *smsg.DeviceHeartbeatMsg:
- if time.Since(time.Unix(0, recvTime)) > time.Second*60 {
- logger.Debugf("NSQ Consumer ignored DeviceHeartbeatMsg: %v", data)
- break
- }
- logger.Infof("NSQ Consumer received DeviceHeartbeatMsg: %v", data)
- case *smsg.DeviceActionDoneMsg:
- logger.Infof("NSQ Consumer received DeviceActionDoneMsg: %v", data)
- default:
- logger.Debugf("NSQ Consumer ignored unknown or uninteresting message: %v", msg)
- }
- return nil
- }), 1)
- err = gNsqConsumer.ConnectToNSQD(gAppConfig.Nsq.TcpAddr)
- if err != nil {
- logger.Fatalf("NSQ Consumer connect error: %s", err)
- }
- // Enter CLI loop
- fmt.Print("Welcome to stck bastion CLI.\n\n")
- scanner := bufio.NewScanner(os.Stdin)
- for scanner.Scan() {
- fmt.Print("stck> ")
- var cmd string
- cmd = scanner.Text()
- cmd = strings.TrimSpace(cmd)
- cmds := strings.Split(cmd, " ")
- switch cmds[0] {
- case "":
- continue
- case "exit":
- goto Exit
- case "help":
- fmt.Println("Commands: exit, help")
- case "reboot":
- // Send a message to NSQ
- msg := smsg.MakeForceDeviceRebootMsg("*")
- err := PublishMessage(&msg)
- if err != nil {
- logger.Errorf("Publish message error: %s", err)
- }
- case "runcmd":
- // Send a message to NSQ
- cmd := strings.TrimSpace(strings.TrimPrefix(cmd, "runcmd"))
- if cmd == "" {
- cmd = "echo Hello\necho World!"
- }
- msg := smsg.MakeRequestDeviceExecuteShellScriptMsg("*", cmd)
- err := PublishMessage(&msg)
- if err != nil {
- logger.Errorf("Publish message error: %s", err)
- }
- case "update":
- if len(cmds) < 3 {
- fmt.Println("Usage: update <service_name> <binary_url>")
- continue
- }
- serviceName := cmds[1]
- binaryUrl := cmds[2]
- // http://localhost:19010/api/v1/buckets/app-updater/objects/download?prefix=watch-daemon-universal-installer-v2025-01-07.tar.gz
- msg := smsg.MakeRequestDeviceUpdateMsg("*", serviceName, "0.1.0", binaryUrl, "")
- err := PublishMessage(&msg)
- if err != nil {
- logger.Errorf("Publish message error: %s", err)
- }
- case "upload-log":
- msg := smsg.MakeRequestDeviceUploadLogsToMinioMsg("*", "app-logs", "logs")
- err := PublishMessage(&msg)
- if err != nil {
- logger.Errorf("Publish message error: %s", err)
- }
- default:
- fmt.Println("Unknown command. Type 'help' to see available commands.")
- }
- }
- Exit:
- fmt.Println("Bye :)")
- // Close NSQ
- gNsqConsumer.Stop()
- gNsqProducer.Stop()
- }
- func initLog() {
- logger = logrus.New()
- envLevel := strings.ToLower(os.Getenv("LOG_LEVEL"))
- if envLevel == "trace" {
- logger.SetLevel(logrus.TraceLevel)
- } else if envLevel == "debug" {
- logger.SetLevel(logrus.DebugLevel)
- } else {
- logger.SetLevel(logrus.InfoLevel)
- }
- logger.Out = os.Stdout
- }
|