lookup_peer.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. package nsqd
  2. import (
  3. "encoding/binary"
  4. "fmt"
  5. "io"
  6. "net"
  7. "time"
  8. "github.com/nsqio/go-nsq"
  9. "github.com/nsqio/nsq/internal/lg"
  10. )
  11. // lookupPeer is a low-level type for connecting/reading/writing to nsqlookupd
  12. //
  13. // A lookupPeer instance is designed to connect lazily to nsqlookupd and reconnect
  14. // gracefully (i.e. it is all handled by the library). Clients can simply use the
  15. // Command interface to perform a round-trip.
  16. type lookupPeer struct {
  17. logf lg.AppLogFunc
  18. addr string
  19. conn net.Conn
  20. state int32
  21. connectCallback func(*lookupPeer)
  22. maxBodySize int64
  23. Info peerInfo
  24. }
  25. // peerInfo contains metadata for a lookupPeer instance (and is JSON marshalable)
  26. type peerInfo struct {
  27. TCPPort int `json:"tcp_port"`
  28. HTTPPort int `json:"http_port"`
  29. Version string `json:"version"`
  30. BroadcastAddress string `json:"broadcast_address"`
  31. }
  32. // newLookupPeer creates a new lookupPeer instance connecting to the supplied address.
  33. //
  34. // The supplied connectCallback will be called *every* time the instance connects.
  35. func newLookupPeer(addr string, maxBodySize int64, l lg.AppLogFunc, connectCallback func(*lookupPeer)) *lookupPeer {
  36. return &lookupPeer{
  37. logf: l,
  38. addr: addr,
  39. state: stateDisconnected,
  40. maxBodySize: maxBodySize,
  41. connectCallback: connectCallback,
  42. }
  43. }
  44. // Connect will Dial the specified address, with timeouts
  45. func (lp *lookupPeer) Connect() error {
  46. lp.logf(lg.INFO, "LOOKUP connecting to %s", lp.addr)
  47. conn, err := net.DialTimeout("tcp", lp.addr, time.Second)
  48. if err != nil {
  49. return err
  50. }
  51. lp.conn = conn
  52. return nil
  53. }
  54. // String returns the specified address
  55. func (lp *lookupPeer) String() string {
  56. return lp.addr
  57. }
  58. // Read implements the io.Reader interface, adding deadlines
  59. func (lp *lookupPeer) Read(data []byte) (int, error) {
  60. lp.conn.SetReadDeadline(time.Now().Add(time.Second))
  61. return lp.conn.Read(data)
  62. }
  63. // Write implements the io.Writer interface, adding deadlines
  64. func (lp *lookupPeer) Write(data []byte) (int, error) {
  65. lp.conn.SetWriteDeadline(time.Now().Add(time.Second))
  66. return lp.conn.Write(data)
  67. }
  68. // Close implements the io.Closer interface
  69. func (lp *lookupPeer) Close() error {
  70. lp.state = stateDisconnected
  71. if lp.conn != nil {
  72. return lp.conn.Close()
  73. }
  74. return nil
  75. }
  76. // Command performs a round-trip for the specified Command.
  77. //
  78. // It will lazily connect to nsqlookupd and gracefully handle
  79. // reconnecting in the event of a failure.
  80. //
  81. // It returns the response from nsqlookupd as []byte
  82. func (lp *lookupPeer) Command(cmd *nsq.Command) ([]byte, error) {
  83. initialState := lp.state
  84. if lp.state != stateConnected {
  85. err := lp.Connect()
  86. if err != nil {
  87. return nil, err
  88. }
  89. lp.state = stateConnected
  90. _, err = lp.Write(nsq.MagicV1)
  91. if err != nil {
  92. lp.Close()
  93. return nil, err
  94. }
  95. if initialState == stateDisconnected {
  96. lp.connectCallback(lp)
  97. }
  98. if lp.state != stateConnected {
  99. return nil, fmt.Errorf("lookupPeer connectCallback() failed")
  100. }
  101. }
  102. if cmd == nil {
  103. return nil, nil
  104. }
  105. _, err := cmd.WriteTo(lp)
  106. if err != nil {
  107. lp.Close()
  108. return nil, err
  109. }
  110. resp, err := readResponseBounded(lp, lp.maxBodySize)
  111. if err != nil {
  112. lp.Close()
  113. return nil, err
  114. }
  115. return resp, nil
  116. }
  117. func readResponseBounded(r io.Reader, limit int64) ([]byte, error) {
  118. var msgSize int32
  119. // message size
  120. err := binary.Read(r, binary.BigEndian, &msgSize)
  121. if err != nil {
  122. return nil, err
  123. }
  124. if int64(msgSize) > limit {
  125. return nil, fmt.Errorf("response body size (%d) is greater than limit (%d)",
  126. msgSize, limit)
  127. }
  128. // message binary data
  129. buf := make([]byte, msgSize)
  130. _, err = io.ReadFull(r, buf)
  131. if err != nil {
  132. return nil, err
  133. }
  134. return buf, nil
  135. }