nsqadmin.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. package nsqadmin
  2. import (
  3. "bytes"
  4. "crypto/tls"
  5. "crypto/x509"
  6. "encoding/json"
  7. "errors"
  8. "fmt"
  9. "io/ioutil"
  10. "log"
  11. "net"
  12. "net/http"
  13. "net/url"
  14. "os"
  15. "path"
  16. "sync"
  17. "sync/atomic"
  18. "github.com/nsqio/nsq/internal/http_api"
  19. "github.com/nsqio/nsq/internal/util"
  20. "github.com/nsqio/nsq/internal/version"
  21. )
  22. type NSQAdmin struct {
  23. sync.RWMutex
  24. opts atomic.Value
  25. httpListener net.Listener
  26. waitGroup util.WaitGroupWrapper
  27. notifications chan *AdminAction
  28. graphiteURL *url.URL
  29. httpClientTLSConfig *tls.Config
  30. }
  31. func New(opts *Options) (*NSQAdmin, error) {
  32. if opts.Logger == nil {
  33. opts.Logger = log.New(os.Stderr, opts.LogPrefix, log.Ldate|log.Ltime|log.Lmicroseconds)
  34. }
  35. n := &NSQAdmin{
  36. notifications: make(chan *AdminAction),
  37. }
  38. n.swapOpts(opts)
  39. if len(opts.NSQDHTTPAddresses) == 0 && len(opts.NSQLookupdHTTPAddresses) == 0 {
  40. return nil, errors.New("--nsqd-http-address or --lookupd-http-address required")
  41. }
  42. if len(opts.NSQDHTTPAddresses) != 0 && len(opts.NSQLookupdHTTPAddresses) != 0 {
  43. return nil, errors.New("use --nsqd-http-address or --lookupd-http-address not both")
  44. }
  45. if opts.HTTPClientTLSCert != "" && opts.HTTPClientTLSKey == "" {
  46. return nil, errors.New("--http-client-tls-key must be specified with --http-client-tls-cert")
  47. }
  48. if opts.HTTPClientTLSKey != "" && opts.HTTPClientTLSCert == "" {
  49. return nil, errors.New("--http-client-tls-cert must be specified with --http-client-tls-key")
  50. }
  51. n.httpClientTLSConfig = &tls.Config{
  52. InsecureSkipVerify: opts.HTTPClientTLSInsecureSkipVerify,
  53. }
  54. if opts.HTTPClientTLSCert != "" && opts.HTTPClientTLSKey != "" {
  55. cert, err := tls.LoadX509KeyPair(opts.HTTPClientTLSCert, opts.HTTPClientTLSKey)
  56. if err != nil {
  57. return nil, fmt.Errorf("failed to LoadX509KeyPair %s, %s - %s",
  58. opts.HTTPClientTLSCert, opts.HTTPClientTLSKey, err)
  59. }
  60. n.httpClientTLSConfig.Certificates = []tls.Certificate{cert}
  61. }
  62. if opts.HTTPClientTLSRootCAFile != "" {
  63. tlsCertPool := x509.NewCertPool()
  64. caCertFile, err := ioutil.ReadFile(opts.HTTPClientTLSRootCAFile)
  65. if err != nil {
  66. return nil, fmt.Errorf("failed to read TLS root CA file %s - %s",
  67. opts.HTTPClientTLSRootCAFile, err)
  68. }
  69. if !tlsCertPool.AppendCertsFromPEM(caCertFile) {
  70. return nil, fmt.Errorf("failed to AppendCertsFromPEM %s", opts.HTTPClientTLSRootCAFile)
  71. }
  72. n.httpClientTLSConfig.RootCAs = tlsCertPool
  73. }
  74. for _, address := range opts.NSQLookupdHTTPAddresses {
  75. _, err := net.ResolveTCPAddr("tcp", address)
  76. if err != nil {
  77. return nil, fmt.Errorf("failed to resolve --lookupd-http-address (%s) - %s", address, err)
  78. }
  79. }
  80. for _, address := range opts.NSQDHTTPAddresses {
  81. _, err := net.ResolveTCPAddr("tcp", address)
  82. if err != nil {
  83. return nil, fmt.Errorf("failed to resolve --nsqd-http-address (%s) - %s", address, err)
  84. }
  85. }
  86. if opts.ProxyGraphite {
  87. url, err := url.Parse(opts.GraphiteURL)
  88. if err != nil {
  89. return nil, fmt.Errorf("failed to parse --graphite-url (%s) - %s", opts.GraphiteURL, err)
  90. }
  91. n.graphiteURL = url
  92. }
  93. if opts.AllowConfigFromCIDR != "" {
  94. _, _, err := net.ParseCIDR(opts.AllowConfigFromCIDR)
  95. if err != nil {
  96. return nil, fmt.Errorf("failed to parse --allow-config-from-cidr (%s) - %s", opts.AllowConfigFromCIDR, err)
  97. }
  98. }
  99. opts.BasePath = normalizeBasePath(opts.BasePath)
  100. n.logf(LOG_INFO, version.String("smqadmin"))
  101. var err error
  102. n.httpListener, err = net.Listen("tcp", n.getOpts().HTTPAddress)
  103. if err != nil {
  104. return nil, fmt.Errorf("listen (%s) failed - %s", n.getOpts().HTTPAddress, err)
  105. }
  106. return n, nil
  107. }
  108. func normalizeBasePath(p string) string {
  109. if len(p) == 0 {
  110. return "/"
  111. }
  112. // add leading slash
  113. if p[0] != '/' {
  114. p = "/" + p
  115. }
  116. return path.Clean(p)
  117. }
  118. func (n *NSQAdmin) getOpts() *Options {
  119. return n.opts.Load().(*Options)
  120. }
  121. func (n *NSQAdmin) swapOpts(opts *Options) {
  122. n.opts.Store(opts)
  123. }
  124. func (n *NSQAdmin) RealHTTPAddr() *net.TCPAddr {
  125. return n.httpListener.Addr().(*net.TCPAddr)
  126. }
  127. func (n *NSQAdmin) handleAdminActions() {
  128. for action := range n.notifications {
  129. content, err := json.Marshal(action)
  130. if err != nil {
  131. n.logf(LOG_ERROR, "failed to serialize admin action - %s", err)
  132. }
  133. httpclient := &http.Client{
  134. Transport: http_api.NewDeadlineTransport(n.getOpts().HTTPClientConnectTimeout, n.getOpts().HTTPClientRequestTimeout),
  135. }
  136. n.logf(LOG_INFO, "POSTing notification to %s", n.getOpts().NotificationHTTPEndpoint)
  137. resp, err := httpclient.Post(n.getOpts().NotificationHTTPEndpoint,
  138. "application/json", bytes.NewBuffer(content))
  139. if err != nil {
  140. n.logf(LOG_ERROR, "failed to POST notification - %s", err)
  141. }
  142. resp.Body.Close()
  143. }
  144. }
  145. func (n *NSQAdmin) Main() error {
  146. exitCh := make(chan error)
  147. var once sync.Once
  148. exitFunc := func(err error) {
  149. once.Do(func() {
  150. if err != nil {
  151. n.logf(LOG_FATAL, "%s", err)
  152. }
  153. exitCh <- err
  154. })
  155. }
  156. httpServer := NewHTTPServer(n)
  157. n.waitGroup.Wrap(func() {
  158. exitFunc(http_api.Serve(n.httpListener, http_api.CompressHandler(httpServer), "HTTP", n.logf))
  159. })
  160. n.waitGroup.Wrap(n.handleAdminActions)
  161. err := <-exitCh
  162. return err
  163. }
  164. func (n *NSQAdmin) Exit() {
  165. if n.httpListener != nil {
  166. n.httpListener.Close()
  167. }
  168. close(n.notifications)
  169. n.waitGroup.Wait()
  170. }