http_test.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484
  1. package nsqlookupd
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io/ioutil"
  6. "net/http"
  7. "os"
  8. "strconv"
  9. "testing"
  10. "time"
  11. "github.com/nsqio/nsq/internal/test"
  12. "github.com/nsqio/nsq/internal/version"
  13. "github.com/nsqio/nsq/nsqd"
  14. )
  15. type InfoDoc struct {
  16. Version string `json:"version"`
  17. }
  18. type ChannelsDoc struct {
  19. Channels []interface{} `json:"channels"`
  20. }
  21. type ErrMessage struct {
  22. Message string `json:"message"`
  23. }
  24. func bootstrapNSQCluster(t *testing.T) (string, []*nsqd.NSQD, *NSQLookupd) {
  25. lgr := test.NewTestLogger(t)
  26. nsqlookupdOpts := NewOptions()
  27. nsqlookupdOpts.TCPAddress = "127.0.0.1:0"
  28. nsqlookupdOpts.HTTPAddress = "127.0.0.1:0"
  29. nsqlookupdOpts.BroadcastAddress = "127.0.0.1"
  30. nsqlookupdOpts.Logger = lgr
  31. nsqlookupd1, err := New(nsqlookupdOpts)
  32. if err != nil {
  33. panic(err)
  34. }
  35. go func() {
  36. err := nsqlookupd1.Main()
  37. if err != nil {
  38. panic(err)
  39. }
  40. }()
  41. time.Sleep(100 * time.Millisecond)
  42. nsqdOpts := nsqd.NewOptions()
  43. nsqdOpts.TCPAddress = "127.0.0.1:0"
  44. nsqdOpts.HTTPAddress = "127.0.0.1:0"
  45. nsqdOpts.BroadcastAddress = "127.0.0.1"
  46. nsqdOpts.NSQLookupdTCPAddresses = []string{nsqlookupd1.RealTCPAddr().String()}
  47. nsqdOpts.Logger = lgr
  48. tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano()))
  49. if err != nil {
  50. panic(err)
  51. }
  52. nsqdOpts.DataPath = tmpDir
  53. nsqd1, err := nsqd.New(nsqdOpts)
  54. if err != nil {
  55. panic(err)
  56. }
  57. go func() {
  58. err := nsqd1.Main()
  59. if err != nil {
  60. panic(err)
  61. }
  62. }()
  63. time.Sleep(100 * time.Millisecond)
  64. return tmpDir, []*nsqd.NSQD{nsqd1}, nsqlookupd1
  65. }
  66. func makeTopic(nsqlookupd *NSQLookupd, topicName string) {
  67. key := Registration{"topic", topicName, ""}
  68. nsqlookupd.DB.AddRegistration(key)
  69. }
  70. func makeChannel(nsqlookupd *NSQLookupd, topicName string, channelName string) {
  71. key := Registration{"channel", topicName, channelName}
  72. nsqlookupd.DB.AddRegistration(key)
  73. makeTopic(nsqlookupd, topicName)
  74. }
  75. func TestPing(t *testing.T) {
  76. dataPath, nsqds, nsqlookupd1 := bootstrapNSQCluster(t)
  77. defer os.RemoveAll(dataPath)
  78. defer nsqds[0].Exit()
  79. defer nsqlookupd1.Exit()
  80. client := http.Client{}
  81. url := fmt.Sprintf("http://%s/ping", nsqlookupd1.RealHTTPAddr())
  82. req, _ := http.NewRequest("GET", url, nil)
  83. resp, err := client.Do(req)
  84. test.Nil(t, err)
  85. test.Equal(t, 200, resp.StatusCode)
  86. body, _ := ioutil.ReadAll(resp.Body)
  87. resp.Body.Close()
  88. test.Equal(t, []byte("OK"), body)
  89. }
  90. func TestInfo(t *testing.T) {
  91. dataPath, nsqds, nsqlookupd1 := bootstrapNSQCluster(t)
  92. defer os.RemoveAll(dataPath)
  93. defer nsqds[0].Exit()
  94. defer nsqlookupd1.Exit()
  95. client := http.Client{}
  96. url := fmt.Sprintf("http://%s/info", nsqlookupd1.RealHTTPAddr())
  97. req, _ := http.NewRequest("GET", url, nil)
  98. resp, err := client.Do(req)
  99. test.Nil(t, err)
  100. test.Equal(t, 200, resp.StatusCode)
  101. body, _ := ioutil.ReadAll(resp.Body)
  102. resp.Body.Close()
  103. t.Logf("%s", body)
  104. info := InfoDoc{}
  105. err = json.Unmarshal(body, &info)
  106. test.Nil(t, err)
  107. test.Equal(t, version.Binary, info.Version)
  108. }
  109. func TestCreateTopic(t *testing.T) {
  110. dataPath, nsqds, nsqlookupd1 := bootstrapNSQCluster(t)
  111. defer os.RemoveAll(dataPath)
  112. defer nsqds[0].Exit()
  113. defer nsqlookupd1.Exit()
  114. em := ErrMessage{}
  115. client := http.Client{}
  116. url := fmt.Sprintf("http://%s/topic/create", nsqlookupd1.RealHTTPAddr())
  117. req, _ := http.NewRequest("POST", url, nil)
  118. resp, err := client.Do(req)
  119. test.Nil(t, err)
  120. test.Equal(t, 400, resp.StatusCode)
  121. test.Equal(t, "Bad Request", http.StatusText(resp.StatusCode))
  122. body, _ := ioutil.ReadAll(resp.Body)
  123. resp.Body.Close()
  124. t.Logf("%s", body)
  125. err = json.Unmarshal(body, &em)
  126. test.Nil(t, err)
  127. test.Equal(t, "MISSING_ARG_TOPIC", em.Message)
  128. topicName := "sampletopicA" + strconv.Itoa(int(time.Now().Unix())) + "$"
  129. url = fmt.Sprintf("http://%s/topic/create?topic=%s", nsqlookupd1.RealHTTPAddr(), topicName)
  130. req, _ = http.NewRequest("POST", url, nil)
  131. resp, err = client.Do(req)
  132. test.Nil(t, err)
  133. test.Equal(t, 400, resp.StatusCode)
  134. test.Equal(t, "Bad Request", http.StatusText(resp.StatusCode))
  135. body, _ = ioutil.ReadAll(resp.Body)
  136. resp.Body.Close()
  137. t.Logf("%s", body)
  138. err = json.Unmarshal(body, &em)
  139. test.Nil(t, err)
  140. test.Equal(t, "INVALID_ARG_TOPIC", em.Message)
  141. topicName = "sampletopicA" + strconv.Itoa(int(time.Now().Unix()))
  142. url = fmt.Sprintf("http://%s/topic/create?topic=%s", nsqlookupd1.RealHTTPAddr(), topicName)
  143. req, _ = http.NewRequest("POST", url, nil)
  144. resp, err = client.Do(req)
  145. test.Nil(t, err)
  146. test.Equal(t, 200, resp.StatusCode)
  147. body, _ = ioutil.ReadAll(resp.Body)
  148. resp.Body.Close()
  149. t.Logf("%s", body)
  150. test.Equal(t, []byte(""), body)
  151. }
  152. func TestDeleteTopic(t *testing.T) {
  153. dataPath, nsqds, nsqlookupd1 := bootstrapNSQCluster(t)
  154. defer os.RemoveAll(dataPath)
  155. defer nsqds[0].Exit()
  156. defer nsqlookupd1.Exit()
  157. em := ErrMessage{}
  158. client := http.Client{}
  159. url := fmt.Sprintf("http://%s/topic/delete", nsqlookupd1.RealHTTPAddr())
  160. req, _ := http.NewRequest("POST", url, nil)
  161. resp, err := client.Do(req)
  162. test.Nil(t, err)
  163. test.Equal(t, 400, resp.StatusCode)
  164. test.Equal(t, "Bad Request", http.StatusText(resp.StatusCode))
  165. body, _ := ioutil.ReadAll(resp.Body)
  166. resp.Body.Close()
  167. t.Logf("%s", body)
  168. err = json.Unmarshal(body, &em)
  169. test.Nil(t, err)
  170. test.Equal(t, "MISSING_ARG_TOPIC", em.Message)
  171. topicName := "sampletopicA" + strconv.Itoa(int(time.Now().Unix()))
  172. makeTopic(nsqlookupd1, topicName)
  173. url = fmt.Sprintf("http://%s/topic/delete?topic=%s", nsqlookupd1.RealHTTPAddr(), topicName)
  174. req, _ = http.NewRequest("POST", url, nil)
  175. resp, err = client.Do(req)
  176. test.Nil(t, err)
  177. test.Equal(t, 200, resp.StatusCode)
  178. body, _ = ioutil.ReadAll(resp.Body)
  179. resp.Body.Close()
  180. t.Logf("%s", body)
  181. test.Equal(t, []byte(""), body)
  182. topicName = "sampletopicB" + strconv.Itoa(int(time.Now().Unix()))
  183. channelName := "foobar" + strconv.Itoa(int(time.Now().Unix()))
  184. makeChannel(nsqlookupd1, topicName, channelName)
  185. url = fmt.Sprintf("http://%s/topic/delete?topic=%s", nsqlookupd1.RealHTTPAddr(), topicName)
  186. req, _ = http.NewRequest("POST", url, nil)
  187. resp, err = client.Do(req)
  188. test.Nil(t, err)
  189. test.Equal(t, 200, resp.StatusCode)
  190. body, _ = ioutil.ReadAll(resp.Body)
  191. resp.Body.Close()
  192. t.Logf("%s", body)
  193. test.Equal(t, []byte(""), body)
  194. }
  195. func TestGetChannels(t *testing.T) {
  196. dataPath, nsqds, nsqlookupd1 := bootstrapNSQCluster(t)
  197. defer os.RemoveAll(dataPath)
  198. defer nsqds[0].Exit()
  199. defer nsqlookupd1.Exit()
  200. client := http.Client{}
  201. url := fmt.Sprintf("http://%s/channels", nsqlookupd1.RealHTTPAddr())
  202. em := ErrMessage{}
  203. req, _ := http.NewRequest("GET", url, nil)
  204. req.Header.Add("Accept", "application/vnd.nsq; version=1.0")
  205. resp, err := client.Do(req)
  206. test.Nil(t, err)
  207. test.Equal(t, 400, resp.StatusCode)
  208. test.Equal(t, "Bad Request", http.StatusText(resp.StatusCode))
  209. body, _ := ioutil.ReadAll(resp.Body)
  210. resp.Body.Close()
  211. t.Logf("%s", body)
  212. err = json.Unmarshal(body, &em)
  213. test.Nil(t, err)
  214. test.Equal(t, "MISSING_ARG_TOPIC", em.Message)
  215. ch := ChannelsDoc{}
  216. topicName := "sampletopicA" + strconv.Itoa(int(time.Now().Unix()))
  217. makeTopic(nsqlookupd1, topicName)
  218. url = fmt.Sprintf("http://%s/channels?topic=%s", nsqlookupd1.RealHTTPAddr(), topicName)
  219. req, _ = http.NewRequest("GET", url, nil)
  220. req.Header.Add("Accept", "application/vnd.nsq; version=1.0")
  221. resp, err = client.Do(req)
  222. test.Nil(t, err)
  223. test.Equal(t, 200, resp.StatusCode)
  224. body, _ = ioutil.ReadAll(resp.Body)
  225. resp.Body.Close()
  226. t.Logf("%s", body)
  227. err = json.Unmarshal(body, &ch)
  228. test.Nil(t, err)
  229. test.Equal(t, 0, len(ch.Channels))
  230. topicName = "sampletopicB" + strconv.Itoa(int(time.Now().Unix()))
  231. channelName := "foobar" + strconv.Itoa(int(time.Now().Unix()))
  232. makeChannel(nsqlookupd1, topicName, channelName)
  233. url = fmt.Sprintf("http://%s/channels?topic=%s", nsqlookupd1.RealHTTPAddr(), topicName)
  234. req, _ = http.NewRequest("GET", url, nil)
  235. req.Header.Add("Accept", "application/vnd.nsq; version=1.0")
  236. resp, err = client.Do(req)
  237. test.Nil(t, err)
  238. test.Equal(t, 200, resp.StatusCode)
  239. body, _ = ioutil.ReadAll(resp.Body)
  240. resp.Body.Close()
  241. t.Logf("%s", body)
  242. err = json.Unmarshal(body, &ch)
  243. test.Nil(t, err)
  244. test.Equal(t, 1, len(ch.Channels))
  245. test.Equal(t, channelName, ch.Channels[0])
  246. }
  247. func TestCreateChannel(t *testing.T) {
  248. dataPath, nsqds, nsqlookupd1 := bootstrapNSQCluster(t)
  249. defer os.RemoveAll(dataPath)
  250. defer nsqds[0].Exit()
  251. defer nsqlookupd1.Exit()
  252. em := ErrMessage{}
  253. client := http.Client{}
  254. url := fmt.Sprintf("http://%s/channel/create", nsqlookupd1.RealHTTPAddr())
  255. req, _ := http.NewRequest("POST", url, nil)
  256. resp, err := client.Do(req)
  257. test.Nil(t, err)
  258. test.Equal(t, 400, resp.StatusCode)
  259. test.Equal(t, "Bad Request", http.StatusText(resp.StatusCode))
  260. body, _ := ioutil.ReadAll(resp.Body)
  261. resp.Body.Close()
  262. t.Logf("%s", body)
  263. err = json.Unmarshal(body, &em)
  264. test.Nil(t, err)
  265. test.Equal(t, "MISSING_ARG_TOPIC", em.Message)
  266. topicName := "sampletopicB" + strconv.Itoa(int(time.Now().Unix())) + "$"
  267. url = fmt.Sprintf("http://%s/channel/create?topic=%s", nsqlookupd1.RealHTTPAddr(), topicName)
  268. req, _ = http.NewRequest("POST", url, nil)
  269. resp, err = client.Do(req)
  270. test.Nil(t, err)
  271. test.Equal(t, 400, resp.StatusCode)
  272. test.Equal(t, "Bad Request", http.StatusText(resp.StatusCode))
  273. body, _ = ioutil.ReadAll(resp.Body)
  274. resp.Body.Close()
  275. t.Logf("%s", body)
  276. err = json.Unmarshal(body, &em)
  277. test.Nil(t, err)
  278. test.Equal(t, "INVALID_ARG_TOPIC", em.Message)
  279. topicName = "sampletopicB" + strconv.Itoa(int(time.Now().Unix()))
  280. url = fmt.Sprintf("http://%s/channel/create?topic=%s", nsqlookupd1.RealHTTPAddr(), topicName)
  281. req, _ = http.NewRequest("POST", url, nil)
  282. resp, err = client.Do(req)
  283. test.Nil(t, err)
  284. test.Equal(t, 400, resp.StatusCode)
  285. test.Equal(t, "Bad Request", http.StatusText(resp.StatusCode))
  286. body, _ = ioutil.ReadAll(resp.Body)
  287. resp.Body.Close()
  288. t.Logf("%s", body)
  289. err = json.Unmarshal(body, &em)
  290. test.Nil(t, err)
  291. test.Equal(t, "MISSING_ARG_CHANNEL", em.Message)
  292. channelName := "foobar" + strconv.Itoa(int(time.Now().Unix())) + "$"
  293. url = fmt.Sprintf("http://%s/channel/create?topic=%s&channel=%s", nsqlookupd1.RealHTTPAddr(), topicName, channelName)
  294. req, _ = http.NewRequest("POST", url, nil)
  295. resp, err = client.Do(req)
  296. test.Nil(t, err)
  297. test.Equal(t, 400, resp.StatusCode)
  298. test.Equal(t, "Bad Request", http.StatusText(resp.StatusCode))
  299. body, _ = ioutil.ReadAll(resp.Body)
  300. resp.Body.Close()
  301. t.Logf("%s", body)
  302. err = json.Unmarshal(body, &em)
  303. test.Nil(t, err)
  304. test.Equal(t, "INVALID_ARG_CHANNEL", em.Message)
  305. channelName = "foobar" + strconv.Itoa(int(time.Now().Unix()))
  306. url = fmt.Sprintf("http://%s/channel/create?topic=%s&channel=%s", nsqlookupd1.RealHTTPAddr(), topicName, channelName)
  307. req, _ = http.NewRequest("POST", url, nil)
  308. resp, err = client.Do(req)
  309. test.Nil(t, err)
  310. test.Equal(t, 200, resp.StatusCode)
  311. body, _ = ioutil.ReadAll(resp.Body)
  312. resp.Body.Close()
  313. t.Logf("%s", body)
  314. test.Equal(t, []byte(""), body)
  315. }
  316. func TestDeleteChannel(t *testing.T) {
  317. dataPath, nsqds, nsqlookupd1 := bootstrapNSQCluster(t)
  318. defer os.RemoveAll(dataPath)
  319. defer nsqds[0].Exit()
  320. defer nsqlookupd1.Exit()
  321. em := ErrMessage{}
  322. client := http.Client{}
  323. url := fmt.Sprintf("http://%s/channel/delete", nsqlookupd1.RealHTTPAddr())
  324. req, _ := http.NewRequest("POST", url, nil)
  325. resp, err := client.Do(req)
  326. test.Nil(t, err)
  327. test.Equal(t, 400, resp.StatusCode)
  328. test.Equal(t, "Bad Request", http.StatusText(resp.StatusCode))
  329. body, _ := ioutil.ReadAll(resp.Body)
  330. resp.Body.Close()
  331. t.Logf("%s", body)
  332. err = json.Unmarshal(body, &em)
  333. test.Nil(t, err)
  334. test.Equal(t, "MISSING_ARG_TOPIC", em.Message)
  335. topicName := "sampletopicB" + strconv.Itoa(int(time.Now().Unix())) + "$"
  336. url = fmt.Sprintf("http://%s/channel/delete?topic=%s", nsqlookupd1.RealHTTPAddr(), topicName)
  337. req, _ = http.NewRequest("POST", url, nil)
  338. resp, err = client.Do(req)
  339. test.Nil(t, err)
  340. test.Equal(t, 400, resp.StatusCode)
  341. test.Equal(t, "Bad Request", http.StatusText(resp.StatusCode))
  342. body, _ = ioutil.ReadAll(resp.Body)
  343. resp.Body.Close()
  344. t.Logf("%s", body)
  345. err = json.Unmarshal(body, &em)
  346. test.Nil(t, err)
  347. test.Equal(t, "INVALID_ARG_TOPIC", em.Message)
  348. topicName = "sampletopicB" + strconv.Itoa(int(time.Now().Unix()))
  349. url = fmt.Sprintf("http://%s/channel/delete?topic=%s", nsqlookupd1.RealHTTPAddr(), topicName)
  350. req, _ = http.NewRequest("POST", url, nil)
  351. resp, err = client.Do(req)
  352. test.Nil(t, err)
  353. test.Equal(t, 400, resp.StatusCode)
  354. test.Equal(t, "Bad Request", http.StatusText(resp.StatusCode))
  355. body, _ = ioutil.ReadAll(resp.Body)
  356. resp.Body.Close()
  357. t.Logf("%s", body)
  358. err = json.Unmarshal(body, &em)
  359. test.Nil(t, err)
  360. test.Equal(t, "MISSING_ARG_CHANNEL", em.Message)
  361. channelName := "foobar" + strconv.Itoa(int(time.Now().Unix())) + "$"
  362. url = fmt.Sprintf("http://%s/channel/delete?topic=%s&channel=%s", nsqlookupd1.RealHTTPAddr(), topicName, channelName)
  363. req, _ = http.NewRequest("POST", url, nil)
  364. resp, err = client.Do(req)
  365. test.Nil(t, err)
  366. test.Equal(t, 400, resp.StatusCode)
  367. test.Equal(t, "Bad Request", http.StatusText(resp.StatusCode))
  368. body, _ = ioutil.ReadAll(resp.Body)
  369. resp.Body.Close()
  370. t.Logf("%s", body)
  371. err = json.Unmarshal(body, &em)
  372. test.Nil(t, err)
  373. test.Equal(t, "INVALID_ARG_CHANNEL", em.Message)
  374. channelName = "foobar" + strconv.Itoa(int(time.Now().Unix()))
  375. url = fmt.Sprintf("http://%s/channel/delete?topic=%s&channel=%s", nsqlookupd1.RealHTTPAddr(), topicName, channelName)
  376. req, _ = http.NewRequest("POST", url, nil)
  377. resp, err = client.Do(req)
  378. test.Nil(t, err)
  379. test.Equal(t, 404, resp.StatusCode)
  380. test.Equal(t, "Not Found", http.StatusText(resp.StatusCode))
  381. body, _ = ioutil.ReadAll(resp.Body)
  382. resp.Body.Close()
  383. t.Logf("%s", body)
  384. err = json.Unmarshal(body, &em)
  385. test.Nil(t, err)
  386. test.Equal(t, "CHANNEL_NOT_FOUND", em.Message)
  387. makeChannel(nsqlookupd1, topicName, channelName)
  388. req, _ = http.NewRequest("POST", url, nil)
  389. resp, err = client.Do(req)
  390. test.Nil(t, err)
  391. test.Equal(t, 200, resp.StatusCode)
  392. body, _ = ioutil.ReadAll(resp.Body)
  393. resp.Body.Close()
  394. t.Logf("%s", body)
  395. test.Equal(t, []byte(""), body)
  396. }