|
- package nsqd
- import (
- "runtime"
- "sort"
- "sync/atomic"
- "github.com/nsqio/nsq/internal/quantile"
- )
- type Stats struct {
- Topics []TopicStats
- Producers []ClientStats
- }
- type ClientStats interface {
- String() string
- }
- type TopicStats struct {
- TopicName string `json:"topic_name"`
- Channels []ChannelStats `json:"channels"`
- Depth int64 `json:"depth"`
- BackendDepth int64 `json:"backend_depth"`
- MessageCount uint64 `json:"message_count"`
- MessageBytes uint64 `json:"message_bytes"`
- Paused bool `json:"paused"`
- E2eProcessingLatency *quantile.Result `json:"e2e_processing_latency"`
- }
- func NewTopicStats(t *Topic, channels []ChannelStats) TopicStats {
- return TopicStats{
- TopicName: t.name,
- Channels: channels,
- Depth: t.Depth(),
- BackendDepth: t.backend.Depth(),
- MessageCount: atomic.LoadUint64(&t.messageCount),
- MessageBytes: atomic.LoadUint64(&t.messageBytes),
- Paused: t.IsPaused(),
- E2eProcessingLatency: t.AggregateChannelE2eProcessingLatency().Result(),
- }
- }
- type ChannelStats struct {
- ChannelName string `json:"channel_name"`
- Depth int64 `json:"depth"`
- BackendDepth int64 `json:"backend_depth"`
- InFlightCount int `json:"in_flight_count"`
- DeferredCount int `json:"deferred_count"`
- MessageCount uint64 `json:"message_count"`
- RequeueCount uint64 `json:"requeue_count"`
- TimeoutCount uint64 `json:"timeout_count"`
- ClientCount int `json:"client_count"`
- Clients []ClientStats `json:"clients"`
- Paused bool `json:"paused"`
- E2eProcessingLatency *quantile.Result `json:"e2e_processing_latency"`
- }
- func NewChannelStats(c *Channel, clients []ClientStats, clientCount int) ChannelStats {
- c.inFlightMutex.Lock()
- inflight := len(c.inFlightMessages)
- c.inFlightMutex.Unlock()
- c.deferredMutex.Lock()
- deferred := len(c.deferredMessages)
- c.deferredMutex.Unlock()
- return ChannelStats{
- ChannelName: c.name,
- Depth: c.Depth(),
- BackendDepth: c.backend.Depth(),
- InFlightCount: inflight,
- DeferredCount: deferred,
- MessageCount: atomic.LoadUint64(&c.messageCount),
- RequeueCount: atomic.LoadUint64(&c.requeueCount),
- TimeoutCount: atomic.LoadUint64(&c.timeoutCount),
- ClientCount: clientCount,
- Clients: clients,
- Paused: c.IsPaused(),
- E2eProcessingLatency: c.e2eProcessingLatencyStream.Result(),
- }
- }
- type Topics []*Topic
- func (t Topics) Len() int { return len(t) }
- func (t Topics) Swap(i, j int) { t[i], t[j] = t[j], t[i] }
- type TopicsByName struct {
- Topics
- }
- func (t TopicsByName) Less(i, j int) bool { return t.Topics[i].name < t.Topics[j].name }
- type Channels []*Channel
- func (c Channels) Len() int { return len(c) }
- func (c Channels) Swap(i, j int) { c[i], c[j] = c[j], c[i] }
- type ChannelsByName struct {
- Channels
- }
- func (c ChannelsByName) Less(i, j int) bool { return c.Channels[i].name < c.Channels[j].name }
- func (n *NSQD) GetStats(topic string, channel string, includeClients bool) Stats {
- var stats Stats
- n.RLock()
- var realTopics []*Topic
- if topic == "" {
- realTopics = make([]*Topic, 0, len(n.topicMap))
- for _, t := range n.topicMap {
- realTopics = append(realTopics, t)
- }
- } else if val, exists := n.topicMap[topic]; exists {
- realTopics = []*Topic{val}
- } else {
- n.RUnlock()
- return stats
- }
- n.RUnlock()
- sort.Sort(TopicsByName{realTopics})
- topics := make([]TopicStats, 0, len(realTopics))
- for _, t := range realTopics {
- t.RLock()
- var realChannels []*Channel
- if channel == "" {
- realChannels = make([]*Channel, 0, len(t.channelMap))
- for _, c := range t.channelMap {
- realChannels = append(realChannels, c)
- }
- } else if val, exists := t.channelMap[channel]; exists {
- realChannels = []*Channel{val}
- } else {
- t.RUnlock()
- continue
- }
- t.RUnlock()
- sort.Sort(ChannelsByName{realChannels})
- channels := make([]ChannelStats, 0, len(realChannels))
- for _, c := range realChannels {
- var clients []ClientStats
- var clientCount int
- c.RLock()
- if includeClients {
- clients = make([]ClientStats, 0, len(c.clients))
- for _, client := range c.clients {
- clients = append(clients, client.Stats(topic))
- }
- }
- clientCount = len(c.clients)
- c.RUnlock()
- channels = append(channels, NewChannelStats(c, clients, clientCount))
- }
- topics = append(topics, NewTopicStats(t, channels))
- }
- stats.Topics = topics
- if includeClients {
- var producerStats []ClientStats
- n.tcpServer.conns.Range(func(k, v interface{}) bool {
- c := v.(Client)
- if c.Type() == typeProducer {
- producerStats = append(producerStats, c.Stats(topic))
- }
- return true
- })
- stats.Producers = producerStats
- }
- return stats
- }
- type memStats struct {
- HeapObjects uint64 `json:"heap_objects"`
- HeapIdleBytes uint64 `json:"heap_idle_bytes"`
- HeapInUseBytes uint64 `json:"heap_in_use_bytes"`
- HeapReleasedBytes uint64 `json:"heap_released_bytes"`
- GCPauseUsec100 uint64 `json:"gc_pause_usec_100"`
- GCPauseUsec99 uint64 `json:"gc_pause_usec_99"`
- GCPauseUsec95 uint64 `json:"gc_pause_usec_95"`
- NextGCBytes uint64 `json:"next_gc_bytes"`
- GCTotalRuns uint32 `json:"gc_total_runs"`
- }
- func getMemStats() memStats {
- var ms runtime.MemStats
- runtime.ReadMemStats(&ms)
- // sort the GC pause array
- length := len(ms.PauseNs)
- if int(ms.NumGC) < length {
- length = int(ms.NumGC)
- }
- gcPauses := make(Uint64Slice, length)
- copy(gcPauses, ms.PauseNs[:length])
- sort.Sort(gcPauses)
- return memStats{
- ms.HeapObjects,
- ms.HeapIdle,
- ms.HeapInuse,
- ms.HeapReleased,
- percentile(100.0, gcPauses, len(gcPauses)) / 1000,
- percentile(99.0, gcPauses, len(gcPauses)) / 1000,
- percentile(95.0, gcPauses, len(gcPauses)) / 1000,
- ms.NextGC,
- ms.NumGC,
- }
- }
|