statsd.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. package nsqd
  2. import (
  3. "fmt"
  4. "math"
  5. "net"
  6. "strings"
  7. "time"
  8. "github.com/nsqio/nsq/internal/statsd"
  9. "github.com/nsqio/nsq/internal/writers"
  10. )
  11. type Uint64Slice []uint64
  12. func (s Uint64Slice) Len() int {
  13. return len(s)
  14. }
  15. func (s Uint64Slice) Swap(i, j int) {
  16. s[i], s[j] = s[j], s[i]
  17. }
  18. func (s Uint64Slice) Less(i, j int) bool {
  19. return s[i] < s[j]
  20. }
  21. func (n *NSQD) statsdLoop() {
  22. var lastMemStats memStats
  23. var lastStats Stats
  24. interval := n.getOpts().StatsdInterval
  25. ticker := time.NewTicker(interval)
  26. for {
  27. select {
  28. case <-n.exitChan:
  29. goto exit
  30. case <-ticker.C:
  31. addr := n.getOpts().StatsdAddress
  32. prefix := n.getOpts().StatsdPrefix
  33. excludeEphemeral := n.getOpts().StatsdExcludeEphemeral
  34. conn, err := net.DialTimeout("udp", addr, time.Second)
  35. if err != nil {
  36. n.logf(LOG_ERROR, "failed to create UDP socket to statsd(%s)", addr)
  37. continue
  38. }
  39. sw := writers.NewSpreadWriter(conn, interval-time.Second, n.exitChan)
  40. bw := writers.NewBoundaryBufferedWriter(sw, n.getOpts().StatsdUDPPacketSize)
  41. client := statsd.NewClient(bw, prefix)
  42. n.logf(LOG_INFO, "STATSD: pushing stats to %s", addr)
  43. stats := n.GetStats("", "", false)
  44. for _, topic := range stats.Topics {
  45. if excludeEphemeral && strings.HasSuffix(topic.TopicName, "#ephemeral") {
  46. continue
  47. }
  48. // try to find the topic in the last collection
  49. lastTopic := TopicStats{}
  50. for _, checkTopic := range lastStats.Topics {
  51. if topic.TopicName == checkTopic.TopicName {
  52. lastTopic = checkTopic
  53. break
  54. }
  55. }
  56. diff := topic.MessageCount - lastTopic.MessageCount
  57. stat := fmt.Sprintf("topic.%s.message_count", topic.TopicName)
  58. client.Incr(stat, int64(diff))
  59. diff = topic.MessageBytes - lastTopic.MessageBytes
  60. stat = fmt.Sprintf("topic.%s.message_bytes", topic.TopicName)
  61. client.Incr(stat, int64(diff))
  62. stat = fmt.Sprintf("topic.%s.depth", topic.TopicName)
  63. client.Gauge(stat, topic.Depth)
  64. stat = fmt.Sprintf("topic.%s.backend_depth", topic.TopicName)
  65. client.Gauge(stat, topic.BackendDepth)
  66. for _, item := range topic.E2eProcessingLatency.Percentiles {
  67. stat = fmt.Sprintf("topic.%s.e2e_processing_latency_%.0f", topic.TopicName, item["quantile"]*100.0)
  68. // We can cast the value to int64 since a value of 1 is the
  69. // minimum resolution we will have, so there is no loss of
  70. // accuracy
  71. client.Gauge(stat, int64(item["value"]))
  72. }
  73. for _, channel := range topic.Channels {
  74. if excludeEphemeral && strings.HasSuffix(channel.ChannelName, "#ephemeral") {
  75. continue
  76. }
  77. // try to find the channel in the last collection
  78. lastChannel := ChannelStats{}
  79. for _, checkChannel := range lastTopic.Channels {
  80. if channel.ChannelName == checkChannel.ChannelName {
  81. lastChannel = checkChannel
  82. break
  83. }
  84. }
  85. diff := channel.MessageCount - lastChannel.MessageCount
  86. stat := fmt.Sprintf("topic.%s.channel.%s.message_count", topic.TopicName, channel.ChannelName)
  87. client.Incr(stat, int64(diff))
  88. stat = fmt.Sprintf("topic.%s.channel.%s.depth", topic.TopicName, channel.ChannelName)
  89. client.Gauge(stat, channel.Depth)
  90. stat = fmt.Sprintf("topic.%s.channel.%s.backend_depth", topic.TopicName, channel.ChannelName)
  91. client.Gauge(stat, channel.BackendDepth)
  92. stat = fmt.Sprintf("topic.%s.channel.%s.in_flight_count", topic.TopicName, channel.ChannelName)
  93. client.Gauge(stat, int64(channel.InFlightCount))
  94. stat = fmt.Sprintf("topic.%s.channel.%s.deferred_count", topic.TopicName, channel.ChannelName)
  95. client.Gauge(stat, int64(channel.DeferredCount))
  96. diff = channel.RequeueCount - lastChannel.RequeueCount
  97. stat = fmt.Sprintf("topic.%s.channel.%s.requeue_count", topic.TopicName, channel.ChannelName)
  98. client.Incr(stat, int64(diff))
  99. diff = channel.TimeoutCount - lastChannel.TimeoutCount
  100. stat = fmt.Sprintf("topic.%s.channel.%s.timeout_count", topic.TopicName, channel.ChannelName)
  101. client.Incr(stat, int64(diff))
  102. stat = fmt.Sprintf("topic.%s.channel.%s.clients", topic.TopicName, channel.ChannelName)
  103. client.Gauge(stat, int64(channel.ClientCount))
  104. for _, item := range channel.E2eProcessingLatency.Percentiles {
  105. stat = fmt.Sprintf("topic.%s.channel.%s.e2e_processing_latency_%.0f", topic.TopicName, channel.ChannelName, item["quantile"]*100.0)
  106. client.Gauge(stat, int64(item["value"]))
  107. }
  108. }
  109. }
  110. lastStats = stats
  111. if n.getOpts().StatsdMemStats {
  112. ms := getMemStats()
  113. client.Gauge("mem.heap_objects", int64(ms.HeapObjects))
  114. client.Gauge("mem.heap_idle_bytes", int64(ms.HeapIdleBytes))
  115. client.Gauge("mem.heap_in_use_bytes", int64(ms.HeapInUseBytes))
  116. client.Gauge("mem.heap_released_bytes", int64(ms.HeapReleasedBytes))
  117. client.Gauge("mem.gc_pause_usec_100", int64(ms.GCPauseUsec100))
  118. client.Gauge("mem.gc_pause_usec_99", int64(ms.GCPauseUsec99))
  119. client.Gauge("mem.gc_pause_usec_95", int64(ms.GCPauseUsec95))
  120. client.Gauge("mem.next_gc_bytes", int64(ms.NextGCBytes))
  121. client.Incr("mem.gc_runs", int64(ms.GCTotalRuns-lastMemStats.GCTotalRuns))
  122. lastMemStats = ms
  123. }
  124. bw.Flush()
  125. sw.Flush()
  126. conn.Close()
  127. }
  128. }
  129. exit:
  130. ticker.Stop()
  131. n.logf(LOG_INFO, "STATSD: closing")
  132. }
  133. func percentile(perc float64, arr []uint64, length int) uint64 {
  134. if length == 0 {
  135. return 0
  136. }
  137. indexOfPerc := int(math.Floor(((perc / 100.0) * float64(length)) + 0.5))
  138. if indexOfPerc >= length {
  139. indexOfPerc = length - 1
  140. }
  141. return arr[indexOfPerc]
  142. }