stats_test.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. package nsqd
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "os"
  6. "strconv"
  7. "sync"
  8. "testing"
  9. "time"
  10. "github.com/golang/snappy"
  11. "github.com/nsqio/nsq/internal/http_api"
  12. "github.com/nsqio/nsq/internal/test"
  13. )
  14. func TestStats(t *testing.T) {
  15. opts := NewOptions()
  16. opts.Logger = test.NewTestLogger(t)
  17. tcpAddr, _, nsqd := mustStartNSQD(opts)
  18. defer os.RemoveAll(opts.DataPath)
  19. defer nsqd.Exit()
  20. topicName := "test_stats" + strconv.Itoa(int(time.Now().Unix()))
  21. topic := nsqd.GetTopic(topicName)
  22. msg := NewMessage(topic.GenerateID(), []byte("test body"))
  23. topic.PutMessage(msg)
  24. accompanyTopicName := "accompany_test_stats" + strconv.Itoa(int(time.Now().Unix()))
  25. accompanyTopic := nsqd.GetTopic(accompanyTopicName)
  26. msg = NewMessage(accompanyTopic.GenerateID(), []byte("accompany test body"))
  27. accompanyTopic.PutMessage(msg)
  28. conn, err := mustConnectNSQD(tcpAddr)
  29. test.Nil(t, err)
  30. defer conn.Close()
  31. identify(t, conn, nil, frameTypeResponse)
  32. sub(t, conn, topicName, "ch")
  33. stats := nsqd.GetStats(topicName, "ch", true).Topics
  34. t.Logf("stats: %+v", stats)
  35. test.Equal(t, 1, len(stats))
  36. test.Equal(t, 1, len(stats[0].Channels))
  37. test.Equal(t, 1, len(stats[0].Channels[0].Clients))
  38. test.Equal(t, 1, stats[0].Channels[0].ClientCount)
  39. stats = nsqd.GetStats(topicName, "ch", false).Topics
  40. t.Logf("stats: %+v", stats)
  41. test.Equal(t, 1, len(stats))
  42. test.Equal(t, 1, len(stats[0].Channels))
  43. test.Equal(t, 0, len(stats[0].Channels[0].Clients))
  44. test.Equal(t, 1, stats[0].Channels[0].ClientCount)
  45. stats = nsqd.GetStats(topicName, "none_exist_channel", false).Topics
  46. t.Logf("stats: %+v", stats)
  47. test.Equal(t, 0, len(stats))
  48. stats = nsqd.GetStats("none_exist_topic", "none_exist_channel", false).Topics
  49. t.Logf("stats: %+v", stats)
  50. test.Equal(t, 0, len(stats))
  51. }
  52. func TestClientAttributes(t *testing.T) {
  53. userAgent := "Test User Agent"
  54. opts := NewOptions()
  55. opts.Logger = test.NewTestLogger(t)
  56. opts.LogLevel = LOG_DEBUG
  57. opts.SnappyEnabled = true
  58. tcpAddr, httpAddr, nsqd := mustStartNSQD(opts)
  59. defer os.RemoveAll(opts.DataPath)
  60. defer nsqd.Exit()
  61. conn, err := mustConnectNSQD(tcpAddr)
  62. test.Nil(t, err)
  63. defer conn.Close()
  64. data := identify(t, conn, map[string]interface{}{
  65. "snappy": true,
  66. "user_agent": userAgent,
  67. }, frameTypeResponse)
  68. resp := struct {
  69. Snappy bool `json:"snappy"`
  70. UserAgent string `json:"user_agent"`
  71. }{}
  72. err = json.Unmarshal(data, &resp)
  73. test.Nil(t, err)
  74. test.Equal(t, true, resp.Snappy)
  75. r := snappy.NewReader(conn)
  76. w := snappy.NewWriter(conn)
  77. readValidate(t, r, frameTypeResponse, "OK")
  78. topicName := "test_client_attributes" + strconv.Itoa(int(time.Now().Unix()))
  79. sub(t, readWriter{r, w}, topicName, "ch")
  80. var d struct {
  81. Topics []struct {
  82. Channels []struct {
  83. Clients []struct {
  84. UserAgent string `json:"user_agent"`
  85. Snappy bool `json:"snappy"`
  86. } `json:"clients"`
  87. } `json:"channels"`
  88. } `json:"topics"`
  89. }
  90. endpoint := fmt.Sprintf("http://127.0.0.1:%d/stats?format=json", httpAddr.Port)
  91. err = http_api.NewClient(nil, ConnectTimeout, RequestTimeout).GETV1(endpoint, &d)
  92. test.Nil(t, err)
  93. test.Equal(t, userAgent, d.Topics[0].Channels[0].Clients[0].UserAgent)
  94. test.Equal(t, true, d.Topics[0].Channels[0].Clients[0].Snappy)
  95. }
  96. func TestStatsChannelLocking(t *testing.T) {
  97. opts := NewOptions()
  98. opts.Logger = test.NewTestLogger(t)
  99. _, _, nsqd := mustStartNSQD(opts)
  100. defer os.RemoveAll(opts.DataPath)
  101. defer nsqd.Exit()
  102. topicName := "test_channel_empty" + strconv.Itoa(int(time.Now().Unix()))
  103. topic := nsqd.GetTopic(topicName)
  104. channel := topic.GetChannel("channel")
  105. var wg sync.WaitGroup
  106. wg.Add(2)
  107. go func() {
  108. for i := 0; i < 25; i++ {
  109. msg := NewMessage(topic.GenerateID(), []byte("test"))
  110. topic.PutMessage(msg)
  111. channel.StartInFlightTimeout(msg, 0, opts.MsgTimeout)
  112. }
  113. wg.Done()
  114. }()
  115. go func() {
  116. for i := 0; i < 25; i++ {
  117. nsqd.GetStats("", "", true)
  118. }
  119. wg.Done()
  120. }()
  121. wg.Wait()
  122. stats := nsqd.GetStats(topicName, "channel", false).Topics
  123. t.Logf("stats: %+v", stats)
  124. test.Equal(t, 1, len(stats))
  125. test.Equal(t, 1, len(stats[0].Channels))
  126. test.Equal(t, 25, stats[0].Channels[0].InFlightCount)
  127. }