channel.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616
  1. package nsqd
  2. import (
  3. "container/heap"
  4. "errors"
  5. "fmt"
  6. "math"
  7. "strings"
  8. "sync"
  9. "sync/atomic"
  10. "time"
  11. "github.com/nsqio/go-diskqueue"
  12. "github.com/nsqio/nsq/internal/lg"
  13. "github.com/nsqio/nsq/internal/pqueue"
  14. "github.com/nsqio/nsq/internal/quantile"
  15. )
  16. type Consumer interface {
  17. UnPause()
  18. Pause()
  19. Close() error
  20. TimedOutMessage()
  21. Stats(string) ClientStats
  22. Empty()
  23. }
  24. // Channel represents the concrete type for a NSQ channel (and also
  25. // implements the Queue interface)
  26. //
  27. // There can be multiple channels per topic, each with there own unique set
  28. // of subscribers (clients).
  29. //
  30. // Channels maintain all client and message metadata, orchestrating in-flight
  31. // messages, timeouts, requeuing, etc.
  32. type Channel struct {
  33. // 64bit atomic vars need to be first for proper alignment on 32bit platforms
  34. requeueCount uint64
  35. messageCount uint64
  36. timeoutCount uint64
  37. sync.RWMutex
  38. topicName string
  39. name string
  40. nsqd *NSQD
  41. backend BackendQueue
  42. memoryMsgChan chan *Message
  43. exitFlag int32
  44. exitMutex sync.RWMutex
  45. // state tracking
  46. clients map[int64]Consumer
  47. paused int32
  48. ephemeral bool
  49. deleteCallback func(*Channel)
  50. deleter sync.Once
  51. // Stats tracking
  52. e2eProcessingLatencyStream *quantile.Quantile
  53. // TODO: these can be DRYd up
  54. deferredMessages map[MessageID]*pqueue.Item
  55. deferredPQ pqueue.PriorityQueue
  56. deferredMutex sync.Mutex
  57. inFlightMessages map[MessageID]*Message
  58. inFlightPQ inFlightPqueue
  59. inFlightMutex sync.Mutex
  60. }
  61. // NewChannel creates a new instance of the Channel type and returns a pointer
  62. func NewChannel(topicName string, channelName string, nsqd *NSQD,
  63. deleteCallback func(*Channel)) *Channel {
  64. c := &Channel{
  65. topicName: topicName,
  66. name: channelName,
  67. memoryMsgChan: nil,
  68. clients: make(map[int64]Consumer),
  69. deleteCallback: deleteCallback,
  70. nsqd: nsqd,
  71. ephemeral: strings.HasSuffix(channelName, "#ephemeral"),
  72. }
  73. // avoid mem-queue if size == 0 for more consistent ordering
  74. if nsqd.getOpts().MemQueueSize > 0 || c.ephemeral {
  75. c.memoryMsgChan = make(chan *Message, nsqd.getOpts().MemQueueSize)
  76. }
  77. if len(nsqd.getOpts().E2EProcessingLatencyPercentiles) > 0 {
  78. c.e2eProcessingLatencyStream = quantile.New(
  79. nsqd.getOpts().E2EProcessingLatencyWindowTime,
  80. nsqd.getOpts().E2EProcessingLatencyPercentiles,
  81. )
  82. }
  83. c.initPQ()
  84. if c.ephemeral {
  85. c.backend = newDummyBackendQueue()
  86. } else {
  87. dqLogf := func(level diskqueue.LogLevel, f string, args ...interface{}) {
  88. opts := nsqd.getOpts()
  89. lg.Logf(opts.Logger, opts.LogLevel, lg.LogLevel(level), f, args...)
  90. }
  91. // backend names, for uniqueness, automatically include the topic...
  92. backendName := getBackendName(topicName, channelName)
  93. c.backend = diskqueue.New(
  94. backendName,
  95. nsqd.getOpts().DataPath,
  96. nsqd.getOpts().MaxBytesPerFile,
  97. int32(minValidMsgLength),
  98. int32(nsqd.getOpts().MaxMsgSize)+minValidMsgLength,
  99. nsqd.getOpts().SyncEvery,
  100. nsqd.getOpts().SyncTimeout,
  101. dqLogf,
  102. )
  103. }
  104. c.nsqd.Notify(c, !c.ephemeral)
  105. return c
  106. }
  107. func (c *Channel) initPQ() {
  108. pqSize := int(math.Max(1, float64(c.nsqd.getOpts().MemQueueSize)/10))
  109. c.inFlightMutex.Lock()
  110. c.inFlightMessages = make(map[MessageID]*Message)
  111. c.inFlightPQ = newInFlightPqueue(pqSize)
  112. c.inFlightMutex.Unlock()
  113. c.deferredMutex.Lock()
  114. c.deferredMessages = make(map[MessageID]*pqueue.Item)
  115. c.deferredPQ = pqueue.New(pqSize)
  116. c.deferredMutex.Unlock()
  117. }
  118. // Exiting returns a boolean indicating if this channel is closed/exiting
  119. func (c *Channel) Exiting() bool {
  120. return atomic.LoadInt32(&c.exitFlag) == 1
  121. }
  122. // Delete empties the channel and closes
  123. func (c *Channel) Delete() error {
  124. return c.exit(true)
  125. }
  126. // Close cleanly closes the Channel
  127. func (c *Channel) Close() error {
  128. return c.exit(false)
  129. }
  130. func (c *Channel) exit(deleted bool) error {
  131. c.exitMutex.Lock()
  132. defer c.exitMutex.Unlock()
  133. if !atomic.CompareAndSwapInt32(&c.exitFlag, 0, 1) {
  134. return errors.New("exiting")
  135. }
  136. if deleted {
  137. c.nsqd.logf(LOG_INFO, "CHANNEL(%s): deleting", c.name)
  138. // since we are explicitly deleting a channel (not just at system exit time)
  139. // de-register this from the lookupd
  140. c.nsqd.Notify(c, !c.ephemeral)
  141. } else {
  142. c.nsqd.logf(LOG_INFO, "CHANNEL(%s): closing", c.name)
  143. }
  144. // this forceably closes client connections
  145. c.RLock()
  146. for _, client := range c.clients {
  147. client.Close()
  148. }
  149. c.RUnlock()
  150. if deleted {
  151. // empty the queue (deletes the backend files, too)
  152. c.Empty()
  153. return c.backend.Delete()
  154. }
  155. // write anything leftover to disk
  156. c.flush()
  157. return c.backend.Close()
  158. }
  159. func (c *Channel) Empty() error {
  160. c.Lock()
  161. defer c.Unlock()
  162. c.initPQ()
  163. for _, client := range c.clients {
  164. client.Empty()
  165. }
  166. for {
  167. select {
  168. case <-c.memoryMsgChan:
  169. default:
  170. goto finish
  171. }
  172. }
  173. finish:
  174. return c.backend.Empty()
  175. }
  176. // flush persists all the messages in internal memory buffers to the backend
  177. // it does not drain inflight/deferred because it is only called in Close()
  178. func (c *Channel) flush() error {
  179. if len(c.memoryMsgChan) > 0 || len(c.inFlightMessages) > 0 || len(c.deferredMessages) > 0 {
  180. c.nsqd.logf(LOG_INFO, "CHANNEL(%s): flushing %d memory %d in-flight %d deferred messages to backend",
  181. c.name, len(c.memoryMsgChan), len(c.inFlightMessages), len(c.deferredMessages))
  182. }
  183. for {
  184. select {
  185. case msg := <-c.memoryMsgChan:
  186. err := writeMessageToBackend(msg, c.backend)
  187. if err != nil {
  188. c.nsqd.logf(LOG_ERROR, "failed to write message to backend - %s", err)
  189. }
  190. default:
  191. goto finish
  192. }
  193. }
  194. finish:
  195. c.inFlightMutex.Lock()
  196. for _, msg := range c.inFlightMessages {
  197. err := writeMessageToBackend(msg, c.backend)
  198. if err != nil {
  199. c.nsqd.logf(LOG_ERROR, "failed to write message to backend - %s", err)
  200. }
  201. }
  202. c.inFlightMutex.Unlock()
  203. c.deferredMutex.Lock()
  204. for _, item := range c.deferredMessages {
  205. msg := item.Value.(*Message)
  206. err := writeMessageToBackend(msg, c.backend)
  207. if err != nil {
  208. c.nsqd.logf(LOG_ERROR, "failed to write message to backend - %s", err)
  209. }
  210. }
  211. c.deferredMutex.Unlock()
  212. return nil
  213. }
  214. func (c *Channel) Depth() int64 {
  215. return int64(len(c.memoryMsgChan)) + c.backend.Depth()
  216. }
  217. func (c *Channel) Pause() error {
  218. return c.doPause(true)
  219. }
  220. func (c *Channel) UnPause() error {
  221. return c.doPause(false)
  222. }
  223. func (c *Channel) doPause(pause bool) error {
  224. if pause {
  225. atomic.StoreInt32(&c.paused, 1)
  226. } else {
  227. atomic.StoreInt32(&c.paused, 0)
  228. }
  229. c.RLock()
  230. for _, client := range c.clients {
  231. if pause {
  232. client.Pause()
  233. } else {
  234. client.UnPause()
  235. }
  236. }
  237. c.RUnlock()
  238. return nil
  239. }
  240. func (c *Channel) IsPaused() bool {
  241. return atomic.LoadInt32(&c.paused) == 1
  242. }
  243. // PutMessage writes a Message to the queue
  244. func (c *Channel) PutMessage(m *Message) error {
  245. c.exitMutex.RLock()
  246. defer c.exitMutex.RUnlock()
  247. if c.Exiting() {
  248. return errors.New("exiting")
  249. }
  250. err := c.put(m)
  251. if err != nil {
  252. return err
  253. }
  254. atomic.AddUint64(&c.messageCount, 1)
  255. return nil
  256. }
  257. func (c *Channel) put(m *Message) error {
  258. select {
  259. case c.memoryMsgChan <- m:
  260. default:
  261. err := writeMessageToBackend(m, c.backend)
  262. c.nsqd.SetHealth(err)
  263. if err != nil {
  264. c.nsqd.logf(LOG_ERROR, "CHANNEL(%s): failed to write message to backend - %s",
  265. c.name, err)
  266. return err
  267. }
  268. }
  269. return nil
  270. }
  271. func (c *Channel) PutMessageDeferred(msg *Message, timeout time.Duration) {
  272. atomic.AddUint64(&c.messageCount, 1)
  273. c.StartDeferredTimeout(msg, timeout)
  274. }
  275. // TouchMessage resets the timeout for an in-flight message
  276. func (c *Channel) TouchMessage(clientID int64, id MessageID, clientMsgTimeout time.Duration) error {
  277. msg, err := c.popInFlightMessage(clientID, id)
  278. if err != nil {
  279. return err
  280. }
  281. c.removeFromInFlightPQ(msg)
  282. newTimeout := time.Now().Add(clientMsgTimeout)
  283. if newTimeout.Sub(msg.deliveryTS) >=
  284. c.nsqd.getOpts().MaxMsgTimeout {
  285. // we would have gone over, set to the max
  286. newTimeout = msg.deliveryTS.Add(c.nsqd.getOpts().MaxMsgTimeout)
  287. }
  288. msg.pri = newTimeout.UnixNano()
  289. err = c.pushInFlightMessage(msg)
  290. if err != nil {
  291. return err
  292. }
  293. c.addToInFlightPQ(msg)
  294. return nil
  295. }
  296. // FinishMessage successfully discards an in-flight message
  297. func (c *Channel) FinishMessage(clientID int64, id MessageID) error {
  298. msg, err := c.popInFlightMessage(clientID, id)
  299. if err != nil {
  300. return err
  301. }
  302. c.removeFromInFlightPQ(msg)
  303. if c.e2eProcessingLatencyStream != nil {
  304. c.e2eProcessingLatencyStream.Insert(msg.Timestamp)
  305. }
  306. return nil
  307. }
  308. // RequeueMessage requeues a message based on `time.Duration`, ie:
  309. //
  310. // `timeoutMs` == 0 - requeue a message immediately
  311. // `timeoutMs` > 0 - asynchronously wait for the specified timeout
  312. // and requeue a message (aka "deferred requeue")
  313. //
  314. func (c *Channel) RequeueMessage(clientID int64, id MessageID, timeout time.Duration) error {
  315. // remove from inflight first
  316. msg, err := c.popInFlightMessage(clientID, id)
  317. if err != nil {
  318. return err
  319. }
  320. c.removeFromInFlightPQ(msg)
  321. atomic.AddUint64(&c.requeueCount, 1)
  322. if timeout == 0 {
  323. c.exitMutex.RLock()
  324. if c.Exiting() {
  325. c.exitMutex.RUnlock()
  326. return errors.New("exiting")
  327. }
  328. err := c.put(msg)
  329. c.exitMutex.RUnlock()
  330. return err
  331. }
  332. // deferred requeue
  333. return c.StartDeferredTimeout(msg, timeout)
  334. }
  335. // AddClient adds a client to the Channel's client list
  336. func (c *Channel) AddClient(clientID int64, client Consumer) error {
  337. c.exitMutex.RLock()
  338. defer c.exitMutex.RUnlock()
  339. if c.Exiting() {
  340. return errors.New("exiting")
  341. }
  342. c.RLock()
  343. _, ok := c.clients[clientID]
  344. numClients := len(c.clients)
  345. c.RUnlock()
  346. if ok {
  347. return nil
  348. }
  349. maxChannelConsumers := c.nsqd.getOpts().MaxChannelConsumers
  350. if maxChannelConsumers != 0 && numClients >= maxChannelConsumers {
  351. return fmt.Errorf("consumers for %s:%s exceeds limit of %d",
  352. c.topicName, c.name, maxChannelConsumers)
  353. }
  354. c.Lock()
  355. c.clients[clientID] = client
  356. c.Unlock()
  357. return nil
  358. }
  359. // RemoveClient removes a client from the Channel's client list
  360. func (c *Channel) RemoveClient(clientID int64) {
  361. c.exitMutex.RLock()
  362. defer c.exitMutex.RUnlock()
  363. if c.Exiting() {
  364. return
  365. }
  366. c.RLock()
  367. _, ok := c.clients[clientID]
  368. c.RUnlock()
  369. if !ok {
  370. return
  371. }
  372. c.Lock()
  373. delete(c.clients, clientID)
  374. c.Unlock()
  375. if len(c.clients) == 0 && c.ephemeral == true {
  376. go c.deleter.Do(func() { c.deleteCallback(c) })
  377. }
  378. }
  379. func (c *Channel) StartInFlightTimeout(msg *Message, clientID int64, timeout time.Duration) error {
  380. now := time.Now()
  381. msg.clientID = clientID
  382. msg.deliveryTS = now
  383. msg.pri = now.Add(timeout).UnixNano()
  384. err := c.pushInFlightMessage(msg)
  385. if err != nil {
  386. return err
  387. }
  388. c.addToInFlightPQ(msg)
  389. return nil
  390. }
  391. func (c *Channel) StartDeferredTimeout(msg *Message, timeout time.Duration) error {
  392. absTs := time.Now().Add(timeout).UnixNano()
  393. item := &pqueue.Item{Value: msg, Priority: absTs}
  394. err := c.pushDeferredMessage(item)
  395. if err != nil {
  396. return err
  397. }
  398. c.addToDeferredPQ(item)
  399. return nil
  400. }
  401. // pushInFlightMessage atomically adds a message to the in-flight dictionary
  402. func (c *Channel) pushInFlightMessage(msg *Message) error {
  403. c.inFlightMutex.Lock()
  404. _, ok := c.inFlightMessages[msg.ID]
  405. if ok {
  406. c.inFlightMutex.Unlock()
  407. return errors.New("ID already in flight")
  408. }
  409. c.inFlightMessages[msg.ID] = msg
  410. c.inFlightMutex.Unlock()
  411. return nil
  412. }
  413. // popInFlightMessage atomically removes a message from the in-flight dictionary
  414. func (c *Channel) popInFlightMessage(clientID int64, id MessageID) (*Message, error) {
  415. c.inFlightMutex.Lock()
  416. msg, ok := c.inFlightMessages[id]
  417. if !ok {
  418. c.inFlightMutex.Unlock()
  419. return nil, errors.New("ID not in flight")
  420. }
  421. if msg.clientID != clientID {
  422. c.inFlightMutex.Unlock()
  423. return nil, errors.New("client does not own message")
  424. }
  425. delete(c.inFlightMessages, id)
  426. c.inFlightMutex.Unlock()
  427. return msg, nil
  428. }
  429. func (c *Channel) addToInFlightPQ(msg *Message) {
  430. c.inFlightMutex.Lock()
  431. c.inFlightPQ.Push(msg)
  432. c.inFlightMutex.Unlock()
  433. }
  434. func (c *Channel) removeFromInFlightPQ(msg *Message) {
  435. c.inFlightMutex.Lock()
  436. if msg.index == -1 {
  437. // this item has already been popped off the pqueue
  438. c.inFlightMutex.Unlock()
  439. return
  440. }
  441. c.inFlightPQ.Remove(msg.index)
  442. c.inFlightMutex.Unlock()
  443. }
  444. func (c *Channel) pushDeferredMessage(item *pqueue.Item) error {
  445. c.deferredMutex.Lock()
  446. // TODO: these map lookups are costly
  447. id := item.Value.(*Message).ID
  448. _, ok := c.deferredMessages[id]
  449. if ok {
  450. c.deferredMutex.Unlock()
  451. return errors.New("ID already deferred")
  452. }
  453. c.deferredMessages[id] = item
  454. c.deferredMutex.Unlock()
  455. return nil
  456. }
  457. func (c *Channel) popDeferredMessage(id MessageID) (*pqueue.Item, error) {
  458. c.deferredMutex.Lock()
  459. // TODO: these map lookups are costly
  460. item, ok := c.deferredMessages[id]
  461. if !ok {
  462. c.deferredMutex.Unlock()
  463. return nil, errors.New("ID not deferred")
  464. }
  465. delete(c.deferredMessages, id)
  466. c.deferredMutex.Unlock()
  467. return item, nil
  468. }
  469. func (c *Channel) addToDeferredPQ(item *pqueue.Item) {
  470. c.deferredMutex.Lock()
  471. heap.Push(&c.deferredPQ, item)
  472. c.deferredMutex.Unlock()
  473. }
  474. func (c *Channel) processDeferredQueue(t int64) bool {
  475. c.exitMutex.RLock()
  476. defer c.exitMutex.RUnlock()
  477. if c.Exiting() {
  478. return false
  479. }
  480. dirty := false
  481. for {
  482. c.deferredMutex.Lock()
  483. item, _ := c.deferredPQ.PeekAndShift(t)
  484. c.deferredMutex.Unlock()
  485. if item == nil {
  486. goto exit
  487. }
  488. dirty = true
  489. msg := item.Value.(*Message)
  490. _, err := c.popDeferredMessage(msg.ID)
  491. if err != nil {
  492. goto exit
  493. }
  494. c.put(msg)
  495. }
  496. exit:
  497. return dirty
  498. }
  499. func (c *Channel) processInFlightQueue(t int64) bool {
  500. c.exitMutex.RLock()
  501. defer c.exitMutex.RUnlock()
  502. if c.Exiting() {
  503. return false
  504. }
  505. dirty := false
  506. for {
  507. c.inFlightMutex.Lock()
  508. msg, _ := c.inFlightPQ.PeekAndShift(t)
  509. c.inFlightMutex.Unlock()
  510. if msg == nil {
  511. goto exit
  512. }
  513. dirty = true
  514. _, err := c.popInFlightMessage(msg.clientID, msg.ID)
  515. if err != nil {
  516. goto exit
  517. }
  518. atomic.AddUint64(&c.timeoutCount, 1)
  519. c.RLock()
  520. client, ok := c.clients[msg.clientID]
  521. c.RUnlock()
  522. if ok {
  523. client.TimedOutMessage()
  524. }
  525. c.put(msg)
  526. }
  527. exit:
  528. return dirty
  529. }