in_flight_pqueue_test.go 1.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. package nsqd
  2. import (
  3. "fmt"
  4. "math/rand"
  5. "sort"
  6. "testing"
  7. "github.com/nsqio/nsq/internal/test"
  8. )
  9. func TestPriorityQueue(t *testing.T) {
  10. c := 100
  11. pq := newInFlightPqueue(c)
  12. for i := 0; i < c+1; i++ {
  13. pq.Push(&Message{clientID: int64(i), pri: int64(i)})
  14. }
  15. test.Equal(t, c+1, len(pq))
  16. test.Equal(t, c*2, cap(pq))
  17. for i := 0; i < c+1; i++ {
  18. msg := pq.Pop()
  19. test.Equal(t, int64(i), msg.clientID)
  20. }
  21. test.Equal(t, c/4, cap(pq))
  22. }
  23. func TestUnsortedInsert(t *testing.T) {
  24. c := 100
  25. pq := newInFlightPqueue(c)
  26. ints := make([]int, 0, c)
  27. for i := 0; i < c; i++ {
  28. v := rand.Int()
  29. ints = append(ints, v)
  30. pq.Push(&Message{pri: int64(v)})
  31. }
  32. test.Equal(t, c, len(pq))
  33. test.Equal(t, c, cap(pq))
  34. sort.Ints(ints)
  35. for i := 0; i < c; i++ {
  36. msg, _ := pq.PeekAndShift(int64(ints[len(ints)-1]))
  37. test.Equal(t, int64(ints[i]), msg.pri)
  38. }
  39. }
  40. func TestRemove(t *testing.T) {
  41. c := 100
  42. pq := newInFlightPqueue(c)
  43. msgs := make(map[MessageID]*Message)
  44. for i := 0; i < c; i++ {
  45. m := &Message{pri: int64(rand.Intn(100000000))}
  46. copy(m.ID[:], fmt.Sprintf("%016d", m.pri))
  47. msgs[m.ID] = m
  48. pq.Push(m)
  49. }
  50. for i := 0; i < 10; i++ {
  51. idx := rand.Intn((c - 1) - i)
  52. var fm *Message
  53. for _, m := range msgs {
  54. if m.index == idx {
  55. fm = m
  56. break
  57. }
  58. }
  59. rm := pq.Remove(idx)
  60. test.Equal(t, fmt.Sprintf("%s", fm.ID), fmt.Sprintf("%s", rm.ID))
  61. }
  62. lastPriority := pq.Pop().pri
  63. for i := 0; i < (c - 10 - 1); i++ {
  64. msg := pq.Pop()
  65. test.Equal(t, true, lastPriority <= msg.pri)
  66. lastPriority = msg.pri
  67. }
  68. }