registration_db.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. package nsqlookupd
  2. import (
  3. "fmt"
  4. "sync"
  5. "sync/atomic"
  6. "time"
  7. )
  8. type RegistrationDB struct {
  9. sync.RWMutex
  10. registrationMap map[Registration]ProducerMap
  11. }
  12. type Registration struct {
  13. Category string
  14. Key string
  15. SubKey string
  16. }
  17. type Registrations []Registration
  18. type PeerInfo struct {
  19. lastUpdate int64
  20. id string
  21. RemoteAddress string `json:"remote_address"`
  22. Hostname string `json:"hostname"`
  23. BroadcastAddress string `json:"broadcast_address"`
  24. TCPPort int `json:"tcp_port"`
  25. HTTPPort int `json:"http_port"`
  26. Version string `json:"version"`
  27. }
  28. type Producer struct {
  29. peerInfo *PeerInfo
  30. tombstoned bool
  31. tombstonedAt time.Time
  32. }
  33. type Producers []*Producer
  34. type ProducerMap map[string]*Producer
  35. func (p *Producer) String() string {
  36. return fmt.Sprintf("%s [%d, %d]", p.peerInfo.BroadcastAddress, p.peerInfo.TCPPort, p.peerInfo.HTTPPort)
  37. }
  38. func (p *Producer) Tombstone() {
  39. p.tombstoned = true
  40. p.tombstonedAt = time.Now()
  41. }
  42. func (p *Producer) IsTombstoned(lifetime time.Duration) bool {
  43. return p.tombstoned && time.Now().Sub(p.tombstonedAt) < lifetime
  44. }
  45. func NewRegistrationDB() *RegistrationDB {
  46. return &RegistrationDB{
  47. registrationMap: make(map[Registration]ProducerMap),
  48. }
  49. }
  50. // add a registration key
  51. func (r *RegistrationDB) AddRegistration(k Registration) {
  52. r.Lock()
  53. defer r.Unlock()
  54. _, ok := r.registrationMap[k]
  55. if !ok {
  56. r.registrationMap[k] = make(map[string]*Producer)
  57. }
  58. }
  59. // add a producer to a registration
  60. func (r *RegistrationDB) AddProducer(k Registration, p *Producer) bool {
  61. r.Lock()
  62. defer r.Unlock()
  63. _, ok := r.registrationMap[k]
  64. if !ok {
  65. r.registrationMap[k] = make(map[string]*Producer)
  66. }
  67. producers := r.registrationMap[k]
  68. _, found := producers[p.peerInfo.id]
  69. if found == false {
  70. producers[p.peerInfo.id] = p
  71. }
  72. return !found
  73. }
  74. // remove a producer from a registration
  75. func (r *RegistrationDB) RemoveProducer(k Registration, id string) (bool, int) {
  76. r.Lock()
  77. defer r.Unlock()
  78. producers, ok := r.registrationMap[k]
  79. if !ok {
  80. return false, 0
  81. }
  82. removed := false
  83. if _, exists := producers[id]; exists {
  84. removed = true
  85. }
  86. // Note: this leaves keys in the DB even if they have empty lists
  87. delete(producers, id)
  88. return removed, len(producers)
  89. }
  90. // remove a Registration and all it's producers
  91. func (r *RegistrationDB) RemoveRegistration(k Registration) {
  92. r.Lock()
  93. defer r.Unlock()
  94. delete(r.registrationMap, k)
  95. }
  96. func (r *RegistrationDB) needFilter(key string, subkey string) bool {
  97. return key == "*" || subkey == "*"
  98. }
  99. func (r *RegistrationDB) FindRegistrations(category string, key string, subkey string) Registrations {
  100. r.RLock()
  101. defer r.RUnlock()
  102. if !r.needFilter(key, subkey) {
  103. k := Registration{category, key, subkey}
  104. if _, ok := r.registrationMap[k]; ok {
  105. return Registrations{k}
  106. }
  107. return Registrations{}
  108. }
  109. results := Registrations{}
  110. for k := range r.registrationMap {
  111. if !k.IsMatch(category, key, subkey) {
  112. continue
  113. }
  114. results = append(results, k)
  115. }
  116. return results
  117. }
  118. func (r *RegistrationDB) FindProducers(category string, key string, subkey string) Producers {
  119. r.RLock()
  120. defer r.RUnlock()
  121. if !r.needFilter(key, subkey) {
  122. k := Registration{category, key, subkey}
  123. return ProducerMap2Slice(r.registrationMap[k])
  124. }
  125. results := make(map[string]struct{})
  126. var retProducers Producers
  127. for k, producers := range r.registrationMap {
  128. if !k.IsMatch(category, key, subkey) {
  129. continue
  130. }
  131. for _, producer := range producers {
  132. _, found := results[producer.peerInfo.id]
  133. if found == false {
  134. results[producer.peerInfo.id] = struct{}{}
  135. retProducers = append(retProducers, producer)
  136. }
  137. }
  138. }
  139. return retProducers
  140. }
  141. func (r *RegistrationDB) LookupRegistrations(id string) Registrations {
  142. r.RLock()
  143. defer r.RUnlock()
  144. results := Registrations{}
  145. for k, producers := range r.registrationMap {
  146. if _, exists := producers[id]; exists {
  147. results = append(results, k)
  148. }
  149. }
  150. return results
  151. }
  152. func (k Registration) IsMatch(category string, key string, subkey string) bool {
  153. if category != k.Category {
  154. return false
  155. }
  156. if key != "*" && k.Key != key {
  157. return false
  158. }
  159. if subkey != "*" && k.SubKey != subkey {
  160. return false
  161. }
  162. return true
  163. }
  164. func (rr Registrations) Filter(category string, key string, subkey string) Registrations {
  165. output := Registrations{}
  166. for _, k := range rr {
  167. if k.IsMatch(category, key, subkey) {
  168. output = append(output, k)
  169. }
  170. }
  171. return output
  172. }
  173. func (rr Registrations) Keys() []string {
  174. keys := make([]string, len(rr))
  175. for i, k := range rr {
  176. keys[i] = k.Key
  177. }
  178. return keys
  179. }
  180. func (rr Registrations) SubKeys() []string {
  181. subkeys := make([]string, len(rr))
  182. for i, k := range rr {
  183. subkeys[i] = k.SubKey
  184. }
  185. return subkeys
  186. }
  187. func (pp Producers) FilterByActive(inactivityTimeout time.Duration, tombstoneLifetime time.Duration) Producers {
  188. now := time.Now()
  189. results := Producers{}
  190. for _, p := range pp {
  191. cur := time.Unix(0, atomic.LoadInt64(&p.peerInfo.lastUpdate))
  192. if now.Sub(cur) > inactivityTimeout || p.IsTombstoned(tombstoneLifetime) {
  193. continue
  194. }
  195. results = append(results, p)
  196. }
  197. return results
  198. }
  199. func (pp Producers) PeerInfo() []*PeerInfo {
  200. results := []*PeerInfo{}
  201. for _, p := range pp {
  202. results = append(results, p.peerInfo)
  203. }
  204. return results
  205. }
  206. func ProducerMap2Slice(pm ProducerMap) Producers {
  207. var producers Producers
  208. for _, producer := range pm {
  209. producers = append(producers, producer)
  210. }
  211. return producers
  212. }