package nsqd import ( "fmt" "io/ioutil" "net/http" "os" "strconv" "testing" "time" "github.com/nsqio/nsq/internal/test" ) // ensure that we can push a message through a topic and get it out of a channel func TestPutMessage(t *testing.T) { opts := NewOptions() opts.Logger = test.NewTestLogger(t) _, _, nsqd := mustStartNSQD(opts) defer os.RemoveAll(opts.DataPath) defer nsqd.Exit() topicName := "test_put_message" + strconv.Itoa(int(time.Now().Unix())) topic := nsqd.GetTopic(topicName) channel1 := topic.GetChannel("ch") var id MessageID msg := NewMessage(id, []byte("test")) topic.PutMessage(msg) outputMsg := <-channel1.memoryMsgChan test.Equal(t, msg.ID, outputMsg.ID) test.Equal(t, msg.Body, outputMsg.Body) } // ensure that both channels get the same message func TestPutMessage2Chan(t *testing.T) { opts := NewOptions() opts.Logger = test.NewTestLogger(t) _, _, nsqd := mustStartNSQD(opts) defer os.RemoveAll(opts.DataPath) defer nsqd.Exit() topicName := "test_put_message_2chan" + strconv.Itoa(int(time.Now().Unix())) topic := nsqd.GetTopic(topicName) channel1 := topic.GetChannel("ch1") channel2 := topic.GetChannel("ch2") var id MessageID msg := NewMessage(id, []byte("test")) topic.PutMessage(msg) outputMsg1 := <-channel1.memoryMsgChan test.Equal(t, msg.ID, outputMsg1.ID) test.Equal(t, msg.Body, outputMsg1.Body) outputMsg2 := <-channel2.memoryMsgChan test.Equal(t, msg.ID, outputMsg2.ID) test.Equal(t, msg.Body, outputMsg2.Body) } func TestInFlightWorker(t *testing.T) { count := 250 opts := NewOptions() opts.Logger = test.NewTestLogger(t) opts.MsgTimeout = 100 * time.Millisecond opts.QueueScanRefreshInterval = 100 * time.Millisecond _, _, nsqd := mustStartNSQD(opts) defer os.RemoveAll(opts.DataPath) defer nsqd.Exit() topicName := "test_in_flight_worker" + strconv.Itoa(int(time.Now().Unix())) topic := nsqd.GetTopic(topicName) channel := topic.GetChannel("channel") for i := 0; i < count; i++ { msg := NewMessage(topic.GenerateID(), []byte("test")) channel.StartInFlightTimeout(msg, 0, opts.MsgTimeout) } channel.Lock() inFlightMsgs := len(channel.inFlightMessages) channel.Unlock() test.Equal(t, count, inFlightMsgs) channel.inFlightMutex.Lock() inFlightPQMsgs := len(channel.inFlightPQ) channel.inFlightMutex.Unlock() test.Equal(t, count, inFlightPQMsgs) // the in flight worker has a resolution of 100ms so we need to wait // at least that much longer than our msgTimeout (in worst case) time.Sleep(4 * opts.MsgTimeout) channel.Lock() inFlightMsgs = len(channel.inFlightMessages) channel.Unlock() test.Equal(t, 0, inFlightMsgs) channel.inFlightMutex.Lock() inFlightPQMsgs = len(channel.inFlightPQ) channel.inFlightMutex.Unlock() test.Equal(t, 0, inFlightPQMsgs) } func TestChannelEmpty(t *testing.T) { opts := NewOptions() opts.Logger = test.NewTestLogger(t) _, _, nsqd := mustStartNSQD(opts) defer os.RemoveAll(opts.DataPath) defer nsqd.Exit() topicName := "test_channel_empty" + strconv.Itoa(int(time.Now().Unix())) topic := nsqd.GetTopic(topicName) channel := topic.GetChannel("channel") msgs := make([]*Message, 0, 25) for i := 0; i < 25; i++ { msg := NewMessage(topic.GenerateID(), []byte("test")) channel.StartInFlightTimeout(msg, 0, opts.MsgTimeout) msgs = append(msgs, msg) } channel.RequeueMessage(0, msgs[len(msgs)-1].ID, 100*time.Millisecond) test.Equal(t, 24, len(channel.inFlightMessages)) test.Equal(t, 24, len(channel.inFlightPQ)) test.Equal(t, 1, len(channel.deferredMessages)) test.Equal(t, 1, len(channel.deferredPQ)) channel.Empty() test.Equal(t, 0, len(channel.inFlightMessages)) test.Equal(t, 0, len(channel.inFlightPQ)) test.Equal(t, 0, len(channel.deferredMessages)) test.Equal(t, 0, len(channel.deferredPQ)) test.Equal(t, int64(0), channel.Depth()) } func TestChannelEmptyConsumer(t *testing.T) { opts := NewOptions() opts.Logger = test.NewTestLogger(t) tcpAddr, _, nsqd := mustStartNSQD(opts) defer os.RemoveAll(opts.DataPath) defer nsqd.Exit() conn, _ := mustConnectNSQD(tcpAddr) defer conn.Close() topicName := "test_channel_empty" + strconv.Itoa(int(time.Now().Unix())) topic := nsqd.GetTopic(topicName) channel := topic.GetChannel("channel") client := newClientV2(0, conn, nsqd) client.SetReadyCount(25) err := channel.AddClient(client.ID, client) test.Equal(t, err, nil) for i := 0; i < 25; i++ { msg := NewMessage(topic.GenerateID(), []byte("test")) channel.StartInFlightTimeout(msg, 0, opts.MsgTimeout) client.SendingMessage() } for _, cl := range channel.clients { stats := cl.Stats("").(ClientV2Stats) test.Equal(t, int64(25), stats.InFlightCount) } channel.Empty() for _, cl := range channel.clients { stats := cl.Stats("").(ClientV2Stats) test.Equal(t, int64(0), stats.InFlightCount) } } func TestMaxChannelConsumers(t *testing.T) { opts := NewOptions() opts.Logger = test.NewTestLogger(t) opts.MaxChannelConsumers = 1 tcpAddr, _, nsqd := mustStartNSQD(opts) defer os.RemoveAll(opts.DataPath) defer nsqd.Exit() conn, _ := mustConnectNSQD(tcpAddr) defer conn.Close() topicName := "test_max_channel_consumers" + strconv.Itoa(int(time.Now().Unix())) topic := nsqd.GetTopic(topicName) channel := topic.GetChannel("channel") client1 := newClientV2(1, conn, nsqd) client1.SetReadyCount(25) err := channel.AddClient(client1.ID, client1) test.Equal(t, err, nil) client2 := newClientV2(2, conn, nsqd) client2.SetReadyCount(25) err = channel.AddClient(client2.ID, client2) test.NotEqual(t, err, nil) } func TestChannelHealth(t *testing.T) { opts := NewOptions() opts.Logger = test.NewTestLogger(t) opts.MemQueueSize = 2 _, httpAddr, nsqd := mustStartNSQD(opts) defer os.RemoveAll(opts.DataPath) defer nsqd.Exit() topic := nsqd.GetTopic("test") channel := topic.GetChannel("channel") channel.backend = &errorBackendQueue{} msg := NewMessage(topic.GenerateID(), make([]byte, 100)) err := channel.PutMessage(msg) test.Nil(t, err) msg = NewMessage(topic.GenerateID(), make([]byte, 100)) err = channel.PutMessage(msg) test.Nil(t, err) msg = NewMessage(topic.GenerateID(), make([]byte, 100)) err = channel.PutMessage(msg) test.NotNil(t, err) url := fmt.Sprintf("http://%s/ping", httpAddr) resp, err := http.Get(url) test.Nil(t, err) test.Equal(t, 500, resp.StatusCode) body, _ := ioutil.ReadAll(resp.Body) resp.Body.Close() test.Equal(t, "NOK - never gonna happen", string(body)) channel.backend = &errorRecoveredBackendQueue{} msg = NewMessage(topic.GenerateID(), make([]byte, 100)) err = channel.PutMessage(msg) test.Nil(t, err) resp, err = http.Get(url) test.Nil(t, err) test.Equal(t, 200, resp.StatusCode) body, _ = ioutil.ReadAll(resp.Body) resp.Body.Close() test.Equal(t, "OK", string(body)) }