tcp.go 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. package nsqd
  2. import (
  3. "io"
  4. "net"
  5. "sync"
  6. "github.com/nsqio/nsq/internal/protocol"
  7. )
  8. const (
  9. typeConsumer = iota
  10. typeProducer
  11. )
  12. type Client interface {
  13. Type() int
  14. Stats(string) ClientStats
  15. }
  16. type tcpServer struct {
  17. nsqd *NSQD
  18. conns sync.Map
  19. }
  20. func (p *tcpServer) Handle(conn net.Conn) {
  21. p.nsqd.logf(LOG_INFO, "TCP: new client(%s)", conn.RemoteAddr())
  22. // The client should initialize itself by sending a 4 byte sequence indicating
  23. // the version of the protocol that it intends to communicate, this will allow us
  24. // to gracefully upgrade the protocol away from text/line oriented to whatever...
  25. buf := make([]byte, 4)
  26. _, err := io.ReadFull(conn, buf)
  27. if err != nil {
  28. p.nsqd.logf(LOG_ERROR, "failed to read protocol version - %s", err)
  29. conn.Close()
  30. return
  31. }
  32. protocolMagic := string(buf)
  33. p.nsqd.logf(LOG_INFO, "CLIENT(%s): desired protocol magic '%s'",
  34. conn.RemoteAddr(), protocolMagic)
  35. var prot protocol.Protocol
  36. switch protocolMagic {
  37. case " V2":
  38. prot = &protocolV2{nsqd: p.nsqd}
  39. default:
  40. protocol.SendFramedResponse(conn, frameTypeError, []byte("E_BAD_PROTOCOL"))
  41. conn.Close()
  42. p.nsqd.logf(LOG_ERROR, "client(%s) bad protocol magic '%s'",
  43. conn.RemoteAddr(), protocolMagic)
  44. return
  45. }
  46. client := prot.NewClient(conn)
  47. p.conns.Store(conn.RemoteAddr(), client)
  48. err = prot.IOLoop(client)
  49. if err != nil {
  50. p.nsqd.logf(LOG_ERROR, "client(%s) - %s", conn.RemoteAddr(), err)
  51. }
  52. p.conns.Delete(conn.RemoteAddr())
  53. client.Close()
  54. }
  55. func (p *tcpServer) Close() {
  56. p.conns.Range(func(k, v interface{}) bool {
  57. v.(protocol.Client).Close()
  58. return true
  59. })
  60. }