package nsqd import ( "errors" "sort" "strings" "sync" "sync/atomic" "time" "github.com/nsqio/go-diskqueue" "github.com/nsqio/nsq/internal/lg" "github.com/nsqio/nsq/internal/quantile" "github.com/nsqio/nsq/internal/util" ) type Topic struct { // 64bit atomic vars need to be first for proper alignment on 32bit platforms messageCount uint64 messageBytes uint64 sync.RWMutex name string channelMap map[string]*Channel backend BackendQueue memoryMsgChan chan *Message startChan chan int exitChan chan int channelUpdateChan chan int waitGroup util.WaitGroupWrapper exitFlag int32 idFactory *guidFactory ephemeral bool deleteCallback func(*Topic) deleter sync.Once paused int32 pauseChan chan int nsqd *NSQD } // Topic constructor func NewTopic(topicName string, nsqd *NSQD, deleteCallback func(*Topic)) *Topic { t := &Topic{ name: topicName, channelMap: make(map[string]*Channel), memoryMsgChan: make(chan *Message, nsqd.getOpts().MemQueueSize), startChan: make(chan int, 1), exitChan: make(chan int), channelUpdateChan: make(chan int), nsqd: nsqd, paused: 0, pauseChan: make(chan int), deleteCallback: deleteCallback, idFactory: NewGUIDFactory(nsqd.getOpts().ID), } if strings.HasSuffix(topicName, "#ephemeral") { t.ephemeral = true t.backend = newDummyBackendQueue() } else { dqLogf := func(level diskqueue.LogLevel, f string, args ...interface{}) { opts := nsqd.getOpts() lg.Logf(opts.Logger, opts.LogLevel, lg.LogLevel(level), f, args...) } t.backend = diskqueue.New( topicName, nsqd.getOpts().DataPath, nsqd.getOpts().MaxBytesPerFile, int32(minValidMsgLength), int32(nsqd.getOpts().MaxMsgSize)+minValidMsgLength, nsqd.getOpts().SyncEvery, nsqd.getOpts().SyncTimeout, dqLogf, ) } t.waitGroup.Wrap(t.messagePump) t.nsqd.Notify(t, !t.ephemeral) return t } func (t *Topic) Start() { select { case t.startChan <- 1: default: } } // Exiting returns a boolean indicating if this topic is closed/exiting func (t *Topic) Exiting() bool { return atomic.LoadInt32(&t.exitFlag) == 1 } // GetChannel performs a thread safe operation // to return a pointer to a Channel object (potentially new) // for the given Topic func (t *Topic) GetChannel(channelName string) *Channel { t.Lock() channel, isNew := t.getOrCreateChannel(channelName) t.Unlock() if isNew { // update messagePump state select { case t.channelUpdateChan <- 1: case <-t.exitChan: } } return channel } // this expects the caller to handle locking func (t *Topic) getOrCreateChannel(channelName string) (*Channel, bool) { channel, ok := t.channelMap[channelName] if !ok { deleteCallback := func(c *Channel) { t.DeleteExistingChannel(c.name) } channel = NewChannel(t.name, channelName, t.nsqd, deleteCallback) t.channelMap[channelName] = channel t.nsqd.logf(LOG_INFO, "TOPIC(%s): new channel(%s)", t.name, channel.name) return channel, true } return channel, false } func (t *Topic) GetExistingChannel(channelName string) (*Channel, error) { t.RLock() defer t.RUnlock() channel, ok := t.channelMap[channelName] if !ok { return nil, errors.New("channel does not exist") } return channel, nil } // DeleteExistingChannel removes a channel from the topic only if it exists func (t *Topic) DeleteExistingChannel(channelName string) error { t.RLock() channel, ok := t.channelMap[channelName] t.RUnlock() if !ok { return errors.New("channel does not exist") } t.nsqd.logf(LOG_INFO, "TOPIC(%s): deleting channel %s", t.name, channel.name) // delete empties the channel before closing // (so that we dont leave any messages around) // // we do this before removing the channel from map below (with no lock) // so that any incoming subs will error and not create a new channel // to enforce ordering channel.Delete() t.Lock() delete(t.channelMap, channelName) numChannels := len(t.channelMap) t.Unlock() // update messagePump state select { case t.channelUpdateChan <- 1: case <-t.exitChan: } if numChannels == 0 && t.ephemeral == true { go t.deleter.Do(func() { t.deleteCallback(t) }) } return nil } // PutMessage writes a Message to the queue func (t *Topic) PutMessage(m *Message) error { t.RLock() defer t.RUnlock() if atomic.LoadInt32(&t.exitFlag) == 1 { return errors.New("exiting") } err := t.put(m) if err != nil { return err } atomic.AddUint64(&t.messageCount, 1) atomic.AddUint64(&t.messageBytes, uint64(len(m.Body))) return nil } // PutMessages writes multiple Messages to the queue func (t *Topic) PutMessages(msgs []*Message) error { // t.nsqd.logf(LOG_INFO, "receive mpub message - %s", msgs) if !t.nsqd.getOpts().FifoEnabled { if t.nsqd.getOpts().QosEnabled { t.nsqd.logf(LOG_INFO, "QosEnabled") // 从小到大排序(稳定排序) sort.SliceStable(msgs, func(i, j int) bool { p1, _ := msgs[i].getPriority() p2, _ := msgs[j].getPriority() return p1 < p2 }) } else { t.nsqd.logf(LOG_INFO, "RtEnabled") // 从小到大排序(稳定排序) sort.SliceStable(msgs, func(i, j int) bool { p1, _ := msgs[i].getLatency() p2, _ := msgs[j].getLatency() t.nsqd.logf(LOG_INFO, "msg:%s(latency:%d) msg:%s(latency:%d)", string(msgs[i].Body), p1, string(msgs[j].Body), p2) return p1 < p2 }) } } t.RLock() defer t.RUnlock() if atomic.LoadInt32(&t.exitFlag) == 1 { return errors.New("exiting") } messageTotalBytes := 0 for i, m := range msgs { t.nsqd.logf(LOG_INFO, "put %d message - %s", i, (string)(m.Body)) err := t.put(m) if err != nil { atomic.AddUint64(&t.messageCount, uint64(i)) atomic.AddUint64(&t.messageBytes, uint64(messageTotalBytes)) return err } messageTotalBytes += len(m.Body) } atomic.AddUint64(&t.messageBytes, uint64(messageTotalBytes)) atomic.AddUint64(&t.messageCount, uint64(len(msgs))) return nil } func (t *Topic) put(m *Message) error { // If mem-queue-size == 0, avoid memory chan, for more consistent ordering, // but try to use memory chan for deferred messages (they lose deferred timer // in backend queue) or if topic is ephemeral (there is no backend queue). if cap(t.memoryMsgChan) > 0 || t.ephemeral || m.deferred != 0 { select { case t.memoryMsgChan <- m: return nil default: break // write to backend } } err := writeMessageToBackend(m, t.backend) t.nsqd.SetHealth(err) if err != nil { t.nsqd.logf(LOG_ERROR, "TOPIC(%s) ERROR: failed to write message to backend - %s", t.name, err) return err } return nil } func (t *Topic) Depth() int64 { return int64(len(t.memoryMsgChan)) + t.backend.Depth() } // messagePump selects over the in-memory and backend queue and // writes messages to every channel for this topic func (t *Topic) messagePump() { var msg *Message var buf []byte var err error var chans []*Channel var memoryMsgChan chan *Message var backendChan <-chan []byte // do not pass messages before Start(), but avoid blocking Pause() or GetChannel() for { select { case <-t.channelUpdateChan: continue case <-t.pauseChan: continue case <-t.exitChan: goto exit case <-t.startChan: } break } t.RLock() for _, c := range t.channelMap { chans = append(chans, c) } t.RUnlock() if len(chans) > 0 && !t.IsPaused() { memoryMsgChan = t.memoryMsgChan backendChan = t.backend.ReadChan() } // main message loop for { select { case msg = <-memoryMsgChan: case buf = <-backendChan: msg, err = decodeMessage(buf) if err != nil { t.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err) continue } case <-t.channelUpdateChan: chans = chans[:0] t.RLock() for _, c := range t.channelMap { chans = append(chans, c) } t.RUnlock() if len(chans) == 0 || t.IsPaused() { memoryMsgChan = nil backendChan = nil } else { memoryMsgChan = t.memoryMsgChan backendChan = t.backend.ReadChan() } continue case <-t.pauseChan: if len(chans) == 0 || t.IsPaused() { memoryMsgChan = nil backendChan = nil } else { memoryMsgChan = t.memoryMsgChan backendChan = t.backend.ReadChan() } continue case <-t.exitChan: goto exit } for i, channel := range chans { chanMsg := msg // copy the message because each channel // needs a unique instance but... // fastpath to avoid copy if its the first channel // (the topic already created the first copy) if i > 0 { chanMsg = NewMessage(msg.ID, msg.Body) chanMsg.Timestamp = msg.Timestamp chanMsg.deferred = msg.deferred } if chanMsg.deferred != 0 { channel.PutMessageDeferred(chanMsg, chanMsg.deferred) continue } err := channel.PutMessage(chanMsg) if err != nil { t.nsqd.logf(LOG_ERROR, "TOPIC(%s) ERROR: failed to put msg(%s) to channel(%s) - %s", t.name, msg.ID, channel.name, err) } } } exit: t.nsqd.logf(LOG_INFO, "TOPIC(%s): closing ... messagePump", t.name) } // Delete empties the topic and all its channels and closes func (t *Topic) Delete() error { return t.exit(true) } // Close persists all outstanding topic data and closes all its channels func (t *Topic) Close() error { return t.exit(false) } func (t *Topic) exit(deleted bool) error { if !atomic.CompareAndSwapInt32(&t.exitFlag, 0, 1) { return errors.New("exiting") } if deleted { t.nsqd.logf(LOG_INFO, "TOPIC(%s): deleting", t.name) // since we are explicitly deleting a topic (not just at system exit time) // de-register this from the lookupd t.nsqd.Notify(t, !t.ephemeral) } else { t.nsqd.logf(LOG_INFO, "TOPIC(%s): closing", t.name) } close(t.exitChan) // synchronize the close of messagePump() t.waitGroup.Wait() if deleted { t.Lock() for _, channel := range t.channelMap { delete(t.channelMap, channel.name) channel.Delete() } t.Unlock() // empty the queue (deletes the backend files, too) t.Empty() return t.backend.Delete() } // close all the channels t.RLock() for _, channel := range t.channelMap { err := channel.Close() if err != nil { // we need to continue regardless of error to close all the channels t.nsqd.logf(LOG_ERROR, "channel(%s) close - %s", channel.name, err) } } t.RUnlock() // write anything leftover to disk t.flush() return t.backend.Close() } func (t *Topic) Empty() error { for { select { case <-t.memoryMsgChan: default: goto finish } } finish: return t.backend.Empty() } func (t *Topic) flush() error { if len(t.memoryMsgChan) > 0 { t.nsqd.logf(LOG_INFO, "TOPIC(%s): flushing %d memory messages to backend", t.name, len(t.memoryMsgChan)) } for { select { case msg := <-t.memoryMsgChan: err := writeMessageToBackend(msg, t.backend) if err != nil { t.nsqd.logf(LOG_ERROR, "ERROR: failed to write message to backend - %s", err) } default: goto finish } } finish: return nil } func (t *Topic) AggregateChannelE2eProcessingLatency() *quantile.Quantile { var latencyStream *quantile.Quantile t.RLock() realChannels := make([]*Channel, 0, len(t.channelMap)) for _, c := range t.channelMap { realChannels = append(realChannels, c) } t.RUnlock() for _, c := range realChannels { if c.e2eProcessingLatencyStream == nil { continue } if latencyStream == nil { latencyStream = quantile.New( t.nsqd.getOpts().E2EProcessingLatencyWindowTime, t.nsqd.getOpts().E2EProcessingLatencyPercentiles) } latencyStream.Merge(c.e2eProcessingLatencyStream) } return latencyStream } func (t *Topic) Pause() error { return t.doPause(true) } func (t *Topic) UnPause() error { return t.doPause(false) } func (t *Topic) doPause(pause bool) error { if pause { atomic.StoreInt32(&t.paused, 1) } else { atomic.StoreInt32(&t.paused, 0) } select { case t.pauseChan <- 1: case <-t.exitChan: } return nil } func (t *Topic) IsPaused() bool { return atomic.LoadInt32(&t.paused) == 1 } func (t *Topic) GenerateID() MessageID { var i int64 = 0 for { id, err := t.idFactory.NewGUID() if err == nil { return id.Hex() } if i%10000 == 0 { t.nsqd.logf(LOG_ERROR, "TOPIC(%s): failed to create guid - %s", t.name, err) } time.Sleep(time.Millisecond) i++ } }