http_test.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644
  1. package nsqadmin
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "fmt"
  6. "io/ioutil"
  7. "net"
  8. "net/http"
  9. "os"
  10. "strconv"
  11. "testing"
  12. "time"
  13. "github.com/nsqio/nsq/internal/clusterinfo"
  14. "github.com/nsqio/nsq/internal/test"
  15. "github.com/nsqio/nsq/internal/version"
  16. "github.com/nsqio/nsq/nsqd"
  17. "github.com/nsqio/nsq/nsqlookupd"
  18. )
  19. type TopicsDoc struct {
  20. Topics []interface{} `json:"topics"`
  21. }
  22. type TopicStatsDoc struct {
  23. *clusterinfo.TopicStats
  24. Message string `json:"message"`
  25. }
  26. type NodesDoc struct {
  27. Nodes clusterinfo.Producers `json:"nodes"`
  28. Message string `json:"message"`
  29. }
  30. type NodeStatsDoc struct {
  31. Node string `json:"node"`
  32. TopicStats []*clusterinfo.TopicStats `json:"topics"`
  33. TotalMessages int64 `json:"total_messages"`
  34. TotalClients int64 `json:"total_clients"`
  35. Message string `json:"message"`
  36. }
  37. type ChannelStatsDoc struct {
  38. *clusterinfo.ChannelStats
  39. Message string `json:"message"`
  40. }
  41. func mustStartNSQLookupd(opts *nsqlookupd.Options) (*net.TCPAddr, *net.TCPAddr, *nsqlookupd.NSQLookupd) {
  42. opts.TCPAddress = "127.0.0.1:0"
  43. opts.HTTPAddress = "127.0.0.1:0"
  44. lookupd, err := nsqlookupd.New(opts)
  45. if err != nil {
  46. panic(err)
  47. }
  48. go func() {
  49. err := lookupd.Main()
  50. if err != nil {
  51. panic(err)
  52. }
  53. }()
  54. return lookupd.RealTCPAddr(), lookupd.RealHTTPAddr(), lookupd
  55. }
  56. func bootstrapNSQCluster(t *testing.T) (string, []*nsqd.NSQD, []*nsqlookupd.NSQLookupd, *NSQAdmin) {
  57. return bootstrapNSQClusterWithAuth(t, false)
  58. }
  59. func bootstrapNSQClusterWithAuth(t *testing.T, withAuth bool) (string, []*nsqd.NSQD, []*nsqlookupd.NSQLookupd, *NSQAdmin) {
  60. lgr := test.NewTestLogger(t)
  61. nsqlookupdOpts := nsqlookupd.NewOptions()
  62. nsqlookupdOpts.TCPAddress = "127.0.0.1:0"
  63. nsqlookupdOpts.HTTPAddress = "127.0.0.1:0"
  64. nsqlookupdOpts.BroadcastAddress = "127.0.0.1"
  65. nsqlookupdOpts.Logger = lgr
  66. nsqlookupd1, err := nsqlookupd.New(nsqlookupdOpts)
  67. if err != nil {
  68. panic(err)
  69. }
  70. go func() {
  71. err := nsqlookupd1.Main()
  72. if err != nil {
  73. panic(err)
  74. }
  75. }()
  76. time.Sleep(100 * time.Millisecond)
  77. nsqdOpts := nsqd.NewOptions()
  78. nsqdOpts.TCPAddress = "127.0.0.1:0"
  79. nsqdOpts.HTTPAddress = "127.0.0.1:0"
  80. nsqdOpts.BroadcastAddress = "127.0.0.1"
  81. nsqdOpts.NSQLookupdTCPAddresses = []string{nsqlookupd1.RealTCPAddr().String()}
  82. nsqdOpts.Logger = lgr
  83. tmpDir, err := ioutil.TempDir("", "nsq-test-")
  84. if err != nil {
  85. panic(err)
  86. }
  87. nsqdOpts.DataPath = tmpDir
  88. nsqd1, err := nsqd.New(nsqdOpts)
  89. if err != nil {
  90. panic(err)
  91. }
  92. go func() {
  93. err := nsqd1.Main()
  94. if err != nil {
  95. panic(err)
  96. }
  97. }()
  98. nsqadminOpts := NewOptions()
  99. nsqadminOpts.HTTPAddress = "127.0.0.1:0"
  100. nsqadminOpts.NSQLookupdHTTPAddresses = []string{nsqlookupd1.RealHTTPAddr().String()}
  101. nsqadminOpts.Logger = lgr
  102. if withAuth {
  103. nsqadminOpts.AdminUsers = []string{"matt"}
  104. }
  105. nsqadmin1, err := New(nsqadminOpts)
  106. if err != nil {
  107. panic(err)
  108. }
  109. go func() {
  110. err := nsqadmin1.Main()
  111. if err != nil {
  112. panic(err)
  113. }
  114. }()
  115. time.Sleep(100 * time.Millisecond)
  116. return tmpDir, []*nsqd.NSQD{nsqd1}, []*nsqlookupd.NSQLookupd{nsqlookupd1}, nsqadmin1
  117. }
  118. func TestPing(t *testing.T) {
  119. dataPath, nsqds, nsqlookupds, nsqadmin1 := bootstrapNSQCluster(t)
  120. defer os.RemoveAll(dataPath)
  121. defer nsqds[0].Exit()
  122. defer nsqlookupds[0].Exit()
  123. defer nsqadmin1.Exit()
  124. client := http.Client{}
  125. url := fmt.Sprintf("http://%s/ping", nsqadmin1.RealHTTPAddr())
  126. req, _ := http.NewRequest("GET", url, nil)
  127. resp, err := client.Do(req)
  128. test.Nil(t, err)
  129. test.Equal(t, 200, resp.StatusCode)
  130. body, _ := ioutil.ReadAll(resp.Body)
  131. resp.Body.Close()
  132. test.Equal(t, []byte("OK"), body)
  133. }
  134. func TestHTTPTopicsGET(t *testing.T) {
  135. dataPath, nsqds, nsqlookupds, nsqadmin1 := bootstrapNSQCluster(t)
  136. defer os.RemoveAll(dataPath)
  137. defer nsqds[0].Exit()
  138. defer nsqlookupds[0].Exit()
  139. defer nsqadmin1.Exit()
  140. topicName := "test_topics_get" + strconv.Itoa(int(time.Now().Unix()))
  141. nsqds[0].GetTopic(topicName)
  142. time.Sleep(100 * time.Millisecond)
  143. client := http.Client{}
  144. url := fmt.Sprintf("http://%s/api/topics", nsqadmin1.RealHTTPAddr())
  145. req, _ := http.NewRequest("GET", url, nil)
  146. resp, err := client.Do(req)
  147. test.Nil(t, err)
  148. test.Equal(t, 200, resp.StatusCode)
  149. body, _ := ioutil.ReadAll(resp.Body)
  150. resp.Body.Close()
  151. t.Logf("%s", body)
  152. tr := TopicsDoc{}
  153. err = json.Unmarshal(body, &tr)
  154. test.Nil(t, err)
  155. test.Equal(t, 1, len(tr.Topics))
  156. test.Equal(t, topicName, tr.Topics[0])
  157. }
  158. func TestHTTPTopicGET(t *testing.T) {
  159. dataPath, nsqds, nsqlookupds, nsqadmin1 := bootstrapNSQCluster(t)
  160. defer os.RemoveAll(dataPath)
  161. defer nsqds[0].Exit()
  162. defer nsqlookupds[0].Exit()
  163. defer nsqadmin1.Exit()
  164. topicName := "test_topic_get" + strconv.Itoa(int(time.Now().Unix()))
  165. nsqds[0].GetTopic(topicName)
  166. time.Sleep(100 * time.Millisecond)
  167. client := http.Client{}
  168. url := fmt.Sprintf("http://%s/api/topics/%s", nsqadmin1.RealHTTPAddr(), topicName)
  169. req, _ := http.NewRequest("GET", url, nil)
  170. resp, err := client.Do(req)
  171. test.Nil(t, err)
  172. test.Equal(t, 200, resp.StatusCode)
  173. body, _ := ioutil.ReadAll(resp.Body)
  174. resp.Body.Close()
  175. t.Logf("%s", body)
  176. ts := TopicStatsDoc{}
  177. err = json.Unmarshal(body, &ts)
  178. test.Nil(t, err)
  179. test.Equal(t, topicName, ts.TopicName)
  180. test.Equal(t, 0, int(ts.Depth))
  181. test.Equal(t, 0, int(ts.MemoryDepth))
  182. test.Equal(t, 0, int(ts.BackendDepth))
  183. test.Equal(t, 0, int(ts.MessageCount))
  184. test.Equal(t, false, ts.Paused)
  185. }
  186. func TestHTTPNodesGET(t *testing.T) {
  187. dataPath, nsqds, nsqlookupds, nsqadmin1 := bootstrapNSQCluster(t)
  188. defer os.RemoveAll(dataPath)
  189. defer nsqds[0].Exit()
  190. defer nsqlookupds[0].Exit()
  191. defer nsqadmin1.Exit()
  192. time.Sleep(100 * time.Millisecond)
  193. client := http.Client{}
  194. url := fmt.Sprintf("http://%s/api/nodes", nsqadmin1.RealHTTPAddr())
  195. req, _ := http.NewRequest("GET", url, nil)
  196. resp, err := client.Do(req)
  197. test.Nil(t, err)
  198. test.Equal(t, 200, resp.StatusCode)
  199. body, _ := ioutil.ReadAll(resp.Body)
  200. resp.Body.Close()
  201. hostname, _ := os.Hostname()
  202. t.Logf("%s", body)
  203. ns := NodesDoc{}
  204. err = json.Unmarshal(body, &ns)
  205. test.Nil(t, err)
  206. test.Equal(t, 1, len(ns.Nodes))
  207. testNode := ns.Nodes[0]
  208. test.Equal(t, hostname, testNode.Hostname)
  209. test.Equal(t, "127.0.0.1", testNode.BroadcastAddress)
  210. test.Equal(t, nsqds[0].RealTCPAddr().Port, testNode.TCPPort)
  211. test.Equal(t, nsqds[0].RealHTTPAddr().Port, testNode.HTTPPort)
  212. test.Equal(t, version.Binary, testNode.Version)
  213. test.Equal(t, 0, len(testNode.Topics))
  214. }
  215. func TestHTTPChannelGET(t *testing.T) {
  216. dataPath, nsqds, nsqlookupds, nsqadmin1 := bootstrapNSQCluster(t)
  217. defer os.RemoveAll(dataPath)
  218. defer nsqds[0].Exit()
  219. defer nsqlookupds[0].Exit()
  220. defer nsqadmin1.Exit()
  221. topicName := "test_channel_get" + strconv.Itoa(int(time.Now().Unix()))
  222. topic := nsqds[0].GetTopic(topicName)
  223. topic.GetChannel("ch")
  224. time.Sleep(100 * time.Millisecond)
  225. client := http.Client{}
  226. url := fmt.Sprintf("http://%s/api/topics/%s/ch", nsqadmin1.RealHTTPAddr(), topicName)
  227. req, _ := http.NewRequest("GET", url, nil)
  228. resp, err := client.Do(req)
  229. test.Nil(t, err)
  230. test.Equal(t, 200, resp.StatusCode)
  231. body, _ := ioutil.ReadAll(resp.Body)
  232. resp.Body.Close()
  233. t.Logf("%s", body)
  234. cs := ChannelStatsDoc{}
  235. err = json.Unmarshal(body, &cs)
  236. test.Nil(t, err)
  237. test.Equal(t, topicName, cs.TopicName)
  238. test.Equal(t, "ch", cs.ChannelName)
  239. test.Equal(t, 0, int(cs.Depth))
  240. test.Equal(t, 0, int(cs.MemoryDepth))
  241. test.Equal(t, 0, int(cs.BackendDepth))
  242. test.Equal(t, 0, int(cs.MessageCount))
  243. test.Equal(t, false, cs.Paused)
  244. test.Equal(t, 0, int(cs.InFlightCount))
  245. test.Equal(t, 0, int(cs.DeferredCount))
  246. test.Equal(t, 0, int(cs.RequeueCount))
  247. test.Equal(t, 0, int(cs.TimeoutCount))
  248. test.Equal(t, 0, len(cs.Clients))
  249. }
  250. func TestHTTPNodesSingleGET(t *testing.T) {
  251. dataPath, nsqds, nsqlookupds, nsqadmin1 := bootstrapNSQCluster(t)
  252. defer os.RemoveAll(dataPath)
  253. defer nsqds[0].Exit()
  254. defer nsqlookupds[0].Exit()
  255. defer nsqadmin1.Exit()
  256. topicName := "test_nodes_single_get" + strconv.Itoa(int(time.Now().Unix()))
  257. topic := nsqds[0].GetTopic(topicName)
  258. topic.GetChannel("ch")
  259. time.Sleep(100 * time.Millisecond)
  260. client := http.Client{}
  261. url := fmt.Sprintf("http://%s/api/nodes/%s", nsqadmin1.RealHTTPAddr(),
  262. nsqds[0].RealHTTPAddr().String())
  263. req, _ := http.NewRequest("GET", url, nil)
  264. resp, err := client.Do(req)
  265. test.Nil(t, err)
  266. test.Equal(t, 200, resp.StatusCode)
  267. body, _ := ioutil.ReadAll(resp.Body)
  268. resp.Body.Close()
  269. t.Logf("%s", body)
  270. ns := NodeStatsDoc{}
  271. err = json.Unmarshal(body, &ns)
  272. test.Nil(t, err)
  273. test.Equal(t, nsqds[0].RealHTTPAddr().String(), ns.Node)
  274. test.Equal(t, 1, len(ns.TopicStats))
  275. testTopic := ns.TopicStats[0]
  276. test.Equal(t, topicName, testTopic.TopicName)
  277. test.Equal(t, 0, int(testTopic.Depth))
  278. test.Equal(t, 0, int(testTopic.MemoryDepth))
  279. test.Equal(t, 0, int(testTopic.BackendDepth))
  280. test.Equal(t, 0, int(testTopic.MessageCount))
  281. test.Equal(t, false, testTopic.Paused)
  282. }
  283. func TestHTTPCreateTopicPOST(t *testing.T) {
  284. dataPath, nsqds, nsqlookupds, nsqadmin1 := bootstrapNSQCluster(t)
  285. defer os.RemoveAll(dataPath)
  286. defer nsqds[0].Exit()
  287. defer nsqlookupds[0].Exit()
  288. defer nsqadmin1.Exit()
  289. time.Sleep(100 * time.Millisecond)
  290. topicName := "test_create_topic_post" + strconv.Itoa(int(time.Now().Unix()))
  291. client := http.Client{}
  292. url := fmt.Sprintf("http://%s/api/topics", nsqadmin1.RealHTTPAddr())
  293. body, _ := json.Marshal(map[string]interface{}{
  294. "topic": topicName,
  295. })
  296. req, _ := http.NewRequest("POST", url, bytes.NewBuffer(body))
  297. resp, err := client.Do(req)
  298. test.Nil(t, err)
  299. test.Equal(t, 200, resp.StatusCode)
  300. resp.Body.Close()
  301. }
  302. func TestHTTPCreateTopicChannelPOST(t *testing.T) {
  303. dataPath, nsqds, nsqlookupds, nsqadmin1 := bootstrapNSQCluster(t)
  304. defer os.RemoveAll(dataPath)
  305. defer nsqds[0].Exit()
  306. defer nsqlookupds[0].Exit()
  307. defer nsqadmin1.Exit()
  308. time.Sleep(100 * time.Millisecond)
  309. topicName := "test_create_topic_channel_post" + strconv.Itoa(int(time.Now().Unix()))
  310. client := http.Client{}
  311. url := fmt.Sprintf("http://%s/api/topics", nsqadmin1.RealHTTPAddr())
  312. body, _ := json.Marshal(map[string]interface{}{
  313. "topic": topicName,
  314. "channel": "ch",
  315. })
  316. req, _ := http.NewRequest("POST", url, bytes.NewBuffer(body))
  317. resp, err := client.Do(req)
  318. test.Nil(t, err)
  319. test.Equal(t, 200, resp.StatusCode)
  320. resp.Body.Close()
  321. }
  322. func TestHTTPTombstoneTopicNodePOST(t *testing.T) {
  323. dataPath, nsqds, nsqlookupds, nsqadmin1 := bootstrapNSQCluster(t)
  324. defer os.RemoveAll(dataPath)
  325. defer nsqds[0].Exit()
  326. defer nsqlookupds[0].Exit()
  327. defer nsqadmin1.Exit()
  328. topicName := "test_tombstone_topic_node_post" + strconv.Itoa(int(time.Now().Unix()))
  329. nsqds[0].GetTopic(topicName)
  330. time.Sleep(100 * time.Millisecond)
  331. client := http.Client{}
  332. url := fmt.Sprintf("http://%s/api/nodes/%s", nsqadmin1.RealHTTPAddr(), nsqds[0].RealHTTPAddr())
  333. body, _ := json.Marshal(map[string]interface{}{
  334. "topic": topicName,
  335. })
  336. req, _ := http.NewRequest("DELETE", url, bytes.NewBuffer(body))
  337. resp, err := client.Do(req)
  338. test.Nil(t, err)
  339. test.Equal(t, 200, resp.StatusCode)
  340. resp.Body.Close()
  341. }
  342. func TestHTTPDeleteTopicPOST(t *testing.T) {
  343. dataPath, nsqds, nsqlookupds, nsqadmin1 := bootstrapNSQCluster(t)
  344. defer os.RemoveAll(dataPath)
  345. defer nsqds[0].Exit()
  346. defer nsqlookupds[0].Exit()
  347. defer nsqadmin1.Exit()
  348. topicName := "test_delete_topic_post" + strconv.Itoa(int(time.Now().Unix()))
  349. nsqds[0].GetTopic(topicName)
  350. time.Sleep(100 * time.Millisecond)
  351. client := http.Client{}
  352. url := fmt.Sprintf("http://%s/api/topics/%s", nsqadmin1.RealHTTPAddr(), topicName)
  353. req, _ := http.NewRequest("DELETE", url, nil)
  354. resp, err := client.Do(req)
  355. test.Nil(t, err)
  356. test.Equal(t, 200, resp.StatusCode)
  357. resp.Body.Close()
  358. }
  359. func TestHTTPDeleteChannelPOST(t *testing.T) {
  360. dataPath, nsqds, nsqlookupds, nsqadmin1 := bootstrapNSQCluster(t)
  361. defer os.RemoveAll(dataPath)
  362. defer nsqds[0].Exit()
  363. defer nsqlookupds[0].Exit()
  364. defer nsqadmin1.Exit()
  365. topicName := "test_delete_channel_post" + strconv.Itoa(int(time.Now().Unix()))
  366. topic := nsqds[0].GetTopic(topicName)
  367. topic.GetChannel("ch")
  368. time.Sleep(100 * time.Millisecond)
  369. client := http.Client{}
  370. url := fmt.Sprintf("http://%s/api/topics/%s/ch", nsqadmin1.RealHTTPAddr(), topicName)
  371. req, _ := http.NewRequest("DELETE", url, nil)
  372. resp, err := client.Do(req)
  373. test.Nil(t, err)
  374. test.Equal(t, 200, resp.StatusCode)
  375. resp.Body.Close()
  376. }
  377. func TestHTTPPauseTopicPOST(t *testing.T) {
  378. dataPath, nsqds, nsqlookupds, nsqadmin1 := bootstrapNSQCluster(t)
  379. defer os.RemoveAll(dataPath)
  380. defer nsqds[0].Exit()
  381. defer nsqlookupds[0].Exit()
  382. defer nsqadmin1.Exit()
  383. topicName := "test_pause_topic_post" + strconv.Itoa(int(time.Now().Unix()))
  384. nsqds[0].GetTopic(topicName)
  385. time.Sleep(100 * time.Millisecond)
  386. client := http.Client{}
  387. url := fmt.Sprintf("http://%s/api/topics/%s", nsqadmin1.RealHTTPAddr(), topicName)
  388. body, _ := json.Marshal(map[string]interface{}{
  389. "action": "pause",
  390. })
  391. req, _ := http.NewRequest("POST", url, bytes.NewBuffer(body))
  392. resp, err := client.Do(req)
  393. test.Nil(t, err)
  394. body, _ = ioutil.ReadAll(resp.Body)
  395. test.Equal(t, 200, resp.StatusCode)
  396. resp.Body.Close()
  397. url = fmt.Sprintf("http://%s/api/topics/%s", nsqadmin1.RealHTTPAddr(), topicName)
  398. body, _ = json.Marshal(map[string]interface{}{
  399. "action": "unpause",
  400. })
  401. req, _ = http.NewRequest("POST", url, bytes.NewBuffer(body))
  402. resp, err = client.Do(req)
  403. test.Nil(t, err)
  404. test.Equal(t, 200, resp.StatusCode)
  405. resp.Body.Close()
  406. }
  407. func TestHTTPPauseChannelPOST(t *testing.T) {
  408. dataPath, nsqds, nsqlookupds, nsqadmin1 := bootstrapNSQCluster(t)
  409. defer os.RemoveAll(dataPath)
  410. defer nsqds[0].Exit()
  411. defer nsqlookupds[0].Exit()
  412. defer nsqadmin1.Exit()
  413. topicName := "test_pause_channel_post" + strconv.Itoa(int(time.Now().Unix()))
  414. topic := nsqds[0].GetTopic(topicName)
  415. topic.GetChannel("ch")
  416. time.Sleep(100 * time.Millisecond)
  417. client := http.Client{}
  418. url := fmt.Sprintf("http://%s/api/topics/%s/ch", nsqadmin1.RealHTTPAddr(), topicName)
  419. body, _ := json.Marshal(map[string]interface{}{
  420. "action": "pause",
  421. })
  422. req, _ := http.NewRequest("POST", url, bytes.NewBuffer(body))
  423. resp, err := client.Do(req)
  424. test.Nil(t, err)
  425. body, _ = ioutil.ReadAll(resp.Body)
  426. test.Equal(t, 200, resp.StatusCode)
  427. resp.Body.Close()
  428. url = fmt.Sprintf("http://%s/api/topics/%s/ch", nsqadmin1.RealHTTPAddr(), topicName)
  429. body, _ = json.Marshal(map[string]interface{}{
  430. "action": "unpause",
  431. })
  432. req, _ = http.NewRequest("POST", url, bytes.NewBuffer(body))
  433. resp, err = client.Do(req)
  434. test.Nil(t, err)
  435. test.Equal(t, 200, resp.StatusCode)
  436. resp.Body.Close()
  437. }
  438. func TestHTTPEmptyTopicPOST(t *testing.T) {
  439. dataPath, nsqds, nsqlookupds, nsqadmin1 := bootstrapNSQCluster(t)
  440. defer os.RemoveAll(dataPath)
  441. defer nsqds[0].Exit()
  442. defer nsqlookupds[0].Exit()
  443. defer nsqadmin1.Exit()
  444. topicName := "test_empty_topic_post" + strconv.Itoa(int(time.Now().Unix()))
  445. topic := nsqds[0].GetTopic(topicName)
  446. topic.PutMessage(nsqd.NewMessage(nsqd.MessageID{}, []byte("1234")))
  447. test.Equal(t, int64(1), topic.Depth())
  448. time.Sleep(100 * time.Millisecond)
  449. client := http.Client{}
  450. url := fmt.Sprintf("http://%s/api/topics/%s", nsqadmin1.RealHTTPAddr(), topicName)
  451. body, _ := json.Marshal(map[string]interface{}{
  452. "action": "empty",
  453. })
  454. req, _ := http.NewRequest("POST", url, bytes.NewBuffer(body))
  455. resp, err := client.Do(req)
  456. test.Nil(t, err)
  457. body, _ = ioutil.ReadAll(resp.Body)
  458. test.Equal(t, 200, resp.StatusCode)
  459. resp.Body.Close()
  460. test.Equal(t, int64(0), topic.Depth())
  461. }
  462. func TestHTTPEmptyChannelPOST(t *testing.T) {
  463. dataPath, nsqds, nsqlookupds, nsqadmin1 := bootstrapNSQCluster(t)
  464. defer os.RemoveAll(dataPath)
  465. defer nsqds[0].Exit()
  466. defer nsqlookupds[0].Exit()
  467. defer nsqadmin1.Exit()
  468. topicName := "test_empty_channel_post" + strconv.Itoa(int(time.Now().Unix()))
  469. topic := nsqds[0].GetTopic(topicName)
  470. channel := topic.GetChannel("ch")
  471. channel.PutMessage(nsqd.NewMessage(nsqd.MessageID{}, []byte("1234")))
  472. time.Sleep(100 * time.Millisecond)
  473. test.Equal(t, int64(1), channel.Depth())
  474. client := http.Client{}
  475. url := fmt.Sprintf("http://%s/api/topics/%s/ch", nsqadmin1.RealHTTPAddr(), topicName)
  476. body, _ := json.Marshal(map[string]interface{}{
  477. "action": "empty",
  478. })
  479. req, _ := http.NewRequest("POST", url, bytes.NewBuffer(body))
  480. resp, err := client.Do(req)
  481. test.Nil(t, err)
  482. body, _ = ioutil.ReadAll(resp.Body)
  483. test.Equal(t, 200, resp.StatusCode)
  484. resp.Body.Close()
  485. test.Equal(t, int64(0), channel.Depth())
  486. }
  487. func TestHTTPconfig(t *testing.T) {
  488. dataPath, nsqds, nsqlookupds, nsqadmin1 := bootstrapNSQCluster(t)
  489. defer os.RemoveAll(dataPath)
  490. defer nsqds[0].Exit()
  491. defer nsqlookupds[0].Exit()
  492. defer nsqadmin1.Exit()
  493. lopts := nsqlookupd.NewOptions()
  494. lopts.Logger = test.NewTestLogger(t)
  495. lopts1 := *lopts
  496. _, _, lookupd1 := mustStartNSQLookupd(&lopts1)
  497. defer lookupd1.Exit()
  498. lopts2 := *lopts
  499. _, _, lookupd2 := mustStartNSQLookupd(&lopts2)
  500. defer lookupd2.Exit()
  501. url := fmt.Sprintf("http://%s/config/nsqlookupd_http_addresses", nsqadmin1.RealHTTPAddr())
  502. resp, err := http.Get(url)
  503. test.Nil(t, err)
  504. defer resp.Body.Close()
  505. body, _ := ioutil.ReadAll(resp.Body)
  506. test.Equal(t, 200, resp.StatusCode)
  507. origaddrs := fmt.Sprintf(`["%s"]`, nsqlookupds[0].RealHTTPAddr().String())
  508. test.Equal(t, origaddrs, string(body))
  509. client := http.Client{}
  510. addrs := fmt.Sprintf(`["%s","%s"]`, lookupd1.RealHTTPAddr().String(), lookupd2.RealHTTPAddr().String())
  511. url = fmt.Sprintf("http://%s/config/nsqlookupd_http_addresses", nsqadmin1.RealHTTPAddr())
  512. req, err := http.NewRequest("PUT", url, bytes.NewBuffer([]byte(addrs)))
  513. test.Nil(t, err)
  514. resp, err = client.Do(req)
  515. test.Nil(t, err)
  516. defer resp.Body.Close()
  517. body, _ = ioutil.ReadAll(resp.Body)
  518. test.Equal(t, 200, resp.StatusCode)
  519. test.Equal(t, addrs, string(body))
  520. url = fmt.Sprintf("http://%s/config/log_level", nsqadmin1.RealHTTPAddr())
  521. req, err = http.NewRequest("PUT", url, bytes.NewBuffer([]byte(`fatal`)))
  522. test.Nil(t, err)
  523. resp, err = client.Do(req)
  524. test.Nil(t, err)
  525. defer resp.Body.Close()
  526. body, _ = ioutil.ReadAll(resp.Body)
  527. test.Equal(t, 200, resp.StatusCode)
  528. test.Equal(t, LOG_FATAL, nsqadmin1.getOpts().LogLevel)
  529. url = fmt.Sprintf("http://%s/config/log_level", nsqadmin1.RealHTTPAddr())
  530. req, err = http.NewRequest("PUT", url, bytes.NewBuffer([]byte(`bad`)))
  531. test.Nil(t, err)
  532. resp, err = client.Do(req)
  533. test.Nil(t, err)
  534. defer resp.Body.Close()
  535. body, _ = ioutil.ReadAll(resp.Body)
  536. test.Equal(t, 400, resp.StatusCode)
  537. }
  538. func TestHTTPconfigCIDR(t *testing.T) {
  539. opts := NewOptions()
  540. opts.HTTPAddress = "127.0.0.1:0"
  541. opts.NSQLookupdHTTPAddresses = []string{"127.0.0.1:4161"}
  542. opts.Logger = test.NewTestLogger(t)
  543. opts.AllowConfigFromCIDR = "10.0.0.0/8"
  544. nsqadmin, err := New(opts)
  545. test.Nil(t, err)
  546. go func() {
  547. err := nsqadmin.Main()
  548. if err != nil {
  549. panic(err)
  550. }
  551. }()
  552. defer nsqadmin.Exit()
  553. time.Sleep(100 * time.Millisecond)
  554. url := fmt.Sprintf("http://%s/config/nsqlookupd_http_addresses", nsqadmin.RealHTTPAddr())
  555. resp, err := http.Get(url)
  556. test.Nil(t, err)
  557. defer resp.Body.Close()
  558. _, _ = ioutil.ReadAll(resp.Body)
  559. test.Equal(t, 403, resp.StatusCode)
  560. }