123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939 |
- package nsqd
- import (
- "bytes"
- "crypto/tls"
- "encoding/json"
- "fmt"
- "io/ioutil"
- "net"
- "net/http"
- "os"
- "runtime"
- "strconv"
- "sync"
- "testing"
- "time"
- "strings"
- "github.com/nsqio/go-nsq"
- "github.com/nsqio/nsq/internal/http_api"
- "github.com/nsqio/nsq/internal/test"
- "github.com/nsqio/nsq/internal/version"
- "github.com/nsqio/nsq/nsqlookupd"
- )
- type ErrMessage struct {
- Message string `json:"message"`
- }
- type InfoDoc struct {
- Version string `json:"version"`
- BroadcastAddress string `json:"broadcast_address"`
- Hostname string `json:"hostname"`
- HTTPPort int `json:"http_port"`
- TCPPort int `json:"tcp_port"`
- StartTime int64 `json:"start_time"`
- }
- func TestHTTPpub(t *testing.T) {
- opts := NewOptions()
- opts.Logger = test.NewTestLogger(t)
- _, httpAddr, nsqd := mustStartNSQD(opts)
- defer os.RemoveAll(opts.DataPath)
- defer nsqd.Exit()
- topicName := "test_http_pub" + strconv.Itoa(int(time.Now().Unix()))
- topic := nsqd.GetTopic(topicName)
- buf := bytes.NewBuffer([]byte("test message"))
- url := fmt.Sprintf("http://%s/pub?topic=%s", httpAddr, topicName)
- resp, err := http.Post(url, "application/octet-stream", buf)
- test.Nil(t, err)
- defer resp.Body.Close()
- body, _ := ioutil.ReadAll(resp.Body)
- test.Equal(t, "OK", string(body))
- time.Sleep(5 * time.Millisecond)
- test.Equal(t, int64(1), topic.Depth())
- }
- func TestHTTPpubEmpty(t *testing.T) {
- opts := NewOptions()
- opts.Logger = test.NewTestLogger(t)
- _, httpAddr, nsqd := mustStartNSQD(opts)
- defer os.RemoveAll(opts.DataPath)
- defer nsqd.Exit()
- topicName := "test_http_pub_empty" + strconv.Itoa(int(time.Now().Unix()))
- topic := nsqd.GetTopic(topicName)
- buf := bytes.NewBuffer([]byte(""))
- url := fmt.Sprintf("http://%s/pub?topic=%s", httpAddr, topicName)
- resp, err := http.Post(url, "application/octet-stream", buf)
- test.Nil(t, err)
- defer resp.Body.Close()
- body, _ := ioutil.ReadAll(resp.Body)
- test.Equal(t, 400, resp.StatusCode)
- test.Equal(t, `{"message":"MSG_EMPTY"}`, string(body))
- time.Sleep(5 * time.Millisecond)
- test.Equal(t, int64(0), topic.Depth())
- }
- func TestHTTPmpub(t *testing.T) {
- opts := NewOptions()
- opts.Logger = test.NewTestLogger(t)
- _, httpAddr, nsqd := mustStartNSQD(opts)
- defer os.RemoveAll(opts.DataPath)
- defer nsqd.Exit()
- topicName := "test_http_mpub" + strconv.Itoa(int(time.Now().Unix()))
- topic := nsqd.GetTopic(topicName)
- msg := []byte("test message")
- msgs := make([][]byte, 4)
- for i := range msgs {
- msgs[i] = msg
- }
- buf := bytes.NewBuffer(bytes.Join(msgs, []byte("\n")))
- url := fmt.Sprintf("http://%s/mpub?topic=%s", httpAddr, topicName)
- resp, err := http.Post(url, "application/octet-stream", buf)
- test.Nil(t, err)
- defer resp.Body.Close()
- body, _ := ioutil.ReadAll(resp.Body)
- test.Equal(t, "OK", string(body))
- time.Sleep(5 * time.Millisecond)
- test.Equal(t, int64(4), topic.Depth())
- }
- func TestHTTPmpubEmpty(t *testing.T) {
- opts := NewOptions()
- opts.Logger = test.NewTestLogger(t)
- _, httpAddr, nsqd := mustStartNSQD(opts)
- defer os.RemoveAll(opts.DataPath)
- defer nsqd.Exit()
- topicName := "test_http_mpub_empty" + strconv.Itoa(int(time.Now().Unix()))
- topic := nsqd.GetTopic(topicName)
- msg := []byte("test message")
- msgs := make([][]byte, 4)
- for i := range msgs {
- msgs[i] = msg
- }
- buf := bytes.NewBuffer(bytes.Join(msgs, []byte("\n")))
- _, err := buf.Write([]byte("\n"))
- test.Nil(t, err)
- url := fmt.Sprintf("http://%s/mpub?topic=%s", httpAddr, topicName)
- resp, err := http.Post(url, "application/octet-stream", buf)
- test.Nil(t, err)
- defer resp.Body.Close()
- body, _ := ioutil.ReadAll(resp.Body)
- test.Equal(t, "OK", string(body))
- time.Sleep(5 * time.Millisecond)
- test.Equal(t, int64(4), topic.Depth())
- }
- func TestHTTPmpubBinary(t *testing.T) {
- opts := NewOptions()
- opts.Logger = test.NewTestLogger(t)
- _, httpAddr, nsqd := mustStartNSQD(opts)
- defer os.RemoveAll(opts.DataPath)
- defer nsqd.Exit()
- topicName := "test_http_mpub_bin" + strconv.Itoa(int(time.Now().Unix()))
- topic := nsqd.GetTopic(topicName)
- mpub := make([][]byte, 5)
- for i := range mpub {
- mpub[i] = make([]byte, 100)
- }
- cmd, _ := nsq.MultiPublish(topicName, mpub)
- buf := bytes.NewBuffer(cmd.Body)
- url := fmt.Sprintf("http://%s/mpub?topic=%s&binary=true", httpAddr, topicName)
- resp, err := http.Post(url, "application/octet-stream", buf)
- test.Nil(t, err)
- defer resp.Body.Close()
- body, _ := ioutil.ReadAll(resp.Body)
- test.Equal(t, "OK", string(body))
- time.Sleep(5 * time.Millisecond)
- test.Equal(t, int64(5), topic.Depth())
- }
- func TestHTTPmpubForNonNormalizedBinaryParam(t *testing.T) {
- opts := NewOptions()
- opts.Logger = test.NewTestLogger(t)
- _, httpAddr, nsqd := mustStartNSQD(opts)
- defer os.RemoveAll(opts.DataPath)
- defer nsqd.Exit()
- topicName := "test_http_mpub_bin" + strconv.Itoa(int(time.Now().Unix()))
- topic := nsqd.GetTopic(topicName)
- mpub := make([][]byte, 5)
- for i := range mpub {
- mpub[i] = make([]byte, 100)
- }
- cmd, _ := nsq.MultiPublish(topicName, mpub)
- buf := bytes.NewBuffer(cmd.Body)
- url := fmt.Sprintf("http://%s/mpub?topic=%s&binary=non_normalized_binary_param", httpAddr, topicName)
- resp, err := http.Post(url, "application/octet-stream", buf)
- test.Nil(t, err)
- defer resp.Body.Close()
- body, _ := ioutil.ReadAll(resp.Body)
- test.Equal(t, "OK", string(body))
- time.Sleep(5 * time.Millisecond)
- test.Equal(t, int64(5), topic.Depth())
- }
- func TestHTTPpubDefer(t *testing.T) {
- opts := NewOptions()
- opts.Logger = test.NewTestLogger(t)
- _, httpAddr, nsqd := mustStartNSQD(opts)
- defer os.RemoveAll(opts.DataPath)
- defer nsqd.Exit()
- topicName := "test_http_pub_defer" + strconv.Itoa(int(time.Now().Unix()))
- topic := nsqd.GetTopic(topicName)
- ch := topic.GetChannel("ch")
- buf := bytes.NewBuffer([]byte("test message"))
- url := fmt.Sprintf("http://%s/pub?topic=%s&defer=%d", httpAddr, topicName, 1000)
- resp, err := http.Post(url, "application/octet-stream", buf)
- test.Nil(t, err)
- defer resp.Body.Close()
- body, _ := ioutil.ReadAll(resp.Body)
- test.Equal(t, "OK", string(body))
- time.Sleep(5 * time.Millisecond)
- ch.deferredMutex.Lock()
- numDef := len(ch.deferredMessages)
- ch.deferredMutex.Unlock()
- test.Equal(t, 1, numDef)
- }
- func TestHTTPSRequire(t *testing.T) {
- opts := NewOptions()
- opts.Logger = test.NewTestLogger(t)
- opts.LogLevel = LOG_DEBUG
- opts.TLSCert = "./test/certs/server.pem"
- opts.TLSKey = "./test/certs/server.key"
- opts.TLSClientAuthPolicy = "require"
- _, httpAddr, nsqd := mustStartNSQD(opts)
- defer os.RemoveAll(opts.DataPath)
- defer nsqd.Exit()
- topicName := "test_http_pub_req" + strconv.Itoa(int(time.Now().Unix()))
- topic := nsqd.GetTopic(topicName)
- buf := bytes.NewBuffer([]byte("test message"))
- url := fmt.Sprintf("http://%s/pub?topic=%s", httpAddr, topicName)
- resp, err := http.Post(url, "application/octet-stream", buf)
- test.Equal(t, 403, resp.StatusCode)
- httpsAddr := nsqd.httpsListener.Addr().(*net.TCPAddr)
- cert, err := tls.LoadX509KeyPair("./test/certs/cert.pem", "./test/certs/key.pem")
- test.Nil(t, err)
- tlsConfig := &tls.Config{
- Certificates: []tls.Certificate{cert},
- InsecureSkipVerify: true,
- MinVersion: 0,
- }
- transport := &http.Transport{
- TLSClientConfig: tlsConfig,
- }
- client := &http.Client{Transport: transport}
- buf = bytes.NewBuffer([]byte("test message"))
- url = fmt.Sprintf("https://%s/pub?topic=%s", httpsAddr, topicName)
- resp, err = client.Post(url, "application/octet-stream", buf)
- test.Nil(t, err)
- defer resp.Body.Close()
- body, _ := ioutil.ReadAll(resp.Body)
- test.Equal(t, "OK", string(body))
- time.Sleep(5 * time.Millisecond)
- test.Equal(t, int64(1), topic.Depth())
- }
- func TestHTTPSRequireVerify(t *testing.T) {
- opts := NewOptions()
- opts.Logger = test.NewTestLogger(t)
- opts.LogLevel = LOG_DEBUG
- opts.TLSCert = "./test/certs/server.pem"
- opts.TLSKey = "./test/certs/server.key"
- opts.TLSRootCAFile = "./test/certs/ca.pem"
- opts.TLSClientAuthPolicy = "require-verify"
- _, httpAddr, nsqd := mustStartNSQD(opts)
- defer os.RemoveAll(opts.DataPath)
- defer nsqd.Exit()
- httpsAddr := nsqd.httpsListener.Addr().(*net.TCPAddr)
- topicName := "test_http_pub_req_verf" + strconv.Itoa(int(time.Now().Unix()))
- topic := nsqd.GetTopic(topicName)
- // no cert
- buf := bytes.NewBuffer([]byte("test message"))
- url := fmt.Sprintf("http://%s/pub?topic=%s", httpAddr, topicName)
- resp, err := http.Post(url, "application/octet-stream", buf)
- test.Equal(t, 403, resp.StatusCode)
- // unsigned cert
- cert, err := tls.LoadX509KeyPair("./test/certs/cert.pem", "./test/certs/key.pem")
- test.Nil(t, err)
- tlsConfig := &tls.Config{
- Certificates: []tls.Certificate{cert},
- InsecureSkipVerify: true,
- }
- transport := &http.Transport{
- TLSClientConfig: tlsConfig,
- }
- client := &http.Client{Transport: transport}
- buf = bytes.NewBuffer([]byte("test message"))
- url = fmt.Sprintf("https://%s/pub?topic=%s", httpsAddr, topicName)
- resp, err = client.Post(url, "application/octet-stream", buf)
- test.NotNil(t, err)
- // signed cert
- cert, err = tls.LoadX509KeyPair("./test/certs/client.pem", "./test/certs/client.key")
- test.Nil(t, err)
- tlsConfig = &tls.Config{
- Certificates: []tls.Certificate{cert},
- InsecureSkipVerify: true,
- }
- transport = &http.Transport{
- TLSClientConfig: tlsConfig,
- }
- client = &http.Client{Transport: transport}
- buf = bytes.NewBuffer([]byte("test message"))
- url = fmt.Sprintf("https://%s/pub?topic=%s", httpsAddr, topicName)
- resp, err = client.Post(url, "application/octet-stream", buf)
- test.Nil(t, err)
- defer resp.Body.Close()
- body, _ := ioutil.ReadAll(resp.Body)
- test.Equal(t, "OK", string(body))
- time.Sleep(5 * time.Millisecond)
- test.Equal(t, int64(1), topic.Depth())
- }
- func TestTLSRequireVerifyExceptHTTP(t *testing.T) {
- opts := NewOptions()
- opts.Logger = test.NewTestLogger(t)
- opts.LogLevel = LOG_DEBUG
- opts.TLSCert = "./test/certs/server.pem"
- opts.TLSKey = "./test/certs/server.key"
- opts.TLSRootCAFile = "./test/certs/ca.pem"
- opts.TLSClientAuthPolicy = "require-verify"
- opts.TLSRequired = TLSRequiredExceptHTTP
- _, httpAddr, nsqd := mustStartNSQD(opts)
- defer os.RemoveAll(opts.DataPath)
- defer nsqd.Exit()
- topicName := "test_http_req_verf_except_http" + strconv.Itoa(int(time.Now().Unix()))
- topic := nsqd.GetTopic(topicName)
- // no cert
- buf := bytes.NewBuffer([]byte("test message"))
- url := fmt.Sprintf("http://%s/pub?topic=%s", httpAddr, topicName)
- resp, err := http.Post(url, "application/octet-stream", buf)
- test.Nil(t, err)
- defer resp.Body.Close()
- body, _ := ioutil.ReadAll(resp.Body)
- test.Equal(t, "OK", string(body))
- time.Sleep(5 * time.Millisecond)
- test.Equal(t, int64(1), topic.Depth())
- }
- func TestHTTPV1TopicChannel(t *testing.T) {
- opts := NewOptions()
- opts.Logger = test.NewTestLogger(t)
- _, httpAddr, nsqd := mustStartNSQD(opts)
- defer os.RemoveAll(opts.DataPath)
- defer nsqd.Exit()
- topicName := "test_http_topic_channel2" + strconv.Itoa(int(time.Now().Unix()))
- channelName := "ch2"
- url := fmt.Sprintf("http://%s/topic/create?topic=%s", httpAddr, topicName)
- resp, err := http.Post(url, "application/json", nil)
- test.Nil(t, err)
- test.Equal(t, 200, resp.StatusCode)
- body, _ := ioutil.ReadAll(resp.Body)
- resp.Body.Close()
- test.Equal(t, "", string(body))
- test.Equal(t, "smq; version=1.0", resp.Header.Get("X-SMQ-Content-Type"))
- url = fmt.Sprintf("http://%s/channel/create?topic=%s&channel=%s", httpAddr, topicName, channelName)
- resp, err = http.Post(url, "application/json", nil)
- test.Nil(t, err)
- test.Equal(t, 200, resp.StatusCode)
- body, _ = ioutil.ReadAll(resp.Body)
- resp.Body.Close()
- test.Equal(t, "", string(body))
- test.Equal(t, "smq; version=1.0", resp.Header.Get("X-SMQ-Content-Type"))
- topic, err := nsqd.GetExistingTopic(topicName)
- test.Nil(t, err)
- test.NotNil(t, topic)
- channel, err := topic.GetExistingChannel(channelName)
- test.Nil(t, err)
- test.NotNil(t, channel)
- em := ErrMessage{}
- url = fmt.Sprintf("http://%s/topic/pause", httpAddr)
- resp, err = http.Post(url, "application/json", nil)
- test.Nil(t, err)
- test.Equal(t, 400, resp.StatusCode)
- test.Equal(t, "Bad Request", http.StatusText(resp.StatusCode))
- body, _ = ioutil.ReadAll(resp.Body)
- resp.Body.Close()
- t.Logf("%s", body)
- err = json.Unmarshal(body, &em)
- test.Nil(t, err)
- test.Equal(t, "MISSING_ARG_TOPIC", em.Message)
- url = fmt.Sprintf("http://%s/topic/pause?topic=%s", httpAddr, topicName+"abc")
- resp, err = http.Post(url, "application/json", nil)
- test.Nil(t, err)
- test.Equal(t, 404, resp.StatusCode)
- test.Equal(t, "Not Found", http.StatusText(resp.StatusCode))
- body, _ = ioutil.ReadAll(resp.Body)
- resp.Body.Close()
- t.Logf("%s", body)
- err = json.Unmarshal(body, &em)
- test.Nil(t, err)
- test.Equal(t, "TOPIC_NOT_FOUND", em.Message)
- url = fmt.Sprintf("http://%s/topic/pause?topic=%s", httpAddr, topicName)
- resp, err = http.Post(url, "application/json", nil)
- test.Nil(t, err)
- test.Equal(t, 200, resp.StatusCode)
- body, _ = ioutil.ReadAll(resp.Body)
- resp.Body.Close()
- test.Equal(t, "", string(body))
- test.Equal(t, "smq; version=1.0", resp.Header.Get("X-SMQ-Content-Type"))
- test.Equal(t, true, topic.IsPaused())
- url = fmt.Sprintf("http://%s/topic/unpause?topic=%s", httpAddr, topicName)
- resp, err = http.Post(url, "application/json", nil)
- test.Nil(t, err)
- test.Equal(t, 200, resp.StatusCode)
- body, _ = ioutil.ReadAll(resp.Body)
- resp.Body.Close()
- test.Equal(t, "", string(body))
- test.Equal(t, "smq; version=1.0", resp.Header.Get("X-SMQ-Content-Type"))
- test.Equal(t, false, topic.IsPaused())
- url = fmt.Sprintf("http://%s/channel/pause?topic=%s&channel=%s", httpAddr, topicName, channelName)
- resp, err = http.Post(url, "application/json", nil)
- test.Nil(t, err)
- test.Equal(t, 200, resp.StatusCode)
- body, _ = ioutil.ReadAll(resp.Body)
- resp.Body.Close()
- test.Equal(t, "", string(body))
- test.Equal(t, "smq; version=1.0", resp.Header.Get("X-SMQ-Content-Type"))
- test.Equal(t, true, channel.IsPaused())
- url = fmt.Sprintf("http://%s/channel/unpause?topic=%s&channel=%s", httpAddr, topicName, channelName)
- resp, err = http.Post(url, "application/json", nil)
- test.Nil(t, err)
- test.Equal(t, 200, resp.StatusCode)
- body, _ = ioutil.ReadAll(resp.Body)
- resp.Body.Close()
- test.Equal(t, "", string(body))
- test.Equal(t, "smq; version=1.0", resp.Header.Get("X-SMQ-Content-Type"))
- test.Equal(t, false, channel.IsPaused())
- url = fmt.Sprintf("http://%s/channel/delete?topic=%s&channel=%s", httpAddr, topicName, channelName)
- resp, err = http.Post(url, "application/json", nil)
- test.Nil(t, err)
- test.Equal(t, 200, resp.StatusCode)
- body, _ = ioutil.ReadAll(resp.Body)
- resp.Body.Close()
- test.Equal(t, "", string(body))
- test.Equal(t, "smq; version=1.0", resp.Header.Get("X-SMQ-Content-Type"))
- _, err = topic.GetExistingChannel(channelName)
- test.NotNil(t, err)
- url = fmt.Sprintf("http://%s/topic/delete?topic=%s", httpAddr, topicName)
- resp, err = http.Post(url, "application/json", nil)
- test.Nil(t, err)
- test.Equal(t, 200, resp.StatusCode)
- body, _ = ioutil.ReadAll(resp.Body)
- resp.Body.Close()
- test.Equal(t, "", string(body))
- test.Equal(t, "smq; version=1.0", resp.Header.Get("X-SMQ-Content-Type"))
- _, err = nsqd.GetExistingTopic(topicName)
- test.NotNil(t, err)
- }
- func TestHTTPClientStats(t *testing.T) {
- topicName := "test_http_client_stats" + strconv.Itoa(int(time.Now().Unix()))
- opts := NewOptions()
- opts.Logger = test.NewTestLogger(t)
- tcpAddr, httpAddr, nsqd := mustStartNSQD(opts)
- defer os.RemoveAll(opts.DataPath)
- defer nsqd.Exit()
- conn, err := mustConnectNSQD(tcpAddr)
- test.Nil(t, err)
- defer conn.Close()
- identify(t, conn, nil, frameTypeResponse)
- sub(t, conn, topicName, "ch")
- var d struct {
- Topics []struct {
- Channels []struct {
- ClientCount int `json:"client_count"`
- Clients []struct {
- } `json:"clients"`
- } `json:"channels"`
- } `json:"topics"`
- Memory *struct{} `json:"memory,omitempty"`
- }
- endpoint := fmt.Sprintf("http://127.0.0.1:%d/stats?format=json", httpAddr.Port)
- err = http_api.NewClient(nil, ConnectTimeout, RequestTimeout).GETV1(endpoint, &d)
- test.Nil(t, err)
- test.Equal(t, 1, len(d.Topics[0].Channels[0].Clients))
- test.Equal(t, 1, d.Topics[0].Channels[0].ClientCount)
- test.NotNil(t, d.Memory)
- endpoint = fmt.Sprintf("http://127.0.0.1:%d/stats?format=json&include_clients=true", httpAddr.Port)
- err = http_api.NewClient(nil, ConnectTimeout, RequestTimeout).GETV1(endpoint, &d)
- test.Nil(t, err)
- test.Equal(t, 1, len(d.Topics[0].Channels[0].Clients))
- test.Equal(t, 1, d.Topics[0].Channels[0].ClientCount)
- endpoint = fmt.Sprintf("http://127.0.0.1:%d/stats?format=json&include_clients=false", httpAddr.Port)
- err = http_api.NewClient(nil, ConnectTimeout, RequestTimeout).GETV1(endpoint, &d)
- test.Nil(t, err)
- test.Equal(t, 0, len(d.Topics[0].Channels[0].Clients))
- test.Equal(t, 1, d.Topics[0].Channels[0].ClientCount)
- endpoint = fmt.Sprintf("http://127.0.0.1:%d/stats?format=json&include_mem=true", httpAddr.Port)
- err = http_api.NewClient(nil, ConnectTimeout, RequestTimeout).GETV1(endpoint, &d)
- test.Nil(t, err)
- test.NotNil(t, d.Memory)
- d.Memory = nil
- endpoint = fmt.Sprintf("http://127.0.0.1:%d/stats?format=json&include_mem=false", httpAddr.Port)
- err = http_api.NewClient(nil, ConnectTimeout, RequestTimeout).GETV1(endpoint, &d)
- test.Nil(t, err)
- test.Nil(t, d.Memory)
- }
- func TestHTTPgetStatusJSON(t *testing.T) {
- testTime := time.Now()
- opts := NewOptions()
- opts.Logger = test.NewTestLogger(t)
- _, httpAddr, nsqd := mustStartNSQD(opts)
- defer os.RemoveAll(opts.DataPath)
- defer nsqd.Exit()
- nsqd.startTime = testTime
- expectedJSON := fmt.Sprintf(`{"version":"%v","health":"OK","start_time":%v,"topics":[],"memory":{`, version.Binary, testTime.Unix())
- url := fmt.Sprintf("http://%s/stats?format=json", httpAddr)
- resp, err := http.Get(url)
- test.Nil(t, err)
- defer resp.Body.Close()
- body, _ := ioutil.ReadAll(resp.Body)
- test.Equal(t, 200, resp.StatusCode)
- test.Equal(t, true, strings.HasPrefix(string(body), expectedJSON))
- }
- func TestHTTPgetStatusText(t *testing.T) {
- testTime := time.Now()
- opts := NewOptions()
- opts.Logger = test.NewTestLogger(t)
- _, httpAddr, nsqd := mustStartNSQD(opts)
- defer os.RemoveAll(opts.DataPath)
- defer nsqd.Exit()
- nsqd.startTime = testTime
- url := fmt.Sprintf("http://%s/stats?format=text", httpAddr)
- resp, err := http.Get(url)
- test.Nil(t, err)
- defer resp.Body.Close()
- body, _ := ioutil.ReadAll(resp.Body)
- test.Equal(t, 200, resp.StatusCode)
- test.NotNil(t, body)
- }
- func TestHTTPconfig(t *testing.T) {
- lopts := nsqlookupd.NewOptions()
- lopts.Logger = test.NewTestLogger(t)
- lopts1 := *lopts
- _, _, lookupd1 := mustStartNSQLookupd(&lopts1)
- defer lookupd1.Exit()
- lopts2 := *lopts
- _, _, lookupd2 := mustStartNSQLookupd(&lopts2)
- defer lookupd2.Exit()
- opts := NewOptions()
- opts.Logger = test.NewTestLogger(t)
- _, httpAddr, nsqd := mustStartNSQD(opts)
- defer os.RemoveAll(opts.DataPath)
- defer nsqd.Exit()
- url := fmt.Sprintf("http://%s/config/nsqlookupd_tcp_addresses", httpAddr)
- resp, err := http.Get(url)
- test.Nil(t, err)
- defer resp.Body.Close()
- body, _ := ioutil.ReadAll(resp.Body)
- test.Equal(t, 200, resp.StatusCode)
- test.Equal(t, "[]", string(body))
- client := http.Client{}
- addrs := fmt.Sprintf(`["%s","%s"]`, lookupd1.RealTCPAddr().String(), lookupd2.RealTCPAddr().String())
- url = fmt.Sprintf("http://%s/config/nsqlookupd_tcp_addresses", httpAddr)
- req, err := http.NewRequest("PUT", url, bytes.NewBuffer([]byte(addrs)))
- test.Nil(t, err)
- resp, err = client.Do(req)
- test.Nil(t, err)
- defer resp.Body.Close()
- body, _ = ioutil.ReadAll(resp.Body)
- test.Equal(t, 200, resp.StatusCode)
- test.Equal(t, addrs, string(body))
- url = fmt.Sprintf("http://%s/config/log_level", httpAddr)
- req, err = http.NewRequest("PUT", url, bytes.NewBuffer([]byte(`fatal`)))
- test.Nil(t, err)
- resp, err = client.Do(req)
- test.Nil(t, err)
- defer resp.Body.Close()
- body, _ = ioutil.ReadAll(resp.Body)
- test.Equal(t, 200, resp.StatusCode)
- test.Equal(t, LOG_FATAL, nsqd.getOpts().LogLevel)
- url = fmt.Sprintf("http://%s/config/log_level", httpAddr)
- req, err = http.NewRequest("PUT", url, bytes.NewBuffer([]byte(`bad`)))
- test.Nil(t, err)
- resp, err = client.Do(req)
- test.Nil(t, err)
- defer resp.Body.Close()
- body, _ = ioutil.ReadAll(resp.Body)
- test.Equal(t, 400, resp.StatusCode)
- }
- func TestHTTPerrors(t *testing.T) {
- opts := NewOptions()
- opts.Logger = test.NewTestLogger(t)
- _, httpAddr, nsqd := mustStartNSQD(opts)
- defer os.RemoveAll(opts.DataPath)
- defer nsqd.Exit()
- url := fmt.Sprintf("http://%s/stats", httpAddr)
- resp, err := http.Post(url, "text/plain", nil)
- test.Nil(t, err)
- defer resp.Body.Close()
- body, _ := ioutil.ReadAll(resp.Body)
- test.Equal(t, 405, resp.StatusCode)
- test.Equal(t, `{"message":"METHOD_NOT_ALLOWED"}`, string(body))
- url = fmt.Sprintf("http://%s/not_found", httpAddr)
- resp, err = http.Get(url)
- test.Nil(t, err)
- defer resp.Body.Close()
- body, _ = ioutil.ReadAll(resp.Body)
- test.Equal(t, 404, resp.StatusCode)
- test.Equal(t, `{"message":"NOT_FOUND"}`, string(body))
- }
- func TestDeleteTopic(t *testing.T) {
- opts := NewOptions()
- opts.Logger = test.NewTestLogger(t)
- _, httpAddr, nsqd := mustStartNSQD(opts)
- defer os.RemoveAll(opts.DataPath)
- defer nsqd.Exit()
- em := ErrMessage{}
- url := fmt.Sprintf("http://%s/topic/delete", httpAddr)
- resp, err := http.Post(url, "application/json", nil)
- test.Nil(t, err)
- test.Equal(t, 400, resp.StatusCode)
- test.Equal(t, "Bad Request", http.StatusText(resp.StatusCode))
- body, _ := ioutil.ReadAll(resp.Body)
- resp.Body.Close()
- t.Logf("%s", body)
- err = json.Unmarshal(body, &em)
- test.Nil(t, err)
- test.Equal(t, "MISSING_ARG_TOPIC", em.Message)
- topicName := "test_http_delete_topic" + strconv.Itoa(int(time.Now().Unix()))
- url = fmt.Sprintf("http://%s/topic/delete?topic=%s", httpAddr, topicName)
- resp, err = http.Post(url, "application/json", nil)
- test.Nil(t, err)
- test.Equal(t, 404, resp.StatusCode)
- test.Equal(t, "Not Found", http.StatusText(resp.StatusCode))
- body, _ = ioutil.ReadAll(resp.Body)
- resp.Body.Close()
- t.Logf("%s", body)
- err = json.Unmarshal(body, &em)
- test.Nil(t, err)
- test.Equal(t, "TOPIC_NOT_FOUND", em.Message)
- nsqd.GetTopic(topicName)
- resp, err = http.Post(url, "application/json", nil)
- test.Nil(t, err)
- test.Equal(t, 200, resp.StatusCode)
- body, _ = ioutil.ReadAll(resp.Body)
- resp.Body.Close()
- t.Logf("%s", body)
- test.Equal(t, []byte(""), body)
- }
- func TestEmptyTopic(t *testing.T) {
- opts := NewOptions()
- opts.Logger = test.NewTestLogger(t)
- _, httpAddr, nsqd := mustStartNSQD(opts)
- defer os.RemoveAll(opts.DataPath)
- defer nsqd.Exit()
- em := ErrMessage{}
- url := fmt.Sprintf("http://%s/topic/empty", httpAddr)
- resp, err := http.Post(url, "application/json", nil)
- test.Nil(t, err)
- test.Equal(t, 400, resp.StatusCode)
- test.Equal(t, "Bad Request", http.StatusText(resp.StatusCode))
- body, _ := ioutil.ReadAll(resp.Body)
- resp.Body.Close()
- t.Logf("%s", body)
- err = json.Unmarshal(body, &em)
- test.Nil(t, err)
- test.Equal(t, "MISSING_ARG_TOPIC", em.Message)
- topicName := "test_http_empty_topic" + strconv.Itoa(int(time.Now().Unix()))
- url = fmt.Sprintf("http://%s/topic/empty?topic=%s", httpAddr, topicName+"$")
- resp, err = http.Post(url, "application/json", nil)
- test.Nil(t, err)
- test.Equal(t, 400, resp.StatusCode)
- test.Equal(t, "Bad Request", http.StatusText(resp.StatusCode))
- body, _ = ioutil.ReadAll(resp.Body)
- resp.Body.Close()
- t.Logf("%s", body)
- err = json.Unmarshal(body, &em)
- test.Nil(t, err)
- test.Equal(t, "INVALID_TOPIC", em.Message)
- url = fmt.Sprintf("http://%s/topic/empty?topic=%s", httpAddr, topicName)
- resp, err = http.Post(url, "application/json", nil)
- test.Nil(t, err)
- test.Equal(t, 404, resp.StatusCode)
- test.Equal(t, "Not Found", http.StatusText(resp.StatusCode))
- body, _ = ioutil.ReadAll(resp.Body)
- resp.Body.Close()
- t.Logf("%s", body)
- err = json.Unmarshal(body, &em)
- test.Nil(t, err)
- test.Equal(t, "TOPIC_NOT_FOUND", em.Message)
- nsqd.GetTopic(topicName)
- resp, err = http.Post(url, "application/json", nil)
- test.Nil(t, err)
- test.Equal(t, 200, resp.StatusCode)
- body, _ = ioutil.ReadAll(resp.Body)
- resp.Body.Close()
- t.Logf("%s", body)
- test.Equal(t, []byte(""), body)
- }
- func TestEmptyChannel(t *testing.T) {
- opts := NewOptions()
- opts.Logger = test.NewTestLogger(t)
- _, httpAddr, nsqd := mustStartNSQD(opts)
- defer os.RemoveAll(opts.DataPath)
- defer nsqd.Exit()
- em := ErrMessage{}
- url := fmt.Sprintf("http://%s/channel/empty", httpAddr)
- resp, err := http.Post(url, "application/json", nil)
- test.Nil(t, err)
- test.Equal(t, 400, resp.StatusCode)
- test.Equal(t, "Bad Request", http.StatusText(resp.StatusCode))
- body, _ := ioutil.ReadAll(resp.Body)
- resp.Body.Close()
- t.Logf("%s", body)
- err = json.Unmarshal(body, &em)
- test.Nil(t, err)
- test.Equal(t, "MISSING_ARG_TOPIC", em.Message)
- topicName := "test_http_empty_channel" + strconv.Itoa(int(time.Now().Unix()))
- url = fmt.Sprintf("http://%s/channel/empty?topic=%s", httpAddr, topicName)
- resp, err = http.Post(url, "application/json", nil)
- test.Nil(t, err)
- test.Equal(t, 400, resp.StatusCode)
- test.Equal(t, "Bad Request", http.StatusText(resp.StatusCode))
- body, _ = ioutil.ReadAll(resp.Body)
- resp.Body.Close()
- t.Logf("%s", body)
- err = json.Unmarshal(body, &em)
- test.Nil(t, err)
- test.Equal(t, "MISSING_ARG_CHANNEL", em.Message)
- channelName := "ch"
- url = fmt.Sprintf("http://%s/channel/empty?topic=%s&channel=%s", httpAddr, topicName, channelName)
- resp, err = http.Post(url, "application/json", nil)
- test.Nil(t, err)
- test.Equal(t, 404, resp.StatusCode)
- test.Equal(t, "Not Found", http.StatusText(resp.StatusCode))
- body, _ = ioutil.ReadAll(resp.Body)
- resp.Body.Close()
- t.Logf("%s", body)
- err = json.Unmarshal(body, &em)
- test.Nil(t, err)
- test.Equal(t, "TOPIC_NOT_FOUND", em.Message)
- topic := nsqd.GetTopic(topicName)
- url = fmt.Sprintf("http://%s/channel/empty?topic=%s&channel=%s", httpAddr, topicName, channelName)
- resp, err = http.Post(url, "application/json", nil)
- test.Nil(t, err)
- test.Equal(t, 404, resp.StatusCode)
- test.Equal(t, "Not Found", http.StatusText(resp.StatusCode))
- body, _ = ioutil.ReadAll(resp.Body)
- resp.Body.Close()
- t.Logf("%s", body)
- err = json.Unmarshal(body, &em)
- test.Nil(t, err)
- test.Equal(t, "CHANNEL_NOT_FOUND", em.Message)
- topic.GetChannel(channelName)
- resp, err = http.Post(url, "application/json", nil)
- test.Nil(t, err)
- test.Equal(t, 200, resp.StatusCode)
- body, _ = ioutil.ReadAll(resp.Body)
- resp.Body.Close()
- t.Logf("%s", body)
- test.Equal(t, []byte(""), body)
- }
- func TestInfo(t *testing.T) {
- opts := NewOptions()
- opts.Logger = test.NewTestLogger(t)
- _, httpAddr, nsqd := mustStartNSQD(opts)
- defer os.RemoveAll(opts.DataPath)
- defer nsqd.Exit()
- info := InfoDoc{}
- url := fmt.Sprintf("http://%s/info", httpAddr)
- resp, err := http.Get(url)
- test.Nil(t, err)
- test.Equal(t, 200, resp.StatusCode)
- body, _ := ioutil.ReadAll(resp.Body)
- resp.Body.Close()
- t.Logf("%s", body)
- err = json.Unmarshal(body, &info)
- test.Nil(t, err)
- test.Equal(t, version.Binary, info.Version)
- }
- func BenchmarkHTTPpub(b *testing.B) {
- var wg sync.WaitGroup
- b.StopTimer()
- opts := NewOptions()
- opts.Logger = test.NewTestLogger(b)
- opts.MemQueueSize = int64(b.N)
- _, httpAddr, nsqd := mustStartNSQD(opts)
- defer os.RemoveAll(opts.DataPath)
- msg := make([]byte, 256)
- topicName := "bench_http_pub" + strconv.Itoa(int(time.Now().Unix()))
- url := fmt.Sprintf("http://%s/pub?topic=%s", httpAddr, topicName)
- client := &http.Client{}
- b.SetBytes(int64(len(msg)))
- b.StartTimer()
- for j := 0; j < runtime.GOMAXPROCS(0); j++ {
- wg.Add(1)
- go func() {
- num := b.N / runtime.GOMAXPROCS(0)
- for i := 0; i < num; i++ {
- buf := bytes.NewBuffer(msg)
- req, _ := http.NewRequest("POST", url, buf)
- resp, err := client.Do(req)
- if err != nil {
- panic(err.Error())
- }
- body, _ := ioutil.ReadAll(resp.Body)
- if !bytes.Equal(body, []byte("OK")) {
- panic("bad response")
- }
- resp.Body.Close()
- }
- wg.Done()
- }()
- }
- wg.Wait()
- b.StopTimer()
- nsqd.Exit()
- }
|