12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273 |
- package nsqd
- import (
- "io"
- "net"
- "sync"
- "github.com/nsqio/nsq/internal/protocol"
- )
- const (
- typeConsumer = iota
- typeProducer
- )
- type Client interface {
- Type() int
- Stats(string) ClientStats
- }
- type tcpServer struct {
- nsqd *NSQD
- conns sync.Map
- }
- func (p *tcpServer) Handle(conn net.Conn) {
- p.nsqd.logf(LOG_INFO, "TCP: new client(%s)", conn.RemoteAddr())
- // The client should initialize itself by sending a 4 byte sequence indicating
- // the version of the protocol that it intends to communicate, this will allow us
- // to gracefully upgrade the protocol away from text/line oriented to whatever...
- buf := make([]byte, 4)
- _, err := io.ReadFull(conn, buf)
- if err != nil {
- p.nsqd.logf(LOG_ERROR, "failed to read protocol version - %s", err)
- conn.Close()
- return
- }
- protocolMagic := string(buf)
- p.nsqd.logf(LOG_INFO, "CLIENT(%s): desired protocol magic '%s'",
- conn.RemoteAddr(), protocolMagic)
- var prot protocol.Protocol
- switch protocolMagic {
- case " V2":
- prot = &protocolV2{nsqd: p.nsqd}
- default:
- protocol.SendFramedResponse(conn, frameTypeError, []byte("E_BAD_PROTOCOL"))
- conn.Close()
- p.nsqd.logf(LOG_ERROR, "client(%s) bad protocol magic '%s'",
- conn.RemoteAddr(), protocolMagic)
- return
- }
- client := prot.NewClient(conn)
- p.conns.Store(conn.RemoteAddr(), client)
- err = prot.IOLoop(client)
- if err != nil {
- p.nsqd.logf(LOG_ERROR, "client(%s) - %s", conn.RemoteAddr(), err)
- }
- p.conns.Delete(conn.RemoteAddr())
- client.Close()
- }
- func (p *tcpServer) Close() {
- p.conns.Range(func(k, v interface{}) bool {
- v.(protocol.Client).Close()
- return true
- })
- }
|