topic_test.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. package nsqd
  2. import (
  3. "errors"
  4. "fmt"
  5. "io/ioutil"
  6. "net/http"
  7. "os"
  8. "runtime"
  9. "strconv"
  10. "testing"
  11. "time"
  12. "github.com/nsqio/nsq/internal/test"
  13. )
  14. func TestGetTopic(t *testing.T) {
  15. opts := NewOptions()
  16. opts.Logger = test.NewTestLogger(t)
  17. _, _, nsqd := mustStartNSQD(opts)
  18. defer os.RemoveAll(opts.DataPath)
  19. defer nsqd.Exit()
  20. topic1 := nsqd.GetTopic("test")
  21. test.NotNil(t, topic1)
  22. test.Equal(t, "test", topic1.name)
  23. topic2 := nsqd.GetTopic("test")
  24. test.Equal(t, topic1, topic2)
  25. topic3 := nsqd.GetTopic("test2")
  26. test.Equal(t, "test2", topic3.name)
  27. test.NotEqual(t, topic2, topic3)
  28. }
  29. func TestGetChannel(t *testing.T) {
  30. opts := NewOptions()
  31. opts.Logger = test.NewTestLogger(t)
  32. _, _, nsqd := mustStartNSQD(opts)
  33. defer os.RemoveAll(opts.DataPath)
  34. defer nsqd.Exit()
  35. topic := nsqd.GetTopic("test")
  36. channel1 := topic.GetChannel("ch1")
  37. test.NotNil(t, channel1)
  38. test.Equal(t, "ch1", channel1.name)
  39. channel2 := topic.GetChannel("ch2")
  40. test.Equal(t, channel1, topic.channelMap["ch1"])
  41. test.Equal(t, channel2, topic.channelMap["ch2"])
  42. }
  43. type errorBackendQueue struct{}
  44. func (d *errorBackendQueue) Put([]byte) error { return errors.New("never gonna happen") }
  45. func (d *errorBackendQueue) ReadChan() <-chan []byte { return nil }
  46. func (d *errorBackendQueue) Close() error { return nil }
  47. func (d *errorBackendQueue) Delete() error { return nil }
  48. func (d *errorBackendQueue) Depth() int64 { return 0 }
  49. func (d *errorBackendQueue) Empty() error { return nil }
  50. type errorRecoveredBackendQueue struct{ errorBackendQueue }
  51. func (d *errorRecoveredBackendQueue) Put([]byte) error { return nil }
  52. func TestHealth(t *testing.T) {
  53. opts := NewOptions()
  54. opts.Logger = test.NewTestLogger(t)
  55. opts.MemQueueSize = 2
  56. _, httpAddr, nsqd := mustStartNSQD(opts)
  57. defer os.RemoveAll(opts.DataPath)
  58. defer nsqd.Exit()
  59. topic := nsqd.GetTopic("test")
  60. topic.backend = &errorBackendQueue{}
  61. msg := NewMessage(topic.GenerateID(), make([]byte, 100))
  62. err := topic.PutMessage(msg)
  63. test.Nil(t, err)
  64. msg = NewMessage(topic.GenerateID(), make([]byte, 100))
  65. err = topic.PutMessages([]*Message{msg})
  66. test.Nil(t, err)
  67. msg = NewMessage(topic.GenerateID(), make([]byte, 100))
  68. err = topic.PutMessage(msg)
  69. test.NotNil(t, err)
  70. msg = NewMessage(topic.GenerateID(), make([]byte, 100))
  71. err = topic.PutMessages([]*Message{msg})
  72. test.NotNil(t, err)
  73. url := fmt.Sprintf("http://%s/ping", httpAddr)
  74. resp, err := http.Get(url)
  75. test.Nil(t, err)
  76. test.Equal(t, 500, resp.StatusCode)
  77. body, _ := ioutil.ReadAll(resp.Body)
  78. resp.Body.Close()
  79. test.Equal(t, "NOK - never gonna happen", string(body))
  80. topic.backend = &errorRecoveredBackendQueue{}
  81. msg = NewMessage(topic.GenerateID(), make([]byte, 100))
  82. err = topic.PutMessages([]*Message{msg})
  83. test.Nil(t, err)
  84. resp, err = http.Get(url)
  85. test.Nil(t, err)
  86. test.Equal(t, 200, resp.StatusCode)
  87. body, _ = ioutil.ReadAll(resp.Body)
  88. resp.Body.Close()
  89. test.Equal(t, "OK", string(body))
  90. }
  91. func TestDeletes(t *testing.T) {
  92. opts := NewOptions()
  93. opts.Logger = test.NewTestLogger(t)
  94. _, _, nsqd := mustStartNSQD(opts)
  95. defer os.RemoveAll(opts.DataPath)
  96. defer nsqd.Exit()
  97. topic := nsqd.GetTopic("test")
  98. channel1 := topic.GetChannel("ch1")
  99. test.NotNil(t, channel1)
  100. err := topic.DeleteExistingChannel("ch1")
  101. test.Nil(t, err)
  102. test.Equal(t, 0, len(topic.channelMap))
  103. channel2 := topic.GetChannel("ch2")
  104. test.NotNil(t, channel2)
  105. err = nsqd.DeleteExistingTopic("test")
  106. test.Nil(t, err)
  107. test.Equal(t, 0, len(topic.channelMap))
  108. test.Equal(t, 0, len(nsqd.topicMap))
  109. }
  110. func TestDeleteLast(t *testing.T) {
  111. opts := NewOptions()
  112. opts.Logger = test.NewTestLogger(t)
  113. _, _, nsqd := mustStartNSQD(opts)
  114. defer os.RemoveAll(opts.DataPath)
  115. defer nsqd.Exit()
  116. topic := nsqd.GetTopic("test")
  117. channel1 := topic.GetChannel("ch1")
  118. test.NotNil(t, channel1)
  119. err := topic.DeleteExistingChannel("ch1")
  120. test.Nil(t, err)
  121. test.Equal(t, 0, len(topic.channelMap))
  122. msg := NewMessage(topic.GenerateID(), []byte("aaaaaaaaaaaaaaaaaaaaaaaaaaa"))
  123. err = topic.PutMessage(msg)
  124. time.Sleep(100 * time.Millisecond)
  125. test.Nil(t, err)
  126. test.Equal(t, int64(1), topic.Depth())
  127. }
  128. func TestPause(t *testing.T) {
  129. opts := NewOptions()
  130. opts.Logger = test.NewTestLogger(t)
  131. _, _, nsqd := mustStartNSQD(opts)
  132. defer os.RemoveAll(opts.DataPath)
  133. defer nsqd.Exit()
  134. topicName := "test_topic_pause" + strconv.Itoa(int(time.Now().Unix()))
  135. topic := nsqd.GetTopic(topicName)
  136. err := topic.Pause()
  137. test.Nil(t, err)
  138. channel := topic.GetChannel("ch1")
  139. test.NotNil(t, channel)
  140. msg := NewMessage(topic.GenerateID(), []byte("aaaaaaaaaaaaaaaaaaaaaaaaaaa"))
  141. err = topic.PutMessage(msg)
  142. test.Nil(t, err)
  143. time.Sleep(15 * time.Millisecond)
  144. test.Equal(t, int64(1), topic.Depth())
  145. test.Equal(t, int64(0), channel.Depth())
  146. err = topic.UnPause()
  147. test.Nil(t, err)
  148. time.Sleep(15 * time.Millisecond)
  149. test.Equal(t, int64(0), topic.Depth())
  150. test.Equal(t, int64(1), channel.Depth())
  151. }
  152. func BenchmarkTopicPut(b *testing.B) {
  153. b.StopTimer()
  154. topicName := "bench_topic_put" + strconv.Itoa(b.N)
  155. opts := NewOptions()
  156. opts.Logger = test.NewTestLogger(b)
  157. opts.MemQueueSize = int64(b.N)
  158. _, _, nsqd := mustStartNSQD(opts)
  159. defer os.RemoveAll(opts.DataPath)
  160. defer nsqd.Exit()
  161. b.StartTimer()
  162. for i := 0; i <= b.N; i++ {
  163. topic := nsqd.GetTopic(topicName)
  164. msg := NewMessage(topic.GenerateID(), []byte("aaaaaaaaaaaaaaaaaaaaaaaaaaa"))
  165. topic.PutMessage(msg)
  166. }
  167. }
  168. func BenchmarkTopicToChannelPut(b *testing.B) {
  169. b.StopTimer()
  170. topicName := "bench_topic_to_channel_put" + strconv.Itoa(b.N)
  171. channelName := "bench"
  172. opts := NewOptions()
  173. opts.Logger = test.NewTestLogger(b)
  174. opts.MemQueueSize = int64(b.N)
  175. _, _, nsqd := mustStartNSQD(opts)
  176. defer os.RemoveAll(opts.DataPath)
  177. defer nsqd.Exit()
  178. channel := nsqd.GetTopic(topicName).GetChannel(channelName)
  179. b.StartTimer()
  180. for i := 0; i <= b.N; i++ {
  181. topic := nsqd.GetTopic(topicName)
  182. msg := NewMessage(topic.GenerateID(), []byte("aaaaaaaaaaaaaaaaaaaaaaaaaaa"))
  183. topic.PutMessage(msg)
  184. }
  185. for {
  186. if len(channel.memoryMsgChan) == b.N {
  187. break
  188. }
  189. runtime.Gosched()
  190. }
  191. }