nsq_stat.go 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. // This is a utility application that polls /stats for all the producers
  2. // of the specified topic/channel and displays aggregate stats
  3. package main
  4. import (
  5. "errors"
  6. "flag"
  7. "fmt"
  8. "log"
  9. "os"
  10. "os/signal"
  11. "strconv"
  12. "strings"
  13. "syscall"
  14. "time"
  15. "github.com/nsqio/nsq/internal/app"
  16. "github.com/nsqio/nsq/internal/clusterinfo"
  17. "github.com/nsqio/nsq/internal/http_api"
  18. "github.com/nsqio/nsq/internal/version"
  19. )
  20. var (
  21. showVersion = flag.Bool("version", false, "print version")
  22. topic = flag.String("topic", "", "NSQ topic")
  23. channel = flag.String("channel", "", "NSQ channel")
  24. interval = flag.Duration("interval", 2*time.Second, "duration of time between polling/printing output")
  25. httpConnectTimeout = flag.Duration("http-client-connect-timeout", 2*time.Second, "timeout for HTTP connect")
  26. httpRequestTimeout = flag.Duration("http-client-request-timeout", 5*time.Second, "timeout for HTTP request")
  27. countNum = numValue{}
  28. nsqdHTTPAddrs = app.StringArray{}
  29. lookupdHTTPAddrs = app.StringArray{}
  30. )
  31. type numValue struct {
  32. isSet bool
  33. value int
  34. }
  35. func (nv *numValue) String() string { return "N" }
  36. func (nv *numValue) Set(s string) error {
  37. value, err := strconv.ParseInt(s, 10, 32)
  38. if err != nil {
  39. return err
  40. }
  41. nv.value = int(value)
  42. nv.isSet = true
  43. return nil
  44. }
  45. func init() {
  46. flag.Var(&nsqdHTTPAddrs, "nsqd-http-address", "nsqd HTTP address (may be given multiple times)")
  47. flag.Var(&lookupdHTTPAddrs, "lookupd-http-address", "lookupd HTTP address (may be given multiple times)")
  48. flag.Var(&countNum, "count", "number of reports")
  49. }
  50. func statLoop(interval time.Duration, connectTimeout time.Duration, requestTimeout time.Duration,
  51. topic string, channel string, nsqdTCPAddrs []string, lookupdHTTPAddrs []string) {
  52. ci := clusterinfo.New(nil, http_api.NewClient(nil, connectTimeout, requestTimeout))
  53. var o *clusterinfo.ChannelStats
  54. for i := 0; !countNum.isSet || countNum.value >= i; i++ {
  55. var producers clusterinfo.Producers
  56. var err error
  57. if len(lookupdHTTPAddrs) != 0 {
  58. producers, err = ci.GetLookupdTopicProducers(topic, lookupdHTTPAddrs)
  59. } else {
  60. producers, err = ci.GetNSQDTopicProducers(topic, nsqdHTTPAddrs)
  61. }
  62. if err != nil {
  63. log.Fatalf("ERROR: failed to get topic producers - %s", err)
  64. }
  65. _, channelStats, err := ci.GetNSQDStats(producers, topic, channel, false)
  66. if err != nil {
  67. log.Fatalf("ERROR: failed to get nsqd stats - %s", err)
  68. }
  69. c, ok := channelStats[channel]
  70. if !ok {
  71. log.Fatalf("ERROR: failed to find channel(%s) in stats metadata for topic(%s)", channel, topic)
  72. }
  73. if i%25 == 0 {
  74. fmt.Printf("%s+%s+%s\n",
  75. "------rate------",
  76. "----------------depth----------------",
  77. "--------------metadata---------------")
  78. fmt.Printf("%7s %7s | %7s %7s %7s %5s %5s | %7s %7s %12s %7s\n",
  79. "ingress", "egress",
  80. "total", "mem", "disk", "inflt",
  81. "def", "req", "t-o", "msgs", "clients")
  82. }
  83. if o == nil {
  84. o = c
  85. time.Sleep(interval)
  86. continue
  87. }
  88. // TODO: paused
  89. fmt.Printf("%7d %7d | %7d %7d %7d %5d %5d | %7d %7d %12d %7d\n",
  90. int64(float64(c.MessageCount-o.MessageCount)/interval.Seconds()),
  91. int64(float64(c.MessageCount-o.MessageCount-(c.Depth-o.Depth))/interval.Seconds()),
  92. c.Depth,
  93. c.MemoryDepth,
  94. c.BackendDepth,
  95. c.InFlightCount,
  96. c.DeferredCount,
  97. c.RequeueCount,
  98. c.TimeoutCount,
  99. c.MessageCount,
  100. c.ClientCount)
  101. o = c
  102. time.Sleep(interval)
  103. }
  104. os.Exit(0)
  105. }
  106. func checkAddrs(addrs []string) error {
  107. for _, a := range addrs {
  108. if strings.HasPrefix(a, "http") {
  109. return errors.New("address should not contain scheme")
  110. }
  111. }
  112. return nil
  113. }
  114. func main() {
  115. flag.Parse()
  116. if *showVersion {
  117. fmt.Printf("nsq_stat v%s\n", version.Binary)
  118. return
  119. }
  120. if *topic == "" || *channel == "" {
  121. log.Fatal("--topic and --channel are required")
  122. }
  123. intvl := *interval
  124. if int64(intvl) <= 0 {
  125. log.Fatal("--interval should be positive")
  126. }
  127. connectTimeout := *httpConnectTimeout
  128. if int64(connectTimeout) <= 0 {
  129. log.Fatal("--http-client-connect-timeout should be positive")
  130. }
  131. requestTimeout := *httpRequestTimeout
  132. if int64(requestTimeout) <= 0 {
  133. log.Fatal("--http-client-request-timeout should be positive")
  134. }
  135. if countNum.isSet && countNum.value <= 0 {
  136. log.Fatal("--count should be positive")
  137. }
  138. if len(nsqdHTTPAddrs) == 0 && len(lookupdHTTPAddrs) == 0 {
  139. log.Fatal("--nsqd-http-address or --lookupd-http-address required")
  140. }
  141. if len(nsqdHTTPAddrs) > 0 && len(lookupdHTTPAddrs) > 0 {
  142. log.Fatal("use --nsqd-http-address or --lookupd-http-address not both")
  143. }
  144. if err := checkAddrs(nsqdHTTPAddrs); err != nil {
  145. log.Fatalf("--nsqd-http-address error - %s", err)
  146. }
  147. if err := checkAddrs(lookupdHTTPAddrs); err != nil {
  148. log.Fatalf("--lookupd-http-address error - %s", err)
  149. }
  150. termChan := make(chan os.Signal, 1)
  151. signal.Notify(termChan, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM)
  152. go statLoop(intvl, connectTimeout, requestTimeout, *topic, *channel, nsqdHTTPAddrs, lookupdHTTPAddrs)
  153. <-termChan
  154. }