package nsqlookupd import ( "bufio" "encoding/binary" "encoding/json" "fmt" "io" "log" "net" "os" "strings" "sync/atomic" "time" "github.com/nsqio/nsq/internal/protocol" "github.com/nsqio/nsq/internal/version" ) type LookupProtocolV1 struct { nsqlookupd *NSQLookupd } func (p *LookupProtocolV1) NewClient(conn net.Conn) protocol.Client { return NewClientV1(conn) } func (p *LookupProtocolV1) IOLoop(c protocol.Client) error { var err error var line string client := c.(*ClientV1) reader := bufio.NewReader(client) for { line, err = reader.ReadString('\n') if err != nil { break } line = strings.TrimSpace(line) params := strings.Split(line, " ") var response []byte response, err = p.Exec(client, reader, params) if err != nil { ctx := "" if parentErr := err.(protocol.ChildErr).Parent(); parentErr != nil { ctx = " - " + parentErr.Error() } p.nsqlookupd.logf(LOG_ERROR, "[%s] - %s%s", client, err, ctx) _, sendErr := protocol.SendResponse(client, []byte(err.Error())) if sendErr != nil { p.nsqlookupd.logf(LOG_ERROR, "[%s] - %s%s", client, sendErr, ctx) break } // errors of type FatalClientErr should forceably close the connection if _, ok := err.(*protocol.FatalClientErr); ok { break } continue } if response != nil { _, err = protocol.SendResponse(client, response) if err != nil { break } } } p.nsqlookupd.logf(LOG_INFO, "PROTOCOL(V1): [%s] exiting ioloop", client) if client.peerInfo != nil { registrations := p.nsqlookupd.DB.LookupRegistrations(client.peerInfo.id) for _, r := range registrations { if removed, _ := p.nsqlookupd.DB.RemoveProducer(r, client.peerInfo.id); removed { p.nsqlookupd.logf(LOG_INFO, "DB: client(%s) UNREGISTER category:%s key:%s subkey:%s", client, r.Category, r.Key, r.SubKey) } } } return err } func (p *LookupProtocolV1) Exec(client *ClientV1, reader *bufio.Reader, params []string) ([]byte, error) { switch params[0] { case "PING": return p.PING(client, params) case "IDENTIFY": return p.IDENTIFY(client, reader, params[1:]) case "REGISTER": return p.REGISTER(client, reader, params[1:]) case "UNREGISTER": return p.UNREGISTER(client, reader, params[1:]) } return nil, protocol.NewFatalClientErr(nil, "E_INVALID", fmt.Sprintf("invalid command %s", params[0])) } func getTopicChan(command string, params []string) (string, string, error) { if len(params) == 0 { return "", "", protocol.NewFatalClientErr(nil, "E_INVALID", fmt.Sprintf("%s insufficient number of params", command)) } topicName := params[0] var channelName string if len(params) >= 2 { channelName = params[1] } if !protocol.IsValidTopicName(topicName) { return "", "", protocol.NewFatalClientErr(nil, "E_BAD_TOPIC", fmt.Sprintf("%s topic name '%s' is not valid", command, topicName)) } if channelName != "" && !protocol.IsValidChannelName(channelName) { return "", "", protocol.NewFatalClientErr(nil, "E_BAD_CHANNEL", fmt.Sprintf("%s channel name '%s' is not valid", command, channelName)) } return topicName, channelName, nil } func (p *LookupProtocolV1) REGISTER(client *ClientV1, reader *bufio.Reader, params []string) ([]byte, error) { if client.peerInfo == nil { return nil, protocol.NewFatalClientErr(nil, "E_INVALID", "client must IDENTIFY") } topic, channel, err := getTopicChan("REGISTER", params) if err != nil { return nil, err } if channel != "" { key := Registration{"channel", topic, channel} if p.nsqlookupd.DB.AddProducer(key, &Producer{peerInfo: client.peerInfo}) { p.nsqlookupd.logf(LOG_INFO, "DB: client(%s) REGISTER category:%s key:%s subkey:%s", client, "channel", topic, channel) } } key := Registration{"topic", topic, ""} if p.nsqlookupd.DB.AddProducer(key, &Producer{peerInfo: client.peerInfo}) { p.nsqlookupd.logf(LOG_INFO, "DB: client(%s) REGISTER category:%s key:%s subkey:%s", client, "topic", topic, "") } return []byte("OK"), nil } func (p *LookupProtocolV1) UNREGISTER(client *ClientV1, reader *bufio.Reader, params []string) ([]byte, error) { if client.peerInfo == nil { return nil, protocol.NewFatalClientErr(nil, "E_INVALID", "client must IDENTIFY") } topic, channel, err := getTopicChan("UNREGISTER", params) if err != nil { return nil, err } if channel != "" { key := Registration{"channel", topic, channel} removed, left := p.nsqlookupd.DB.RemoveProducer(key, client.peerInfo.id) if removed { p.nsqlookupd.logf(LOG_INFO, "DB: client(%s) UNREGISTER category:%s key:%s subkey:%s", client, "channel", topic, channel) } // for ephemeral channels, remove the channel as well if it has no producers if left == 0 && strings.HasSuffix(channel, "#ephemeral") { p.nsqlookupd.DB.RemoveRegistration(key) } } else { // no channel was specified so this is a topic unregistration // remove all of the channel registrations... // normally this shouldn't happen which is why we print a warning message // if anything is actually removed registrations := p.nsqlookupd.DB.FindRegistrations("channel", topic, "*") for _, r := range registrations { removed, _ := p.nsqlookupd.DB.RemoveProducer(r, client.peerInfo.id) if removed { p.nsqlookupd.logf(LOG_WARN, "client(%s) unexpected UNREGISTER category:%s key:%s subkey:%s", client, "channel", topic, r.SubKey) } } key := Registration{"topic", topic, ""} removed, left := p.nsqlookupd.DB.RemoveProducer(key, client.peerInfo.id) if removed { p.nsqlookupd.logf(LOG_INFO, "DB: client(%s) UNREGISTER category:%s key:%s subkey:%s", client, "topic", topic, "") } if left == 0 && strings.HasSuffix(topic, "#ephemeral") { p.nsqlookupd.DB.RemoveRegistration(key) } } return []byte("OK"), nil } func (p *LookupProtocolV1) IDENTIFY(client *ClientV1, reader *bufio.Reader, params []string) ([]byte, error) { var err error if client.peerInfo != nil { return nil, protocol.NewFatalClientErr(err, "E_INVALID", "cannot IDENTIFY again") } var bodyLen int32 err = binary.Read(reader, binary.BigEndian, &bodyLen) if err != nil { return nil, protocol.NewFatalClientErr(err, "E_BAD_BODY", "IDENTIFY failed to read body size") } body := make([]byte, bodyLen) _, err = io.ReadFull(reader, body) if err != nil { return nil, protocol.NewFatalClientErr(err, "E_BAD_BODY", "IDENTIFY failed to read body") } // body is a json structure with producer information peerInfo := PeerInfo{id: client.RemoteAddr().String()} err = json.Unmarshal(body, &peerInfo) if err != nil { return nil, protocol.NewFatalClientErr(err, "E_BAD_BODY", "IDENTIFY failed to decode JSON body") } peerInfo.RemoteAddress = client.RemoteAddr().String() // require all fields if peerInfo.BroadcastAddress == "" || peerInfo.TCPPort == 0 || peerInfo.HTTPPort == 0 || peerInfo.Version == "" { return nil, protocol.NewFatalClientErr(nil, "E_BAD_BODY", "IDENTIFY missing fields") } atomic.StoreInt64(&peerInfo.lastUpdate, time.Now().UnixNano()) p.nsqlookupd.logf(LOG_INFO, "CLIENT(%s): IDENTIFY Address:%s TCP:%d HTTP:%d Version:%s", client, peerInfo.BroadcastAddress, peerInfo.TCPPort, peerInfo.HTTPPort, peerInfo.Version) client.peerInfo = &peerInfo if p.nsqlookupd.DB.AddProducer(Registration{"client", "", ""}, &Producer{peerInfo: client.peerInfo}) { p.nsqlookupd.logf(LOG_INFO, "DB: client(%s) REGISTER category:%s key:%s subkey:%s", client, "client", "", "") } // build a response data := make(map[string]interface{}) data["tcp_port"] = p.nsqlookupd.RealTCPAddr().Port data["http_port"] = p.nsqlookupd.RealHTTPAddr().Port data["version"] = version.Binary hostname, err := os.Hostname() if err != nil { log.Fatalf("ERROR: unable to get hostname %s", err) } data["broadcast_address"] = p.nsqlookupd.opts.BroadcastAddress data["hostname"] = hostname response, err := json.Marshal(data) if err != nil { p.nsqlookupd.logf(LOG_ERROR, "marshaling %v", data) return []byte("OK"), nil } return response, nil } func (p *LookupProtocolV1) PING(client *ClientV1, params []string) ([]byte, error) { if client.peerInfo != nil { // we could get a PING before other commands on the same client connection cur := time.Unix(0, atomic.LoadInt64(&client.peerInfo.lastUpdate)) now := time.Now() p.nsqlookupd.logf(LOG_INFO, "CLIENT(%s): pinged (last ping %s)", client.peerInfo.id, now.Sub(cur)) atomic.StoreInt64(&client.peerInfo.lastUpdate, now.UnixNano()) } return []byte("OK"), nil }