123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616 |
- package nsqd
- import (
- "container/heap"
- "errors"
- "fmt"
- "math"
- "strings"
- "sync"
- "sync/atomic"
- "time"
- "github.com/nsqio/go-diskqueue"
- "github.com/nsqio/nsq/internal/lg"
- "github.com/nsqio/nsq/internal/pqueue"
- "github.com/nsqio/nsq/internal/quantile"
- )
- type Consumer interface {
- UnPause()
- Pause()
- Close() error
- TimedOutMessage()
- Stats(string) ClientStats
- Empty()
- }
- // Channel represents the concrete type for a NSQ channel (and also
- // implements the Queue interface)
- //
- // There can be multiple channels per topic, each with there own unique set
- // of subscribers (clients).
- //
- // Channels maintain all client and message metadata, orchestrating in-flight
- // messages, timeouts, requeuing, etc.
- type Channel struct {
- // 64bit atomic vars need to be first for proper alignment on 32bit platforms
- requeueCount uint64
- messageCount uint64
- timeoutCount uint64
- sync.RWMutex
- topicName string
- name string
- nsqd *NSQD
- backend BackendQueue
- memoryMsgChan chan *Message
- exitFlag int32
- exitMutex sync.RWMutex
- // state tracking
- clients map[int64]Consumer
- paused int32
- ephemeral bool
- deleteCallback func(*Channel)
- deleter sync.Once
- // Stats tracking
- e2eProcessingLatencyStream *quantile.Quantile
- // TODO: these can be DRYd up
- deferredMessages map[MessageID]*pqueue.Item
- deferredPQ pqueue.PriorityQueue
- deferredMutex sync.Mutex
- inFlightMessages map[MessageID]*Message
- inFlightPQ inFlightPqueue
- inFlightMutex sync.Mutex
- }
- // NewChannel creates a new instance of the Channel type and returns a pointer
- func NewChannel(topicName string, channelName string, nsqd *NSQD,
- deleteCallback func(*Channel)) *Channel {
- c := &Channel{
- topicName: topicName,
- name: channelName,
- memoryMsgChan: nil,
- clients: make(map[int64]Consumer),
- deleteCallback: deleteCallback,
- nsqd: nsqd,
- ephemeral: strings.HasSuffix(channelName, "#ephemeral"),
- }
- // avoid mem-queue if size == 0 for more consistent ordering
- if nsqd.getOpts().MemQueueSize > 0 || c.ephemeral {
- c.memoryMsgChan = make(chan *Message, nsqd.getOpts().MemQueueSize)
- }
- if len(nsqd.getOpts().E2EProcessingLatencyPercentiles) > 0 {
- c.e2eProcessingLatencyStream = quantile.New(
- nsqd.getOpts().E2EProcessingLatencyWindowTime,
- nsqd.getOpts().E2EProcessingLatencyPercentiles,
- )
- }
- c.initPQ()
- if c.ephemeral {
- c.backend = newDummyBackendQueue()
- } else {
- dqLogf := func(level diskqueue.LogLevel, f string, args ...interface{}) {
- opts := nsqd.getOpts()
- lg.Logf(opts.Logger, opts.LogLevel, lg.LogLevel(level), f, args...)
- }
- // backend names, for uniqueness, automatically include the topic...
- backendName := getBackendName(topicName, channelName)
- c.backend = diskqueue.New(
- backendName,
- nsqd.getOpts().DataPath,
- nsqd.getOpts().MaxBytesPerFile,
- int32(minValidMsgLength),
- int32(nsqd.getOpts().MaxMsgSize)+minValidMsgLength,
- nsqd.getOpts().SyncEvery,
- nsqd.getOpts().SyncTimeout,
- dqLogf,
- )
- }
- c.nsqd.Notify(c, !c.ephemeral)
- return c
- }
- func (c *Channel) initPQ() {
- pqSize := int(math.Max(1, float64(c.nsqd.getOpts().MemQueueSize)/10))
- c.inFlightMutex.Lock()
- c.inFlightMessages = make(map[MessageID]*Message)
- c.inFlightPQ = newInFlightPqueue(pqSize)
- c.inFlightMutex.Unlock()
- c.deferredMutex.Lock()
- c.deferredMessages = make(map[MessageID]*pqueue.Item)
- c.deferredPQ = pqueue.New(pqSize)
- c.deferredMutex.Unlock()
- }
- // Exiting returns a boolean indicating if this channel is closed/exiting
- func (c *Channel) Exiting() bool {
- return atomic.LoadInt32(&c.exitFlag) == 1
- }
- // Delete empties the channel and closes
- func (c *Channel) Delete() error {
- return c.exit(true)
- }
- // Close cleanly closes the Channel
- func (c *Channel) Close() error {
- return c.exit(false)
- }
- func (c *Channel) exit(deleted bool) error {
- c.exitMutex.Lock()
- defer c.exitMutex.Unlock()
- if !atomic.CompareAndSwapInt32(&c.exitFlag, 0, 1) {
- return errors.New("exiting")
- }
- if deleted {
- c.nsqd.logf(LOG_INFO, "CHANNEL(%s): deleting", c.name)
- // since we are explicitly deleting a channel (not just at system exit time)
- // de-register this from the lookupd
- c.nsqd.Notify(c, !c.ephemeral)
- } else {
- c.nsqd.logf(LOG_INFO, "CHANNEL(%s): closing", c.name)
- }
- // this forceably closes client connections
- c.RLock()
- for _, client := range c.clients {
- client.Close()
- }
- c.RUnlock()
- if deleted {
- // empty the queue (deletes the backend files, too)
- c.Empty()
- return c.backend.Delete()
- }
- // write anything leftover to disk
- c.flush()
- return c.backend.Close()
- }
- func (c *Channel) Empty() error {
- c.Lock()
- defer c.Unlock()
- c.initPQ()
- for _, client := range c.clients {
- client.Empty()
- }
- for {
- select {
- case <-c.memoryMsgChan:
- default:
- goto finish
- }
- }
- finish:
- return c.backend.Empty()
- }
- // flush persists all the messages in internal memory buffers to the backend
- // it does not drain inflight/deferred because it is only called in Close()
- func (c *Channel) flush() error {
- if len(c.memoryMsgChan) > 0 || len(c.inFlightMessages) > 0 || len(c.deferredMessages) > 0 {
- c.nsqd.logf(LOG_INFO, "CHANNEL(%s): flushing %d memory %d in-flight %d deferred messages to backend",
- c.name, len(c.memoryMsgChan), len(c.inFlightMessages), len(c.deferredMessages))
- }
- for {
- select {
- case msg := <-c.memoryMsgChan:
- err := writeMessageToBackend(msg, c.backend)
- if err != nil {
- c.nsqd.logf(LOG_ERROR, "failed to write message to backend - %s", err)
- }
- default:
- goto finish
- }
- }
- finish:
- c.inFlightMutex.Lock()
- for _, msg := range c.inFlightMessages {
- err := writeMessageToBackend(msg, c.backend)
- if err != nil {
- c.nsqd.logf(LOG_ERROR, "failed to write message to backend - %s", err)
- }
- }
- c.inFlightMutex.Unlock()
- c.deferredMutex.Lock()
- for _, item := range c.deferredMessages {
- msg := item.Value.(*Message)
- err := writeMessageToBackend(msg, c.backend)
- if err != nil {
- c.nsqd.logf(LOG_ERROR, "failed to write message to backend - %s", err)
- }
- }
- c.deferredMutex.Unlock()
- return nil
- }
- func (c *Channel) Depth() int64 {
- return int64(len(c.memoryMsgChan)) + c.backend.Depth()
- }
- func (c *Channel) Pause() error {
- return c.doPause(true)
- }
- func (c *Channel) UnPause() error {
- return c.doPause(false)
- }
- func (c *Channel) doPause(pause bool) error {
- if pause {
- atomic.StoreInt32(&c.paused, 1)
- } else {
- atomic.StoreInt32(&c.paused, 0)
- }
- c.RLock()
- for _, client := range c.clients {
- if pause {
- client.Pause()
- } else {
- client.UnPause()
- }
- }
- c.RUnlock()
- return nil
- }
- func (c *Channel) IsPaused() bool {
- return atomic.LoadInt32(&c.paused) == 1
- }
- // PutMessage writes a Message to the queue
- func (c *Channel) PutMessage(m *Message) error {
- c.exitMutex.RLock()
- defer c.exitMutex.RUnlock()
- if c.Exiting() {
- return errors.New("exiting")
- }
- err := c.put(m)
- if err != nil {
- return err
- }
- atomic.AddUint64(&c.messageCount, 1)
- return nil
- }
- func (c *Channel) put(m *Message) error {
- select {
- case c.memoryMsgChan <- m:
- default:
- err := writeMessageToBackend(m, c.backend)
- c.nsqd.SetHealth(err)
- if err != nil {
- c.nsqd.logf(LOG_ERROR, "CHANNEL(%s): failed to write message to backend - %s",
- c.name, err)
- return err
- }
- }
- return nil
- }
- func (c *Channel) PutMessageDeferred(msg *Message, timeout time.Duration) {
- atomic.AddUint64(&c.messageCount, 1)
- c.StartDeferredTimeout(msg, timeout)
- }
- // TouchMessage resets the timeout for an in-flight message
- func (c *Channel) TouchMessage(clientID int64, id MessageID, clientMsgTimeout time.Duration) error {
- msg, err := c.popInFlightMessage(clientID, id)
- if err != nil {
- return err
- }
- c.removeFromInFlightPQ(msg)
- newTimeout := time.Now().Add(clientMsgTimeout)
- if newTimeout.Sub(msg.deliveryTS) >=
- c.nsqd.getOpts().MaxMsgTimeout {
- // we would have gone over, set to the max
- newTimeout = msg.deliveryTS.Add(c.nsqd.getOpts().MaxMsgTimeout)
- }
- msg.pri = newTimeout.UnixNano()
- err = c.pushInFlightMessage(msg)
- if err != nil {
- return err
- }
- c.addToInFlightPQ(msg)
- return nil
- }
- // FinishMessage successfully discards an in-flight message
- func (c *Channel) FinishMessage(clientID int64, id MessageID) error {
- msg, err := c.popInFlightMessage(clientID, id)
- if err != nil {
- return err
- }
- c.removeFromInFlightPQ(msg)
- if c.e2eProcessingLatencyStream != nil {
- c.e2eProcessingLatencyStream.Insert(msg.Timestamp)
- }
- return nil
- }
- // RequeueMessage requeues a message based on `time.Duration`, ie:
- //
- // `timeoutMs` == 0 - requeue a message immediately
- // `timeoutMs` > 0 - asynchronously wait for the specified timeout
- // and requeue a message (aka "deferred requeue")
- //
- func (c *Channel) RequeueMessage(clientID int64, id MessageID, timeout time.Duration) error {
- // remove from inflight first
- msg, err := c.popInFlightMessage(clientID, id)
- if err != nil {
- return err
- }
- c.removeFromInFlightPQ(msg)
- atomic.AddUint64(&c.requeueCount, 1)
- if timeout == 0 {
- c.exitMutex.RLock()
- if c.Exiting() {
- c.exitMutex.RUnlock()
- return errors.New("exiting")
- }
- err := c.put(msg)
- c.exitMutex.RUnlock()
- return err
- }
- // deferred requeue
- return c.StartDeferredTimeout(msg, timeout)
- }
- // AddClient adds a client to the Channel's client list
- func (c *Channel) AddClient(clientID int64, client Consumer) error {
- c.exitMutex.RLock()
- defer c.exitMutex.RUnlock()
- if c.Exiting() {
- return errors.New("exiting")
- }
- c.RLock()
- _, ok := c.clients[clientID]
- numClients := len(c.clients)
- c.RUnlock()
- if ok {
- return nil
- }
- maxChannelConsumers := c.nsqd.getOpts().MaxChannelConsumers
- if maxChannelConsumers != 0 && numClients >= maxChannelConsumers {
- return fmt.Errorf("consumers for %s:%s exceeds limit of %d",
- c.topicName, c.name, maxChannelConsumers)
- }
- c.Lock()
- c.clients[clientID] = client
- c.Unlock()
- return nil
- }
- // RemoveClient removes a client from the Channel's client list
- func (c *Channel) RemoveClient(clientID int64) {
- c.exitMutex.RLock()
- defer c.exitMutex.RUnlock()
- if c.Exiting() {
- return
- }
- c.RLock()
- _, ok := c.clients[clientID]
- c.RUnlock()
- if !ok {
- return
- }
- c.Lock()
- delete(c.clients, clientID)
- c.Unlock()
- if len(c.clients) == 0 && c.ephemeral == true {
- go c.deleter.Do(func() { c.deleteCallback(c) })
- }
- }
- func (c *Channel) StartInFlightTimeout(msg *Message, clientID int64, timeout time.Duration) error {
- now := time.Now()
- msg.clientID = clientID
- msg.deliveryTS = now
- msg.pri = now.Add(timeout).UnixNano()
- err := c.pushInFlightMessage(msg)
- if err != nil {
- return err
- }
- c.addToInFlightPQ(msg)
- return nil
- }
- func (c *Channel) StartDeferredTimeout(msg *Message, timeout time.Duration) error {
- absTs := time.Now().Add(timeout).UnixNano()
- item := &pqueue.Item{Value: msg, Priority: absTs}
- err := c.pushDeferredMessage(item)
- if err != nil {
- return err
- }
- c.addToDeferredPQ(item)
- return nil
- }
- // pushInFlightMessage atomically adds a message to the in-flight dictionary
- func (c *Channel) pushInFlightMessage(msg *Message) error {
- c.inFlightMutex.Lock()
- _, ok := c.inFlightMessages[msg.ID]
- if ok {
- c.inFlightMutex.Unlock()
- return errors.New("ID already in flight")
- }
- c.inFlightMessages[msg.ID] = msg
- c.inFlightMutex.Unlock()
- return nil
- }
- // popInFlightMessage atomically removes a message from the in-flight dictionary
- func (c *Channel) popInFlightMessage(clientID int64, id MessageID) (*Message, error) {
- c.inFlightMutex.Lock()
- msg, ok := c.inFlightMessages[id]
- if !ok {
- c.inFlightMutex.Unlock()
- return nil, errors.New("ID not in flight")
- }
- if msg.clientID != clientID {
- c.inFlightMutex.Unlock()
- return nil, errors.New("client does not own message")
- }
- delete(c.inFlightMessages, id)
- c.inFlightMutex.Unlock()
- return msg, nil
- }
- func (c *Channel) addToInFlightPQ(msg *Message) {
- c.inFlightMutex.Lock()
- c.inFlightPQ.Push(msg)
- c.inFlightMutex.Unlock()
- }
- func (c *Channel) removeFromInFlightPQ(msg *Message) {
- c.inFlightMutex.Lock()
- if msg.index == -1 {
- // this item has already been popped off the pqueue
- c.inFlightMutex.Unlock()
- return
- }
- c.inFlightPQ.Remove(msg.index)
- c.inFlightMutex.Unlock()
- }
- func (c *Channel) pushDeferredMessage(item *pqueue.Item) error {
- c.deferredMutex.Lock()
- // TODO: these map lookups are costly
- id := item.Value.(*Message).ID
- _, ok := c.deferredMessages[id]
- if ok {
- c.deferredMutex.Unlock()
- return errors.New("ID already deferred")
- }
- c.deferredMessages[id] = item
- c.deferredMutex.Unlock()
- return nil
- }
- func (c *Channel) popDeferredMessage(id MessageID) (*pqueue.Item, error) {
- c.deferredMutex.Lock()
- // TODO: these map lookups are costly
- item, ok := c.deferredMessages[id]
- if !ok {
- c.deferredMutex.Unlock()
- return nil, errors.New("ID not deferred")
- }
- delete(c.deferredMessages, id)
- c.deferredMutex.Unlock()
- return item, nil
- }
- func (c *Channel) addToDeferredPQ(item *pqueue.Item) {
- c.deferredMutex.Lock()
- heap.Push(&c.deferredPQ, item)
- c.deferredMutex.Unlock()
- }
- func (c *Channel) processDeferredQueue(t int64) bool {
- c.exitMutex.RLock()
- defer c.exitMutex.RUnlock()
- if c.Exiting() {
- return false
- }
- dirty := false
- for {
- c.deferredMutex.Lock()
- item, _ := c.deferredPQ.PeekAndShift(t)
- c.deferredMutex.Unlock()
- if item == nil {
- goto exit
- }
- dirty = true
- msg := item.Value.(*Message)
- _, err := c.popDeferredMessage(msg.ID)
- if err != nil {
- goto exit
- }
- c.put(msg)
- }
- exit:
- return dirty
- }
- func (c *Channel) processInFlightQueue(t int64) bool {
- c.exitMutex.RLock()
- defer c.exitMutex.RUnlock()
- if c.Exiting() {
- return false
- }
- dirty := false
- for {
- c.inFlightMutex.Lock()
- msg, _ := c.inFlightPQ.PeekAndShift(t)
- c.inFlightMutex.Unlock()
- if msg == nil {
- goto exit
- }
- dirty = true
- _, err := c.popInFlightMessage(msg.clientID, msg.ID)
- if err != nil {
- goto exit
- }
- atomic.AddUint64(&c.timeoutCount, 1)
- c.RLock()
- client, ok := c.clients[msg.clientID]
- c.RUnlock()
- if ok {
- client.TimedOutMessage()
- }
- c.put(msg)
- }
- exit:
- return dirty
- }
|