nsqd_test.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441
  1. package nsqd
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "io/ioutil"
  7. "net"
  8. "os"
  9. "strconv"
  10. "sync/atomic"
  11. "testing"
  12. "time"
  13. "github.com/nsqio/nsq/internal/http_api"
  14. "github.com/nsqio/nsq/internal/test"
  15. "github.com/nsqio/nsq/nsqlookupd"
  16. )
  17. const (
  18. ConnectTimeout = 2 * time.Second
  19. RequestTimeout = 5 * time.Second
  20. )
  21. func getMetadata(n *NSQD) (*Metadata, error) {
  22. fn := newMetadataFile(n.getOpts())
  23. data, err := ioutil.ReadFile(fn)
  24. if err != nil {
  25. return nil, err
  26. }
  27. var m Metadata
  28. err = json.Unmarshal(data, &m)
  29. if err != nil {
  30. return nil, err
  31. }
  32. return &m, nil
  33. }
  34. func TestStartup(t *testing.T) {
  35. var msg *Message
  36. iterations := 300
  37. doneExitChan := make(chan int)
  38. opts := NewOptions()
  39. opts.Logger = test.NewTestLogger(t)
  40. opts.MemQueueSize = 100
  41. opts.MaxBytesPerFile = 10240
  42. _, _, nsqd := mustStartNSQD(opts)
  43. defer os.RemoveAll(opts.DataPath)
  44. origDataPath := opts.DataPath
  45. topicName := "nsqd_test" + strconv.Itoa(int(time.Now().Unix()))
  46. exitChan := make(chan int)
  47. go func() {
  48. <-exitChan
  49. nsqd.Exit()
  50. doneExitChan <- 1
  51. }()
  52. // verify nsqd metadata shows no topics
  53. err := nsqd.PersistMetadata()
  54. test.Nil(t, err)
  55. atomic.StoreInt32(&nsqd.isLoading, 1)
  56. nsqd.GetTopic(topicName) // will not persist if `flagLoading`
  57. m, err := getMetadata(nsqd)
  58. test.Nil(t, err)
  59. test.Equal(t, 0, len(m.Topics))
  60. nsqd.DeleteExistingTopic(topicName)
  61. atomic.StoreInt32(&nsqd.isLoading, 0)
  62. body := make([]byte, 256)
  63. topic := nsqd.GetTopic(topicName)
  64. for i := 0; i < iterations; i++ {
  65. msg := NewMessage(topic.GenerateID(), body)
  66. topic.PutMessage(msg)
  67. }
  68. t.Logf("pulling from channel")
  69. channel1 := topic.GetChannel("ch1")
  70. t.Logf("read %d msgs", iterations/2)
  71. for i := 0; i < iterations/2; i++ {
  72. select {
  73. case msg = <-channel1.memoryMsgChan:
  74. case b := <-channel1.backend.ReadChan():
  75. msg, _ = decodeMessage(b)
  76. }
  77. t.Logf("read message %d", i+1)
  78. test.Equal(t, body, msg.Body)
  79. }
  80. for {
  81. if channel1.Depth() == int64(iterations/2) {
  82. break
  83. }
  84. time.Sleep(50 * time.Millisecond)
  85. }
  86. // make sure metadata shows the topic
  87. m, err = getMetadata(nsqd)
  88. test.Nil(t, err)
  89. test.Equal(t, 1, len(m.Topics))
  90. test.Equal(t, topicName, m.Topics[0].Name)
  91. exitChan <- 1
  92. <-doneExitChan
  93. // start up a new nsqd w/ the same folder
  94. opts = NewOptions()
  95. opts.Logger = test.NewTestLogger(t)
  96. opts.MemQueueSize = 100
  97. opts.MaxBytesPerFile = 10240
  98. opts.DataPath = origDataPath
  99. _, _, nsqd = mustStartNSQD(opts)
  100. go func() {
  101. <-exitChan
  102. nsqd.Exit()
  103. doneExitChan <- 1
  104. }()
  105. topic = nsqd.GetTopic(topicName)
  106. // should be empty; channel should have drained everything
  107. count := topic.Depth()
  108. test.Equal(t, int64(0), count)
  109. channel1 = topic.GetChannel("ch1")
  110. for {
  111. if channel1.Depth() == int64(iterations/2) {
  112. break
  113. }
  114. time.Sleep(50 * time.Millisecond)
  115. }
  116. // read the other half of the messages
  117. for i := 0; i < iterations/2; i++ {
  118. select {
  119. case msg = <-channel1.memoryMsgChan:
  120. case b := <-channel1.backend.ReadChan():
  121. msg, _ = decodeMessage(b)
  122. }
  123. t.Logf("read message %d", i+1)
  124. test.Equal(t, body, msg.Body)
  125. }
  126. // verify we drained things
  127. test.Equal(t, 0, len(topic.memoryMsgChan))
  128. test.Equal(t, int64(0), topic.backend.Depth())
  129. exitChan <- 1
  130. <-doneExitChan
  131. }
  132. func TestEphemeralTopicsAndChannels(t *testing.T) {
  133. // ephemeral topics/channels are lazily removed after the last channel/client is removed
  134. opts := NewOptions()
  135. opts.Logger = test.NewTestLogger(t)
  136. opts.MemQueueSize = 100
  137. _, _, nsqd := mustStartNSQD(opts)
  138. defer os.RemoveAll(opts.DataPath)
  139. topicName := "ephemeral_topic" + strconv.Itoa(int(time.Now().Unix())) + "#ephemeral"
  140. doneExitChan := make(chan int)
  141. exitChan := make(chan int)
  142. go func() {
  143. <-exitChan
  144. nsqd.Exit()
  145. doneExitChan <- 1
  146. }()
  147. body := []byte("an_ephemeral_message")
  148. topic := nsqd.GetTopic(topicName)
  149. ephemeralChannel := topic.GetChannel("ch1#ephemeral")
  150. client := newClientV2(0, nil, nsqd)
  151. err := ephemeralChannel.AddClient(client.ID, client)
  152. test.Equal(t, err, nil)
  153. msg := NewMessage(topic.GenerateID(), body)
  154. topic.PutMessage(msg)
  155. msg = <-ephemeralChannel.memoryMsgChan
  156. test.Equal(t, body, msg.Body)
  157. ephemeralChannel.RemoveClient(client.ID)
  158. time.Sleep(100 * time.Millisecond)
  159. topic.Lock()
  160. numChannels := len(topic.channelMap)
  161. topic.Unlock()
  162. test.Equal(t, 0, numChannels)
  163. nsqd.Lock()
  164. numTopics := len(nsqd.topicMap)
  165. nsqd.Unlock()
  166. test.Equal(t, 0, numTopics)
  167. exitChan <- 1
  168. <-doneExitChan
  169. }
  170. func TestPauseMetadata(t *testing.T) {
  171. opts := NewOptions()
  172. opts.Logger = test.NewTestLogger(t)
  173. _, _, nsqd := mustStartNSQD(opts)
  174. defer os.RemoveAll(opts.DataPath)
  175. defer nsqd.Exit()
  176. // avoid concurrency issue of async PersistMetadata() calls
  177. atomic.StoreInt32(&nsqd.isLoading, 1)
  178. topicName := "pause_metadata" + strconv.Itoa(int(time.Now().Unix()))
  179. topic := nsqd.GetTopic(topicName)
  180. channel := topic.GetChannel("ch")
  181. atomic.StoreInt32(&nsqd.isLoading, 0)
  182. nsqd.PersistMetadata()
  183. var isPaused = func(n *NSQD, topicIndex int, channelIndex int) bool {
  184. m, _ := getMetadata(n)
  185. return m.Topics[topicIndex].Channels[channelIndex].Paused
  186. }
  187. test.Equal(t, false, isPaused(nsqd, 0, 0))
  188. channel.Pause()
  189. test.Equal(t, false, isPaused(nsqd, 0, 0))
  190. nsqd.PersistMetadata()
  191. test.Equal(t, true, isPaused(nsqd, 0, 0))
  192. channel.UnPause()
  193. test.Equal(t, true, isPaused(nsqd, 0, 0))
  194. nsqd.PersistMetadata()
  195. test.Equal(t, false, isPaused(nsqd, 0, 0))
  196. }
  197. func mustStartNSQLookupd(opts *nsqlookupd.Options) (*net.TCPAddr, *net.TCPAddr, *nsqlookupd.NSQLookupd) {
  198. opts.TCPAddress = "127.0.0.1:0"
  199. opts.HTTPAddress = "127.0.0.1:0"
  200. lookupd, err := nsqlookupd.New(opts)
  201. if err != nil {
  202. panic(err)
  203. }
  204. go func() {
  205. err := lookupd.Main()
  206. if err != nil {
  207. panic(err)
  208. }
  209. }()
  210. return lookupd.RealTCPAddr(), lookupd.RealHTTPAddr(), lookupd
  211. }
  212. func TestReconfigure(t *testing.T) {
  213. lopts := nsqlookupd.NewOptions()
  214. lopts.Logger = test.NewTestLogger(t)
  215. lopts1 := *lopts
  216. _, _, lookupd1 := mustStartNSQLookupd(&lopts1)
  217. defer lookupd1.Exit()
  218. lopts2 := *lopts
  219. _, _, lookupd2 := mustStartNSQLookupd(&lopts2)
  220. defer lookupd2.Exit()
  221. lopts3 := *lopts
  222. _, _, lookupd3 := mustStartNSQLookupd(&lopts3)
  223. defer lookupd3.Exit()
  224. opts := NewOptions()
  225. opts.Logger = test.NewTestLogger(t)
  226. _, _, nsqd := mustStartNSQD(opts)
  227. defer os.RemoveAll(opts.DataPath)
  228. defer nsqd.Exit()
  229. newOpts := NewOptions()
  230. newOpts.Logger = opts.Logger
  231. newOpts.NSQLookupdTCPAddresses = []string{lookupd1.RealTCPAddr().String()}
  232. nsqd.swapOpts(newOpts)
  233. nsqd.triggerOptsNotification()
  234. test.Equal(t, 1, len(nsqd.getOpts().NSQLookupdTCPAddresses))
  235. var numLookupPeers int
  236. for i := 0; i < 100; i++ {
  237. numLookupPeers = len(nsqd.lookupPeers.Load().([]*lookupPeer))
  238. if numLookupPeers == 1 {
  239. break
  240. }
  241. time.Sleep(10 * time.Millisecond)
  242. }
  243. test.Equal(t, 1, numLookupPeers)
  244. newOpts = NewOptions()
  245. newOpts.Logger = opts.Logger
  246. newOpts.NSQLookupdTCPAddresses = []string{lookupd2.RealTCPAddr().String(), lookupd3.RealTCPAddr().String()}
  247. nsqd.swapOpts(newOpts)
  248. nsqd.triggerOptsNotification()
  249. test.Equal(t, 2, len(nsqd.getOpts().NSQLookupdTCPAddresses))
  250. for i := 0; i < 100; i++ {
  251. numLookupPeers = len(nsqd.lookupPeers.Load().([]*lookupPeer))
  252. if numLookupPeers == 2 {
  253. break
  254. }
  255. time.Sleep(10 * time.Millisecond)
  256. }
  257. test.Equal(t, 2, numLookupPeers)
  258. var lookupPeers []string
  259. for _, lp := range nsqd.lookupPeers.Load().([]*lookupPeer) {
  260. lookupPeers = append(lookupPeers, lp.addr)
  261. }
  262. test.Equal(t, newOpts.NSQLookupdTCPAddresses, lookupPeers)
  263. }
  264. func TestCluster(t *testing.T) {
  265. lopts := nsqlookupd.NewOptions()
  266. lopts.Logger = test.NewTestLogger(t)
  267. lopts.BroadcastAddress = "127.0.0.1"
  268. _, _, lookupd := mustStartNSQLookupd(lopts)
  269. opts := NewOptions()
  270. opts.Logger = test.NewTestLogger(t)
  271. opts.NSQLookupdTCPAddresses = []string{lookupd.RealTCPAddr().String()}
  272. opts.BroadcastAddress = "127.0.0.1"
  273. _, _, nsqd := mustStartNSQD(opts)
  274. defer os.RemoveAll(opts.DataPath)
  275. defer nsqd.Exit()
  276. topicName := "cluster_test" + strconv.Itoa(int(time.Now().Unix()))
  277. hostname, err := os.Hostname()
  278. test.Nil(t, err)
  279. url := fmt.Sprintf("http://%s/topic/create?topic=%s", nsqd.RealHTTPAddr(), topicName)
  280. err = http_api.NewClient(nil, ConnectTimeout, RequestTimeout).POSTV1(url)
  281. test.Nil(t, err)
  282. url = fmt.Sprintf("http://%s/channel/create?topic=%s&channel=ch", nsqd.RealHTTPAddr(), topicName)
  283. err = http_api.NewClient(nil, ConnectTimeout, RequestTimeout).POSTV1(url)
  284. test.Nil(t, err)
  285. // allow some time for nsqd to push info to nsqlookupd
  286. time.Sleep(350 * time.Millisecond)
  287. var d map[string][]struct {
  288. Hostname string `json:"hostname"`
  289. BroadcastAddress string `json:"broadcast_address"`
  290. TCPPort int `json:"tcp_port"`
  291. Tombstoned bool `json:"tombstoned"`
  292. }
  293. endpoint := fmt.Sprintf("http://%s/debug", lookupd.RealHTTPAddr())
  294. err = http_api.NewClient(nil, ConnectTimeout, RequestTimeout).GETV1(endpoint, &d)
  295. test.Nil(t, err)
  296. topicData := d["topic:"+topicName+":"]
  297. test.Equal(t, 1, len(topicData))
  298. test.Equal(t, hostname, topicData[0].Hostname)
  299. test.Equal(t, "127.0.0.1", topicData[0].BroadcastAddress)
  300. test.Equal(t, nsqd.RealTCPAddr().Port, topicData[0].TCPPort)
  301. test.Equal(t, false, topicData[0].Tombstoned)
  302. channelData := d["channel:"+topicName+":ch"]
  303. test.Equal(t, 1, len(channelData))
  304. test.Equal(t, hostname, channelData[0].Hostname)
  305. test.Equal(t, "127.0.0.1", channelData[0].BroadcastAddress)
  306. test.Equal(t, nsqd.RealTCPAddr().Port, channelData[0].TCPPort)
  307. test.Equal(t, false, channelData[0].Tombstoned)
  308. var lr struct {
  309. Producers []struct {
  310. Hostname string `json:"hostname"`
  311. BroadcastAddress string `json:"broadcast_address"`
  312. TCPPort int `json:"tcp_port"`
  313. } `json:"producers"`
  314. Channels []string `json:"channels"`
  315. }
  316. endpoint = fmt.Sprintf("http://%s/lookup?topic=%s", lookupd.RealHTTPAddr(), topicName)
  317. err = http_api.NewClient(nil, ConnectTimeout, RequestTimeout).GETV1(endpoint, &lr)
  318. test.Nil(t, err)
  319. test.Equal(t, 1, len(lr.Producers))
  320. test.Equal(t, hostname, lr.Producers[0].Hostname)
  321. test.Equal(t, "127.0.0.1", lr.Producers[0].BroadcastAddress)
  322. test.Equal(t, nsqd.RealTCPAddr().Port, lr.Producers[0].TCPPort)
  323. test.Equal(t, 1, len(lr.Channels))
  324. test.Equal(t, "ch", lr.Channels[0])
  325. url = fmt.Sprintf("http://%s/topic/delete?topic=%s", nsqd.RealHTTPAddr(), topicName)
  326. err = http_api.NewClient(nil, ConnectTimeout, RequestTimeout).POSTV1(url)
  327. test.Nil(t, err)
  328. // allow some time for nsqd to push info to nsqlookupd
  329. time.Sleep(350 * time.Millisecond)
  330. endpoint = fmt.Sprintf("http://%s/lookup?topic=%s", lookupd.RealHTTPAddr(), topicName)
  331. err = http_api.NewClient(nil, ConnectTimeout, RequestTimeout).GETV1(endpoint, &lr)
  332. test.Nil(t, err)
  333. test.Equal(t, 0, len(lr.Producers))
  334. var dd map[string][]interface{}
  335. endpoint = fmt.Sprintf("http://%s/debug", lookupd.RealHTTPAddr())
  336. err = http_api.NewClient(nil, ConnectTimeout, RequestTimeout).GETV1(endpoint, &dd)
  337. test.Nil(t, err)
  338. test.Equal(t, 0, len(dd["topic:"+topicName+":"]))
  339. test.Equal(t, 0, len(dd["channel:"+topicName+":ch"]))
  340. }
  341. func TestSetHealth(t *testing.T) {
  342. opts := NewOptions()
  343. opts.Logger = test.NewTestLogger(t)
  344. nsqd, err := New(opts)
  345. test.Nil(t, err)
  346. defer nsqd.Exit()
  347. test.Nil(t, nsqd.GetError())
  348. test.Equal(t, true, nsqd.IsHealthy())
  349. nsqd.SetHealth(nil)
  350. test.Nil(t, nsqd.GetError())
  351. test.Equal(t, true, nsqd.IsHealthy())
  352. nsqd.SetHealth(errors.New("health error"))
  353. test.NotNil(t, nsqd.GetError())
  354. test.Equal(t, "NOK - health error", nsqd.GetHealth())
  355. test.Equal(t, false, nsqd.IsHealthy())
  356. nsqd.SetHealth(nil)
  357. test.Nil(t, nsqd.GetError())
  358. test.Equal(t, "OK", nsqd.GetHealth())
  359. test.Equal(t, true, nsqd.IsHealthy())
  360. }