main.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. package main
  2. import (
  3. "bufio"
  4. "fmt"
  5. "os"
  6. "stck/stck-nsq-msg"
  7. smsg "stck/stck-nsq-msg/msg"
  8. "strings"
  9. "time"
  10. // "github.com/natefinch/lumberjack"
  11. "github.com/nsqio/go-nsq"
  12. "github.com/sirupsen/logrus"
  13. // "github.com/spf13/viper"
  14. )
  15. type AppConfig struct {
  16. Nsq struct {
  17. TcpAddr string `mapstructure:"tcp_addr"`
  18. LookupdHttpAddr string `mapstructure:"lookupd_http_addr"`
  19. } `mapstructure:"nsq"`
  20. Mysql struct {
  21. User string `mapstructure:"user"`
  22. Password string `mapstructure:"password"`
  23. Host string `mapstructure:"host"`
  24. Database string `mapstructure:"database"`
  25. } `mapstructure:"mysql"`
  26. }
  27. var logger *logrus.Logger
  28. var gAppConfig *AppConfig
  29. var gNsqProducer *nsq.Producer
  30. var gNsqConsumer *nsq.Consumer
  31. func PublishMessage(msg stcknsqmsg.StckNsqMsgVariant) error {
  32. payload, err := stcknsqmsg.ToStckNsqMsgString(msg)
  33. if err != nil {
  34. return fmt.Errorf("marshal message error: %w", err)
  35. }
  36. err = gNsqProducer.Publish(smsg.ClientDoActionTopic, []byte(payload))
  37. if err != nil {
  38. return fmt.Errorf("publish message error: %w", err)
  39. }
  40. return nil
  41. }
  42. func main() {
  43. var err error
  44. // Init logger
  45. initLog()
  46. gAppConfig = &AppConfig{}
  47. gAppConfig.Nsq.TcpAddr = "localhost:4150"
  48. gAppConfig.Nsq.LookupdHttpAddr = "localhost:4161"
  49. // Init NSQ
  50. nsqConfig := nsq.NewConfig()
  51. gNsqProducer, err = nsq.NewProducer(gAppConfig.Nsq.TcpAddr, nsqConfig)
  52. if err != nil {
  53. logger.Fatalf("NSQ Producer init error: %s", err)
  54. }
  55. gNsqConsumer, err = nsq.NewConsumer(smsg.ServerDoNotifyTopic, "main-bastion-1", nsqConfig)
  56. if err != nil {
  57. logger.Fatalf("NSQ Consumer init error: %s", err)
  58. }
  59. gNsqConsumer.AddConcurrentHandlers(nsq.HandlerFunc(func(message *nsq.Message) error {
  60. logger.Debugf("NSQ Consumer received message: %s", message.Body)
  61. // Parse the message
  62. recvTime := message.Timestamp
  63. msg, err := stcknsqmsg.FromString(string(message.Body))
  64. if err != nil {
  65. logger.Errorf("NSQ Consumer unmarshal message error: %s", err)
  66. return nil
  67. }
  68. // Process the message
  69. switch data := msg.Data.(type) {
  70. case *smsg.DeviceHeartbeatMsg:
  71. if time.Since(time.Unix(0, recvTime)) > time.Second*60 {
  72. logger.Debugf("NSQ Consumer ignored DeviceHeartbeatMsg: %v", data)
  73. break
  74. }
  75. logger.Infof("NSQ Consumer received DeviceHeartbeatMsg: %v", data)
  76. case *smsg.DeviceActionDoneMsg:
  77. logger.Infof("NSQ Consumer received DeviceActionDoneMsg: %v", data)
  78. default:
  79. logger.Debugf("NSQ Consumer ignored unknown or uninteresting message: %v", msg)
  80. }
  81. return nil
  82. }), 1)
  83. err = gNsqConsumer.ConnectToNSQD(gAppConfig.Nsq.TcpAddr)
  84. if err != nil {
  85. logger.Fatalf("NSQ Consumer connect error: %s", err)
  86. }
  87. // Enter CLI loop
  88. fmt.Print("Welcome to stck bastion CLI.\n\n")
  89. scanner := bufio.NewScanner(os.Stdin)
  90. for scanner.Scan() {
  91. fmt.Print("stck> ")
  92. var cmd string
  93. cmd = scanner.Text()
  94. cmd = strings.TrimSpace(cmd)
  95. cmds := strings.Split(cmd, " ")
  96. switch cmds[0] {
  97. case "":
  98. continue
  99. case "exit":
  100. goto Exit
  101. case "help":
  102. fmt.Println("Commands: exit, help")
  103. case "reboot":
  104. // Send a message to NSQ
  105. msg := smsg.MakeForceDeviceRebootMsg("*")
  106. err := PublishMessage(&msg)
  107. if err != nil {
  108. logger.Errorf("Publish message error: %s", err)
  109. }
  110. case "runcmd":
  111. // Send a message to NSQ
  112. cmd := strings.TrimSpace(strings.TrimPrefix(cmd, "runcmd"))
  113. if cmd == "" {
  114. cmd = "echo Hello\necho World!"
  115. }
  116. msg := smsg.MakeRequestDeviceExecuteShellScriptMsg("*", cmd)
  117. err := PublishMessage(&msg)
  118. if err != nil {
  119. logger.Errorf("Publish message error: %s", err)
  120. }
  121. case "update":
  122. if len(cmds) < 3 {
  123. fmt.Println("Usage: update <service_name> <binary_url>")
  124. continue
  125. }
  126. serviceName := cmds[1]
  127. binaryUrl := cmds[2]
  128. // http://localhost:19010/api/v1/buckets/app-updater/objects/download?prefix=watch-daemon-universal-installer-v2025-01-07.tar.gz
  129. msg := smsg.MakeRequestDeviceUpdateMsg("*", serviceName, "0.1.0", binaryUrl, "")
  130. err := PublishMessage(&msg)
  131. if err != nil {
  132. logger.Errorf("Publish message error: %s", err)
  133. }
  134. case "upload-log":
  135. msg := smsg.MakeRequestDeviceUploadLogsToMinioMsg("*", "app-logs", "logs")
  136. err := PublishMessage(&msg)
  137. if err != nil {
  138. logger.Errorf("Publish message error: %s", err)
  139. }
  140. default:
  141. fmt.Println("Unknown command. Type 'help' to see available commands.")
  142. }
  143. }
  144. Exit:
  145. fmt.Println("Bye :)")
  146. // Close NSQ
  147. gNsqConsumer.Stop()
  148. gNsqProducer.Stop()
  149. }
  150. func initLog() {
  151. logger = logrus.New()
  152. envLevel := strings.ToLower(os.Getenv("LOG_LEVEL"))
  153. if envLevel == "trace" {
  154. logger.SetLevel(logrus.TraceLevel)
  155. } else if envLevel == "debug" {
  156. logger.SetLevel(logrus.DebugLevel)
  157. } else {
  158. logger.SetLevel(logrus.InfoLevel)
  159. }
  160. logger.Out = os.Stdout
  161. }