topic.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529
  1. package nsqd
  2. import (
  3. "errors"
  4. "sort"
  5. "strings"
  6. "sync"
  7. "sync/atomic"
  8. "time"
  9. "github.com/nsqio/go-diskqueue"
  10. "github.com/nsqio/nsq/internal/lg"
  11. "github.com/nsqio/nsq/internal/quantile"
  12. "github.com/nsqio/nsq/internal/util"
  13. )
  14. type Topic struct {
  15. // 64bit atomic vars need to be first for proper alignment on 32bit platforms
  16. messageCount uint64
  17. messageBytes uint64
  18. sync.RWMutex
  19. name string
  20. channelMap map[string]*Channel
  21. backend BackendQueue
  22. memoryMsgChan chan *Message
  23. startChan chan int
  24. exitChan chan int
  25. channelUpdateChan chan int
  26. waitGroup util.WaitGroupWrapper
  27. exitFlag int32
  28. idFactory *guidFactory
  29. ephemeral bool
  30. deleteCallback func(*Topic)
  31. deleter sync.Once
  32. paused int32
  33. pauseChan chan int
  34. nsqd *NSQD
  35. }
  36. // Topic constructor
  37. func NewTopic(topicName string, nsqd *NSQD, deleteCallback func(*Topic)) *Topic {
  38. t := &Topic{
  39. name: topicName,
  40. channelMap: make(map[string]*Channel),
  41. memoryMsgChan: make(chan *Message, nsqd.getOpts().MemQueueSize),
  42. startChan: make(chan int, 1),
  43. exitChan: make(chan int),
  44. channelUpdateChan: make(chan int),
  45. nsqd: nsqd,
  46. paused: 0,
  47. pauseChan: make(chan int),
  48. deleteCallback: deleteCallback,
  49. idFactory: NewGUIDFactory(nsqd.getOpts().ID),
  50. }
  51. if strings.HasSuffix(topicName, "#ephemeral") {
  52. t.ephemeral = true
  53. t.backend = newDummyBackendQueue()
  54. } else {
  55. dqLogf := func(level diskqueue.LogLevel, f string, args ...interface{}) {
  56. opts := nsqd.getOpts()
  57. lg.Logf(opts.Logger, opts.LogLevel, lg.LogLevel(level), f, args...)
  58. }
  59. t.backend = diskqueue.New(
  60. topicName,
  61. nsqd.getOpts().DataPath,
  62. nsqd.getOpts().MaxBytesPerFile,
  63. int32(minValidMsgLength),
  64. int32(nsqd.getOpts().MaxMsgSize)+minValidMsgLength,
  65. nsqd.getOpts().SyncEvery,
  66. nsqd.getOpts().SyncTimeout,
  67. dqLogf,
  68. )
  69. }
  70. t.waitGroup.Wrap(t.messagePump)
  71. t.nsqd.Notify(t, !t.ephemeral)
  72. return t
  73. }
  74. func (t *Topic) Start() {
  75. select {
  76. case t.startChan <- 1:
  77. default:
  78. }
  79. }
  80. // Exiting returns a boolean indicating if this topic is closed/exiting
  81. func (t *Topic) Exiting() bool {
  82. return atomic.LoadInt32(&t.exitFlag) == 1
  83. }
  84. // GetChannel performs a thread safe operation
  85. // to return a pointer to a Channel object (potentially new)
  86. // for the given Topic
  87. func (t *Topic) GetChannel(channelName string) *Channel {
  88. t.Lock()
  89. channel, isNew := t.getOrCreateChannel(channelName)
  90. t.Unlock()
  91. if isNew {
  92. // update messagePump state
  93. select {
  94. case t.channelUpdateChan <- 1:
  95. case <-t.exitChan:
  96. }
  97. }
  98. return channel
  99. }
  100. // this expects the caller to handle locking
  101. func (t *Topic) getOrCreateChannel(channelName string) (*Channel, bool) {
  102. channel, ok := t.channelMap[channelName]
  103. if !ok {
  104. deleteCallback := func(c *Channel) {
  105. t.DeleteExistingChannel(c.name)
  106. }
  107. channel = NewChannel(t.name, channelName, t.nsqd, deleteCallback)
  108. t.channelMap[channelName] = channel
  109. t.nsqd.logf(LOG_INFO, "TOPIC(%s): new channel(%s)", t.name, channel.name)
  110. return channel, true
  111. }
  112. return channel, false
  113. }
  114. func (t *Topic) GetExistingChannel(channelName string) (*Channel, error) {
  115. t.RLock()
  116. defer t.RUnlock()
  117. channel, ok := t.channelMap[channelName]
  118. if !ok {
  119. return nil, errors.New("channel does not exist")
  120. }
  121. return channel, nil
  122. }
  123. // DeleteExistingChannel removes a channel from the topic only if it exists
  124. func (t *Topic) DeleteExistingChannel(channelName string) error {
  125. t.RLock()
  126. channel, ok := t.channelMap[channelName]
  127. t.RUnlock()
  128. if !ok {
  129. return errors.New("channel does not exist")
  130. }
  131. t.nsqd.logf(LOG_INFO, "TOPIC(%s): deleting channel %s", t.name, channel.name)
  132. // delete empties the channel before closing
  133. // (so that we dont leave any messages around)
  134. //
  135. // we do this before removing the channel from map below (with no lock)
  136. // so that any incoming subs will error and not create a new channel
  137. // to enforce ordering
  138. channel.Delete()
  139. t.Lock()
  140. delete(t.channelMap, channelName)
  141. numChannels := len(t.channelMap)
  142. t.Unlock()
  143. // update messagePump state
  144. select {
  145. case t.channelUpdateChan <- 1:
  146. case <-t.exitChan:
  147. }
  148. if numChannels == 0 && t.ephemeral == true {
  149. go t.deleter.Do(func() { t.deleteCallback(t) })
  150. }
  151. return nil
  152. }
  153. // PutMessage writes a Message to the queue
  154. func (t *Topic) PutMessage(m *Message) error {
  155. t.RLock()
  156. defer t.RUnlock()
  157. if atomic.LoadInt32(&t.exitFlag) == 1 {
  158. return errors.New("exiting")
  159. }
  160. err := t.put(m)
  161. if err != nil {
  162. return err
  163. }
  164. atomic.AddUint64(&t.messageCount, 1)
  165. atomic.AddUint64(&t.messageBytes, uint64(len(m.Body)))
  166. return nil
  167. }
  168. // PutMessages writes multiple Messages to the queue
  169. func (t *Topic) PutMessages(msgs []*Message) error {
  170. // t.nsqd.logf(LOG_INFO, "receive mpub message - %s", msgs)
  171. if !t.nsqd.getOpts().FifoEnabled {
  172. if t.nsqd.getOpts().QosEnabled {
  173. t.nsqd.logf(LOG_INFO, "QosEnabled")
  174. // 从小到大排序(稳定排序)
  175. sort.SliceStable(msgs, func(i, j int) bool {
  176. p1, _ := msgs[i].getPriority()
  177. p2, _ := msgs[j].getPriority()
  178. return p1 < p2
  179. })
  180. } else {
  181. t.nsqd.logf(LOG_INFO, "RtEnabled")
  182. // 从小到大排序(稳定排序)
  183. sort.SliceStable(msgs, func(i, j int) bool {
  184. p1, _ := msgs[i].getLatency()
  185. p2, _ := msgs[j].getLatency()
  186. t.nsqd.logf(LOG_INFO, "msg:%s(latency:%d) msg:%s(latency:%d)", string(msgs[i].Body), p1, string(msgs[j].Body), p2)
  187. return p1 < p2
  188. })
  189. }
  190. }
  191. t.RLock()
  192. defer t.RUnlock()
  193. if atomic.LoadInt32(&t.exitFlag) == 1 {
  194. return errors.New("exiting")
  195. }
  196. messageTotalBytes := 0
  197. for i, m := range msgs {
  198. t.nsqd.logf(LOG_INFO, "put %d message - %s", i, (string)(m.Body))
  199. err := t.put(m)
  200. if err != nil {
  201. atomic.AddUint64(&t.messageCount, uint64(i))
  202. atomic.AddUint64(&t.messageBytes, uint64(messageTotalBytes))
  203. return err
  204. }
  205. messageTotalBytes += len(m.Body)
  206. }
  207. atomic.AddUint64(&t.messageBytes, uint64(messageTotalBytes))
  208. atomic.AddUint64(&t.messageCount, uint64(len(msgs)))
  209. return nil
  210. }
  211. func (t *Topic) put(m *Message) error {
  212. // If mem-queue-size == 0, avoid memory chan, for more consistent ordering,
  213. // but try to use memory chan for deferred messages (they lose deferred timer
  214. // in backend queue) or if topic is ephemeral (there is no backend queue).
  215. if cap(t.memoryMsgChan) > 0 || t.ephemeral || m.deferred != 0 {
  216. select {
  217. case t.memoryMsgChan <- m:
  218. return nil
  219. default:
  220. break // write to backend
  221. }
  222. }
  223. err := writeMessageToBackend(m, t.backend)
  224. t.nsqd.SetHealth(err)
  225. if err != nil {
  226. t.nsqd.logf(LOG_ERROR,
  227. "TOPIC(%s) ERROR: failed to write message to backend - %s",
  228. t.name, err)
  229. return err
  230. }
  231. return nil
  232. }
  233. func (t *Topic) Depth() int64 {
  234. return int64(len(t.memoryMsgChan)) + t.backend.Depth()
  235. }
  236. // messagePump selects over the in-memory and backend queue and
  237. // writes messages to every channel for this topic
  238. func (t *Topic) messagePump() {
  239. var msg *Message
  240. var buf []byte
  241. var err error
  242. var chans []*Channel
  243. var memoryMsgChan chan *Message
  244. var backendChan <-chan []byte
  245. // do not pass messages before Start(), but avoid blocking Pause() or GetChannel()
  246. for {
  247. select {
  248. case <-t.channelUpdateChan:
  249. continue
  250. case <-t.pauseChan:
  251. continue
  252. case <-t.exitChan:
  253. goto exit
  254. case <-t.startChan:
  255. }
  256. break
  257. }
  258. t.RLock()
  259. for _, c := range t.channelMap {
  260. chans = append(chans, c)
  261. }
  262. t.RUnlock()
  263. if len(chans) > 0 && !t.IsPaused() {
  264. memoryMsgChan = t.memoryMsgChan
  265. backendChan = t.backend.ReadChan()
  266. }
  267. // main message loop
  268. for {
  269. select {
  270. case msg = <-memoryMsgChan:
  271. case buf = <-backendChan:
  272. msg, err = decodeMessage(buf)
  273. if err != nil {
  274. t.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err)
  275. continue
  276. }
  277. case <-t.channelUpdateChan:
  278. chans = chans[:0]
  279. t.RLock()
  280. for _, c := range t.channelMap {
  281. chans = append(chans, c)
  282. }
  283. t.RUnlock()
  284. if len(chans) == 0 || t.IsPaused() {
  285. memoryMsgChan = nil
  286. backendChan = nil
  287. } else {
  288. memoryMsgChan = t.memoryMsgChan
  289. backendChan = t.backend.ReadChan()
  290. }
  291. continue
  292. case <-t.pauseChan:
  293. if len(chans) == 0 || t.IsPaused() {
  294. memoryMsgChan = nil
  295. backendChan = nil
  296. } else {
  297. memoryMsgChan = t.memoryMsgChan
  298. backendChan = t.backend.ReadChan()
  299. }
  300. continue
  301. case <-t.exitChan:
  302. goto exit
  303. }
  304. for i, channel := range chans {
  305. chanMsg := msg
  306. // copy the message because each channel
  307. // needs a unique instance but...
  308. // fastpath to avoid copy if its the first channel
  309. // (the topic already created the first copy)
  310. if i > 0 {
  311. chanMsg = NewMessage(msg.ID, msg.Body)
  312. chanMsg.Timestamp = msg.Timestamp
  313. chanMsg.deferred = msg.deferred
  314. }
  315. if chanMsg.deferred != 0 {
  316. channel.PutMessageDeferred(chanMsg, chanMsg.deferred)
  317. continue
  318. }
  319. err := channel.PutMessage(chanMsg)
  320. if err != nil {
  321. t.nsqd.logf(LOG_ERROR,
  322. "TOPIC(%s) ERROR: failed to put msg(%s) to channel(%s) - %s",
  323. t.name, msg.ID, channel.name, err)
  324. }
  325. }
  326. }
  327. exit:
  328. t.nsqd.logf(LOG_INFO, "TOPIC(%s): closing ... messagePump", t.name)
  329. }
  330. // Delete empties the topic and all its channels and closes
  331. func (t *Topic) Delete() error {
  332. return t.exit(true)
  333. }
  334. // Close persists all outstanding topic data and closes all its channels
  335. func (t *Topic) Close() error {
  336. return t.exit(false)
  337. }
  338. func (t *Topic) exit(deleted bool) error {
  339. if !atomic.CompareAndSwapInt32(&t.exitFlag, 0, 1) {
  340. return errors.New("exiting")
  341. }
  342. if deleted {
  343. t.nsqd.logf(LOG_INFO, "TOPIC(%s): deleting", t.name)
  344. // since we are explicitly deleting a topic (not just at system exit time)
  345. // de-register this from the lookupd
  346. t.nsqd.Notify(t, !t.ephemeral)
  347. } else {
  348. t.nsqd.logf(LOG_INFO, "TOPIC(%s): closing", t.name)
  349. }
  350. close(t.exitChan)
  351. // synchronize the close of messagePump()
  352. t.waitGroup.Wait()
  353. if deleted {
  354. t.Lock()
  355. for _, channel := range t.channelMap {
  356. delete(t.channelMap, channel.name)
  357. channel.Delete()
  358. }
  359. t.Unlock()
  360. // empty the queue (deletes the backend files, too)
  361. t.Empty()
  362. return t.backend.Delete()
  363. }
  364. // close all the channels
  365. t.RLock()
  366. for _, channel := range t.channelMap {
  367. err := channel.Close()
  368. if err != nil {
  369. // we need to continue regardless of error to close all the channels
  370. t.nsqd.logf(LOG_ERROR, "channel(%s) close - %s", channel.name, err)
  371. }
  372. }
  373. t.RUnlock()
  374. // write anything leftover to disk
  375. t.flush()
  376. return t.backend.Close()
  377. }
  378. func (t *Topic) Empty() error {
  379. for {
  380. select {
  381. case <-t.memoryMsgChan:
  382. default:
  383. goto finish
  384. }
  385. }
  386. finish:
  387. return t.backend.Empty()
  388. }
  389. func (t *Topic) flush() error {
  390. if len(t.memoryMsgChan) > 0 {
  391. t.nsqd.logf(LOG_INFO,
  392. "TOPIC(%s): flushing %d memory messages to backend",
  393. t.name, len(t.memoryMsgChan))
  394. }
  395. for {
  396. select {
  397. case msg := <-t.memoryMsgChan:
  398. err := writeMessageToBackend(msg, t.backend)
  399. if err != nil {
  400. t.nsqd.logf(LOG_ERROR,
  401. "ERROR: failed to write message to backend - %s", err)
  402. }
  403. default:
  404. goto finish
  405. }
  406. }
  407. finish:
  408. return nil
  409. }
  410. func (t *Topic) AggregateChannelE2eProcessingLatency() *quantile.Quantile {
  411. var latencyStream *quantile.Quantile
  412. t.RLock()
  413. realChannels := make([]*Channel, 0, len(t.channelMap))
  414. for _, c := range t.channelMap {
  415. realChannels = append(realChannels, c)
  416. }
  417. t.RUnlock()
  418. for _, c := range realChannels {
  419. if c.e2eProcessingLatencyStream == nil {
  420. continue
  421. }
  422. if latencyStream == nil {
  423. latencyStream = quantile.New(
  424. t.nsqd.getOpts().E2EProcessingLatencyWindowTime,
  425. t.nsqd.getOpts().E2EProcessingLatencyPercentiles)
  426. }
  427. latencyStream.Merge(c.e2eProcessingLatencyStream)
  428. }
  429. return latencyStream
  430. }
  431. func (t *Topic) Pause() error {
  432. return t.doPause(true)
  433. }
  434. func (t *Topic) UnPause() error {
  435. return t.doPause(false)
  436. }
  437. func (t *Topic) doPause(pause bool) error {
  438. if pause {
  439. atomic.StoreInt32(&t.paused, 1)
  440. } else {
  441. atomic.StoreInt32(&t.paused, 0)
  442. }
  443. select {
  444. case t.pauseChan <- 1:
  445. case <-t.exitChan:
  446. }
  447. return nil
  448. }
  449. func (t *Topic) IsPaused() bool {
  450. return atomic.LoadInt32(&t.paused) == 1
  451. }
  452. func (t *Topic) GenerateID() MessageID {
  453. var i int64 = 0
  454. for {
  455. id, err := t.idFactory.NewGUID()
  456. if err == nil {
  457. return id.Hex()
  458. }
  459. if i%10000 == 0 {
  460. t.nsqd.logf(LOG_ERROR, "TOPIC(%s): failed to create guid - %s", t.name, err)
  461. }
  462. time.Sleep(time.Millisecond)
  463. i++
  464. }
  465. }