tcp.go 1.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263
  1. package nsqlookupd
  2. import (
  3. "io"
  4. "net"
  5. "sync"
  6. "github.com/nsqio/nsq/internal/protocol"
  7. )
  8. type tcpServer struct {
  9. nsqlookupd *NSQLookupd
  10. conns sync.Map
  11. }
  12. func (p *tcpServer) Handle(conn net.Conn) {
  13. p.nsqlookupd.logf(LOG_INFO, "TCP: new client(%s)", conn.RemoteAddr())
  14. // The client should initialize itself by sending a 4 byte sequence indicating
  15. // the version of the protocol that it intends to communicate, this will allow us
  16. // to gracefully upgrade the protocol away from text/line oriented to whatever...
  17. buf := make([]byte, 4)
  18. _, err := io.ReadFull(conn, buf)
  19. if err != nil {
  20. p.nsqlookupd.logf(LOG_ERROR, "failed to read protocol version - %s", err)
  21. conn.Close()
  22. return
  23. }
  24. protocolMagic := string(buf)
  25. p.nsqlookupd.logf(LOG_INFO, "CLIENT(%s): desired protocol magic '%s'",
  26. conn.RemoteAddr(), protocolMagic)
  27. var prot protocol.Protocol
  28. switch protocolMagic {
  29. case " V1":
  30. prot = &LookupProtocolV1{nsqlookupd: p.nsqlookupd}
  31. default:
  32. protocol.SendResponse(conn, []byte("E_BAD_PROTOCOL"))
  33. conn.Close()
  34. p.nsqlookupd.logf(LOG_ERROR, "client(%s) bad protocol magic '%s'",
  35. conn.RemoteAddr(), protocolMagic)
  36. return
  37. }
  38. client := prot.NewClient(conn)
  39. p.conns.Store(conn.RemoteAddr(), client)
  40. err = prot.IOLoop(client)
  41. if err != nil {
  42. p.nsqlookupd.logf(LOG_ERROR, "client(%s) - %s", conn.RemoteAddr(), err)
  43. }
  44. p.conns.Delete(conn.RemoteAddr())
  45. client.Close()
  46. }
  47. func (p *tcpServer) Close() {
  48. p.conns.Range(func(k, v interface{}) bool {
  49. v.(protocol.Client).Close()
  50. return true
  51. })
  52. }