123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198 |
- package nsqd
- import (
- "bytes"
- "encoding/json"
- "net"
- "os"
- "strconv"
- "time"
- "github.com/nsqio/go-nsq"
- "github.com/nsqio/nsq/internal/version"
- )
- func connectCallback(n *NSQD, hostname string) func(*lookupPeer) {
- return func(lp *lookupPeer) {
- ci := make(map[string]interface{})
- ci["version"] = version.Binary
- ci["tcp_port"] = n.getOpts().BroadcastTCPPort
- ci["http_port"] = n.getOpts().BroadcastHTTPPort
- ci["hostname"] = hostname
- ci["broadcast_address"] = n.getOpts().BroadcastAddress
- cmd, err := nsq.Identify(ci)
- if err != nil {
- lp.Close()
- return
- }
- resp, err := lp.Command(cmd)
- if err != nil {
- n.logf(LOG_ERROR, "LOOKUPD(%s): %s - %s", lp, cmd, err)
- return
- } else if bytes.Equal(resp, []byte("E_INVALID")) {
- n.logf(LOG_INFO, "LOOKUPD(%s): lookupd returned %s", lp, resp)
- lp.Close()
- return
- } else {
- err = json.Unmarshal(resp, &lp.Info)
- if err != nil {
- n.logf(LOG_ERROR, "LOOKUPD(%s): parsing response - %s", lp, resp)
- lp.Close()
- return
- } else {
- n.logf(LOG_INFO, "LOOKUPD(%s): peer info %+v", lp, lp.Info)
- if lp.Info.BroadcastAddress == "" {
- n.logf(LOG_ERROR, "LOOKUPD(%s): no broadcast address", lp)
- }
- }
- }
- // build all the commands first so we exit the lock(s) as fast as possible
- var commands []*nsq.Command
- n.RLock()
- for _, topic := range n.topicMap {
- topic.RLock()
- if len(topic.channelMap) == 0 {
- commands = append(commands, nsq.Register(topic.name, ""))
- } else {
- for _, channel := range topic.channelMap {
- commands = append(commands, nsq.Register(channel.topicName, channel.name))
- }
- }
- topic.RUnlock()
- }
- n.RUnlock()
- for _, cmd := range commands {
- n.logf(LOG_INFO, "LOOKUPD(%s): %s", lp, cmd)
- _, err := lp.Command(cmd)
- if err != nil {
- n.logf(LOG_ERROR, "LOOKUPD(%s): %s - %s", lp, cmd, err)
- return
- }
- }
- }
- }
- func (n *NSQD) lookupLoop() {
- var lookupPeers []*lookupPeer
- var lookupAddrs []string
- connect := true
- hostname, err := os.Hostname()
- if err != nil {
- n.logf(LOG_FATAL, "failed to get hostname - %s", err)
- os.Exit(1)
- }
- // for announcements, lookupd determines the host automatically
- ticker := time.Tick(15 * time.Second)
- for {
- if connect {
- for _, host := range n.getOpts().NSQLookupdTCPAddresses {
- if in(host, lookupAddrs) {
- continue
- }
- n.logf(LOG_INFO, "LOOKUP(%s): adding peer", host)
- lookupPeer := newLookupPeer(host, n.getOpts().MaxBodySize, n.logf,
- connectCallback(n, hostname))
- lookupPeer.Command(nil) // start the connection
- lookupPeers = append(lookupPeers, lookupPeer)
- lookupAddrs = append(lookupAddrs, host)
- }
- n.lookupPeers.Store(lookupPeers)
- connect = false
- }
- select {
- case <-ticker:
- // send a heartbeat and read a response (read detects closed conns)
- for _, lookupPeer := range lookupPeers {
- n.logf(LOG_DEBUG, "LOOKUPD(%s): sending heartbeat", lookupPeer)
- cmd := nsq.Ping()
- _, err := lookupPeer.Command(cmd)
- if err != nil {
- n.logf(LOG_ERROR, "LOOKUPD(%s): %s - %s", lookupPeer, cmd, err)
- }
- }
- case val := <-n.notifyChan:
- var cmd *nsq.Command
- var branch string
- switch val.(type) {
- case *Channel:
- // notify all nsqlookupds that a new channel exists, or that it's removed
- branch = "channel"
- channel := val.(*Channel)
- if channel.Exiting() == true {
- cmd = nsq.UnRegister(channel.topicName, channel.name)
- } else {
- cmd = nsq.Register(channel.topicName, channel.name)
- }
- case *Topic:
- // notify all nsqlookupds that a new topic exists, or that it's removed
- branch = "topic"
- topic := val.(*Topic)
- if topic.Exiting() == true {
- cmd = nsq.UnRegister(topic.name, "")
- } else {
- cmd = nsq.Register(topic.name, "")
- }
- }
- for _, lookupPeer := range lookupPeers {
- n.logf(LOG_INFO, "LOOKUPD(%s): %s %s", lookupPeer, branch, cmd)
- _, err := lookupPeer.Command(cmd)
- if err != nil {
- n.logf(LOG_ERROR, "LOOKUPD(%s): %s - %s", lookupPeer, cmd, err)
- }
- }
- case <-n.optsNotificationChan:
- var tmpPeers []*lookupPeer
- var tmpAddrs []string
- for _, lp := range lookupPeers {
- if in(lp.addr, n.getOpts().NSQLookupdTCPAddresses) {
- tmpPeers = append(tmpPeers, lp)
- tmpAddrs = append(tmpAddrs, lp.addr)
- continue
- }
- n.logf(LOG_INFO, "LOOKUP(%s): removing peer", lp)
- lp.Close()
- }
- lookupPeers = tmpPeers
- lookupAddrs = tmpAddrs
- connect = true
- case <-n.exitChan:
- goto exit
- }
- }
- exit:
- n.logf(LOG_INFO, "LOOKUP: closing")
- }
- func in(s string, lst []string) bool {
- for _, v := range lst {
- if s == v {
- return true
- }
- }
- return false
- }
- func (n *NSQD) lookupdHTTPAddrs() []string {
- var lookupHTTPAddrs []string
- lookupPeers := n.lookupPeers.Load()
- if lookupPeers == nil {
- return nil
- }
- for _, lp := range lookupPeers.([]*lookupPeer) {
- if len(lp.Info.BroadcastAddress) <= 0 {
- continue
- }
- addr := net.JoinHostPort(lp.Info.BroadcastAddress, strconv.Itoa(lp.Info.HTTPPort))
- lookupHTTPAddrs = append(lookupHTTPAddrs, addr)
- }
- return lookupHTTPAddrs
- }
|