channel_test.go 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250
  1. package nsqd
  2. import (
  3. "fmt"
  4. "io/ioutil"
  5. "net/http"
  6. "os"
  7. "strconv"
  8. "testing"
  9. "time"
  10. "github.com/nsqio/nsq/internal/test"
  11. )
  12. // ensure that we can push a message through a topic and get it out of a channel
  13. func TestPutMessage(t *testing.T) {
  14. opts := NewOptions()
  15. opts.Logger = test.NewTestLogger(t)
  16. _, _, nsqd := mustStartNSQD(opts)
  17. defer os.RemoveAll(opts.DataPath)
  18. defer nsqd.Exit()
  19. topicName := "test_put_message" + strconv.Itoa(int(time.Now().Unix()))
  20. topic := nsqd.GetTopic(topicName)
  21. channel1 := topic.GetChannel("ch")
  22. var id MessageID
  23. msg := NewMessage(id, []byte("test"))
  24. topic.PutMessage(msg)
  25. outputMsg := <-channel1.memoryMsgChan
  26. test.Equal(t, msg.ID, outputMsg.ID)
  27. test.Equal(t, msg.Body, outputMsg.Body)
  28. }
  29. // ensure that both channels get the same message
  30. func TestPutMessage2Chan(t *testing.T) {
  31. opts := NewOptions()
  32. opts.Logger = test.NewTestLogger(t)
  33. _, _, nsqd := mustStartNSQD(opts)
  34. defer os.RemoveAll(opts.DataPath)
  35. defer nsqd.Exit()
  36. topicName := "test_put_message_2chan" + strconv.Itoa(int(time.Now().Unix()))
  37. topic := nsqd.GetTopic(topicName)
  38. channel1 := topic.GetChannel("ch1")
  39. channel2 := topic.GetChannel("ch2")
  40. var id MessageID
  41. msg := NewMessage(id, []byte("test"))
  42. topic.PutMessage(msg)
  43. outputMsg1 := <-channel1.memoryMsgChan
  44. test.Equal(t, msg.ID, outputMsg1.ID)
  45. test.Equal(t, msg.Body, outputMsg1.Body)
  46. outputMsg2 := <-channel2.memoryMsgChan
  47. test.Equal(t, msg.ID, outputMsg2.ID)
  48. test.Equal(t, msg.Body, outputMsg2.Body)
  49. }
  50. func TestInFlightWorker(t *testing.T) {
  51. count := 250
  52. opts := NewOptions()
  53. opts.Logger = test.NewTestLogger(t)
  54. opts.MsgTimeout = 100 * time.Millisecond
  55. opts.QueueScanRefreshInterval = 100 * time.Millisecond
  56. _, _, nsqd := mustStartNSQD(opts)
  57. defer os.RemoveAll(opts.DataPath)
  58. defer nsqd.Exit()
  59. topicName := "test_in_flight_worker" + strconv.Itoa(int(time.Now().Unix()))
  60. topic := nsqd.GetTopic(topicName)
  61. channel := topic.GetChannel("channel")
  62. for i := 0; i < count; i++ {
  63. msg := NewMessage(topic.GenerateID(), []byte("test"))
  64. channel.StartInFlightTimeout(msg, 0, opts.MsgTimeout)
  65. }
  66. channel.Lock()
  67. inFlightMsgs := len(channel.inFlightMessages)
  68. channel.Unlock()
  69. test.Equal(t, count, inFlightMsgs)
  70. channel.inFlightMutex.Lock()
  71. inFlightPQMsgs := len(channel.inFlightPQ)
  72. channel.inFlightMutex.Unlock()
  73. test.Equal(t, count, inFlightPQMsgs)
  74. // the in flight worker has a resolution of 100ms so we need to wait
  75. // at least that much longer than our msgTimeout (in worst case)
  76. time.Sleep(4 * opts.MsgTimeout)
  77. channel.Lock()
  78. inFlightMsgs = len(channel.inFlightMessages)
  79. channel.Unlock()
  80. test.Equal(t, 0, inFlightMsgs)
  81. channel.inFlightMutex.Lock()
  82. inFlightPQMsgs = len(channel.inFlightPQ)
  83. channel.inFlightMutex.Unlock()
  84. test.Equal(t, 0, inFlightPQMsgs)
  85. }
  86. func TestChannelEmpty(t *testing.T) {
  87. opts := NewOptions()
  88. opts.Logger = test.NewTestLogger(t)
  89. _, _, nsqd := mustStartNSQD(opts)
  90. defer os.RemoveAll(opts.DataPath)
  91. defer nsqd.Exit()
  92. topicName := "test_channel_empty" + strconv.Itoa(int(time.Now().Unix()))
  93. topic := nsqd.GetTopic(topicName)
  94. channel := topic.GetChannel("channel")
  95. msgs := make([]*Message, 0, 25)
  96. for i := 0; i < 25; i++ {
  97. msg := NewMessage(topic.GenerateID(), []byte("test"))
  98. channel.StartInFlightTimeout(msg, 0, opts.MsgTimeout)
  99. msgs = append(msgs, msg)
  100. }
  101. channel.RequeueMessage(0, msgs[len(msgs)-1].ID, 100*time.Millisecond)
  102. test.Equal(t, 24, len(channel.inFlightMessages))
  103. test.Equal(t, 24, len(channel.inFlightPQ))
  104. test.Equal(t, 1, len(channel.deferredMessages))
  105. test.Equal(t, 1, len(channel.deferredPQ))
  106. channel.Empty()
  107. test.Equal(t, 0, len(channel.inFlightMessages))
  108. test.Equal(t, 0, len(channel.inFlightPQ))
  109. test.Equal(t, 0, len(channel.deferredMessages))
  110. test.Equal(t, 0, len(channel.deferredPQ))
  111. test.Equal(t, int64(0), channel.Depth())
  112. }
  113. func TestChannelEmptyConsumer(t *testing.T) {
  114. opts := NewOptions()
  115. opts.Logger = test.NewTestLogger(t)
  116. tcpAddr, _, nsqd := mustStartNSQD(opts)
  117. defer os.RemoveAll(opts.DataPath)
  118. defer nsqd.Exit()
  119. conn, _ := mustConnectNSQD(tcpAddr)
  120. defer conn.Close()
  121. topicName := "test_channel_empty" + strconv.Itoa(int(time.Now().Unix()))
  122. topic := nsqd.GetTopic(topicName)
  123. channel := topic.GetChannel("channel")
  124. client := newClientV2(0, conn, nsqd)
  125. client.SetReadyCount(25)
  126. err := channel.AddClient(client.ID, client)
  127. test.Equal(t, err, nil)
  128. for i := 0; i < 25; i++ {
  129. msg := NewMessage(topic.GenerateID(), []byte("test"))
  130. channel.StartInFlightTimeout(msg, 0, opts.MsgTimeout)
  131. client.SendingMessage()
  132. }
  133. for _, cl := range channel.clients {
  134. stats := cl.Stats("").(ClientV2Stats)
  135. test.Equal(t, int64(25), stats.InFlightCount)
  136. }
  137. channel.Empty()
  138. for _, cl := range channel.clients {
  139. stats := cl.Stats("").(ClientV2Stats)
  140. test.Equal(t, int64(0), stats.InFlightCount)
  141. }
  142. }
  143. func TestMaxChannelConsumers(t *testing.T) {
  144. opts := NewOptions()
  145. opts.Logger = test.NewTestLogger(t)
  146. opts.MaxChannelConsumers = 1
  147. tcpAddr, _, nsqd := mustStartNSQD(opts)
  148. defer os.RemoveAll(opts.DataPath)
  149. defer nsqd.Exit()
  150. conn, _ := mustConnectNSQD(tcpAddr)
  151. defer conn.Close()
  152. topicName := "test_max_channel_consumers" + strconv.Itoa(int(time.Now().Unix()))
  153. topic := nsqd.GetTopic(topicName)
  154. channel := topic.GetChannel("channel")
  155. client1 := newClientV2(1, conn, nsqd)
  156. client1.SetReadyCount(25)
  157. err := channel.AddClient(client1.ID, client1)
  158. test.Equal(t, err, nil)
  159. client2 := newClientV2(2, conn, nsqd)
  160. client2.SetReadyCount(25)
  161. err = channel.AddClient(client2.ID, client2)
  162. test.NotEqual(t, err, nil)
  163. }
  164. func TestChannelHealth(t *testing.T) {
  165. opts := NewOptions()
  166. opts.Logger = test.NewTestLogger(t)
  167. opts.MemQueueSize = 2
  168. _, httpAddr, nsqd := mustStartNSQD(opts)
  169. defer os.RemoveAll(opts.DataPath)
  170. defer nsqd.Exit()
  171. topic := nsqd.GetTopic("test")
  172. channel := topic.GetChannel("channel")
  173. channel.backend = &errorBackendQueue{}
  174. msg := NewMessage(topic.GenerateID(), make([]byte, 100))
  175. err := channel.PutMessage(msg)
  176. test.Nil(t, err)
  177. msg = NewMessage(topic.GenerateID(), make([]byte, 100))
  178. err = channel.PutMessage(msg)
  179. test.Nil(t, err)
  180. msg = NewMessage(topic.GenerateID(), make([]byte, 100))
  181. err = channel.PutMessage(msg)
  182. test.NotNil(t, err)
  183. url := fmt.Sprintf("http://%s/ping", httpAddr)
  184. resp, err := http.Get(url)
  185. test.Nil(t, err)
  186. test.Equal(t, 500, resp.StatusCode)
  187. body, _ := ioutil.ReadAll(resp.Body)
  188. resp.Body.Close()
  189. test.Equal(t, "NOK - never gonna happen", string(body))
  190. channel.backend = &errorRecoveredBackendQueue{}
  191. msg = NewMessage(topic.GenerateID(), make([]byte, 100))
  192. err = channel.PutMessage(msg)
  193. test.Nil(t, err)
  194. resp, err = http.Get(url)
  195. test.Nil(t, err)
  196. test.Equal(t, 200, resp.StatusCode)
  197. body, _ = ioutil.ReadAll(resp.Body)
  198. resp.Body.Close()
  199. test.Equal(t, "OK", string(body))
  200. }