nsqd.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767
  1. package nsqd
  2. import (
  3. "context"
  4. "crypto/tls"
  5. "crypto/x509"
  6. "encoding/json"
  7. "errors"
  8. "fmt"
  9. "io/ioutil"
  10. "log"
  11. "math/rand"
  12. "net"
  13. "os"
  14. "path"
  15. "strings"
  16. "sync"
  17. "sync/atomic"
  18. "time"
  19. "github.com/nsqio/nsq/internal/clusterinfo"
  20. "github.com/nsqio/nsq/internal/dirlock"
  21. "github.com/nsqio/nsq/internal/http_api"
  22. "github.com/nsqio/nsq/internal/protocol"
  23. "github.com/nsqio/nsq/internal/statsd"
  24. "github.com/nsqio/nsq/internal/util"
  25. "github.com/nsqio/nsq/internal/version"
  26. )
  27. const (
  28. TLSNotRequired = iota
  29. TLSRequiredExceptHTTP
  30. TLSRequired
  31. )
  32. type errStore struct {
  33. err error
  34. }
  35. type NSQD struct {
  36. // 64bit atomic vars need to be first for proper alignment on 32bit platforms
  37. clientIDSequence int64
  38. sync.RWMutex
  39. ctx context.Context
  40. // ctxCancel cancels a context that main() is waiting on
  41. ctxCancel context.CancelFunc
  42. opts atomic.Value
  43. dl *dirlock.DirLock
  44. isLoading int32
  45. isExiting int32
  46. errValue atomic.Value
  47. startTime time.Time
  48. topicMap map[string]*Topic
  49. lookupPeers atomic.Value
  50. tcpServer *tcpServer
  51. tcpListener net.Listener
  52. httpListener net.Listener
  53. httpsListener net.Listener
  54. tlsConfig *tls.Config
  55. poolSize int
  56. notifyChan chan interface{}
  57. optsNotificationChan chan struct{}
  58. exitChan chan int
  59. waitGroup util.WaitGroupWrapper
  60. ci *clusterinfo.ClusterInfo
  61. }
  62. func New(opts *Options) (*NSQD, error) {
  63. var err error
  64. dataPath := opts.DataPath
  65. if opts.DataPath == "" {
  66. cwd, _ := os.Getwd()
  67. dataPath = cwd
  68. }
  69. if opts.Logger == nil {
  70. opts.Logger = log.New(os.Stderr, opts.LogPrefix, log.Ldate|log.Ltime|log.Lmicroseconds)
  71. }
  72. n := &NSQD{
  73. startTime: time.Now(),
  74. topicMap: make(map[string]*Topic),
  75. exitChan: make(chan int),
  76. notifyChan: make(chan interface{}),
  77. optsNotificationChan: make(chan struct{}, 1),
  78. dl: dirlock.New(dataPath),
  79. }
  80. n.ctx, n.ctxCancel = context.WithCancel(context.Background())
  81. httpcli := http_api.NewClient(nil, opts.HTTPClientConnectTimeout, opts.HTTPClientRequestTimeout)
  82. n.ci = clusterinfo.New(n.logf, httpcli)
  83. n.lookupPeers.Store([]*lookupPeer{})
  84. n.swapOpts(opts)
  85. n.errValue.Store(errStore{})
  86. err = n.dl.Lock()
  87. if err != nil {
  88. return nil, fmt.Errorf("failed to lock data-path: %v", err)
  89. }
  90. if opts.MaxDeflateLevel < 1 || opts.MaxDeflateLevel > 9 {
  91. return nil, errors.New("--max-deflate-level must be [1,9]")
  92. }
  93. if opts.ID < 0 || opts.ID >= 1024 {
  94. return nil, errors.New("--node-id must be [0,1024)")
  95. }
  96. if opts.TLSClientAuthPolicy != "" && opts.TLSRequired == TLSNotRequired {
  97. opts.TLSRequired = TLSRequired
  98. }
  99. tlsConfig, err := buildTLSConfig(opts)
  100. if err != nil {
  101. return nil, fmt.Errorf("failed to build TLS config - %s", err)
  102. }
  103. if tlsConfig == nil && opts.TLSRequired != TLSNotRequired {
  104. return nil, errors.New("cannot require TLS client connections without TLS key and cert")
  105. }
  106. n.tlsConfig = tlsConfig
  107. for _, v := range opts.E2EProcessingLatencyPercentiles {
  108. if v <= 0 || v > 1 {
  109. return nil, fmt.Errorf("invalid E2E processing latency percentile: %v", v)
  110. }
  111. }
  112. n.logf(LOG_INFO, version.String("smqd"))
  113. n.logf(LOG_INFO, "ID: %d", opts.ID)
  114. n.tcpServer = &tcpServer{nsqd: n}
  115. n.tcpListener, err = net.Listen("tcp", opts.TCPAddress)
  116. if err != nil {
  117. return nil, fmt.Errorf("listen (%s) failed - %s", opts.TCPAddress, err)
  118. }
  119. if opts.HTTPAddress != "" {
  120. n.httpListener, err = net.Listen("tcp", opts.HTTPAddress)
  121. if err != nil {
  122. return nil, fmt.Errorf("listen (%s) failed - %s", opts.HTTPAddress, err)
  123. }
  124. }
  125. if n.tlsConfig != nil && opts.HTTPSAddress != "" {
  126. n.httpsListener, err = tls.Listen("tcp", opts.HTTPSAddress, n.tlsConfig)
  127. if err != nil {
  128. return nil, fmt.Errorf("listen (%s) failed - %s", opts.HTTPSAddress, err)
  129. }
  130. }
  131. if opts.BroadcastHTTPPort == 0 {
  132. opts.BroadcastHTTPPort = n.RealHTTPAddr().Port
  133. }
  134. if opts.BroadcastTCPPort == 0 {
  135. opts.BroadcastTCPPort = n.RealTCPAddr().Port
  136. }
  137. if opts.StatsdPrefix != "" {
  138. var port string = fmt.Sprint(opts.BroadcastHTTPPort)
  139. statsdHostKey := statsd.HostKey(net.JoinHostPort(opts.BroadcastAddress, port))
  140. prefixWithHost := strings.Replace(opts.StatsdPrefix, "%s", statsdHostKey, -1)
  141. if prefixWithHost[len(prefixWithHost)-1] != '.' {
  142. prefixWithHost += "."
  143. }
  144. opts.StatsdPrefix = prefixWithHost
  145. }
  146. return n, nil
  147. }
  148. func (n *NSQD) getOpts() *Options {
  149. return n.opts.Load().(*Options)
  150. }
  151. func (n *NSQD) swapOpts(opts *Options) {
  152. n.opts.Store(opts)
  153. }
  154. func (n *NSQD) triggerOptsNotification() {
  155. select {
  156. case n.optsNotificationChan <- struct{}{}:
  157. default:
  158. }
  159. }
  160. func (n *NSQD) RealTCPAddr() *net.TCPAddr {
  161. if n.tcpListener == nil {
  162. return &net.TCPAddr{}
  163. }
  164. return n.tcpListener.Addr().(*net.TCPAddr)
  165. }
  166. func (n *NSQD) RealHTTPAddr() *net.TCPAddr {
  167. if n.httpListener == nil {
  168. return &net.TCPAddr{}
  169. }
  170. return n.httpListener.Addr().(*net.TCPAddr)
  171. }
  172. func (n *NSQD) RealHTTPSAddr() *net.TCPAddr {
  173. if n.httpsListener == nil {
  174. return &net.TCPAddr{}
  175. }
  176. return n.httpsListener.Addr().(*net.TCPAddr)
  177. }
  178. func (n *NSQD) SetHealth(err error) {
  179. n.errValue.Store(errStore{err: err})
  180. }
  181. func (n *NSQD) IsHealthy() bool {
  182. return n.GetError() == nil
  183. }
  184. func (n *NSQD) GetError() error {
  185. errValue := n.errValue.Load()
  186. return errValue.(errStore).err
  187. }
  188. func (n *NSQD) GetHealth() string {
  189. err := n.GetError()
  190. if err != nil {
  191. return fmt.Sprintf("NOK - %s", err)
  192. }
  193. return "OK"
  194. }
  195. func (n *NSQD) GetStartTime() time.Time {
  196. return n.startTime
  197. }
  198. func (n *NSQD) Main() error {
  199. exitCh := make(chan error)
  200. var once sync.Once
  201. exitFunc := func(err error) {
  202. once.Do(func() {
  203. if err != nil {
  204. n.logf(LOG_FATAL, "%s", err)
  205. }
  206. exitCh <- err
  207. })
  208. }
  209. n.waitGroup.Wrap(func() {
  210. exitFunc(protocol.TCPServer(n.tcpListener, n.tcpServer, n.logf))
  211. })
  212. if n.httpListener != nil {
  213. httpServer := newHTTPServer(n, false, n.getOpts().TLSRequired == TLSRequired)
  214. n.waitGroup.Wrap(func() {
  215. exitFunc(http_api.Serve(n.httpListener, httpServer, "HTTP", n.logf))
  216. })
  217. }
  218. if n.httpsListener != nil {
  219. httpsServer := newHTTPServer(n, true, true)
  220. n.waitGroup.Wrap(func() {
  221. exitFunc(http_api.Serve(n.httpsListener, httpsServer, "HTTPS", n.logf))
  222. })
  223. }
  224. n.waitGroup.Wrap(n.queueScanLoop)
  225. n.waitGroup.Wrap(n.lookupLoop)
  226. if n.getOpts().StatsdAddress != "" {
  227. n.waitGroup.Wrap(n.statsdLoop)
  228. }
  229. err := <-exitCh
  230. return err
  231. }
  232. // Metadata is the collection of persistent information about the current NSQD.
  233. type Metadata struct {
  234. Topics []TopicMetadata `json:"topics"`
  235. Version string `json:"version"`
  236. }
  237. // TopicMetadata is the collection of persistent information about a topic.
  238. type TopicMetadata struct {
  239. Name string `json:"name"`
  240. Paused bool `json:"paused"`
  241. Channels []ChannelMetadata `json:"channels"`
  242. }
  243. // ChannelMetadata is the collection of persistent information about a channel.
  244. type ChannelMetadata struct {
  245. Name string `json:"name"`
  246. Paused bool `json:"paused"`
  247. }
  248. func newMetadataFile(opts *Options) string {
  249. return path.Join(opts.DataPath, "smqd.dat")
  250. }
  251. func readOrEmpty(fn string) ([]byte, error) {
  252. data, err := ioutil.ReadFile(fn)
  253. if err != nil {
  254. if !os.IsNotExist(err) {
  255. return nil, fmt.Errorf("failed to read metadata from %s - %s", fn, err)
  256. }
  257. }
  258. return data, nil
  259. }
  260. func writeSyncFile(fn string, data []byte) error {
  261. f, err := os.OpenFile(fn, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600)
  262. if err != nil {
  263. return err
  264. }
  265. _, err = f.Write(data)
  266. if err == nil {
  267. err = f.Sync()
  268. }
  269. f.Close()
  270. return err
  271. }
  272. func (n *NSQD) LoadMetadata() error {
  273. atomic.StoreInt32(&n.isLoading, 1)
  274. defer atomic.StoreInt32(&n.isLoading, 0)
  275. fn := newMetadataFile(n.getOpts())
  276. data, err := readOrEmpty(fn)
  277. if err != nil {
  278. return err
  279. }
  280. if data == nil {
  281. return nil // fresh start
  282. }
  283. var m Metadata
  284. err = json.Unmarshal(data, &m)
  285. if err != nil {
  286. return fmt.Errorf("failed to parse metadata in %s - %s", fn, err)
  287. }
  288. for _, t := range m.Topics {
  289. if !protocol.IsValidTopicName(t.Name) {
  290. n.logf(LOG_WARN, "skipping creation of invalid topic %s", t.Name)
  291. continue
  292. }
  293. topic := n.GetTopic(t.Name)
  294. if t.Paused {
  295. topic.Pause()
  296. }
  297. for _, c := range t.Channels {
  298. if !protocol.IsValidChannelName(c.Name) {
  299. n.logf(LOG_WARN, "skipping creation of invalid channel %s", c.Name)
  300. continue
  301. }
  302. channel := topic.GetChannel(c.Name)
  303. if c.Paused {
  304. channel.Pause()
  305. }
  306. }
  307. topic.Start()
  308. }
  309. return nil
  310. }
  311. // GetMetadata retrieves the current topic and channel set of the NSQ daemon. If
  312. // the ephemeral flag is set, ephemeral topics are also returned even though these
  313. // are not saved to disk.
  314. func (n *NSQD) GetMetadata(ephemeral bool) *Metadata {
  315. meta := &Metadata{
  316. Version: version.Binary,
  317. }
  318. for _, topic := range n.topicMap {
  319. if topic.ephemeral && !ephemeral {
  320. continue
  321. }
  322. topicData := TopicMetadata{
  323. Name: topic.name,
  324. Paused: topic.IsPaused(),
  325. }
  326. topic.Lock()
  327. for _, channel := range topic.channelMap {
  328. if channel.ephemeral {
  329. continue
  330. }
  331. topicData.Channels = append(topicData.Channels, ChannelMetadata{
  332. Name: channel.name,
  333. Paused: channel.IsPaused(),
  334. })
  335. }
  336. topic.Unlock()
  337. meta.Topics = append(meta.Topics, topicData)
  338. }
  339. return meta
  340. }
  341. func (n *NSQD) PersistMetadata() error {
  342. // persist metadata about what topics/channels we have, across restarts
  343. fileName := newMetadataFile(n.getOpts())
  344. n.logf(LOG_INFO, "smq: persisting topic/channel metadata to %s", fileName)
  345. data, err := json.Marshal(n.GetMetadata(false))
  346. if err != nil {
  347. return err
  348. }
  349. tmpFileName := fmt.Sprintf("%s.%d.tmp", fileName, rand.Int())
  350. err = writeSyncFile(tmpFileName, data)
  351. if err != nil {
  352. return err
  353. }
  354. err = os.Rename(tmpFileName, fileName)
  355. if err != nil {
  356. return err
  357. }
  358. // technically should fsync DataPath here
  359. return nil
  360. }
  361. func (n *NSQD) Exit() {
  362. if !atomic.CompareAndSwapInt32(&n.isExiting, 0, 1) {
  363. // avoid double call
  364. return
  365. }
  366. if n.tcpListener != nil {
  367. n.tcpListener.Close()
  368. }
  369. if n.tcpServer != nil {
  370. n.tcpServer.Close()
  371. }
  372. if n.httpListener != nil {
  373. n.httpListener.Close()
  374. }
  375. if n.httpsListener != nil {
  376. n.httpsListener.Close()
  377. }
  378. n.Lock()
  379. err := n.PersistMetadata()
  380. if err != nil {
  381. n.logf(LOG_ERROR, "failed to persist metadata - %s", err)
  382. }
  383. n.logf(LOG_INFO, "closing topics")
  384. for _, topic := range n.topicMap {
  385. topic.Close()
  386. }
  387. n.Unlock()
  388. n.logf(LOG_INFO, "stopping subsystems")
  389. close(n.exitChan)
  390. n.waitGroup.Wait()
  391. n.dl.Unlock()
  392. n.logf(LOG_INFO, "bye")
  393. n.ctxCancel()
  394. }
  395. // GetTopic performs a thread safe operation
  396. // to return a pointer to a Topic object (potentially new)
  397. func (n *NSQD) GetTopic(topicName string) *Topic {
  398. // most likely we already have this topic, so try read lock first
  399. n.RLock()
  400. t, ok := n.topicMap[topicName]
  401. n.RUnlock()
  402. if ok {
  403. return t
  404. }
  405. n.Lock()
  406. t, ok = n.topicMap[topicName]
  407. if ok {
  408. n.Unlock()
  409. return t
  410. }
  411. deleteCallback := func(t *Topic) {
  412. n.DeleteExistingTopic(t.name)
  413. }
  414. t = NewTopic(topicName, n, deleteCallback)
  415. n.topicMap[topicName] = t
  416. n.Unlock()
  417. n.logf(LOG_INFO, "TOPIC(%s): created", t.name)
  418. // topic is created but messagePump not yet started
  419. // if this topic was created while loading metadata at startup don't do any further initialization
  420. // (topic will be "started" after loading completes)
  421. if atomic.LoadInt32(&n.isLoading) == 1 {
  422. return t
  423. }
  424. // if using lookupd, make a blocking call to get channels and immediately create them
  425. // to ensure that all channels receive published messages
  426. lookupdHTTPAddrs := n.lookupdHTTPAddrs()
  427. if len(lookupdHTTPAddrs) > 0 {
  428. channelNames, err := n.ci.GetLookupdTopicChannels(t.name, lookupdHTTPAddrs)
  429. if err != nil {
  430. n.logf(LOG_WARN, "failed to query nsqlookupd for channels to pre-create for topic %s - %s", t.name, err)
  431. }
  432. for _, channelName := range channelNames {
  433. if strings.HasSuffix(channelName, "#ephemeral") {
  434. continue // do not create ephemeral channel with no consumer client
  435. }
  436. t.GetChannel(channelName)
  437. }
  438. } else if len(n.getOpts().NSQLookupdTCPAddresses) > 0 {
  439. n.logf(LOG_ERROR, "no available nsqlookupd to query for channels to pre-create for topic %s", t.name)
  440. }
  441. // now that all channels are added, start topic messagePump
  442. t.Start()
  443. return t
  444. }
  445. // GetExistingTopic gets a topic only if it exists
  446. func (n *NSQD) GetExistingTopic(topicName string) (*Topic, error) {
  447. n.RLock()
  448. defer n.RUnlock()
  449. topic, ok := n.topicMap[topicName]
  450. if !ok {
  451. return nil, errors.New("topic does not exist")
  452. }
  453. return topic, nil
  454. }
  455. // DeleteExistingTopic removes a topic only if it exists
  456. func (n *NSQD) DeleteExistingTopic(topicName string) error {
  457. n.RLock()
  458. topic, ok := n.topicMap[topicName]
  459. if !ok {
  460. n.RUnlock()
  461. return errors.New("topic does not exist")
  462. }
  463. n.RUnlock()
  464. // delete empties all channels and the topic itself before closing
  465. // (so that we dont leave any messages around)
  466. //
  467. // we do this before removing the topic from map below (with no lock)
  468. // so that any incoming writes will error and not create a new topic
  469. // to enforce ordering
  470. topic.Delete()
  471. n.Lock()
  472. delete(n.topicMap, topicName)
  473. n.Unlock()
  474. return nil
  475. }
  476. func (n *NSQD) Notify(v interface{}, persist bool) {
  477. // since the in-memory metadata is incomplete,
  478. // should not persist metadata while loading it.
  479. // nsqd will call `PersistMetadata` it after loading
  480. loading := atomic.LoadInt32(&n.isLoading) == 1
  481. n.waitGroup.Wrap(func() {
  482. // by selecting on exitChan we guarantee that
  483. // we do not block exit, see issue #123
  484. select {
  485. case <-n.exitChan:
  486. case n.notifyChan <- v:
  487. if loading || !persist {
  488. return
  489. }
  490. n.Lock()
  491. err := n.PersistMetadata()
  492. if err != nil {
  493. n.logf(LOG_ERROR, "failed to persist metadata - %s", err)
  494. }
  495. n.Unlock()
  496. }
  497. })
  498. }
  499. // channels returns a flat slice of all channels in all topics
  500. func (n *NSQD) channels() []*Channel {
  501. var channels []*Channel
  502. n.RLock()
  503. for _, t := range n.topicMap {
  504. t.RLock()
  505. for _, c := range t.channelMap {
  506. channels = append(channels, c)
  507. }
  508. t.RUnlock()
  509. }
  510. n.RUnlock()
  511. return channels
  512. }
  513. // resizePool adjusts the size of the pool of queueScanWorker goroutines
  514. //
  515. // 1 <= pool <= min(num * 0.25, QueueScanWorkerPoolMax)
  516. //
  517. func (n *NSQD) resizePool(num int, workCh chan *Channel, responseCh chan bool, closeCh chan int) {
  518. idealPoolSize := int(float64(num) * 0.25)
  519. if idealPoolSize < 1 {
  520. idealPoolSize = 1
  521. } else if idealPoolSize > n.getOpts().QueueScanWorkerPoolMax {
  522. idealPoolSize = n.getOpts().QueueScanWorkerPoolMax
  523. }
  524. for {
  525. if idealPoolSize == n.poolSize {
  526. break
  527. } else if idealPoolSize < n.poolSize {
  528. // contract
  529. closeCh <- 1
  530. n.poolSize--
  531. } else {
  532. // expand
  533. n.waitGroup.Wrap(func() {
  534. n.queueScanWorker(workCh, responseCh, closeCh)
  535. })
  536. n.poolSize++
  537. }
  538. }
  539. }
  540. // queueScanWorker receives work (in the form of a channel) from queueScanLoop
  541. // and processes the deferred and in-flight queues
  542. func (n *NSQD) queueScanWorker(workCh chan *Channel, responseCh chan bool, closeCh chan int) {
  543. for {
  544. select {
  545. case c := <-workCh:
  546. now := time.Now().UnixNano()
  547. dirty := false
  548. if c.processInFlightQueue(now) {
  549. dirty = true
  550. }
  551. if c.processDeferredQueue(now) {
  552. dirty = true
  553. }
  554. responseCh <- dirty
  555. case <-closeCh:
  556. return
  557. }
  558. }
  559. }
  560. // queueScanLoop runs in a single goroutine to process in-flight and deferred
  561. // priority queues. It manages a pool of queueScanWorker (configurable max of
  562. // QueueScanWorkerPoolMax (default: 4)) that process channels concurrently.
  563. //
  564. // It copies Redis's probabilistic expiration algorithm: it wakes up every
  565. // QueueScanInterval (default: 100ms) to select a random QueueScanSelectionCount
  566. // (default: 20) channels from a locally cached list (refreshed every
  567. // QueueScanRefreshInterval (default: 5s)).
  568. //
  569. // If either of the queues had work to do the channel is considered "dirty".
  570. //
  571. // If QueueScanDirtyPercent (default: 25%) of the selected channels were dirty,
  572. // the loop continues without sleep.
  573. func (n *NSQD) queueScanLoop() {
  574. workCh := make(chan *Channel, n.getOpts().QueueScanSelectionCount)
  575. responseCh := make(chan bool, n.getOpts().QueueScanSelectionCount)
  576. closeCh := make(chan int)
  577. workTicker := time.NewTicker(n.getOpts().QueueScanInterval)
  578. refreshTicker := time.NewTicker(n.getOpts().QueueScanRefreshInterval)
  579. channels := n.channels()
  580. n.resizePool(len(channels), workCh, responseCh, closeCh)
  581. for {
  582. select {
  583. case <-workTicker.C:
  584. if len(channels) == 0 {
  585. continue
  586. }
  587. case <-refreshTicker.C:
  588. channels = n.channels()
  589. n.resizePool(len(channels), workCh, responseCh, closeCh)
  590. continue
  591. case <-n.exitChan:
  592. goto exit
  593. }
  594. num := n.getOpts().QueueScanSelectionCount
  595. if num > len(channels) {
  596. num = len(channels)
  597. }
  598. loop:
  599. for _, i := range util.UniqRands(num, len(channels)) {
  600. workCh <- channels[i]
  601. }
  602. numDirty := 0
  603. for i := 0; i < num; i++ {
  604. if <-responseCh {
  605. numDirty++
  606. }
  607. }
  608. if float64(numDirty)/float64(num) > n.getOpts().QueueScanDirtyPercent {
  609. goto loop
  610. }
  611. }
  612. exit:
  613. n.logf(LOG_INFO, "QUEUESCAN: closing")
  614. close(closeCh)
  615. workTicker.Stop()
  616. refreshTicker.Stop()
  617. }
  618. func buildTLSConfig(opts *Options) (*tls.Config, error) {
  619. var tlsConfig *tls.Config
  620. if opts.TLSCert == "" && opts.TLSKey == "" {
  621. return nil, nil
  622. }
  623. tlsClientAuthPolicy := tls.VerifyClientCertIfGiven
  624. cert, err := tls.LoadX509KeyPair(opts.TLSCert, opts.TLSKey)
  625. if err != nil {
  626. return nil, err
  627. }
  628. switch opts.TLSClientAuthPolicy {
  629. case "require":
  630. tlsClientAuthPolicy = tls.RequireAnyClientCert
  631. case "require-verify":
  632. tlsClientAuthPolicy = tls.RequireAndVerifyClientCert
  633. default:
  634. tlsClientAuthPolicy = tls.NoClientCert
  635. }
  636. tlsConfig = &tls.Config{
  637. Certificates: []tls.Certificate{cert},
  638. ClientAuth: tlsClientAuthPolicy,
  639. MinVersion: opts.TLSMinVersion,
  640. }
  641. if opts.TLSRootCAFile != "" {
  642. tlsCertPool := x509.NewCertPool()
  643. caCertFile, err := ioutil.ReadFile(opts.TLSRootCAFile)
  644. if err != nil {
  645. return nil, err
  646. }
  647. if !tlsCertPool.AppendCertsFromPEM(caCertFile) {
  648. return nil, errors.New("failed to append certificate to pool")
  649. }
  650. tlsConfig.ClientCAs = tlsCertPool
  651. }
  652. tlsConfig.BuildNameToCertificate()
  653. return tlsConfig, nil
  654. }
  655. func (n *NSQD) IsAuthEnabled() bool {
  656. return len(n.getOpts().AuthHTTPAddresses) != 0
  657. }
  658. // Context returns a context that will be canceled when nsqd initiates the shutdown
  659. func (n *NSQD) Context() context.Context {
  660. return n.ctx
  661. }