123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529 |
- package nsqd
- import (
- "errors"
- "sort"
- "strings"
- "sync"
- "sync/atomic"
- "time"
- "github.com/nsqio/go-diskqueue"
- "github.com/nsqio/nsq/internal/lg"
- "github.com/nsqio/nsq/internal/quantile"
- "github.com/nsqio/nsq/internal/util"
- )
- type Topic struct {
- // 64bit atomic vars need to be first for proper alignment on 32bit platforms
- messageCount uint64
- messageBytes uint64
- sync.RWMutex
- name string
- channelMap map[string]*Channel
- backend BackendQueue
- memoryMsgChan chan *Message
- startChan chan int
- exitChan chan int
- channelUpdateChan chan int
- waitGroup util.WaitGroupWrapper
- exitFlag int32
- idFactory *guidFactory
- ephemeral bool
- deleteCallback func(*Topic)
- deleter sync.Once
- paused int32
- pauseChan chan int
- nsqd *NSQD
- }
- // Topic constructor
- func NewTopic(topicName string, nsqd *NSQD, deleteCallback func(*Topic)) *Topic {
- t := &Topic{
- name: topicName,
- channelMap: make(map[string]*Channel),
- memoryMsgChan: make(chan *Message, nsqd.getOpts().MemQueueSize),
- startChan: make(chan int, 1),
- exitChan: make(chan int),
- channelUpdateChan: make(chan int),
- nsqd: nsqd,
- paused: 0,
- pauseChan: make(chan int),
- deleteCallback: deleteCallback,
- idFactory: NewGUIDFactory(nsqd.getOpts().ID),
- }
- if strings.HasSuffix(topicName, "#ephemeral") {
- t.ephemeral = true
- t.backend = newDummyBackendQueue()
- } else {
- dqLogf := func(level diskqueue.LogLevel, f string, args ...interface{}) {
- opts := nsqd.getOpts()
- lg.Logf(opts.Logger, opts.LogLevel, lg.LogLevel(level), f, args...)
- }
- t.backend = diskqueue.New(
- topicName,
- nsqd.getOpts().DataPath,
- nsqd.getOpts().MaxBytesPerFile,
- int32(minValidMsgLength),
- int32(nsqd.getOpts().MaxMsgSize)+minValidMsgLength,
- nsqd.getOpts().SyncEvery,
- nsqd.getOpts().SyncTimeout,
- dqLogf,
- )
- }
- t.waitGroup.Wrap(t.messagePump)
- t.nsqd.Notify(t, !t.ephemeral)
- return t
- }
- func (t *Topic) Start() {
- select {
- case t.startChan <- 1:
- default:
- }
- }
- // Exiting returns a boolean indicating if this topic is closed/exiting
- func (t *Topic) Exiting() bool {
- return atomic.LoadInt32(&t.exitFlag) == 1
- }
- // GetChannel performs a thread safe operation
- // to return a pointer to a Channel object (potentially new)
- // for the given Topic
- func (t *Topic) GetChannel(channelName string) *Channel {
- t.Lock()
- channel, isNew := t.getOrCreateChannel(channelName)
- t.Unlock()
- if isNew {
- // update messagePump state
- select {
- case t.channelUpdateChan <- 1:
- case <-t.exitChan:
- }
- }
- return channel
- }
- // this expects the caller to handle locking
- func (t *Topic) getOrCreateChannel(channelName string) (*Channel, bool) {
- channel, ok := t.channelMap[channelName]
- if !ok {
- deleteCallback := func(c *Channel) {
- t.DeleteExistingChannel(c.name)
- }
- channel = NewChannel(t.name, channelName, t.nsqd, deleteCallback)
- t.channelMap[channelName] = channel
- t.nsqd.logf(LOG_INFO, "TOPIC(%s): new channel(%s)", t.name, channel.name)
- return channel, true
- }
- return channel, false
- }
- func (t *Topic) GetExistingChannel(channelName string) (*Channel, error) {
- t.RLock()
- defer t.RUnlock()
- channel, ok := t.channelMap[channelName]
- if !ok {
- return nil, errors.New("channel does not exist")
- }
- return channel, nil
- }
- // DeleteExistingChannel removes a channel from the topic only if it exists
- func (t *Topic) DeleteExistingChannel(channelName string) error {
- t.RLock()
- channel, ok := t.channelMap[channelName]
- t.RUnlock()
- if !ok {
- return errors.New("channel does not exist")
- }
- t.nsqd.logf(LOG_INFO, "TOPIC(%s): deleting channel %s", t.name, channel.name)
- // delete empties the channel before closing
- // (so that we dont leave any messages around)
- //
- // we do this before removing the channel from map below (with no lock)
- // so that any incoming subs will error and not create a new channel
- // to enforce ordering
- channel.Delete()
- t.Lock()
- delete(t.channelMap, channelName)
- numChannels := len(t.channelMap)
- t.Unlock()
- // update messagePump state
- select {
- case t.channelUpdateChan <- 1:
- case <-t.exitChan:
- }
- if numChannels == 0 && t.ephemeral == true {
- go t.deleter.Do(func() { t.deleteCallback(t) })
- }
- return nil
- }
- // PutMessage writes a Message to the queue
- func (t *Topic) PutMessage(m *Message) error {
- t.RLock()
- defer t.RUnlock()
- if atomic.LoadInt32(&t.exitFlag) == 1 {
- return errors.New("exiting")
- }
- err := t.put(m)
- if err != nil {
- return err
- }
- atomic.AddUint64(&t.messageCount, 1)
- atomic.AddUint64(&t.messageBytes, uint64(len(m.Body)))
- return nil
- }
- // PutMessages writes multiple Messages to the queue
- func (t *Topic) PutMessages(msgs []*Message) error {
- // t.nsqd.logf(LOG_INFO, "receive mpub message - %s", msgs)
- if !t.nsqd.getOpts().FifoEnabled {
- if t.nsqd.getOpts().QosEnabled {
- t.nsqd.logf(LOG_INFO, "QosEnabled")
- // 从小到大排序(稳定排序)
- sort.SliceStable(msgs, func(i, j int) bool {
- p1, _ := msgs[i].getPriority()
- p2, _ := msgs[j].getPriority()
- return p1 < p2
- })
- } else {
- t.nsqd.logf(LOG_INFO, "RtEnabled")
- // 从小到大排序(稳定排序)
- sort.SliceStable(msgs, func(i, j int) bool {
- p1, _ := msgs[i].getLatency()
- p2, _ := msgs[j].getLatency()
- t.nsqd.logf(LOG_INFO, "msg:%s(latency:%d) msg:%s(latency:%d)", string(msgs[i].Body), p1, string(msgs[j].Body), p2)
- return p1 < p2
- })
- }
- }
- t.RLock()
- defer t.RUnlock()
- if atomic.LoadInt32(&t.exitFlag) == 1 {
- return errors.New("exiting")
- }
- messageTotalBytes := 0
- for i, m := range msgs {
- t.nsqd.logf(LOG_INFO, "put %d message - %s", i, (string)(m.Body))
- err := t.put(m)
- if err != nil {
- atomic.AddUint64(&t.messageCount, uint64(i))
- atomic.AddUint64(&t.messageBytes, uint64(messageTotalBytes))
- return err
- }
- messageTotalBytes += len(m.Body)
- }
- atomic.AddUint64(&t.messageBytes, uint64(messageTotalBytes))
- atomic.AddUint64(&t.messageCount, uint64(len(msgs)))
- return nil
- }
- func (t *Topic) put(m *Message) error {
- // If mem-queue-size == 0, avoid memory chan, for more consistent ordering,
- // but try to use memory chan for deferred messages (they lose deferred timer
- // in backend queue) or if topic is ephemeral (there is no backend queue).
- if cap(t.memoryMsgChan) > 0 || t.ephemeral || m.deferred != 0 {
- select {
- case t.memoryMsgChan <- m:
- return nil
- default:
- break // write to backend
- }
- }
- err := writeMessageToBackend(m, t.backend)
- t.nsqd.SetHealth(err)
- if err != nil {
- t.nsqd.logf(LOG_ERROR,
- "TOPIC(%s) ERROR: failed to write message to backend - %s",
- t.name, err)
- return err
- }
- return nil
- }
- func (t *Topic) Depth() int64 {
- return int64(len(t.memoryMsgChan)) + t.backend.Depth()
- }
- // messagePump selects over the in-memory and backend queue and
- // writes messages to every channel for this topic
- func (t *Topic) messagePump() {
- var msg *Message
- var buf []byte
- var err error
- var chans []*Channel
- var memoryMsgChan chan *Message
- var backendChan <-chan []byte
- // do not pass messages before Start(), but avoid blocking Pause() or GetChannel()
- for {
- select {
- case <-t.channelUpdateChan:
- continue
- case <-t.pauseChan:
- continue
- case <-t.exitChan:
- goto exit
- case <-t.startChan:
- }
- break
- }
- t.RLock()
- for _, c := range t.channelMap {
- chans = append(chans, c)
- }
- t.RUnlock()
- if len(chans) > 0 && !t.IsPaused() {
- memoryMsgChan = t.memoryMsgChan
- backendChan = t.backend.ReadChan()
- }
- // main message loop
- for {
- select {
- case msg = <-memoryMsgChan:
- case buf = <-backendChan:
- msg, err = decodeMessage(buf)
- if err != nil {
- t.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err)
- continue
- }
- case <-t.channelUpdateChan:
- chans = chans[:0]
- t.RLock()
- for _, c := range t.channelMap {
- chans = append(chans, c)
- }
- t.RUnlock()
- if len(chans) == 0 || t.IsPaused() {
- memoryMsgChan = nil
- backendChan = nil
- } else {
- memoryMsgChan = t.memoryMsgChan
- backendChan = t.backend.ReadChan()
- }
- continue
- case <-t.pauseChan:
- if len(chans) == 0 || t.IsPaused() {
- memoryMsgChan = nil
- backendChan = nil
- } else {
- memoryMsgChan = t.memoryMsgChan
- backendChan = t.backend.ReadChan()
- }
- continue
- case <-t.exitChan:
- goto exit
- }
- for i, channel := range chans {
- chanMsg := msg
- // copy the message because each channel
- // needs a unique instance but...
- // fastpath to avoid copy if its the first channel
- // (the topic already created the first copy)
- if i > 0 {
- chanMsg = NewMessage(msg.ID, msg.Body)
- chanMsg.Timestamp = msg.Timestamp
- chanMsg.deferred = msg.deferred
- }
- if chanMsg.deferred != 0 {
- channel.PutMessageDeferred(chanMsg, chanMsg.deferred)
- continue
- }
- err := channel.PutMessage(chanMsg)
- if err != nil {
- t.nsqd.logf(LOG_ERROR,
- "TOPIC(%s) ERROR: failed to put msg(%s) to channel(%s) - %s",
- t.name, msg.ID, channel.name, err)
- }
- }
- }
- exit:
- t.nsqd.logf(LOG_INFO, "TOPIC(%s): closing ... messagePump", t.name)
- }
- // Delete empties the topic and all its channels and closes
- func (t *Topic) Delete() error {
- return t.exit(true)
- }
- // Close persists all outstanding topic data and closes all its channels
- func (t *Topic) Close() error {
- return t.exit(false)
- }
- func (t *Topic) exit(deleted bool) error {
- if !atomic.CompareAndSwapInt32(&t.exitFlag, 0, 1) {
- return errors.New("exiting")
- }
- if deleted {
- t.nsqd.logf(LOG_INFO, "TOPIC(%s): deleting", t.name)
- // since we are explicitly deleting a topic (not just at system exit time)
- // de-register this from the lookupd
- t.nsqd.Notify(t, !t.ephemeral)
- } else {
- t.nsqd.logf(LOG_INFO, "TOPIC(%s): closing", t.name)
- }
- close(t.exitChan)
- // synchronize the close of messagePump()
- t.waitGroup.Wait()
- if deleted {
- t.Lock()
- for _, channel := range t.channelMap {
- delete(t.channelMap, channel.name)
- channel.Delete()
- }
- t.Unlock()
- // empty the queue (deletes the backend files, too)
- t.Empty()
- return t.backend.Delete()
- }
- // close all the channels
- t.RLock()
- for _, channel := range t.channelMap {
- err := channel.Close()
- if err != nil {
- // we need to continue regardless of error to close all the channels
- t.nsqd.logf(LOG_ERROR, "channel(%s) close - %s", channel.name, err)
- }
- }
- t.RUnlock()
- // write anything leftover to disk
- t.flush()
- return t.backend.Close()
- }
- func (t *Topic) Empty() error {
- for {
- select {
- case <-t.memoryMsgChan:
- default:
- goto finish
- }
- }
- finish:
- return t.backend.Empty()
- }
- func (t *Topic) flush() error {
- if len(t.memoryMsgChan) > 0 {
- t.nsqd.logf(LOG_INFO,
- "TOPIC(%s): flushing %d memory messages to backend",
- t.name, len(t.memoryMsgChan))
- }
- for {
- select {
- case msg := <-t.memoryMsgChan:
- err := writeMessageToBackend(msg, t.backend)
- if err != nil {
- t.nsqd.logf(LOG_ERROR,
- "ERROR: failed to write message to backend - %s", err)
- }
- default:
- goto finish
- }
- }
- finish:
- return nil
- }
- func (t *Topic) AggregateChannelE2eProcessingLatency() *quantile.Quantile {
- var latencyStream *quantile.Quantile
- t.RLock()
- realChannels := make([]*Channel, 0, len(t.channelMap))
- for _, c := range t.channelMap {
- realChannels = append(realChannels, c)
- }
- t.RUnlock()
- for _, c := range realChannels {
- if c.e2eProcessingLatencyStream == nil {
- continue
- }
- if latencyStream == nil {
- latencyStream = quantile.New(
- t.nsqd.getOpts().E2EProcessingLatencyWindowTime,
- t.nsqd.getOpts().E2EProcessingLatencyPercentiles)
- }
- latencyStream.Merge(c.e2eProcessingLatencyStream)
- }
- return latencyStream
- }
- func (t *Topic) Pause() error {
- return t.doPause(true)
- }
- func (t *Topic) UnPause() error {
- return t.doPause(false)
- }
- func (t *Topic) doPause(pause bool) error {
- if pause {
- atomic.StoreInt32(&t.paused, 1)
- } else {
- atomic.StoreInt32(&t.paused, 0)
- }
- select {
- case t.pauseChan <- 1:
- case <-t.exitChan:
- }
- return nil
- }
- func (t *Topic) IsPaused() bool {
- return atomic.LoadInt32(&t.paused) == 1
- }
- func (t *Topic) GenerateID() MessageID {
- var i int64 = 0
- for {
- id, err := t.idFactory.NewGUID()
- if err == nil {
- return id.Hex()
- }
- if i%10000 == 0 {
- t.nsqd.logf(LOG_ERROR, "TOPIC(%s): failed to create guid - %s", t.name, err)
- }
- time.Sleep(time.Millisecond)
- i++
- }
- }
|