123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767 |
- package nsqd
- import (
- "context"
- "crypto/tls"
- "crypto/x509"
- "encoding/json"
- "errors"
- "fmt"
- "io/ioutil"
- "log"
- "math/rand"
- "net"
- "os"
- "path"
- "strings"
- "sync"
- "sync/atomic"
- "time"
- "github.com/nsqio/nsq/internal/clusterinfo"
- "github.com/nsqio/nsq/internal/dirlock"
- "github.com/nsqio/nsq/internal/http_api"
- "github.com/nsqio/nsq/internal/protocol"
- "github.com/nsqio/nsq/internal/statsd"
- "github.com/nsqio/nsq/internal/util"
- "github.com/nsqio/nsq/internal/version"
- )
- const (
- TLSNotRequired = iota
- TLSRequiredExceptHTTP
- TLSRequired
- )
- type errStore struct {
- err error
- }
- type NSQD struct {
- // 64bit atomic vars need to be first for proper alignment on 32bit platforms
- clientIDSequence int64
- sync.RWMutex
- ctx context.Context
- // ctxCancel cancels a context that main() is waiting on
- ctxCancel context.CancelFunc
- opts atomic.Value
- dl *dirlock.DirLock
- isLoading int32
- isExiting int32
- errValue atomic.Value
- startTime time.Time
- topicMap map[string]*Topic
- lookupPeers atomic.Value
- tcpServer *tcpServer
- tcpListener net.Listener
- httpListener net.Listener
- httpsListener net.Listener
- tlsConfig *tls.Config
- poolSize int
- notifyChan chan interface{}
- optsNotificationChan chan struct{}
- exitChan chan int
- waitGroup util.WaitGroupWrapper
- ci *clusterinfo.ClusterInfo
- }
- func New(opts *Options) (*NSQD, error) {
- var err error
- dataPath := opts.DataPath
- if opts.DataPath == "" {
- cwd, _ := os.Getwd()
- dataPath = cwd
- }
- if opts.Logger == nil {
- opts.Logger = log.New(os.Stderr, opts.LogPrefix, log.Ldate|log.Ltime|log.Lmicroseconds)
- }
- n := &NSQD{
- startTime: time.Now(),
- topicMap: make(map[string]*Topic),
- exitChan: make(chan int),
- notifyChan: make(chan interface{}),
- optsNotificationChan: make(chan struct{}, 1),
- dl: dirlock.New(dataPath),
- }
- n.ctx, n.ctxCancel = context.WithCancel(context.Background())
- httpcli := http_api.NewClient(nil, opts.HTTPClientConnectTimeout, opts.HTTPClientRequestTimeout)
- n.ci = clusterinfo.New(n.logf, httpcli)
- n.lookupPeers.Store([]*lookupPeer{})
- n.swapOpts(opts)
- n.errValue.Store(errStore{})
- err = n.dl.Lock()
- if err != nil {
- return nil, fmt.Errorf("failed to lock data-path: %v", err)
- }
- if opts.MaxDeflateLevel < 1 || opts.MaxDeflateLevel > 9 {
- return nil, errors.New("--max-deflate-level must be [1,9]")
- }
- if opts.ID < 0 || opts.ID >= 1024 {
- return nil, errors.New("--node-id must be [0,1024)")
- }
- if opts.TLSClientAuthPolicy != "" && opts.TLSRequired == TLSNotRequired {
- opts.TLSRequired = TLSRequired
- }
- tlsConfig, err := buildTLSConfig(opts)
- if err != nil {
- return nil, fmt.Errorf("failed to build TLS config - %s", err)
- }
- if tlsConfig == nil && opts.TLSRequired != TLSNotRequired {
- return nil, errors.New("cannot require TLS client connections without TLS key and cert")
- }
- n.tlsConfig = tlsConfig
- for _, v := range opts.E2EProcessingLatencyPercentiles {
- if v <= 0 || v > 1 {
- return nil, fmt.Errorf("invalid E2E processing latency percentile: %v", v)
- }
- }
- n.logf(LOG_INFO, version.String("smqd"))
- n.logf(LOG_INFO, "ID: %d", opts.ID)
- n.tcpServer = &tcpServer{nsqd: n}
- n.tcpListener, err = net.Listen("tcp", opts.TCPAddress)
- if err != nil {
- return nil, fmt.Errorf("listen (%s) failed - %s", opts.TCPAddress, err)
- }
- if opts.HTTPAddress != "" {
- n.httpListener, err = net.Listen("tcp", opts.HTTPAddress)
- if err != nil {
- return nil, fmt.Errorf("listen (%s) failed - %s", opts.HTTPAddress, err)
- }
- }
- if n.tlsConfig != nil && opts.HTTPSAddress != "" {
- n.httpsListener, err = tls.Listen("tcp", opts.HTTPSAddress, n.tlsConfig)
- if err != nil {
- return nil, fmt.Errorf("listen (%s) failed - %s", opts.HTTPSAddress, err)
- }
- }
- if opts.BroadcastHTTPPort == 0 {
- opts.BroadcastHTTPPort = n.RealHTTPAddr().Port
- }
- if opts.BroadcastTCPPort == 0 {
- opts.BroadcastTCPPort = n.RealTCPAddr().Port
- }
- if opts.StatsdPrefix != "" {
- var port string = fmt.Sprint(opts.BroadcastHTTPPort)
- statsdHostKey := statsd.HostKey(net.JoinHostPort(opts.BroadcastAddress, port))
- prefixWithHost := strings.Replace(opts.StatsdPrefix, "%s", statsdHostKey, -1)
- if prefixWithHost[len(prefixWithHost)-1] != '.' {
- prefixWithHost += "."
- }
- opts.StatsdPrefix = prefixWithHost
- }
- return n, nil
- }
- func (n *NSQD) getOpts() *Options {
- return n.opts.Load().(*Options)
- }
- func (n *NSQD) swapOpts(opts *Options) {
- n.opts.Store(opts)
- }
- func (n *NSQD) triggerOptsNotification() {
- select {
- case n.optsNotificationChan <- struct{}{}:
- default:
- }
- }
- func (n *NSQD) RealTCPAddr() *net.TCPAddr {
- if n.tcpListener == nil {
- return &net.TCPAddr{}
- }
- return n.tcpListener.Addr().(*net.TCPAddr)
- }
- func (n *NSQD) RealHTTPAddr() *net.TCPAddr {
- if n.httpListener == nil {
- return &net.TCPAddr{}
- }
- return n.httpListener.Addr().(*net.TCPAddr)
- }
- func (n *NSQD) RealHTTPSAddr() *net.TCPAddr {
- if n.httpsListener == nil {
- return &net.TCPAddr{}
- }
- return n.httpsListener.Addr().(*net.TCPAddr)
- }
- func (n *NSQD) SetHealth(err error) {
- n.errValue.Store(errStore{err: err})
- }
- func (n *NSQD) IsHealthy() bool {
- return n.GetError() == nil
- }
- func (n *NSQD) GetError() error {
- errValue := n.errValue.Load()
- return errValue.(errStore).err
- }
- func (n *NSQD) GetHealth() string {
- err := n.GetError()
- if err != nil {
- return fmt.Sprintf("NOK - %s", err)
- }
- return "OK"
- }
- func (n *NSQD) GetStartTime() time.Time {
- return n.startTime
- }
- func (n *NSQD) Main() error {
- exitCh := make(chan error)
- var once sync.Once
- exitFunc := func(err error) {
- once.Do(func() {
- if err != nil {
- n.logf(LOG_FATAL, "%s", err)
- }
- exitCh <- err
- })
- }
- n.waitGroup.Wrap(func() {
- exitFunc(protocol.TCPServer(n.tcpListener, n.tcpServer, n.logf))
- })
- if n.httpListener != nil {
- httpServer := newHTTPServer(n, false, n.getOpts().TLSRequired == TLSRequired)
- n.waitGroup.Wrap(func() {
- exitFunc(http_api.Serve(n.httpListener, httpServer, "HTTP", n.logf))
- })
- }
- if n.httpsListener != nil {
- httpsServer := newHTTPServer(n, true, true)
- n.waitGroup.Wrap(func() {
- exitFunc(http_api.Serve(n.httpsListener, httpsServer, "HTTPS", n.logf))
- })
- }
- n.waitGroup.Wrap(n.queueScanLoop)
- n.waitGroup.Wrap(n.lookupLoop)
- if n.getOpts().StatsdAddress != "" {
- n.waitGroup.Wrap(n.statsdLoop)
- }
- err := <-exitCh
- return err
- }
- // Metadata is the collection of persistent information about the current NSQD.
- type Metadata struct {
- Topics []TopicMetadata `json:"topics"`
- Version string `json:"version"`
- }
- // TopicMetadata is the collection of persistent information about a topic.
- type TopicMetadata struct {
- Name string `json:"name"`
- Paused bool `json:"paused"`
- Channels []ChannelMetadata `json:"channels"`
- }
- // ChannelMetadata is the collection of persistent information about a channel.
- type ChannelMetadata struct {
- Name string `json:"name"`
- Paused bool `json:"paused"`
- }
- func newMetadataFile(opts *Options) string {
- return path.Join(opts.DataPath, "smqd.dat")
- }
- func readOrEmpty(fn string) ([]byte, error) {
- data, err := ioutil.ReadFile(fn)
- if err != nil {
- if !os.IsNotExist(err) {
- return nil, fmt.Errorf("failed to read metadata from %s - %s", fn, err)
- }
- }
- return data, nil
- }
- func writeSyncFile(fn string, data []byte) error {
- f, err := os.OpenFile(fn, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600)
- if err != nil {
- return err
- }
- _, err = f.Write(data)
- if err == nil {
- err = f.Sync()
- }
- f.Close()
- return err
- }
- func (n *NSQD) LoadMetadata() error {
- atomic.StoreInt32(&n.isLoading, 1)
- defer atomic.StoreInt32(&n.isLoading, 0)
- fn := newMetadataFile(n.getOpts())
- data, err := readOrEmpty(fn)
- if err != nil {
- return err
- }
- if data == nil {
- return nil // fresh start
- }
- var m Metadata
- err = json.Unmarshal(data, &m)
- if err != nil {
- return fmt.Errorf("failed to parse metadata in %s - %s", fn, err)
- }
- for _, t := range m.Topics {
- if !protocol.IsValidTopicName(t.Name) {
- n.logf(LOG_WARN, "skipping creation of invalid topic %s", t.Name)
- continue
- }
- topic := n.GetTopic(t.Name)
- if t.Paused {
- topic.Pause()
- }
- for _, c := range t.Channels {
- if !protocol.IsValidChannelName(c.Name) {
- n.logf(LOG_WARN, "skipping creation of invalid channel %s", c.Name)
- continue
- }
- channel := topic.GetChannel(c.Name)
- if c.Paused {
- channel.Pause()
- }
- }
- topic.Start()
- }
- return nil
- }
- // GetMetadata retrieves the current topic and channel set of the NSQ daemon. If
- // the ephemeral flag is set, ephemeral topics are also returned even though these
- // are not saved to disk.
- func (n *NSQD) GetMetadata(ephemeral bool) *Metadata {
- meta := &Metadata{
- Version: version.Binary,
- }
- for _, topic := range n.topicMap {
- if topic.ephemeral && !ephemeral {
- continue
- }
- topicData := TopicMetadata{
- Name: topic.name,
- Paused: topic.IsPaused(),
- }
- topic.Lock()
- for _, channel := range topic.channelMap {
- if channel.ephemeral {
- continue
- }
- topicData.Channels = append(topicData.Channels, ChannelMetadata{
- Name: channel.name,
- Paused: channel.IsPaused(),
- })
- }
- topic.Unlock()
- meta.Topics = append(meta.Topics, topicData)
- }
- return meta
- }
- func (n *NSQD) PersistMetadata() error {
- // persist metadata about what topics/channels we have, across restarts
- fileName := newMetadataFile(n.getOpts())
- n.logf(LOG_INFO, "smq: persisting topic/channel metadata to %s", fileName)
- data, err := json.Marshal(n.GetMetadata(false))
- if err != nil {
- return err
- }
- tmpFileName := fmt.Sprintf("%s.%d.tmp", fileName, rand.Int())
- err = writeSyncFile(tmpFileName, data)
- if err != nil {
- return err
- }
- err = os.Rename(tmpFileName, fileName)
- if err != nil {
- return err
- }
- // technically should fsync DataPath here
- return nil
- }
- func (n *NSQD) Exit() {
- if !atomic.CompareAndSwapInt32(&n.isExiting, 0, 1) {
- // avoid double call
- return
- }
- if n.tcpListener != nil {
- n.tcpListener.Close()
- }
- if n.tcpServer != nil {
- n.tcpServer.Close()
- }
- if n.httpListener != nil {
- n.httpListener.Close()
- }
- if n.httpsListener != nil {
- n.httpsListener.Close()
- }
- n.Lock()
- err := n.PersistMetadata()
- if err != nil {
- n.logf(LOG_ERROR, "failed to persist metadata - %s", err)
- }
- n.logf(LOG_INFO, "closing topics")
- for _, topic := range n.topicMap {
- topic.Close()
- }
- n.Unlock()
- n.logf(LOG_INFO, "stopping subsystems")
- close(n.exitChan)
- n.waitGroup.Wait()
- n.dl.Unlock()
- n.logf(LOG_INFO, "bye")
- n.ctxCancel()
- }
- // GetTopic performs a thread safe operation
- // to return a pointer to a Topic object (potentially new)
- func (n *NSQD) GetTopic(topicName string) *Topic {
- // most likely we already have this topic, so try read lock first
- n.RLock()
- t, ok := n.topicMap[topicName]
- n.RUnlock()
- if ok {
- return t
- }
- n.Lock()
- t, ok = n.topicMap[topicName]
- if ok {
- n.Unlock()
- return t
- }
- deleteCallback := func(t *Topic) {
- n.DeleteExistingTopic(t.name)
- }
- t = NewTopic(topicName, n, deleteCallback)
- n.topicMap[topicName] = t
- n.Unlock()
- n.logf(LOG_INFO, "TOPIC(%s): created", t.name)
- // topic is created but messagePump not yet started
- // if this topic was created while loading metadata at startup don't do any further initialization
- // (topic will be "started" after loading completes)
- if atomic.LoadInt32(&n.isLoading) == 1 {
- return t
- }
- // if using lookupd, make a blocking call to get channels and immediately create them
- // to ensure that all channels receive published messages
- lookupdHTTPAddrs := n.lookupdHTTPAddrs()
- if len(lookupdHTTPAddrs) > 0 {
- channelNames, err := n.ci.GetLookupdTopicChannels(t.name, lookupdHTTPAddrs)
- if err != nil {
- n.logf(LOG_WARN, "failed to query nsqlookupd for channels to pre-create for topic %s - %s", t.name, err)
- }
- for _, channelName := range channelNames {
- if strings.HasSuffix(channelName, "#ephemeral") {
- continue // do not create ephemeral channel with no consumer client
- }
- t.GetChannel(channelName)
- }
- } else if len(n.getOpts().NSQLookupdTCPAddresses) > 0 {
- n.logf(LOG_ERROR, "no available nsqlookupd to query for channels to pre-create for topic %s", t.name)
- }
- // now that all channels are added, start topic messagePump
- t.Start()
- return t
- }
- // GetExistingTopic gets a topic only if it exists
- func (n *NSQD) GetExistingTopic(topicName string) (*Topic, error) {
- n.RLock()
- defer n.RUnlock()
- topic, ok := n.topicMap[topicName]
- if !ok {
- return nil, errors.New("topic does not exist")
- }
- return topic, nil
- }
- // DeleteExistingTopic removes a topic only if it exists
- func (n *NSQD) DeleteExistingTopic(topicName string) error {
- n.RLock()
- topic, ok := n.topicMap[topicName]
- if !ok {
- n.RUnlock()
- return errors.New("topic does not exist")
- }
- n.RUnlock()
- // delete empties all channels and the topic itself before closing
- // (so that we dont leave any messages around)
- //
- // we do this before removing the topic from map below (with no lock)
- // so that any incoming writes will error and not create a new topic
- // to enforce ordering
- topic.Delete()
- n.Lock()
- delete(n.topicMap, topicName)
- n.Unlock()
- return nil
- }
- func (n *NSQD) Notify(v interface{}, persist bool) {
- // since the in-memory metadata is incomplete,
- // should not persist metadata while loading it.
- // nsqd will call `PersistMetadata` it after loading
- loading := atomic.LoadInt32(&n.isLoading) == 1
- n.waitGroup.Wrap(func() {
- // by selecting on exitChan we guarantee that
- // we do not block exit, see issue #123
- select {
- case <-n.exitChan:
- case n.notifyChan <- v:
- if loading || !persist {
- return
- }
- n.Lock()
- err := n.PersistMetadata()
- if err != nil {
- n.logf(LOG_ERROR, "failed to persist metadata - %s", err)
- }
- n.Unlock()
- }
- })
- }
- // channels returns a flat slice of all channels in all topics
- func (n *NSQD) channels() []*Channel {
- var channels []*Channel
- n.RLock()
- for _, t := range n.topicMap {
- t.RLock()
- for _, c := range t.channelMap {
- channels = append(channels, c)
- }
- t.RUnlock()
- }
- n.RUnlock()
- return channels
- }
- // resizePool adjusts the size of the pool of queueScanWorker goroutines
- //
- // 1 <= pool <= min(num * 0.25, QueueScanWorkerPoolMax)
- //
- func (n *NSQD) resizePool(num int, workCh chan *Channel, responseCh chan bool, closeCh chan int) {
- idealPoolSize := int(float64(num) * 0.25)
- if idealPoolSize < 1 {
- idealPoolSize = 1
- } else if idealPoolSize > n.getOpts().QueueScanWorkerPoolMax {
- idealPoolSize = n.getOpts().QueueScanWorkerPoolMax
- }
- for {
- if idealPoolSize == n.poolSize {
- break
- } else if idealPoolSize < n.poolSize {
- // contract
- closeCh <- 1
- n.poolSize--
- } else {
- // expand
- n.waitGroup.Wrap(func() {
- n.queueScanWorker(workCh, responseCh, closeCh)
- })
- n.poolSize++
- }
- }
- }
- // queueScanWorker receives work (in the form of a channel) from queueScanLoop
- // and processes the deferred and in-flight queues
- func (n *NSQD) queueScanWorker(workCh chan *Channel, responseCh chan bool, closeCh chan int) {
- for {
- select {
- case c := <-workCh:
- now := time.Now().UnixNano()
- dirty := false
- if c.processInFlightQueue(now) {
- dirty = true
- }
- if c.processDeferredQueue(now) {
- dirty = true
- }
- responseCh <- dirty
- case <-closeCh:
- return
- }
- }
- }
- // queueScanLoop runs in a single goroutine to process in-flight and deferred
- // priority queues. It manages a pool of queueScanWorker (configurable max of
- // QueueScanWorkerPoolMax (default: 4)) that process channels concurrently.
- //
- // It copies Redis's probabilistic expiration algorithm: it wakes up every
- // QueueScanInterval (default: 100ms) to select a random QueueScanSelectionCount
- // (default: 20) channels from a locally cached list (refreshed every
- // QueueScanRefreshInterval (default: 5s)).
- //
- // If either of the queues had work to do the channel is considered "dirty".
- //
- // If QueueScanDirtyPercent (default: 25%) of the selected channels were dirty,
- // the loop continues without sleep.
- func (n *NSQD) queueScanLoop() {
- workCh := make(chan *Channel, n.getOpts().QueueScanSelectionCount)
- responseCh := make(chan bool, n.getOpts().QueueScanSelectionCount)
- closeCh := make(chan int)
- workTicker := time.NewTicker(n.getOpts().QueueScanInterval)
- refreshTicker := time.NewTicker(n.getOpts().QueueScanRefreshInterval)
- channels := n.channels()
- n.resizePool(len(channels), workCh, responseCh, closeCh)
- for {
- select {
- case <-workTicker.C:
- if len(channels) == 0 {
- continue
- }
- case <-refreshTicker.C:
- channels = n.channels()
- n.resizePool(len(channels), workCh, responseCh, closeCh)
- continue
- case <-n.exitChan:
- goto exit
- }
- num := n.getOpts().QueueScanSelectionCount
- if num > len(channels) {
- num = len(channels)
- }
- loop:
- for _, i := range util.UniqRands(num, len(channels)) {
- workCh <- channels[i]
- }
- numDirty := 0
- for i := 0; i < num; i++ {
- if <-responseCh {
- numDirty++
- }
- }
- if float64(numDirty)/float64(num) > n.getOpts().QueueScanDirtyPercent {
- goto loop
- }
- }
- exit:
- n.logf(LOG_INFO, "QUEUESCAN: closing")
- close(closeCh)
- workTicker.Stop()
- refreshTicker.Stop()
- }
- func buildTLSConfig(opts *Options) (*tls.Config, error) {
- var tlsConfig *tls.Config
- if opts.TLSCert == "" && opts.TLSKey == "" {
- return nil, nil
- }
- tlsClientAuthPolicy := tls.VerifyClientCertIfGiven
- cert, err := tls.LoadX509KeyPair(opts.TLSCert, opts.TLSKey)
- if err != nil {
- return nil, err
- }
- switch opts.TLSClientAuthPolicy {
- case "require":
- tlsClientAuthPolicy = tls.RequireAnyClientCert
- case "require-verify":
- tlsClientAuthPolicy = tls.RequireAndVerifyClientCert
- default:
- tlsClientAuthPolicy = tls.NoClientCert
- }
- tlsConfig = &tls.Config{
- Certificates: []tls.Certificate{cert},
- ClientAuth: tlsClientAuthPolicy,
- MinVersion: opts.TLSMinVersion,
- }
- if opts.TLSRootCAFile != "" {
- tlsCertPool := x509.NewCertPool()
- caCertFile, err := ioutil.ReadFile(opts.TLSRootCAFile)
- if err != nil {
- return nil, err
- }
- if !tlsCertPool.AppendCertsFromPEM(caCertFile) {
- return nil, errors.New("failed to append certificate to pool")
- }
- tlsConfig.ClientCAs = tlsCertPool
- }
- tlsConfig.BuildNameToCertificate()
- return tlsConfig, nil
- }
- func (n *NSQD) IsAuthEnabled() bool {
- return len(n.getOpts().AuthHTTPAddresses) != 0
- }
- // Context returns a context that will be canceled when nsqd initiates the shutdown
- func (n *NSQD) Context() context.Context {
- return n.ctx
- }
|