123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899 |
- package nsqlookupd
- import (
- "fmt"
- "log"
- "net"
- "os"
- "sync"
- "github.com/nsqio/nsq/internal/http_api"
- "github.com/nsqio/nsq/internal/protocol"
- "github.com/nsqio/nsq/internal/util"
- "github.com/nsqio/nsq/internal/version"
- )
- type NSQLookupd struct {
- sync.RWMutex
- opts *Options
- tcpListener net.Listener
- httpListener net.Listener
- tcpServer *tcpServer
- waitGroup util.WaitGroupWrapper
- DB *RegistrationDB
- }
- func New(opts *Options) (*NSQLookupd, error) {
- var err error
- if opts.Logger == nil {
- opts.Logger = log.New(os.Stderr, opts.LogPrefix, log.Ldate|log.Ltime|log.Lmicroseconds)
- }
- l := &NSQLookupd{
- opts: opts,
- DB: NewRegistrationDB(),
- }
- l.logf(LOG_INFO, version.String("smqlookupd"))
- l.tcpServer = &tcpServer{nsqlookupd: l}
- l.tcpListener, err = net.Listen("tcp", opts.TCPAddress)
- if err != nil {
- return nil, fmt.Errorf("listen (%s) failed - %s", opts.TCPAddress, err)
- }
- l.httpListener, err = net.Listen("tcp", opts.HTTPAddress)
- if err != nil {
- return nil, fmt.Errorf("listen (%s) failed - %s", opts.HTTPAddress, err)
- }
- return l, nil
- }
- // Main starts an instance of nsqlookupd and returns an
- // error if there was a problem starting up.
- func (l *NSQLookupd) Main() error {
- exitCh := make(chan error)
- var once sync.Once
- exitFunc := func(err error) {
- once.Do(func() {
- if err != nil {
- l.logf(LOG_FATAL, "%s", err)
- }
- exitCh <- err
- })
- }
- l.waitGroup.Wrap(func() {
- exitFunc(protocol.TCPServer(l.tcpListener, l.tcpServer, l.logf))
- })
- httpServer := newHTTPServer(l)
- l.waitGroup.Wrap(func() {
- exitFunc(http_api.Serve(l.httpListener, httpServer, "HTTP", l.logf))
- })
- err := <-exitCh
- return err
- }
- func (l *NSQLookupd) RealTCPAddr() *net.TCPAddr {
- return l.tcpListener.Addr().(*net.TCPAddr)
- }
- func (l *NSQLookupd) RealHTTPAddr() *net.TCPAddr {
- return l.httpListener.Addr().(*net.TCPAddr)
- }
- func (l *NSQLookupd) Exit() {
- if l.tcpListener != nil {
- l.tcpListener.Close()
- }
- if l.tcpServer != nil {
- l.tcpServer.Close()
- }
- if l.httpListener != nil {
- l.httpListener.Close()
- }
- l.waitGroup.Wait()
- }
|