123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250 |
- package nsqd
- import (
- "fmt"
- "io/ioutil"
- "net/http"
- "os"
- "strconv"
- "testing"
- "time"
- "github.com/nsqio/nsq/internal/test"
- )
- // ensure that we can push a message through a topic and get it out of a channel
- func TestPutMessage(t *testing.T) {
- opts := NewOptions()
- opts.Logger = test.NewTestLogger(t)
- _, _, nsqd := mustStartNSQD(opts)
- defer os.RemoveAll(opts.DataPath)
- defer nsqd.Exit()
- topicName := "test_put_message" + strconv.Itoa(int(time.Now().Unix()))
- topic := nsqd.GetTopic(topicName)
- channel1 := topic.GetChannel("ch")
- var id MessageID
- msg := NewMessage(id, []byte("test"))
- topic.PutMessage(msg)
- outputMsg := <-channel1.memoryMsgChan
- test.Equal(t, msg.ID, outputMsg.ID)
- test.Equal(t, msg.Body, outputMsg.Body)
- }
- // ensure that both channels get the same message
- func TestPutMessage2Chan(t *testing.T) {
- opts := NewOptions()
- opts.Logger = test.NewTestLogger(t)
- _, _, nsqd := mustStartNSQD(opts)
- defer os.RemoveAll(opts.DataPath)
- defer nsqd.Exit()
- topicName := "test_put_message_2chan" + strconv.Itoa(int(time.Now().Unix()))
- topic := nsqd.GetTopic(topicName)
- channel1 := topic.GetChannel("ch1")
- channel2 := topic.GetChannel("ch2")
- var id MessageID
- msg := NewMessage(id, []byte("test"))
- topic.PutMessage(msg)
- outputMsg1 := <-channel1.memoryMsgChan
- test.Equal(t, msg.ID, outputMsg1.ID)
- test.Equal(t, msg.Body, outputMsg1.Body)
- outputMsg2 := <-channel2.memoryMsgChan
- test.Equal(t, msg.ID, outputMsg2.ID)
- test.Equal(t, msg.Body, outputMsg2.Body)
- }
- func TestInFlightWorker(t *testing.T) {
- count := 250
- opts := NewOptions()
- opts.Logger = test.NewTestLogger(t)
- opts.MsgTimeout = 100 * time.Millisecond
- opts.QueueScanRefreshInterval = 100 * time.Millisecond
- _, _, nsqd := mustStartNSQD(opts)
- defer os.RemoveAll(opts.DataPath)
- defer nsqd.Exit()
- topicName := "test_in_flight_worker" + strconv.Itoa(int(time.Now().Unix()))
- topic := nsqd.GetTopic(topicName)
- channel := topic.GetChannel("channel")
- for i := 0; i < count; i++ {
- msg := NewMessage(topic.GenerateID(), []byte("test"))
- channel.StartInFlightTimeout(msg, 0, opts.MsgTimeout)
- }
- channel.Lock()
- inFlightMsgs := len(channel.inFlightMessages)
- channel.Unlock()
- test.Equal(t, count, inFlightMsgs)
- channel.inFlightMutex.Lock()
- inFlightPQMsgs := len(channel.inFlightPQ)
- channel.inFlightMutex.Unlock()
- test.Equal(t, count, inFlightPQMsgs)
- // the in flight worker has a resolution of 100ms so we need to wait
- // at least that much longer than our msgTimeout (in worst case)
- time.Sleep(4 * opts.MsgTimeout)
- channel.Lock()
- inFlightMsgs = len(channel.inFlightMessages)
- channel.Unlock()
- test.Equal(t, 0, inFlightMsgs)
- channel.inFlightMutex.Lock()
- inFlightPQMsgs = len(channel.inFlightPQ)
- channel.inFlightMutex.Unlock()
- test.Equal(t, 0, inFlightPQMsgs)
- }
- func TestChannelEmpty(t *testing.T) {
- opts := NewOptions()
- opts.Logger = test.NewTestLogger(t)
- _, _, nsqd := mustStartNSQD(opts)
- defer os.RemoveAll(opts.DataPath)
- defer nsqd.Exit()
- topicName := "test_channel_empty" + strconv.Itoa(int(time.Now().Unix()))
- topic := nsqd.GetTopic(topicName)
- channel := topic.GetChannel("channel")
- msgs := make([]*Message, 0, 25)
- for i := 0; i < 25; i++ {
- msg := NewMessage(topic.GenerateID(), []byte("test"))
- channel.StartInFlightTimeout(msg, 0, opts.MsgTimeout)
- msgs = append(msgs, msg)
- }
- channel.RequeueMessage(0, msgs[len(msgs)-1].ID, 100*time.Millisecond)
- test.Equal(t, 24, len(channel.inFlightMessages))
- test.Equal(t, 24, len(channel.inFlightPQ))
- test.Equal(t, 1, len(channel.deferredMessages))
- test.Equal(t, 1, len(channel.deferredPQ))
- channel.Empty()
- test.Equal(t, 0, len(channel.inFlightMessages))
- test.Equal(t, 0, len(channel.inFlightPQ))
- test.Equal(t, 0, len(channel.deferredMessages))
- test.Equal(t, 0, len(channel.deferredPQ))
- test.Equal(t, int64(0), channel.Depth())
- }
- func TestChannelEmptyConsumer(t *testing.T) {
- opts := NewOptions()
- opts.Logger = test.NewTestLogger(t)
- tcpAddr, _, nsqd := mustStartNSQD(opts)
- defer os.RemoveAll(opts.DataPath)
- defer nsqd.Exit()
- conn, _ := mustConnectNSQD(tcpAddr)
- defer conn.Close()
- topicName := "test_channel_empty" + strconv.Itoa(int(time.Now().Unix()))
- topic := nsqd.GetTopic(topicName)
- channel := topic.GetChannel("channel")
- client := newClientV2(0, conn, nsqd)
- client.SetReadyCount(25)
- err := channel.AddClient(client.ID, client)
- test.Equal(t, err, nil)
- for i := 0; i < 25; i++ {
- msg := NewMessage(topic.GenerateID(), []byte("test"))
- channel.StartInFlightTimeout(msg, 0, opts.MsgTimeout)
- client.SendingMessage()
- }
- for _, cl := range channel.clients {
- stats := cl.Stats("").(ClientV2Stats)
- test.Equal(t, int64(25), stats.InFlightCount)
- }
- channel.Empty()
- for _, cl := range channel.clients {
- stats := cl.Stats("").(ClientV2Stats)
- test.Equal(t, int64(0), stats.InFlightCount)
- }
- }
- func TestMaxChannelConsumers(t *testing.T) {
- opts := NewOptions()
- opts.Logger = test.NewTestLogger(t)
- opts.MaxChannelConsumers = 1
- tcpAddr, _, nsqd := mustStartNSQD(opts)
- defer os.RemoveAll(opts.DataPath)
- defer nsqd.Exit()
- conn, _ := mustConnectNSQD(tcpAddr)
- defer conn.Close()
- topicName := "test_max_channel_consumers" + strconv.Itoa(int(time.Now().Unix()))
- topic := nsqd.GetTopic(topicName)
- channel := topic.GetChannel("channel")
- client1 := newClientV2(1, conn, nsqd)
- client1.SetReadyCount(25)
- err := channel.AddClient(client1.ID, client1)
- test.Equal(t, err, nil)
- client2 := newClientV2(2, conn, nsqd)
- client2.SetReadyCount(25)
- err = channel.AddClient(client2.ID, client2)
- test.NotEqual(t, err, nil)
- }
- func TestChannelHealth(t *testing.T) {
- opts := NewOptions()
- opts.Logger = test.NewTestLogger(t)
- opts.MemQueueSize = 2
- _, httpAddr, nsqd := mustStartNSQD(opts)
- defer os.RemoveAll(opts.DataPath)
- defer nsqd.Exit()
- topic := nsqd.GetTopic("test")
- channel := topic.GetChannel("channel")
- channel.backend = &errorBackendQueue{}
- msg := NewMessage(topic.GenerateID(), make([]byte, 100))
- err := channel.PutMessage(msg)
- test.Nil(t, err)
- msg = NewMessage(topic.GenerateID(), make([]byte, 100))
- err = channel.PutMessage(msg)
- test.Nil(t, err)
- msg = NewMessage(topic.GenerateID(), make([]byte, 100))
- err = channel.PutMessage(msg)
- test.NotNil(t, err)
- url := fmt.Sprintf("http://%s/ping", httpAddr)
- resp, err := http.Get(url)
- test.Nil(t, err)
- test.Equal(t, 500, resp.StatusCode)
- body, _ := ioutil.ReadAll(resp.Body)
- resp.Body.Close()
- test.Equal(t, "NOK - never gonna happen", string(body))
- channel.backend = &errorRecoveredBackendQueue{}
- msg = NewMessage(topic.GenerateID(), make([]byte, 100))
- err = channel.PutMessage(msg)
- test.Nil(t, err)
- resp, err = http.Get(url)
- test.Nil(t, err)
- test.Equal(t, 200, resp.StatusCode)
- body, _ = ioutil.ReadAll(resp.Body)
- resp.Body.Close()
- test.Equal(t, "OK", string(body))
- }
|