registration_db_test.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218
  1. package nsqlookupd
  2. import (
  3. "math/rand"
  4. "strconv"
  5. "testing"
  6. "time"
  7. "github.com/nsqio/nsq/internal/test"
  8. )
  9. func TestRegistrationDB(t *testing.T) {
  10. sec30 := 30 * time.Second
  11. beginningOfTime := time.Unix(1348797047, 0)
  12. pi1 := &PeerInfo{beginningOfTime.UnixNano(), "1", "remote_addr:1", "host", "b_addr", 1, 2, "v1"}
  13. pi2 := &PeerInfo{beginningOfTime.UnixNano(), "2", "remote_addr:2", "host", "b_addr", 2, 3, "v1"}
  14. pi3 := &PeerInfo{beginningOfTime.UnixNano(), "3", "remote_addr:3", "host", "b_addr", 3, 4, "v1"}
  15. p1 := &Producer{pi1, false, beginningOfTime}
  16. p2 := &Producer{pi2, false, beginningOfTime}
  17. p3 := &Producer{pi3, false, beginningOfTime}
  18. p4 := &Producer{pi1, false, beginningOfTime}
  19. db := NewRegistrationDB()
  20. // add producers
  21. db.AddProducer(Registration{"c", "a", ""}, p1)
  22. db.AddProducer(Registration{"c", "a", ""}, p2)
  23. db.AddProducer(Registration{"c", "a", "b"}, p2)
  24. db.AddProducer(Registration{"d", "a", ""}, p3)
  25. db.AddProducer(Registration{"t", "a", ""}, p4)
  26. // find producers
  27. r := db.FindRegistrations("c", "*", "").Keys()
  28. test.Equal(t, 1, len(r))
  29. test.Equal(t, "a", r[0])
  30. p := db.FindProducers("t", "*", "")
  31. t.Logf("%s", p)
  32. test.Equal(t, 1, len(p))
  33. p = db.FindProducers("c", "*", "")
  34. t.Logf("%s", p)
  35. test.Equal(t, 2, len(p))
  36. p = db.FindProducers("c", "a", "")
  37. t.Logf("%s", p)
  38. test.Equal(t, 2, len(p))
  39. p = db.FindProducers("c", "*", "b")
  40. t.Logf("%s", p)
  41. test.Equal(t, 1, len(p))
  42. test.Equal(t, p2.peerInfo.id, p[0].peerInfo.id)
  43. // filter by active
  44. test.Equal(t, 0, len(p.FilterByActive(sec30, sec30)))
  45. p2.peerInfo.lastUpdate = time.Now().UnixNano()
  46. test.Equal(t, 1, len(p.FilterByActive(sec30, sec30)))
  47. p = db.FindProducers("c", "*", "")
  48. t.Logf("%s", p)
  49. test.Equal(t, 1, len(p.FilterByActive(sec30, sec30)))
  50. // tombstoning
  51. fewSecAgo := time.Now().Add(-5 * time.Second).UnixNano()
  52. p1.peerInfo.lastUpdate = fewSecAgo
  53. p2.peerInfo.lastUpdate = fewSecAgo
  54. test.Equal(t, 2, len(p.FilterByActive(sec30, sec30)))
  55. p1.Tombstone()
  56. test.Equal(t, 1, len(p.FilterByActive(sec30, sec30)))
  57. time.Sleep(10 * time.Millisecond)
  58. test.Equal(t, 2, len(p.FilterByActive(sec30, 5*time.Millisecond)))
  59. // make sure we can still retrieve p1 from another registration see #148
  60. test.Equal(t, 1, len(db.FindProducers("t", "*", "").FilterByActive(sec30, sec30)))
  61. // keys and subkeys
  62. k := db.FindRegistrations("c", "b", "").Keys()
  63. test.Equal(t, 0, len(k))
  64. k = db.FindRegistrations("c", "a", "").Keys()
  65. test.Equal(t, 1, len(k))
  66. test.Equal(t, "a", k[0])
  67. k = db.FindRegistrations("c", "*", "b").SubKeys()
  68. test.Equal(t, 1, len(k))
  69. test.Equal(t, "b", k[0])
  70. // removing producers
  71. db.RemoveProducer(Registration{"c", "a", ""}, p1.peerInfo.id)
  72. p = db.FindProducers("c", "*", "*")
  73. t.Logf("%s", p)
  74. test.Equal(t, 1, len(p))
  75. db.RemoveProducer(Registration{"c", "a", ""}, p2.peerInfo.id)
  76. db.RemoveProducer(Registration{"c", "a", "b"}, p2.peerInfo.id)
  77. p = db.FindProducers("c", "*", "*")
  78. t.Logf("%s", p)
  79. test.Equal(t, 0, len(p))
  80. // do some key removals
  81. k = db.FindRegistrations("c", "*", "*").Keys()
  82. test.Equal(t, 2, len(k))
  83. db.RemoveRegistration(Registration{"c", "a", ""})
  84. db.RemoveRegistration(Registration{"c", "a", "b"})
  85. k = db.FindRegistrations("c", "*", "*").Keys()
  86. test.Equal(t, 0, len(k))
  87. }
  88. func fillRegDB(registrations int, producers int) *RegistrationDB {
  89. regDB := NewRegistrationDB()
  90. for i := 0; i < registrations; i++ {
  91. regT := Registration{"topic", "t" + strconv.Itoa(i), ""}
  92. regCa := Registration{"channel", "t" + strconv.Itoa(i), "ca" + strconv.Itoa(i)}
  93. regCb := Registration{"channel", "t" + strconv.Itoa(i), "cb" + strconv.Itoa(i)}
  94. for j := 0; j < producers; j++ {
  95. p := Producer{
  96. peerInfo: &PeerInfo{
  97. id: "p" + strconv.Itoa(j),
  98. },
  99. }
  100. regDB.AddProducer(regT, &p)
  101. regDB.AddProducer(regCa, &p)
  102. regDB.AddProducer(regCb, &p)
  103. }
  104. }
  105. return regDB
  106. }
  107. func benchmarkLookupRegistrations(b *testing.B, registrations int, producers int) {
  108. regDB := fillRegDB(registrations, producers)
  109. b.ResetTimer()
  110. for i := 0; i < b.N; i++ {
  111. j := strconv.Itoa(rand.Intn(producers))
  112. _ = regDB.LookupRegistrations("p" + j)
  113. }
  114. }
  115. func benchmarkRegister(b *testing.B, registrations int, producers int) {
  116. for i := 0; i < b.N; i++ {
  117. _ = fillRegDB(registrations, producers)
  118. }
  119. }
  120. func benchmarkDoLookup(b *testing.B, registrations int, producers int) {
  121. regDB := fillRegDB(registrations, producers)
  122. b.ResetTimer()
  123. for i := 0; i < b.N; i++ {
  124. topic := "t" + strconv.Itoa(rand.Intn(registrations))
  125. _ = regDB.FindRegistrations("topic", topic, "")
  126. _ = regDB.FindRegistrations("channel", topic, "*").SubKeys()
  127. _ = regDB.FindProducers("topic", topic, "")
  128. }
  129. }
  130. func BenchmarkLookupRegistrations8x8(b *testing.B) {
  131. benchmarkLookupRegistrations(b, 8, 8)
  132. }
  133. func BenchmarkLookupRegistrations8x64(b *testing.B) {
  134. benchmarkLookupRegistrations(b, 8, 64)
  135. }
  136. func BenchmarkLookupRegistrations64x64(b *testing.B) {
  137. benchmarkLookupRegistrations(b, 64, 64)
  138. }
  139. func BenchmarkLookupRegistrations64x512(b *testing.B) {
  140. benchmarkLookupRegistrations(b, 64, 512)
  141. }
  142. func BenchmarkLookupRegistrations512x512(b *testing.B) {
  143. benchmarkLookupRegistrations(b, 512, 512)
  144. }
  145. func BenchmarkLookupRegistrations512x2048(b *testing.B) {
  146. benchmarkLookupRegistrations(b, 512, 2048)
  147. }
  148. func BenchmarkRegister8x8(b *testing.B) {
  149. benchmarkRegister(b, 8, 8)
  150. }
  151. func BenchmarkRegister8x64(b *testing.B) {
  152. benchmarkRegister(b, 8, 64)
  153. }
  154. func BenchmarkRegister64x64(b *testing.B) {
  155. benchmarkRegister(b, 64, 64)
  156. }
  157. func BenchmarkRegister64x512(b *testing.B) {
  158. benchmarkRegister(b, 64, 512)
  159. }
  160. func BenchmarkRegister512x512(b *testing.B) {
  161. benchmarkRegister(b, 512, 512)
  162. }
  163. func BenchmarkRegister512x2048(b *testing.B) {
  164. benchmarkRegister(b, 512, 2048)
  165. }
  166. func BenchmarkDoLookup8x8(b *testing.B) {
  167. benchmarkDoLookup(b, 8, 8)
  168. }
  169. func BenchmarkDoLookup8x64(b *testing.B) {
  170. benchmarkDoLookup(b, 8, 64)
  171. }
  172. func BenchmarkDoLookup64x64(b *testing.B) {
  173. benchmarkDoLookup(b, 64, 64)
  174. }
  175. func BenchmarkDoLookup64x512(b *testing.B) {
  176. benchmarkDoLookup(b, 64, 512)
  177. }
  178. func BenchmarkDoLookup512x512(b *testing.B) {
  179. benchmarkDoLookup(b, 512, 512)
  180. }
  181. func BenchmarkDoLookup512x2048(b *testing.B) {
  182. benchmarkDoLookup(b, 512, 2048)
  183. }