http_test.go 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939
  1. package nsqd
  2. import (
  3. "bytes"
  4. "crypto/tls"
  5. "encoding/json"
  6. "fmt"
  7. "io/ioutil"
  8. "net"
  9. "net/http"
  10. "os"
  11. "runtime"
  12. "strconv"
  13. "sync"
  14. "testing"
  15. "time"
  16. "strings"
  17. "github.com/nsqio/go-nsq"
  18. "github.com/nsqio/nsq/internal/http_api"
  19. "github.com/nsqio/nsq/internal/test"
  20. "github.com/nsqio/nsq/internal/version"
  21. "github.com/nsqio/nsq/nsqlookupd"
  22. )
  23. type ErrMessage struct {
  24. Message string `json:"message"`
  25. }
  26. type InfoDoc struct {
  27. Version string `json:"version"`
  28. BroadcastAddress string `json:"broadcast_address"`
  29. Hostname string `json:"hostname"`
  30. HTTPPort int `json:"http_port"`
  31. TCPPort int `json:"tcp_port"`
  32. StartTime int64 `json:"start_time"`
  33. }
  34. func TestHTTPpub(t *testing.T) {
  35. opts := NewOptions()
  36. opts.Logger = test.NewTestLogger(t)
  37. _, httpAddr, nsqd := mustStartNSQD(opts)
  38. defer os.RemoveAll(opts.DataPath)
  39. defer nsqd.Exit()
  40. topicName := "test_http_pub" + strconv.Itoa(int(time.Now().Unix()))
  41. topic := nsqd.GetTopic(topicName)
  42. buf := bytes.NewBuffer([]byte("test message"))
  43. url := fmt.Sprintf("http://%s/pub?topic=%s", httpAddr, topicName)
  44. resp, err := http.Post(url, "application/octet-stream", buf)
  45. test.Nil(t, err)
  46. defer resp.Body.Close()
  47. body, _ := ioutil.ReadAll(resp.Body)
  48. test.Equal(t, "OK", string(body))
  49. time.Sleep(5 * time.Millisecond)
  50. test.Equal(t, int64(1), topic.Depth())
  51. }
  52. func TestHTTPpubEmpty(t *testing.T) {
  53. opts := NewOptions()
  54. opts.Logger = test.NewTestLogger(t)
  55. _, httpAddr, nsqd := mustStartNSQD(opts)
  56. defer os.RemoveAll(opts.DataPath)
  57. defer nsqd.Exit()
  58. topicName := "test_http_pub_empty" + strconv.Itoa(int(time.Now().Unix()))
  59. topic := nsqd.GetTopic(topicName)
  60. buf := bytes.NewBuffer([]byte(""))
  61. url := fmt.Sprintf("http://%s/pub?topic=%s", httpAddr, topicName)
  62. resp, err := http.Post(url, "application/octet-stream", buf)
  63. test.Nil(t, err)
  64. defer resp.Body.Close()
  65. body, _ := ioutil.ReadAll(resp.Body)
  66. test.Equal(t, 400, resp.StatusCode)
  67. test.Equal(t, `{"message":"MSG_EMPTY"}`, string(body))
  68. time.Sleep(5 * time.Millisecond)
  69. test.Equal(t, int64(0), topic.Depth())
  70. }
  71. func TestHTTPmpub(t *testing.T) {
  72. opts := NewOptions()
  73. opts.Logger = test.NewTestLogger(t)
  74. _, httpAddr, nsqd := mustStartNSQD(opts)
  75. defer os.RemoveAll(opts.DataPath)
  76. defer nsqd.Exit()
  77. topicName := "test_http_mpub" + strconv.Itoa(int(time.Now().Unix()))
  78. topic := nsqd.GetTopic(topicName)
  79. msg := []byte("test message")
  80. msgs := make([][]byte, 4)
  81. for i := range msgs {
  82. msgs[i] = msg
  83. }
  84. buf := bytes.NewBuffer(bytes.Join(msgs, []byte("\n")))
  85. url := fmt.Sprintf("http://%s/mpub?topic=%s", httpAddr, topicName)
  86. resp, err := http.Post(url, "application/octet-stream", buf)
  87. test.Nil(t, err)
  88. defer resp.Body.Close()
  89. body, _ := ioutil.ReadAll(resp.Body)
  90. test.Equal(t, "OK", string(body))
  91. time.Sleep(5 * time.Millisecond)
  92. test.Equal(t, int64(4), topic.Depth())
  93. }
  94. func TestHTTPmpubEmpty(t *testing.T) {
  95. opts := NewOptions()
  96. opts.Logger = test.NewTestLogger(t)
  97. _, httpAddr, nsqd := mustStartNSQD(opts)
  98. defer os.RemoveAll(opts.DataPath)
  99. defer nsqd.Exit()
  100. topicName := "test_http_mpub_empty" + strconv.Itoa(int(time.Now().Unix()))
  101. topic := nsqd.GetTopic(topicName)
  102. msg := []byte("test message")
  103. msgs := make([][]byte, 4)
  104. for i := range msgs {
  105. msgs[i] = msg
  106. }
  107. buf := bytes.NewBuffer(bytes.Join(msgs, []byte("\n")))
  108. _, err := buf.Write([]byte("\n"))
  109. test.Nil(t, err)
  110. url := fmt.Sprintf("http://%s/mpub?topic=%s", httpAddr, topicName)
  111. resp, err := http.Post(url, "application/octet-stream", buf)
  112. test.Nil(t, err)
  113. defer resp.Body.Close()
  114. body, _ := ioutil.ReadAll(resp.Body)
  115. test.Equal(t, "OK", string(body))
  116. time.Sleep(5 * time.Millisecond)
  117. test.Equal(t, int64(4), topic.Depth())
  118. }
  119. func TestHTTPmpubBinary(t *testing.T) {
  120. opts := NewOptions()
  121. opts.Logger = test.NewTestLogger(t)
  122. _, httpAddr, nsqd := mustStartNSQD(opts)
  123. defer os.RemoveAll(opts.DataPath)
  124. defer nsqd.Exit()
  125. topicName := "test_http_mpub_bin" + strconv.Itoa(int(time.Now().Unix()))
  126. topic := nsqd.GetTopic(topicName)
  127. mpub := make([][]byte, 5)
  128. for i := range mpub {
  129. mpub[i] = make([]byte, 100)
  130. }
  131. cmd, _ := nsq.MultiPublish(topicName, mpub)
  132. buf := bytes.NewBuffer(cmd.Body)
  133. url := fmt.Sprintf("http://%s/mpub?topic=%s&binary=true", httpAddr, topicName)
  134. resp, err := http.Post(url, "application/octet-stream", buf)
  135. test.Nil(t, err)
  136. defer resp.Body.Close()
  137. body, _ := ioutil.ReadAll(resp.Body)
  138. test.Equal(t, "OK", string(body))
  139. time.Sleep(5 * time.Millisecond)
  140. test.Equal(t, int64(5), topic.Depth())
  141. }
  142. func TestHTTPmpubForNonNormalizedBinaryParam(t *testing.T) {
  143. opts := NewOptions()
  144. opts.Logger = test.NewTestLogger(t)
  145. _, httpAddr, nsqd := mustStartNSQD(opts)
  146. defer os.RemoveAll(opts.DataPath)
  147. defer nsqd.Exit()
  148. topicName := "test_http_mpub_bin" + strconv.Itoa(int(time.Now().Unix()))
  149. topic := nsqd.GetTopic(topicName)
  150. mpub := make([][]byte, 5)
  151. for i := range mpub {
  152. mpub[i] = make([]byte, 100)
  153. }
  154. cmd, _ := nsq.MultiPublish(topicName, mpub)
  155. buf := bytes.NewBuffer(cmd.Body)
  156. url := fmt.Sprintf("http://%s/mpub?topic=%s&binary=non_normalized_binary_param", httpAddr, topicName)
  157. resp, err := http.Post(url, "application/octet-stream", buf)
  158. test.Nil(t, err)
  159. defer resp.Body.Close()
  160. body, _ := ioutil.ReadAll(resp.Body)
  161. test.Equal(t, "OK", string(body))
  162. time.Sleep(5 * time.Millisecond)
  163. test.Equal(t, int64(5), topic.Depth())
  164. }
  165. func TestHTTPpubDefer(t *testing.T) {
  166. opts := NewOptions()
  167. opts.Logger = test.NewTestLogger(t)
  168. _, httpAddr, nsqd := mustStartNSQD(opts)
  169. defer os.RemoveAll(opts.DataPath)
  170. defer nsqd.Exit()
  171. topicName := "test_http_pub_defer" + strconv.Itoa(int(time.Now().Unix()))
  172. topic := nsqd.GetTopic(topicName)
  173. ch := topic.GetChannel("ch")
  174. buf := bytes.NewBuffer([]byte("test message"))
  175. url := fmt.Sprintf("http://%s/pub?topic=%s&defer=%d", httpAddr, topicName, 1000)
  176. resp, err := http.Post(url, "application/octet-stream", buf)
  177. test.Nil(t, err)
  178. defer resp.Body.Close()
  179. body, _ := ioutil.ReadAll(resp.Body)
  180. test.Equal(t, "OK", string(body))
  181. time.Sleep(5 * time.Millisecond)
  182. ch.deferredMutex.Lock()
  183. numDef := len(ch.deferredMessages)
  184. ch.deferredMutex.Unlock()
  185. test.Equal(t, 1, numDef)
  186. }
  187. func TestHTTPSRequire(t *testing.T) {
  188. opts := NewOptions()
  189. opts.Logger = test.NewTestLogger(t)
  190. opts.LogLevel = LOG_DEBUG
  191. opts.TLSCert = "./test/certs/server.pem"
  192. opts.TLSKey = "./test/certs/server.key"
  193. opts.TLSClientAuthPolicy = "require"
  194. _, httpAddr, nsqd := mustStartNSQD(opts)
  195. defer os.RemoveAll(opts.DataPath)
  196. defer nsqd.Exit()
  197. topicName := "test_http_pub_req" + strconv.Itoa(int(time.Now().Unix()))
  198. topic := nsqd.GetTopic(topicName)
  199. buf := bytes.NewBuffer([]byte("test message"))
  200. url := fmt.Sprintf("http://%s/pub?topic=%s", httpAddr, topicName)
  201. resp, err := http.Post(url, "application/octet-stream", buf)
  202. test.Equal(t, 403, resp.StatusCode)
  203. httpsAddr := nsqd.httpsListener.Addr().(*net.TCPAddr)
  204. cert, err := tls.LoadX509KeyPair("./test/certs/cert.pem", "./test/certs/key.pem")
  205. test.Nil(t, err)
  206. tlsConfig := &tls.Config{
  207. Certificates: []tls.Certificate{cert},
  208. InsecureSkipVerify: true,
  209. MinVersion: 0,
  210. }
  211. transport := &http.Transport{
  212. TLSClientConfig: tlsConfig,
  213. }
  214. client := &http.Client{Transport: transport}
  215. buf = bytes.NewBuffer([]byte("test message"))
  216. url = fmt.Sprintf("https://%s/pub?topic=%s", httpsAddr, topicName)
  217. resp, err = client.Post(url, "application/octet-stream", buf)
  218. test.Nil(t, err)
  219. defer resp.Body.Close()
  220. body, _ := ioutil.ReadAll(resp.Body)
  221. test.Equal(t, "OK", string(body))
  222. time.Sleep(5 * time.Millisecond)
  223. test.Equal(t, int64(1), topic.Depth())
  224. }
  225. func TestHTTPSRequireVerify(t *testing.T) {
  226. opts := NewOptions()
  227. opts.Logger = test.NewTestLogger(t)
  228. opts.LogLevel = LOG_DEBUG
  229. opts.TLSCert = "./test/certs/server.pem"
  230. opts.TLSKey = "./test/certs/server.key"
  231. opts.TLSRootCAFile = "./test/certs/ca.pem"
  232. opts.TLSClientAuthPolicy = "require-verify"
  233. _, httpAddr, nsqd := mustStartNSQD(opts)
  234. defer os.RemoveAll(opts.DataPath)
  235. defer nsqd.Exit()
  236. httpsAddr := nsqd.httpsListener.Addr().(*net.TCPAddr)
  237. topicName := "test_http_pub_req_verf" + strconv.Itoa(int(time.Now().Unix()))
  238. topic := nsqd.GetTopic(topicName)
  239. // no cert
  240. buf := bytes.NewBuffer([]byte("test message"))
  241. url := fmt.Sprintf("http://%s/pub?topic=%s", httpAddr, topicName)
  242. resp, err := http.Post(url, "application/octet-stream", buf)
  243. test.Equal(t, 403, resp.StatusCode)
  244. // unsigned cert
  245. cert, err := tls.LoadX509KeyPair("./test/certs/cert.pem", "./test/certs/key.pem")
  246. test.Nil(t, err)
  247. tlsConfig := &tls.Config{
  248. Certificates: []tls.Certificate{cert},
  249. InsecureSkipVerify: true,
  250. }
  251. transport := &http.Transport{
  252. TLSClientConfig: tlsConfig,
  253. }
  254. client := &http.Client{Transport: transport}
  255. buf = bytes.NewBuffer([]byte("test message"))
  256. url = fmt.Sprintf("https://%s/pub?topic=%s", httpsAddr, topicName)
  257. resp, err = client.Post(url, "application/octet-stream", buf)
  258. test.NotNil(t, err)
  259. // signed cert
  260. cert, err = tls.LoadX509KeyPair("./test/certs/client.pem", "./test/certs/client.key")
  261. test.Nil(t, err)
  262. tlsConfig = &tls.Config{
  263. Certificates: []tls.Certificate{cert},
  264. InsecureSkipVerify: true,
  265. }
  266. transport = &http.Transport{
  267. TLSClientConfig: tlsConfig,
  268. }
  269. client = &http.Client{Transport: transport}
  270. buf = bytes.NewBuffer([]byte("test message"))
  271. url = fmt.Sprintf("https://%s/pub?topic=%s", httpsAddr, topicName)
  272. resp, err = client.Post(url, "application/octet-stream", buf)
  273. test.Nil(t, err)
  274. defer resp.Body.Close()
  275. body, _ := ioutil.ReadAll(resp.Body)
  276. test.Equal(t, "OK", string(body))
  277. time.Sleep(5 * time.Millisecond)
  278. test.Equal(t, int64(1), topic.Depth())
  279. }
  280. func TestTLSRequireVerifyExceptHTTP(t *testing.T) {
  281. opts := NewOptions()
  282. opts.Logger = test.NewTestLogger(t)
  283. opts.LogLevel = LOG_DEBUG
  284. opts.TLSCert = "./test/certs/server.pem"
  285. opts.TLSKey = "./test/certs/server.key"
  286. opts.TLSRootCAFile = "./test/certs/ca.pem"
  287. opts.TLSClientAuthPolicy = "require-verify"
  288. opts.TLSRequired = TLSRequiredExceptHTTP
  289. _, httpAddr, nsqd := mustStartNSQD(opts)
  290. defer os.RemoveAll(opts.DataPath)
  291. defer nsqd.Exit()
  292. topicName := "test_http_req_verf_except_http" + strconv.Itoa(int(time.Now().Unix()))
  293. topic := nsqd.GetTopic(topicName)
  294. // no cert
  295. buf := bytes.NewBuffer([]byte("test message"))
  296. url := fmt.Sprintf("http://%s/pub?topic=%s", httpAddr, topicName)
  297. resp, err := http.Post(url, "application/octet-stream", buf)
  298. test.Nil(t, err)
  299. defer resp.Body.Close()
  300. body, _ := ioutil.ReadAll(resp.Body)
  301. test.Equal(t, "OK", string(body))
  302. time.Sleep(5 * time.Millisecond)
  303. test.Equal(t, int64(1), topic.Depth())
  304. }
  305. func TestHTTPV1TopicChannel(t *testing.T) {
  306. opts := NewOptions()
  307. opts.Logger = test.NewTestLogger(t)
  308. _, httpAddr, nsqd := mustStartNSQD(opts)
  309. defer os.RemoveAll(opts.DataPath)
  310. defer nsqd.Exit()
  311. topicName := "test_http_topic_channel2" + strconv.Itoa(int(time.Now().Unix()))
  312. channelName := "ch2"
  313. url := fmt.Sprintf("http://%s/topic/create?topic=%s", httpAddr, topicName)
  314. resp, err := http.Post(url, "application/json", nil)
  315. test.Nil(t, err)
  316. test.Equal(t, 200, resp.StatusCode)
  317. body, _ := ioutil.ReadAll(resp.Body)
  318. resp.Body.Close()
  319. test.Equal(t, "", string(body))
  320. test.Equal(t, "smq; version=1.0", resp.Header.Get("X-SMQ-Content-Type"))
  321. url = fmt.Sprintf("http://%s/channel/create?topic=%s&channel=%s", httpAddr, topicName, channelName)
  322. resp, err = http.Post(url, "application/json", nil)
  323. test.Nil(t, err)
  324. test.Equal(t, 200, resp.StatusCode)
  325. body, _ = ioutil.ReadAll(resp.Body)
  326. resp.Body.Close()
  327. test.Equal(t, "", string(body))
  328. test.Equal(t, "smq; version=1.0", resp.Header.Get("X-SMQ-Content-Type"))
  329. topic, err := nsqd.GetExistingTopic(topicName)
  330. test.Nil(t, err)
  331. test.NotNil(t, topic)
  332. channel, err := topic.GetExistingChannel(channelName)
  333. test.Nil(t, err)
  334. test.NotNil(t, channel)
  335. em := ErrMessage{}
  336. url = fmt.Sprintf("http://%s/topic/pause", httpAddr)
  337. resp, err = http.Post(url, "application/json", nil)
  338. test.Nil(t, err)
  339. test.Equal(t, 400, resp.StatusCode)
  340. test.Equal(t, "Bad Request", http.StatusText(resp.StatusCode))
  341. body, _ = ioutil.ReadAll(resp.Body)
  342. resp.Body.Close()
  343. t.Logf("%s", body)
  344. err = json.Unmarshal(body, &em)
  345. test.Nil(t, err)
  346. test.Equal(t, "MISSING_ARG_TOPIC", em.Message)
  347. url = fmt.Sprintf("http://%s/topic/pause?topic=%s", httpAddr, topicName+"abc")
  348. resp, err = http.Post(url, "application/json", nil)
  349. test.Nil(t, err)
  350. test.Equal(t, 404, resp.StatusCode)
  351. test.Equal(t, "Not Found", http.StatusText(resp.StatusCode))
  352. body, _ = ioutil.ReadAll(resp.Body)
  353. resp.Body.Close()
  354. t.Logf("%s", body)
  355. err = json.Unmarshal(body, &em)
  356. test.Nil(t, err)
  357. test.Equal(t, "TOPIC_NOT_FOUND", em.Message)
  358. url = fmt.Sprintf("http://%s/topic/pause?topic=%s", httpAddr, topicName)
  359. resp, err = http.Post(url, "application/json", nil)
  360. test.Nil(t, err)
  361. test.Equal(t, 200, resp.StatusCode)
  362. body, _ = ioutil.ReadAll(resp.Body)
  363. resp.Body.Close()
  364. test.Equal(t, "", string(body))
  365. test.Equal(t, "smq; version=1.0", resp.Header.Get("X-SMQ-Content-Type"))
  366. test.Equal(t, true, topic.IsPaused())
  367. url = fmt.Sprintf("http://%s/topic/unpause?topic=%s", httpAddr, topicName)
  368. resp, err = http.Post(url, "application/json", nil)
  369. test.Nil(t, err)
  370. test.Equal(t, 200, resp.StatusCode)
  371. body, _ = ioutil.ReadAll(resp.Body)
  372. resp.Body.Close()
  373. test.Equal(t, "", string(body))
  374. test.Equal(t, "smq; version=1.0", resp.Header.Get("X-SMQ-Content-Type"))
  375. test.Equal(t, false, topic.IsPaused())
  376. url = fmt.Sprintf("http://%s/channel/pause?topic=%s&channel=%s", httpAddr, topicName, channelName)
  377. resp, err = http.Post(url, "application/json", nil)
  378. test.Nil(t, err)
  379. test.Equal(t, 200, resp.StatusCode)
  380. body, _ = ioutil.ReadAll(resp.Body)
  381. resp.Body.Close()
  382. test.Equal(t, "", string(body))
  383. test.Equal(t, "smq; version=1.0", resp.Header.Get("X-SMQ-Content-Type"))
  384. test.Equal(t, true, channel.IsPaused())
  385. url = fmt.Sprintf("http://%s/channel/unpause?topic=%s&channel=%s", httpAddr, topicName, channelName)
  386. resp, err = http.Post(url, "application/json", nil)
  387. test.Nil(t, err)
  388. test.Equal(t, 200, resp.StatusCode)
  389. body, _ = ioutil.ReadAll(resp.Body)
  390. resp.Body.Close()
  391. test.Equal(t, "", string(body))
  392. test.Equal(t, "smq; version=1.0", resp.Header.Get("X-SMQ-Content-Type"))
  393. test.Equal(t, false, channel.IsPaused())
  394. url = fmt.Sprintf("http://%s/channel/delete?topic=%s&channel=%s", httpAddr, topicName, channelName)
  395. resp, err = http.Post(url, "application/json", nil)
  396. test.Nil(t, err)
  397. test.Equal(t, 200, resp.StatusCode)
  398. body, _ = ioutil.ReadAll(resp.Body)
  399. resp.Body.Close()
  400. test.Equal(t, "", string(body))
  401. test.Equal(t, "smq; version=1.0", resp.Header.Get("X-SMQ-Content-Type"))
  402. _, err = topic.GetExistingChannel(channelName)
  403. test.NotNil(t, err)
  404. url = fmt.Sprintf("http://%s/topic/delete?topic=%s", httpAddr, topicName)
  405. resp, err = http.Post(url, "application/json", nil)
  406. test.Nil(t, err)
  407. test.Equal(t, 200, resp.StatusCode)
  408. body, _ = ioutil.ReadAll(resp.Body)
  409. resp.Body.Close()
  410. test.Equal(t, "", string(body))
  411. test.Equal(t, "smq; version=1.0", resp.Header.Get("X-SMQ-Content-Type"))
  412. _, err = nsqd.GetExistingTopic(topicName)
  413. test.NotNil(t, err)
  414. }
  415. func TestHTTPClientStats(t *testing.T) {
  416. topicName := "test_http_client_stats" + strconv.Itoa(int(time.Now().Unix()))
  417. opts := NewOptions()
  418. opts.Logger = test.NewTestLogger(t)
  419. tcpAddr, httpAddr, nsqd := mustStartNSQD(opts)
  420. defer os.RemoveAll(opts.DataPath)
  421. defer nsqd.Exit()
  422. conn, err := mustConnectNSQD(tcpAddr)
  423. test.Nil(t, err)
  424. defer conn.Close()
  425. identify(t, conn, nil, frameTypeResponse)
  426. sub(t, conn, topicName, "ch")
  427. var d struct {
  428. Topics []struct {
  429. Channels []struct {
  430. ClientCount int `json:"client_count"`
  431. Clients []struct {
  432. } `json:"clients"`
  433. } `json:"channels"`
  434. } `json:"topics"`
  435. Memory *struct{} `json:"memory,omitempty"`
  436. }
  437. endpoint := fmt.Sprintf("http://127.0.0.1:%d/stats?format=json", httpAddr.Port)
  438. err = http_api.NewClient(nil, ConnectTimeout, RequestTimeout).GETV1(endpoint, &d)
  439. test.Nil(t, err)
  440. test.Equal(t, 1, len(d.Topics[0].Channels[0].Clients))
  441. test.Equal(t, 1, d.Topics[0].Channels[0].ClientCount)
  442. test.NotNil(t, d.Memory)
  443. endpoint = fmt.Sprintf("http://127.0.0.1:%d/stats?format=json&include_clients=true", httpAddr.Port)
  444. err = http_api.NewClient(nil, ConnectTimeout, RequestTimeout).GETV1(endpoint, &d)
  445. test.Nil(t, err)
  446. test.Equal(t, 1, len(d.Topics[0].Channels[0].Clients))
  447. test.Equal(t, 1, d.Topics[0].Channels[0].ClientCount)
  448. endpoint = fmt.Sprintf("http://127.0.0.1:%d/stats?format=json&include_clients=false", httpAddr.Port)
  449. err = http_api.NewClient(nil, ConnectTimeout, RequestTimeout).GETV1(endpoint, &d)
  450. test.Nil(t, err)
  451. test.Equal(t, 0, len(d.Topics[0].Channels[0].Clients))
  452. test.Equal(t, 1, d.Topics[0].Channels[0].ClientCount)
  453. endpoint = fmt.Sprintf("http://127.0.0.1:%d/stats?format=json&include_mem=true", httpAddr.Port)
  454. err = http_api.NewClient(nil, ConnectTimeout, RequestTimeout).GETV1(endpoint, &d)
  455. test.Nil(t, err)
  456. test.NotNil(t, d.Memory)
  457. d.Memory = nil
  458. endpoint = fmt.Sprintf("http://127.0.0.1:%d/stats?format=json&include_mem=false", httpAddr.Port)
  459. err = http_api.NewClient(nil, ConnectTimeout, RequestTimeout).GETV1(endpoint, &d)
  460. test.Nil(t, err)
  461. test.Nil(t, d.Memory)
  462. }
  463. func TestHTTPgetStatusJSON(t *testing.T) {
  464. testTime := time.Now()
  465. opts := NewOptions()
  466. opts.Logger = test.NewTestLogger(t)
  467. _, httpAddr, nsqd := mustStartNSQD(opts)
  468. defer os.RemoveAll(opts.DataPath)
  469. defer nsqd.Exit()
  470. nsqd.startTime = testTime
  471. expectedJSON := fmt.Sprintf(`{"version":"%v","health":"OK","start_time":%v,"topics":[],"memory":{`, version.Binary, testTime.Unix())
  472. url := fmt.Sprintf("http://%s/stats?format=json", httpAddr)
  473. resp, err := http.Get(url)
  474. test.Nil(t, err)
  475. defer resp.Body.Close()
  476. body, _ := ioutil.ReadAll(resp.Body)
  477. test.Equal(t, 200, resp.StatusCode)
  478. test.Equal(t, true, strings.HasPrefix(string(body), expectedJSON))
  479. }
  480. func TestHTTPgetStatusText(t *testing.T) {
  481. testTime := time.Now()
  482. opts := NewOptions()
  483. opts.Logger = test.NewTestLogger(t)
  484. _, httpAddr, nsqd := mustStartNSQD(opts)
  485. defer os.RemoveAll(opts.DataPath)
  486. defer nsqd.Exit()
  487. nsqd.startTime = testTime
  488. url := fmt.Sprintf("http://%s/stats?format=text", httpAddr)
  489. resp, err := http.Get(url)
  490. test.Nil(t, err)
  491. defer resp.Body.Close()
  492. body, _ := ioutil.ReadAll(resp.Body)
  493. test.Equal(t, 200, resp.StatusCode)
  494. test.NotNil(t, body)
  495. }
  496. func TestHTTPconfig(t *testing.T) {
  497. lopts := nsqlookupd.NewOptions()
  498. lopts.Logger = test.NewTestLogger(t)
  499. lopts1 := *lopts
  500. _, _, lookupd1 := mustStartNSQLookupd(&lopts1)
  501. defer lookupd1.Exit()
  502. lopts2 := *lopts
  503. _, _, lookupd2 := mustStartNSQLookupd(&lopts2)
  504. defer lookupd2.Exit()
  505. opts := NewOptions()
  506. opts.Logger = test.NewTestLogger(t)
  507. _, httpAddr, nsqd := mustStartNSQD(opts)
  508. defer os.RemoveAll(opts.DataPath)
  509. defer nsqd.Exit()
  510. url := fmt.Sprintf("http://%s/config/nsqlookupd_tcp_addresses", httpAddr)
  511. resp, err := http.Get(url)
  512. test.Nil(t, err)
  513. defer resp.Body.Close()
  514. body, _ := ioutil.ReadAll(resp.Body)
  515. test.Equal(t, 200, resp.StatusCode)
  516. test.Equal(t, "[]", string(body))
  517. client := http.Client{}
  518. addrs := fmt.Sprintf(`["%s","%s"]`, lookupd1.RealTCPAddr().String(), lookupd2.RealTCPAddr().String())
  519. url = fmt.Sprintf("http://%s/config/nsqlookupd_tcp_addresses", httpAddr)
  520. req, err := http.NewRequest("PUT", url, bytes.NewBuffer([]byte(addrs)))
  521. test.Nil(t, err)
  522. resp, err = client.Do(req)
  523. test.Nil(t, err)
  524. defer resp.Body.Close()
  525. body, _ = ioutil.ReadAll(resp.Body)
  526. test.Equal(t, 200, resp.StatusCode)
  527. test.Equal(t, addrs, string(body))
  528. url = fmt.Sprintf("http://%s/config/log_level", httpAddr)
  529. req, err = http.NewRequest("PUT", url, bytes.NewBuffer([]byte(`fatal`)))
  530. test.Nil(t, err)
  531. resp, err = client.Do(req)
  532. test.Nil(t, err)
  533. defer resp.Body.Close()
  534. body, _ = ioutil.ReadAll(resp.Body)
  535. test.Equal(t, 200, resp.StatusCode)
  536. test.Equal(t, LOG_FATAL, nsqd.getOpts().LogLevel)
  537. url = fmt.Sprintf("http://%s/config/log_level", httpAddr)
  538. req, err = http.NewRequest("PUT", url, bytes.NewBuffer([]byte(`bad`)))
  539. test.Nil(t, err)
  540. resp, err = client.Do(req)
  541. test.Nil(t, err)
  542. defer resp.Body.Close()
  543. body, _ = ioutil.ReadAll(resp.Body)
  544. test.Equal(t, 400, resp.StatusCode)
  545. }
  546. func TestHTTPerrors(t *testing.T) {
  547. opts := NewOptions()
  548. opts.Logger = test.NewTestLogger(t)
  549. _, httpAddr, nsqd := mustStartNSQD(opts)
  550. defer os.RemoveAll(opts.DataPath)
  551. defer nsqd.Exit()
  552. url := fmt.Sprintf("http://%s/stats", httpAddr)
  553. resp, err := http.Post(url, "text/plain", nil)
  554. test.Nil(t, err)
  555. defer resp.Body.Close()
  556. body, _ := ioutil.ReadAll(resp.Body)
  557. test.Equal(t, 405, resp.StatusCode)
  558. test.Equal(t, `{"message":"METHOD_NOT_ALLOWED"}`, string(body))
  559. url = fmt.Sprintf("http://%s/not_found", httpAddr)
  560. resp, err = http.Get(url)
  561. test.Nil(t, err)
  562. defer resp.Body.Close()
  563. body, _ = ioutil.ReadAll(resp.Body)
  564. test.Equal(t, 404, resp.StatusCode)
  565. test.Equal(t, `{"message":"NOT_FOUND"}`, string(body))
  566. }
  567. func TestDeleteTopic(t *testing.T) {
  568. opts := NewOptions()
  569. opts.Logger = test.NewTestLogger(t)
  570. _, httpAddr, nsqd := mustStartNSQD(opts)
  571. defer os.RemoveAll(opts.DataPath)
  572. defer nsqd.Exit()
  573. em := ErrMessage{}
  574. url := fmt.Sprintf("http://%s/topic/delete", httpAddr)
  575. resp, err := http.Post(url, "application/json", nil)
  576. test.Nil(t, err)
  577. test.Equal(t, 400, resp.StatusCode)
  578. test.Equal(t, "Bad Request", http.StatusText(resp.StatusCode))
  579. body, _ := ioutil.ReadAll(resp.Body)
  580. resp.Body.Close()
  581. t.Logf("%s", body)
  582. err = json.Unmarshal(body, &em)
  583. test.Nil(t, err)
  584. test.Equal(t, "MISSING_ARG_TOPIC", em.Message)
  585. topicName := "test_http_delete_topic" + strconv.Itoa(int(time.Now().Unix()))
  586. url = fmt.Sprintf("http://%s/topic/delete?topic=%s", httpAddr, topicName)
  587. resp, err = http.Post(url, "application/json", nil)
  588. test.Nil(t, err)
  589. test.Equal(t, 404, resp.StatusCode)
  590. test.Equal(t, "Not Found", http.StatusText(resp.StatusCode))
  591. body, _ = ioutil.ReadAll(resp.Body)
  592. resp.Body.Close()
  593. t.Logf("%s", body)
  594. err = json.Unmarshal(body, &em)
  595. test.Nil(t, err)
  596. test.Equal(t, "TOPIC_NOT_FOUND", em.Message)
  597. nsqd.GetTopic(topicName)
  598. resp, err = http.Post(url, "application/json", nil)
  599. test.Nil(t, err)
  600. test.Equal(t, 200, resp.StatusCode)
  601. body, _ = ioutil.ReadAll(resp.Body)
  602. resp.Body.Close()
  603. t.Logf("%s", body)
  604. test.Equal(t, []byte(""), body)
  605. }
  606. func TestEmptyTopic(t *testing.T) {
  607. opts := NewOptions()
  608. opts.Logger = test.NewTestLogger(t)
  609. _, httpAddr, nsqd := mustStartNSQD(opts)
  610. defer os.RemoveAll(opts.DataPath)
  611. defer nsqd.Exit()
  612. em := ErrMessage{}
  613. url := fmt.Sprintf("http://%s/topic/empty", httpAddr)
  614. resp, err := http.Post(url, "application/json", nil)
  615. test.Nil(t, err)
  616. test.Equal(t, 400, resp.StatusCode)
  617. test.Equal(t, "Bad Request", http.StatusText(resp.StatusCode))
  618. body, _ := ioutil.ReadAll(resp.Body)
  619. resp.Body.Close()
  620. t.Logf("%s", body)
  621. err = json.Unmarshal(body, &em)
  622. test.Nil(t, err)
  623. test.Equal(t, "MISSING_ARG_TOPIC", em.Message)
  624. topicName := "test_http_empty_topic" + strconv.Itoa(int(time.Now().Unix()))
  625. url = fmt.Sprintf("http://%s/topic/empty?topic=%s", httpAddr, topicName+"$")
  626. resp, err = http.Post(url, "application/json", nil)
  627. test.Nil(t, err)
  628. test.Equal(t, 400, resp.StatusCode)
  629. test.Equal(t, "Bad Request", http.StatusText(resp.StatusCode))
  630. body, _ = ioutil.ReadAll(resp.Body)
  631. resp.Body.Close()
  632. t.Logf("%s", body)
  633. err = json.Unmarshal(body, &em)
  634. test.Nil(t, err)
  635. test.Equal(t, "INVALID_TOPIC", em.Message)
  636. url = fmt.Sprintf("http://%s/topic/empty?topic=%s", httpAddr, topicName)
  637. resp, err = http.Post(url, "application/json", nil)
  638. test.Nil(t, err)
  639. test.Equal(t, 404, resp.StatusCode)
  640. test.Equal(t, "Not Found", http.StatusText(resp.StatusCode))
  641. body, _ = ioutil.ReadAll(resp.Body)
  642. resp.Body.Close()
  643. t.Logf("%s", body)
  644. err = json.Unmarshal(body, &em)
  645. test.Nil(t, err)
  646. test.Equal(t, "TOPIC_NOT_FOUND", em.Message)
  647. nsqd.GetTopic(topicName)
  648. resp, err = http.Post(url, "application/json", nil)
  649. test.Nil(t, err)
  650. test.Equal(t, 200, resp.StatusCode)
  651. body, _ = ioutil.ReadAll(resp.Body)
  652. resp.Body.Close()
  653. t.Logf("%s", body)
  654. test.Equal(t, []byte(""), body)
  655. }
  656. func TestEmptyChannel(t *testing.T) {
  657. opts := NewOptions()
  658. opts.Logger = test.NewTestLogger(t)
  659. _, httpAddr, nsqd := mustStartNSQD(opts)
  660. defer os.RemoveAll(opts.DataPath)
  661. defer nsqd.Exit()
  662. em := ErrMessage{}
  663. url := fmt.Sprintf("http://%s/channel/empty", httpAddr)
  664. resp, err := http.Post(url, "application/json", nil)
  665. test.Nil(t, err)
  666. test.Equal(t, 400, resp.StatusCode)
  667. test.Equal(t, "Bad Request", http.StatusText(resp.StatusCode))
  668. body, _ := ioutil.ReadAll(resp.Body)
  669. resp.Body.Close()
  670. t.Logf("%s", body)
  671. err = json.Unmarshal(body, &em)
  672. test.Nil(t, err)
  673. test.Equal(t, "MISSING_ARG_TOPIC", em.Message)
  674. topicName := "test_http_empty_channel" + strconv.Itoa(int(time.Now().Unix()))
  675. url = fmt.Sprintf("http://%s/channel/empty?topic=%s", httpAddr, topicName)
  676. resp, err = http.Post(url, "application/json", nil)
  677. test.Nil(t, err)
  678. test.Equal(t, 400, resp.StatusCode)
  679. test.Equal(t, "Bad Request", http.StatusText(resp.StatusCode))
  680. body, _ = ioutil.ReadAll(resp.Body)
  681. resp.Body.Close()
  682. t.Logf("%s", body)
  683. err = json.Unmarshal(body, &em)
  684. test.Nil(t, err)
  685. test.Equal(t, "MISSING_ARG_CHANNEL", em.Message)
  686. channelName := "ch"
  687. url = fmt.Sprintf("http://%s/channel/empty?topic=%s&channel=%s", httpAddr, topicName, channelName)
  688. resp, err = http.Post(url, "application/json", nil)
  689. test.Nil(t, err)
  690. test.Equal(t, 404, resp.StatusCode)
  691. test.Equal(t, "Not Found", http.StatusText(resp.StatusCode))
  692. body, _ = ioutil.ReadAll(resp.Body)
  693. resp.Body.Close()
  694. t.Logf("%s", body)
  695. err = json.Unmarshal(body, &em)
  696. test.Nil(t, err)
  697. test.Equal(t, "TOPIC_NOT_FOUND", em.Message)
  698. topic := nsqd.GetTopic(topicName)
  699. url = fmt.Sprintf("http://%s/channel/empty?topic=%s&channel=%s", httpAddr, topicName, channelName)
  700. resp, err = http.Post(url, "application/json", nil)
  701. test.Nil(t, err)
  702. test.Equal(t, 404, resp.StatusCode)
  703. test.Equal(t, "Not Found", http.StatusText(resp.StatusCode))
  704. body, _ = ioutil.ReadAll(resp.Body)
  705. resp.Body.Close()
  706. t.Logf("%s", body)
  707. err = json.Unmarshal(body, &em)
  708. test.Nil(t, err)
  709. test.Equal(t, "CHANNEL_NOT_FOUND", em.Message)
  710. topic.GetChannel(channelName)
  711. resp, err = http.Post(url, "application/json", nil)
  712. test.Nil(t, err)
  713. test.Equal(t, 200, resp.StatusCode)
  714. body, _ = ioutil.ReadAll(resp.Body)
  715. resp.Body.Close()
  716. t.Logf("%s", body)
  717. test.Equal(t, []byte(""), body)
  718. }
  719. func TestInfo(t *testing.T) {
  720. opts := NewOptions()
  721. opts.Logger = test.NewTestLogger(t)
  722. _, httpAddr, nsqd := mustStartNSQD(opts)
  723. defer os.RemoveAll(opts.DataPath)
  724. defer nsqd.Exit()
  725. info := InfoDoc{}
  726. url := fmt.Sprintf("http://%s/info", httpAddr)
  727. resp, err := http.Get(url)
  728. test.Nil(t, err)
  729. test.Equal(t, 200, resp.StatusCode)
  730. body, _ := ioutil.ReadAll(resp.Body)
  731. resp.Body.Close()
  732. t.Logf("%s", body)
  733. err = json.Unmarshal(body, &info)
  734. test.Nil(t, err)
  735. test.Equal(t, version.Binary, info.Version)
  736. }
  737. func BenchmarkHTTPpub(b *testing.B) {
  738. var wg sync.WaitGroup
  739. b.StopTimer()
  740. opts := NewOptions()
  741. opts.Logger = test.NewTestLogger(b)
  742. opts.MemQueueSize = int64(b.N)
  743. _, httpAddr, nsqd := mustStartNSQD(opts)
  744. defer os.RemoveAll(opts.DataPath)
  745. msg := make([]byte, 256)
  746. topicName := "bench_http_pub" + strconv.Itoa(int(time.Now().Unix()))
  747. url := fmt.Sprintf("http://%s/pub?topic=%s", httpAddr, topicName)
  748. client := &http.Client{}
  749. b.SetBytes(int64(len(msg)))
  750. b.StartTimer()
  751. for j := 0; j < runtime.GOMAXPROCS(0); j++ {
  752. wg.Add(1)
  753. go func() {
  754. num := b.N / runtime.GOMAXPROCS(0)
  755. for i := 0; i < num; i++ {
  756. buf := bytes.NewBuffer(msg)
  757. req, _ := http.NewRequest("POST", url, buf)
  758. resp, err := client.Do(req)
  759. if err != nil {
  760. panic(err.Error())
  761. }
  762. body, _ := ioutil.ReadAll(resp.Body)
  763. if !bytes.Equal(body, []byte("OK")) {
  764. panic("bad response")
  765. }
  766. resp.Body.Close()
  767. }
  768. wg.Done()
  769. }()
  770. }
  771. wg.Wait()
  772. b.StopTimer()
  773. nsqd.Exit()
  774. }