in_flight_pqueue.go 1.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. package nsqd
  2. type inFlightPqueue []*Message
  3. func newInFlightPqueue(capacity int) inFlightPqueue {
  4. return make(inFlightPqueue, 0, capacity)
  5. }
  6. func (pq inFlightPqueue) Swap(i, j int) {
  7. pq[i], pq[j] = pq[j], pq[i]
  8. pq[i].index = i
  9. pq[j].index = j
  10. }
  11. func (pq *inFlightPqueue) Push(x *Message) {
  12. n := len(*pq)
  13. c := cap(*pq)
  14. if n+1 > c {
  15. npq := make(inFlightPqueue, n, c*2)
  16. copy(npq, *pq)
  17. *pq = npq
  18. }
  19. *pq = (*pq)[0 : n+1]
  20. x.index = n
  21. (*pq)[n] = x
  22. pq.up(n)
  23. }
  24. func (pq *inFlightPqueue) Pop() *Message {
  25. n := len(*pq)
  26. c := cap(*pq)
  27. pq.Swap(0, n-1)
  28. pq.down(0, n-1)
  29. if n < (c/2) && c > 25 {
  30. npq := make(inFlightPqueue, n, c/2)
  31. copy(npq, *pq)
  32. *pq = npq
  33. }
  34. x := (*pq)[n-1]
  35. x.index = -1
  36. *pq = (*pq)[0 : n-1]
  37. return x
  38. }
  39. func (pq *inFlightPqueue) Remove(i int) *Message {
  40. n := len(*pq)
  41. if n-1 != i {
  42. pq.Swap(i, n-1)
  43. pq.down(i, n-1)
  44. pq.up(i)
  45. }
  46. x := (*pq)[n-1]
  47. x.index = -1
  48. *pq = (*pq)[0 : n-1]
  49. return x
  50. }
  51. func (pq *inFlightPqueue) PeekAndShift(max int64) (*Message, int64) {
  52. if len(*pq) == 0 {
  53. return nil, 0
  54. }
  55. x := (*pq)[0]
  56. if x.pri > max {
  57. return nil, x.pri - max
  58. }
  59. pq.Pop()
  60. return x, 0
  61. }
  62. func (pq *inFlightPqueue) up(j int) {
  63. for {
  64. i := (j - 1) / 2 // parent
  65. if i == j || (*pq)[j].pri >= (*pq)[i].pri {
  66. break
  67. }
  68. pq.Swap(i, j)
  69. j = i
  70. }
  71. }
  72. func (pq *inFlightPqueue) down(i, n int) {
  73. for {
  74. j1 := 2*i + 1
  75. if j1 >= n || j1 < 0 { // j1 < 0 after int overflow
  76. break
  77. }
  78. j := j1 // left child
  79. if j2 := j1 + 1; j2 < n && (*pq)[j1].pri >= (*pq)[j2].pri {
  80. j = j2 // = 2*i + 2 // right child
  81. }
  82. if (*pq)[j].pri >= (*pq)[i].pri {
  83. break
  84. }
  85. pq.Swap(i, j)
  86. i = j
  87. }
  88. }