lookup_protocol_v1_test.go 1.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
  1. package nsqlookupd
  2. import (
  3. "errors"
  4. "testing"
  5. "time"
  6. "github.com/nsqio/nsq/internal/protocol"
  7. "github.com/nsqio/nsq/internal/test"
  8. )
  9. func TestIOLoopReturnsClientErrWhenSendFails(t *testing.T) {
  10. fakeConn := test.NewFakeNetConn()
  11. fakeConn.WriteFunc = func(b []byte) (int, error) {
  12. return 0, errors.New("write error")
  13. }
  14. testIOLoopReturnsClientErr(t, fakeConn)
  15. }
  16. func TestIOLoopReturnsClientErrWhenSendSucceeds(t *testing.T) {
  17. fakeConn := test.NewFakeNetConn()
  18. fakeConn.WriteFunc = func(b []byte) (int, error) {
  19. return len(b), nil
  20. }
  21. testIOLoopReturnsClientErr(t, fakeConn)
  22. }
  23. func testIOLoopReturnsClientErr(t *testing.T, fakeConn test.FakeNetConn) {
  24. fakeConn.ReadFunc = func(b []byte) (int, error) {
  25. return copy(b, []byte("INVALID_COMMAND\n")), nil
  26. }
  27. opts := NewOptions()
  28. opts.Logger = test.NewTestLogger(t)
  29. opts.LogLevel = LOG_DEBUG
  30. opts.TCPAddress = "127.0.0.1:0"
  31. opts.HTTPAddress = "127.0.0.1:0"
  32. nsqlookupd, err := New(opts)
  33. test.Nil(t, err)
  34. prot := &LookupProtocolV1{nsqlookupd: nsqlookupd}
  35. nsqlookupd.tcpServer = &tcpServer{nsqlookupd: prot.nsqlookupd}
  36. errChan := make(chan error)
  37. testIOLoop := func() {
  38. client := prot.NewClient(fakeConn)
  39. errChan <- prot.IOLoop(client)
  40. defer prot.nsqlookupd.Exit()
  41. }
  42. go testIOLoop()
  43. var timeout bool
  44. select {
  45. case err = <-errChan:
  46. case <-time.After(2 * time.Second):
  47. timeout = true
  48. }
  49. test.Equal(t, false, timeout)
  50. test.NotNil(t, err)
  51. test.Equal(t, "E_INVALID invalid command INVALID_COMMAND", err.Error())
  52. test.NotNil(t, err.(*protocol.FatalClientErr))
  53. }