package nsqlookupd import ( "fmt" "log" "net" "os" "sync" "github.com/nsqio/nsq/internal/http_api" "github.com/nsqio/nsq/internal/protocol" "github.com/nsqio/nsq/internal/util" "github.com/nsqio/nsq/internal/version" ) type NSQLookupd struct { sync.RWMutex opts *Options tcpListener net.Listener httpListener net.Listener tcpServer *tcpServer waitGroup util.WaitGroupWrapper DB *RegistrationDB } func New(opts *Options) (*NSQLookupd, error) { var err error if opts.Logger == nil { opts.Logger = log.New(os.Stderr, opts.LogPrefix, log.Ldate|log.Ltime|log.Lmicroseconds) } l := &NSQLookupd{ opts: opts, DB: NewRegistrationDB(), } l.logf(LOG_INFO, version.String("smqlookupd")) l.tcpServer = &tcpServer{nsqlookupd: l} l.tcpListener, err = net.Listen("tcp", opts.TCPAddress) if err != nil { return nil, fmt.Errorf("listen (%s) failed - %s", opts.TCPAddress, err) } l.httpListener, err = net.Listen("tcp", opts.HTTPAddress) if err != nil { return nil, fmt.Errorf("listen (%s) failed - %s", opts.HTTPAddress, err) } return l, nil } // Main starts an instance of nsqlookupd and returns an // error if there was a problem starting up. func (l *NSQLookupd) Main() error { exitCh := make(chan error) var once sync.Once exitFunc := func(err error) { once.Do(func() { if err != nil { l.logf(LOG_FATAL, "%s", err) } exitCh <- err }) } l.waitGroup.Wrap(func() { exitFunc(protocol.TCPServer(l.tcpListener, l.tcpServer, l.logf)) }) httpServer := newHTTPServer(l) l.waitGroup.Wrap(func() { exitFunc(http_api.Serve(l.httpListener, httpServer, "HTTP", l.logf)) }) err := <-exitCh return err } func (l *NSQLookupd) RealTCPAddr() *net.TCPAddr { return l.tcpListener.Addr().(*net.TCPAddr) } func (l *NSQLookupd) RealHTTPAddr() *net.TCPAddr { return l.httpListener.Addr().(*net.TCPAddr) } func (l *NSQLookupd) Exit() { if l.tcpListener != nil { l.tcpListener.Close() } if l.tcpServer != nil { l.tcpServer.Close() } if l.httpListener != nil { l.httpListener.Close() } l.waitGroup.Wait() }