123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241 |
- package nsqd
- import (
- "errors"
- "fmt"
- "io/ioutil"
- "net/http"
- "os"
- "runtime"
- "strconv"
- "testing"
- "time"
- "github.com/nsqio/nsq/internal/test"
- )
- func TestGetTopic(t *testing.T) {
- opts := NewOptions()
- opts.Logger = test.NewTestLogger(t)
- _, _, nsqd := mustStartNSQD(opts)
- defer os.RemoveAll(opts.DataPath)
- defer nsqd.Exit()
- topic1 := nsqd.GetTopic("test")
- test.NotNil(t, topic1)
- test.Equal(t, "test", topic1.name)
- topic2 := nsqd.GetTopic("test")
- test.Equal(t, topic1, topic2)
- topic3 := nsqd.GetTopic("test2")
- test.Equal(t, "test2", topic3.name)
- test.NotEqual(t, topic2, topic3)
- }
- func TestGetChannel(t *testing.T) {
- opts := NewOptions()
- opts.Logger = test.NewTestLogger(t)
- _, _, nsqd := mustStartNSQD(opts)
- defer os.RemoveAll(opts.DataPath)
- defer nsqd.Exit()
- topic := nsqd.GetTopic("test")
- channel1 := topic.GetChannel("ch1")
- test.NotNil(t, channel1)
- test.Equal(t, "ch1", channel1.name)
- channel2 := topic.GetChannel("ch2")
- test.Equal(t, channel1, topic.channelMap["ch1"])
- test.Equal(t, channel2, topic.channelMap["ch2"])
- }
- type errorBackendQueue struct{}
- func (d *errorBackendQueue) Put([]byte) error { return errors.New("never gonna happen") }
- func (d *errorBackendQueue) ReadChan() <-chan []byte { return nil }
- func (d *errorBackendQueue) Close() error { return nil }
- func (d *errorBackendQueue) Delete() error { return nil }
- func (d *errorBackendQueue) Depth() int64 { return 0 }
- func (d *errorBackendQueue) Empty() error { return nil }
- type errorRecoveredBackendQueue struct{ errorBackendQueue }
- func (d *errorRecoveredBackendQueue) Put([]byte) error { return nil }
- func TestHealth(t *testing.T) {
- opts := NewOptions()
- opts.Logger = test.NewTestLogger(t)
- opts.MemQueueSize = 2
- _, httpAddr, nsqd := mustStartNSQD(opts)
- defer os.RemoveAll(opts.DataPath)
- defer nsqd.Exit()
- topic := nsqd.GetTopic("test")
- topic.backend = &errorBackendQueue{}
- msg := NewMessage(topic.GenerateID(), make([]byte, 100))
- err := topic.PutMessage(msg)
- test.Nil(t, err)
- msg = NewMessage(topic.GenerateID(), make([]byte, 100))
- err = topic.PutMessages([]*Message{msg})
- test.Nil(t, err)
- msg = NewMessage(topic.GenerateID(), make([]byte, 100))
- err = topic.PutMessage(msg)
- test.NotNil(t, err)
- msg = NewMessage(topic.GenerateID(), make([]byte, 100))
- err = topic.PutMessages([]*Message{msg})
- test.NotNil(t, err)
- url := fmt.Sprintf("http://%s/ping", httpAddr)
- resp, err := http.Get(url)
- test.Nil(t, err)
- test.Equal(t, 500, resp.StatusCode)
- body, _ := ioutil.ReadAll(resp.Body)
- resp.Body.Close()
- test.Equal(t, "NOK - never gonna happen", string(body))
- topic.backend = &errorRecoveredBackendQueue{}
- msg = NewMessage(topic.GenerateID(), make([]byte, 100))
- err = topic.PutMessages([]*Message{msg})
- test.Nil(t, err)
- resp, err = http.Get(url)
- test.Nil(t, err)
- test.Equal(t, 200, resp.StatusCode)
- body, _ = ioutil.ReadAll(resp.Body)
- resp.Body.Close()
- test.Equal(t, "OK", string(body))
- }
- func TestDeletes(t *testing.T) {
- opts := NewOptions()
- opts.Logger = test.NewTestLogger(t)
- _, _, nsqd := mustStartNSQD(opts)
- defer os.RemoveAll(opts.DataPath)
- defer nsqd.Exit()
- topic := nsqd.GetTopic("test")
- channel1 := topic.GetChannel("ch1")
- test.NotNil(t, channel1)
- err := topic.DeleteExistingChannel("ch1")
- test.Nil(t, err)
- test.Equal(t, 0, len(topic.channelMap))
- channel2 := topic.GetChannel("ch2")
- test.NotNil(t, channel2)
- err = nsqd.DeleteExistingTopic("test")
- test.Nil(t, err)
- test.Equal(t, 0, len(topic.channelMap))
- test.Equal(t, 0, len(nsqd.topicMap))
- }
- func TestDeleteLast(t *testing.T) {
- opts := NewOptions()
- opts.Logger = test.NewTestLogger(t)
- _, _, nsqd := mustStartNSQD(opts)
- defer os.RemoveAll(opts.DataPath)
- defer nsqd.Exit()
- topic := nsqd.GetTopic("test")
- channel1 := topic.GetChannel("ch1")
- test.NotNil(t, channel1)
- err := topic.DeleteExistingChannel("ch1")
- test.Nil(t, err)
- test.Equal(t, 0, len(topic.channelMap))
- msg := NewMessage(topic.GenerateID(), []byte("aaaaaaaaaaaaaaaaaaaaaaaaaaa"))
- err = topic.PutMessage(msg)
- time.Sleep(100 * time.Millisecond)
- test.Nil(t, err)
- test.Equal(t, int64(1), topic.Depth())
- }
- func TestPause(t *testing.T) {
- opts := NewOptions()
- opts.Logger = test.NewTestLogger(t)
- _, _, nsqd := mustStartNSQD(opts)
- defer os.RemoveAll(opts.DataPath)
- defer nsqd.Exit()
- topicName := "test_topic_pause" + strconv.Itoa(int(time.Now().Unix()))
- topic := nsqd.GetTopic(topicName)
- err := topic.Pause()
- test.Nil(t, err)
- channel := topic.GetChannel("ch1")
- test.NotNil(t, channel)
- msg := NewMessage(topic.GenerateID(), []byte("aaaaaaaaaaaaaaaaaaaaaaaaaaa"))
- err = topic.PutMessage(msg)
- test.Nil(t, err)
- time.Sleep(15 * time.Millisecond)
- test.Equal(t, int64(1), topic.Depth())
- test.Equal(t, int64(0), channel.Depth())
- err = topic.UnPause()
- test.Nil(t, err)
- time.Sleep(15 * time.Millisecond)
- test.Equal(t, int64(0), topic.Depth())
- test.Equal(t, int64(1), channel.Depth())
- }
- func BenchmarkTopicPut(b *testing.B) {
- b.StopTimer()
- topicName := "bench_topic_put" + strconv.Itoa(b.N)
- opts := NewOptions()
- opts.Logger = test.NewTestLogger(b)
- opts.MemQueueSize = int64(b.N)
- _, _, nsqd := mustStartNSQD(opts)
- defer os.RemoveAll(opts.DataPath)
- defer nsqd.Exit()
- b.StartTimer()
- for i := 0; i <= b.N; i++ {
- topic := nsqd.GetTopic(topicName)
- msg := NewMessage(topic.GenerateID(), []byte("aaaaaaaaaaaaaaaaaaaaaaaaaaa"))
- topic.PutMessage(msg)
- }
- }
- func BenchmarkTopicToChannelPut(b *testing.B) {
- b.StopTimer()
- topicName := "bench_topic_to_channel_put" + strconv.Itoa(b.N)
- channelName := "bench"
- opts := NewOptions()
- opts.Logger = test.NewTestLogger(b)
- opts.MemQueueSize = int64(b.N)
- _, _, nsqd := mustStartNSQD(opts)
- defer os.RemoveAll(opts.DataPath)
- defer nsqd.Exit()
- channel := nsqd.GetTopic(topicName).GetChannel(channelName)
- b.StartTimer()
- for i := 0; i <= b.N; i++ {
- topic := nsqd.GetTopic(topicName)
- msg := NewMessage(topic.GenerateID(), []byte("aaaaaaaaaaaaaaaaaaaaaaaaaaa"))
- topic.PutMessage(msg)
- }
- for {
- if len(channel.memoryMsgChan) == b.N {
- break
- }
- runtime.Gosched()
- }
- }
|