types.go 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306
  1. package clusterinfo
  2. import (
  3. "encoding/json"
  4. "net"
  5. "sort"
  6. "strconv"
  7. "time"
  8. "github.com/blang/semver"
  9. "github.com/nsqio/nsq/internal/quantile"
  10. )
  11. type ProducerTopic struct {
  12. Topic string `json:"topic"`
  13. Tombstoned bool `json:"tombstoned"`
  14. }
  15. type ProducerTopics []ProducerTopic
  16. func (pt ProducerTopics) Len() int { return len(pt) }
  17. func (pt ProducerTopics) Swap(i, j int) { pt[i], pt[j] = pt[j], pt[i] }
  18. func (pt ProducerTopics) Less(i, j int) bool { return pt[i].Topic < pt[j].Topic }
  19. type Producer struct {
  20. RemoteAddresses []string `json:"remote_addresses"`
  21. RemoteAddress string `json:"remote_address"`
  22. Hostname string `json:"hostname"`
  23. BroadcastAddress string `json:"broadcast_address"`
  24. TCPPort int `json:"tcp_port"`
  25. HTTPPort int `json:"http_port"`
  26. Version string `json:"version"`
  27. VersionObj semver.Version `json:"-"`
  28. Topics ProducerTopics `json:"topics"`
  29. OutOfDate bool `json:"out_of_date"`
  30. }
  31. // UnmarshalJSON implements json.Unmarshaler and postprocesses of ProducerTopics and VersionObj
  32. func (p *Producer) UnmarshalJSON(b []byte) error {
  33. var r struct {
  34. RemoteAddress string `json:"remote_address"`
  35. Hostname string `json:"hostname"`
  36. BroadcastAddress string `json:"broadcast_address"`
  37. TCPPort int `json:"tcp_port"`
  38. HTTPPort int `json:"http_port"`
  39. Version string `json:"version"`
  40. Topics []string `json:"topics"`
  41. Tombstoned []bool `json:"tombstones"`
  42. }
  43. if err := json.Unmarshal(b, &r); err != nil {
  44. return err
  45. }
  46. *p = Producer{
  47. RemoteAddress: r.RemoteAddress,
  48. Hostname: r.Hostname,
  49. BroadcastAddress: r.BroadcastAddress,
  50. TCPPort: r.TCPPort,
  51. HTTPPort: r.HTTPPort,
  52. Version: r.Version,
  53. }
  54. for i, t := range r.Topics {
  55. p.Topics = append(p.Topics, ProducerTopic{Topic: t, Tombstoned: r.Tombstoned[i]})
  56. }
  57. version, err := semver.Parse(p.Version)
  58. if err != nil {
  59. version, _ = semver.Parse("0.0.0")
  60. }
  61. p.VersionObj = version
  62. return nil
  63. }
  64. func (p *Producer) Address() string {
  65. if p.RemoteAddress == "" {
  66. return "N/A"
  67. }
  68. return p.RemoteAddress
  69. }
  70. func (p *Producer) HTTPAddress() string {
  71. return net.JoinHostPort(p.BroadcastAddress, strconv.Itoa(p.HTTPPort))
  72. }
  73. func (p *Producer) TCPAddress() string {
  74. return net.JoinHostPort(p.BroadcastAddress, strconv.Itoa(p.TCPPort))
  75. }
  76. // IsInconsistent checks for cases where an unexpected number of nsqd connections are
  77. // reporting the same information to nsqlookupd (ie: multiple instances are using the
  78. // same broadcast address), or cases where some nsqd are not reporting to all nsqlookupd.
  79. func (p *Producer) IsInconsistent(numLookupd int) bool {
  80. return len(p.RemoteAddresses) != numLookupd
  81. }
  82. type TopicStats struct {
  83. Node string `json:"node"`
  84. Hostname string `json:"hostname"`
  85. TopicName string `json:"topic_name"`
  86. Depth int64 `json:"depth"`
  87. MemoryDepth int64 `json:"memory_depth"`
  88. BackendDepth int64 `json:"backend_depth"`
  89. MessageCount int64 `json:"message_count"`
  90. NodeStats []*TopicStats `json:"nodes"`
  91. Channels []*ChannelStats `json:"channels"`
  92. Paused bool `json:"paused"`
  93. E2eProcessingLatency *quantile.E2eProcessingLatencyAggregate `json:"e2e_processing_latency"`
  94. }
  95. func (t *TopicStats) Add(a *TopicStats) {
  96. t.Node = "*"
  97. t.Depth += a.Depth
  98. t.MemoryDepth += a.MemoryDepth
  99. t.BackendDepth += a.BackendDepth
  100. t.MessageCount += a.MessageCount
  101. if a.Paused {
  102. t.Paused = a.Paused
  103. }
  104. for _, aChannelStats := range a.Channels {
  105. found := false
  106. for _, channelStats := range t.Channels {
  107. if aChannelStats.ChannelName == channelStats.ChannelName {
  108. found = true
  109. channelStats.Add(aChannelStats)
  110. }
  111. }
  112. if !found {
  113. t.Channels = append(t.Channels, aChannelStats)
  114. }
  115. }
  116. t.NodeStats = append(t.NodeStats, a)
  117. sort.Sort(TopicStatsByHost{t.NodeStats})
  118. if t.E2eProcessingLatency == nil {
  119. t.E2eProcessingLatency = &quantile.E2eProcessingLatencyAggregate{
  120. Addr: t.Node,
  121. Topic: t.TopicName,
  122. }
  123. }
  124. t.E2eProcessingLatency.Add(a.E2eProcessingLatency)
  125. }
  126. type ChannelStats struct {
  127. Node string `json:"node"`
  128. Hostname string `json:"hostname"`
  129. TopicName string `json:"topic_name"`
  130. ChannelName string `json:"channel_name"`
  131. Depth int64 `json:"depth"`
  132. MemoryDepth int64 `json:"memory_depth"`
  133. BackendDepth int64 `json:"backend_depth"`
  134. InFlightCount int64 `json:"in_flight_count"`
  135. DeferredCount int64 `json:"deferred_count"`
  136. RequeueCount int64 `json:"requeue_count"`
  137. TimeoutCount int64 `json:"timeout_count"`
  138. MessageCount int64 `json:"message_count"`
  139. ClientCount int `json:"client_count"`
  140. Selected bool `json:"-"`
  141. NodeStats []*ChannelStats `json:"nodes"`
  142. Clients []*ClientStats `json:"clients"`
  143. Paused bool `json:"paused"`
  144. E2eProcessingLatency *quantile.E2eProcessingLatencyAggregate `json:"e2e_processing_latency"`
  145. }
  146. func (c *ChannelStats) Add(a *ChannelStats) {
  147. c.Node = "*"
  148. c.Depth += a.Depth
  149. c.MemoryDepth += a.MemoryDepth
  150. c.BackendDepth += a.BackendDepth
  151. c.InFlightCount += a.InFlightCount
  152. c.DeferredCount += a.DeferredCount
  153. c.RequeueCount += a.RequeueCount
  154. c.TimeoutCount += a.TimeoutCount
  155. c.MessageCount += a.MessageCount
  156. c.ClientCount += a.ClientCount
  157. if a.Paused {
  158. c.Paused = a.Paused
  159. }
  160. c.NodeStats = append(c.NodeStats, a)
  161. sort.Sort(ChannelStatsByHost{c.NodeStats})
  162. if c.E2eProcessingLatency == nil {
  163. c.E2eProcessingLatency = &quantile.E2eProcessingLatencyAggregate{
  164. Addr: c.Node,
  165. Topic: c.TopicName,
  166. Channel: c.ChannelName,
  167. }
  168. }
  169. c.E2eProcessingLatency.Add(a.E2eProcessingLatency)
  170. c.Clients = append(c.Clients, a.Clients...)
  171. sort.Sort(ClientsByHost{c.Clients})
  172. }
  173. type ClientStats struct {
  174. Node string `json:"node"`
  175. RemoteAddress string `json:"remote_address"`
  176. Version string `json:"version"`
  177. ClientID string `json:"client_id"`
  178. Hostname string `json:"hostname"`
  179. UserAgent string `json:"user_agent"`
  180. ConnectTs int64 `json:"connect_ts"`
  181. ConnectedDuration time.Duration `json:"connected"`
  182. InFlightCount int `json:"in_flight_count"`
  183. ReadyCount int `json:"ready_count"`
  184. FinishCount int64 `json:"finish_count"`
  185. RequeueCount int64 `json:"requeue_count"`
  186. MessageCount int64 `json:"message_count"`
  187. SampleRate int32 `json:"sample_rate"`
  188. Deflate bool `json:"deflate"`
  189. Snappy bool `json:"snappy"`
  190. Authed bool `json:"authed"`
  191. AuthIdentity string `json:"auth_identity"`
  192. AuthIdentityURL string `json:"auth_identity_url"`
  193. TLS bool `json:"tls"`
  194. CipherSuite string `json:"tls_cipher_suite"`
  195. TLSVersion string `json:"tls_version"`
  196. TLSNegotiatedProtocol string `json:"tls_negotiated_protocol"`
  197. TLSNegotiatedProtocolIsMutual bool `json:"tls_negotiated_protocol_is_mutual"`
  198. }
  199. // UnmarshalJSON implements json.Unmarshaler and postprocesses ConnectedDuration
  200. func (s *ClientStats) UnmarshalJSON(b []byte) error {
  201. type locaClientStats ClientStats // re-typed to prevent recursion from json.Unmarshal
  202. var ss locaClientStats
  203. if err := json.Unmarshal(b, &ss); err != nil {
  204. return err
  205. }
  206. *s = ClientStats(ss)
  207. s.ConnectedDuration = time.Now().Truncate(time.Second).Sub(time.Unix(s.ConnectTs, 0))
  208. return nil
  209. }
  210. func (c *ClientStats) HasUserAgent() bool {
  211. return c.UserAgent != ""
  212. }
  213. func (c *ClientStats) HasSampleRate() bool {
  214. return c.SampleRate > 0
  215. }
  216. type ChannelStatsList []*ChannelStats
  217. func (c ChannelStatsList) Len() int { return len(c) }
  218. func (c ChannelStatsList) Swap(i, j int) { c[i], c[j] = c[j], c[i] }
  219. type ChannelStatsByHost struct {
  220. ChannelStatsList
  221. }
  222. func (c ChannelStatsByHost) Less(i, j int) bool {
  223. return c.ChannelStatsList[i].Hostname < c.ChannelStatsList[j].Hostname
  224. }
  225. type ClientStatsList []*ClientStats
  226. func (c ClientStatsList) Len() int { return len(c) }
  227. func (c ClientStatsList) Swap(i, j int) { c[i], c[j] = c[j], c[i] }
  228. type ClientsByHost struct {
  229. ClientStatsList
  230. }
  231. func (c ClientsByHost) Less(i, j int) bool {
  232. return c.ClientStatsList[i].Hostname < c.ClientStatsList[j].Hostname
  233. }
  234. type TopicStatsList []*TopicStats
  235. func (t TopicStatsList) Len() int { return len(t) }
  236. func (t TopicStatsList) Swap(i, j int) { t[i], t[j] = t[j], t[i] }
  237. type TopicStatsByHost struct {
  238. TopicStatsList
  239. }
  240. func (c TopicStatsByHost) Less(i, j int) bool {
  241. return c.TopicStatsList[i].Hostname < c.TopicStatsList[j].Hostname
  242. }
  243. type Producers []*Producer
  244. func (t Producers) Len() int { return len(t) }
  245. func (t Producers) Swap(i, j int) { t[i], t[j] = t[j], t[i] }
  246. func (t Producers) HTTPAddrs() []string {
  247. var addrs []string
  248. for _, p := range t {
  249. addrs = append(addrs, p.HTTPAddress())
  250. }
  251. return addrs
  252. }
  253. func (t Producers) Search(needle string) *Producer {
  254. for _, producer := range t {
  255. if needle == producer.HTTPAddress() {
  256. return producer
  257. }
  258. }
  259. return nil
  260. }
  261. type ProducersByHost struct {
  262. Producers
  263. }
  264. func (c ProducersByHost) Less(i, j int) bool {
  265. return c.Producers[i].Hostname < c.Producers[j].Hostname
  266. }