nsqlookupd.go 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. package nsqlookupd
  2. import (
  3. "fmt"
  4. "log"
  5. "net"
  6. "os"
  7. "sync"
  8. "github.com/nsqio/nsq/internal/http_api"
  9. "github.com/nsqio/nsq/internal/protocol"
  10. "github.com/nsqio/nsq/internal/util"
  11. "github.com/nsqio/nsq/internal/version"
  12. )
  13. type NSQLookupd struct {
  14. sync.RWMutex
  15. opts *Options
  16. tcpListener net.Listener
  17. httpListener net.Listener
  18. tcpServer *tcpServer
  19. waitGroup util.WaitGroupWrapper
  20. DB *RegistrationDB
  21. }
  22. func New(opts *Options) (*NSQLookupd, error) {
  23. var err error
  24. if opts.Logger == nil {
  25. opts.Logger = log.New(os.Stderr, opts.LogPrefix, log.Ldate|log.Ltime|log.Lmicroseconds)
  26. }
  27. l := &NSQLookupd{
  28. opts: opts,
  29. DB: NewRegistrationDB(),
  30. }
  31. l.logf(LOG_INFO, version.String("smqlookupd"))
  32. l.tcpServer = &tcpServer{nsqlookupd: l}
  33. l.tcpListener, err = net.Listen("tcp", opts.TCPAddress)
  34. if err != nil {
  35. return nil, fmt.Errorf("listen (%s) failed - %s", opts.TCPAddress, err)
  36. }
  37. l.httpListener, err = net.Listen("tcp", opts.HTTPAddress)
  38. if err != nil {
  39. return nil, fmt.Errorf("listen (%s) failed - %s", opts.HTTPAddress, err)
  40. }
  41. return l, nil
  42. }
  43. // Main starts an instance of nsqlookupd and returns an
  44. // error if there was a problem starting up.
  45. func (l *NSQLookupd) Main() error {
  46. exitCh := make(chan error)
  47. var once sync.Once
  48. exitFunc := func(err error) {
  49. once.Do(func() {
  50. if err != nil {
  51. l.logf(LOG_FATAL, "%s", err)
  52. }
  53. exitCh <- err
  54. })
  55. }
  56. l.waitGroup.Wrap(func() {
  57. exitFunc(protocol.TCPServer(l.tcpListener, l.tcpServer, l.logf))
  58. })
  59. httpServer := newHTTPServer(l)
  60. l.waitGroup.Wrap(func() {
  61. exitFunc(http_api.Serve(l.httpListener, httpServer, "HTTP", l.logf))
  62. })
  63. err := <-exitCh
  64. return err
  65. }
  66. func (l *NSQLookupd) RealTCPAddr() *net.TCPAddr {
  67. return l.tcpListener.Addr().(*net.TCPAddr)
  68. }
  69. func (l *NSQLookupd) RealHTTPAddr() *net.TCPAddr {
  70. return l.httpListener.Addr().(*net.TCPAddr)
  71. }
  72. func (l *NSQLookupd) Exit() {
  73. if l.tcpListener != nil {
  74. l.tcpListener.Close()
  75. }
  76. if l.tcpServer != nil {
  77. l.tcpServer.Close()
  78. }
  79. if l.httpListener != nil {
  80. l.httpListener.Close()
  81. }
  82. l.waitGroup.Wait()
  83. }