stats.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217
  1. package nsqd
  2. import (
  3. "runtime"
  4. "sort"
  5. "sync/atomic"
  6. "github.com/nsqio/nsq/internal/quantile"
  7. )
  8. type Stats struct {
  9. Topics []TopicStats
  10. Producers []ClientStats
  11. }
  12. type ClientStats interface {
  13. String() string
  14. }
  15. type TopicStats struct {
  16. TopicName string `json:"topic_name"`
  17. Channels []ChannelStats `json:"channels"`
  18. Depth int64 `json:"depth"`
  19. BackendDepth int64 `json:"backend_depth"`
  20. MessageCount uint64 `json:"message_count"`
  21. MessageBytes uint64 `json:"message_bytes"`
  22. Paused bool `json:"paused"`
  23. E2eProcessingLatency *quantile.Result `json:"e2e_processing_latency"`
  24. }
  25. func NewTopicStats(t *Topic, channels []ChannelStats) TopicStats {
  26. return TopicStats{
  27. TopicName: t.name,
  28. Channels: channels,
  29. Depth: t.Depth(),
  30. BackendDepth: t.backend.Depth(),
  31. MessageCount: atomic.LoadUint64(&t.messageCount),
  32. MessageBytes: atomic.LoadUint64(&t.messageBytes),
  33. Paused: t.IsPaused(),
  34. E2eProcessingLatency: t.AggregateChannelE2eProcessingLatency().Result(),
  35. }
  36. }
  37. type ChannelStats struct {
  38. ChannelName string `json:"channel_name"`
  39. Depth int64 `json:"depth"`
  40. BackendDepth int64 `json:"backend_depth"`
  41. InFlightCount int `json:"in_flight_count"`
  42. DeferredCount int `json:"deferred_count"`
  43. MessageCount uint64 `json:"message_count"`
  44. RequeueCount uint64 `json:"requeue_count"`
  45. TimeoutCount uint64 `json:"timeout_count"`
  46. ClientCount int `json:"client_count"`
  47. Clients []ClientStats `json:"clients"`
  48. Paused bool `json:"paused"`
  49. E2eProcessingLatency *quantile.Result `json:"e2e_processing_latency"`
  50. }
  51. func NewChannelStats(c *Channel, clients []ClientStats, clientCount int) ChannelStats {
  52. c.inFlightMutex.Lock()
  53. inflight := len(c.inFlightMessages)
  54. c.inFlightMutex.Unlock()
  55. c.deferredMutex.Lock()
  56. deferred := len(c.deferredMessages)
  57. c.deferredMutex.Unlock()
  58. return ChannelStats{
  59. ChannelName: c.name,
  60. Depth: c.Depth(),
  61. BackendDepth: c.backend.Depth(),
  62. InFlightCount: inflight,
  63. DeferredCount: deferred,
  64. MessageCount: atomic.LoadUint64(&c.messageCount),
  65. RequeueCount: atomic.LoadUint64(&c.requeueCount),
  66. TimeoutCount: atomic.LoadUint64(&c.timeoutCount),
  67. ClientCount: clientCount,
  68. Clients: clients,
  69. Paused: c.IsPaused(),
  70. E2eProcessingLatency: c.e2eProcessingLatencyStream.Result(),
  71. }
  72. }
  73. type Topics []*Topic
  74. func (t Topics) Len() int { return len(t) }
  75. func (t Topics) Swap(i, j int) { t[i], t[j] = t[j], t[i] }
  76. type TopicsByName struct {
  77. Topics
  78. }
  79. func (t TopicsByName) Less(i, j int) bool { return t.Topics[i].name < t.Topics[j].name }
  80. type Channels []*Channel
  81. func (c Channels) Len() int { return len(c) }
  82. func (c Channels) Swap(i, j int) { c[i], c[j] = c[j], c[i] }
  83. type ChannelsByName struct {
  84. Channels
  85. }
  86. func (c ChannelsByName) Less(i, j int) bool { return c.Channels[i].name < c.Channels[j].name }
  87. func (n *NSQD) GetStats(topic string, channel string, includeClients bool) Stats {
  88. var stats Stats
  89. n.RLock()
  90. var realTopics []*Topic
  91. if topic == "" {
  92. realTopics = make([]*Topic, 0, len(n.topicMap))
  93. for _, t := range n.topicMap {
  94. realTopics = append(realTopics, t)
  95. }
  96. } else if val, exists := n.topicMap[topic]; exists {
  97. realTopics = []*Topic{val}
  98. } else {
  99. n.RUnlock()
  100. return stats
  101. }
  102. n.RUnlock()
  103. sort.Sort(TopicsByName{realTopics})
  104. topics := make([]TopicStats, 0, len(realTopics))
  105. for _, t := range realTopics {
  106. t.RLock()
  107. var realChannels []*Channel
  108. if channel == "" {
  109. realChannels = make([]*Channel, 0, len(t.channelMap))
  110. for _, c := range t.channelMap {
  111. realChannels = append(realChannels, c)
  112. }
  113. } else if val, exists := t.channelMap[channel]; exists {
  114. realChannels = []*Channel{val}
  115. } else {
  116. t.RUnlock()
  117. continue
  118. }
  119. t.RUnlock()
  120. sort.Sort(ChannelsByName{realChannels})
  121. channels := make([]ChannelStats, 0, len(realChannels))
  122. for _, c := range realChannels {
  123. var clients []ClientStats
  124. var clientCount int
  125. c.RLock()
  126. if includeClients {
  127. clients = make([]ClientStats, 0, len(c.clients))
  128. for _, client := range c.clients {
  129. clients = append(clients, client.Stats(topic))
  130. }
  131. }
  132. clientCount = len(c.clients)
  133. c.RUnlock()
  134. channels = append(channels, NewChannelStats(c, clients, clientCount))
  135. }
  136. topics = append(topics, NewTopicStats(t, channels))
  137. }
  138. stats.Topics = topics
  139. if includeClients {
  140. var producerStats []ClientStats
  141. n.tcpServer.conns.Range(func(k, v interface{}) bool {
  142. c := v.(Client)
  143. if c.Type() == typeProducer {
  144. producerStats = append(producerStats, c.Stats(topic))
  145. }
  146. return true
  147. })
  148. stats.Producers = producerStats
  149. }
  150. return stats
  151. }
  152. type memStats struct {
  153. HeapObjects uint64 `json:"heap_objects"`
  154. HeapIdleBytes uint64 `json:"heap_idle_bytes"`
  155. HeapInUseBytes uint64 `json:"heap_in_use_bytes"`
  156. HeapReleasedBytes uint64 `json:"heap_released_bytes"`
  157. GCPauseUsec100 uint64 `json:"gc_pause_usec_100"`
  158. GCPauseUsec99 uint64 `json:"gc_pause_usec_99"`
  159. GCPauseUsec95 uint64 `json:"gc_pause_usec_95"`
  160. NextGCBytes uint64 `json:"next_gc_bytes"`
  161. GCTotalRuns uint32 `json:"gc_total_runs"`
  162. }
  163. func getMemStats() memStats {
  164. var ms runtime.MemStats
  165. runtime.ReadMemStats(&ms)
  166. // sort the GC pause array
  167. length := len(ms.PauseNs)
  168. if int(ms.NumGC) < length {
  169. length = int(ms.NumGC)
  170. }
  171. gcPauses := make(Uint64Slice, length)
  172. copy(gcPauses, ms.PauseNs[:length])
  173. sort.Sort(gcPauses)
  174. return memStats{
  175. ms.HeapObjects,
  176. ms.HeapIdle,
  177. ms.HeapInuse,
  178. ms.HeapReleased,
  179. percentile(100.0, gcPauses, len(gcPauses)) / 1000,
  180. percentile(99.0, gcPauses, len(gcPauses)) / 1000,
  181. percentile(95.0, gcPauses, len(gcPauses)) / 1000,
  182. ms.NextGC,
  183. ms.NumGC,
  184. }
  185. }