123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240 |
- package nsqlookupd
- import (
- "fmt"
- "sync"
- "sync/atomic"
- "time"
- )
- type RegistrationDB struct {
- sync.RWMutex
- registrationMap map[Registration]ProducerMap
- }
- type Registration struct {
- Category string
- Key string
- SubKey string
- }
- type Registrations []Registration
- type PeerInfo struct {
- lastUpdate int64
- id string
- RemoteAddress string `json:"remote_address"`
- Hostname string `json:"hostname"`
- BroadcastAddress string `json:"broadcast_address"`
- TCPPort int `json:"tcp_port"`
- HTTPPort int `json:"http_port"`
- Version string `json:"version"`
- }
- type Producer struct {
- peerInfo *PeerInfo
- tombstoned bool
- tombstonedAt time.Time
- }
- type Producers []*Producer
- type ProducerMap map[string]*Producer
- func (p *Producer) String() string {
- return fmt.Sprintf("%s [%d, %d]", p.peerInfo.BroadcastAddress, p.peerInfo.TCPPort, p.peerInfo.HTTPPort)
- }
- func (p *Producer) Tombstone() {
- p.tombstoned = true
- p.tombstonedAt = time.Now()
- }
- func (p *Producer) IsTombstoned(lifetime time.Duration) bool {
- return p.tombstoned && time.Now().Sub(p.tombstonedAt) < lifetime
- }
- func NewRegistrationDB() *RegistrationDB {
- return &RegistrationDB{
- registrationMap: make(map[Registration]ProducerMap),
- }
- }
- // add a registration key
- func (r *RegistrationDB) AddRegistration(k Registration) {
- r.Lock()
- defer r.Unlock()
- _, ok := r.registrationMap[k]
- if !ok {
- r.registrationMap[k] = make(map[string]*Producer)
- }
- }
- // add a producer to a registration
- func (r *RegistrationDB) AddProducer(k Registration, p *Producer) bool {
- r.Lock()
- defer r.Unlock()
- _, ok := r.registrationMap[k]
- if !ok {
- r.registrationMap[k] = make(map[string]*Producer)
- }
- producers := r.registrationMap[k]
- _, found := producers[p.peerInfo.id]
- if found == false {
- producers[p.peerInfo.id] = p
- }
- return !found
- }
- // remove a producer from a registration
- func (r *RegistrationDB) RemoveProducer(k Registration, id string) (bool, int) {
- r.Lock()
- defer r.Unlock()
- producers, ok := r.registrationMap[k]
- if !ok {
- return false, 0
- }
- removed := false
- if _, exists := producers[id]; exists {
- removed = true
- }
- // Note: this leaves keys in the DB even if they have empty lists
- delete(producers, id)
- return removed, len(producers)
- }
- // remove a Registration and all it's producers
- func (r *RegistrationDB) RemoveRegistration(k Registration) {
- r.Lock()
- defer r.Unlock()
- delete(r.registrationMap, k)
- }
- func (r *RegistrationDB) needFilter(key string, subkey string) bool {
- return key == "*" || subkey == "*"
- }
- func (r *RegistrationDB) FindRegistrations(category string, key string, subkey string) Registrations {
- r.RLock()
- defer r.RUnlock()
- if !r.needFilter(key, subkey) {
- k := Registration{category, key, subkey}
- if _, ok := r.registrationMap[k]; ok {
- return Registrations{k}
- }
- return Registrations{}
- }
- results := Registrations{}
- for k := range r.registrationMap {
- if !k.IsMatch(category, key, subkey) {
- continue
- }
- results = append(results, k)
- }
- return results
- }
- func (r *RegistrationDB) FindProducers(category string, key string, subkey string) Producers {
- r.RLock()
- defer r.RUnlock()
- if !r.needFilter(key, subkey) {
- k := Registration{category, key, subkey}
- return ProducerMap2Slice(r.registrationMap[k])
- }
- results := make(map[string]struct{})
- var retProducers Producers
- for k, producers := range r.registrationMap {
- if !k.IsMatch(category, key, subkey) {
- continue
- }
- for _, producer := range producers {
- _, found := results[producer.peerInfo.id]
- if found == false {
- results[producer.peerInfo.id] = struct{}{}
- retProducers = append(retProducers, producer)
- }
- }
- }
- return retProducers
- }
- func (r *RegistrationDB) LookupRegistrations(id string) Registrations {
- r.RLock()
- defer r.RUnlock()
- results := Registrations{}
- for k, producers := range r.registrationMap {
- if _, exists := producers[id]; exists {
- results = append(results, k)
- }
- }
- return results
- }
- func (k Registration) IsMatch(category string, key string, subkey string) bool {
- if category != k.Category {
- return false
- }
- if key != "*" && k.Key != key {
- return false
- }
- if subkey != "*" && k.SubKey != subkey {
- return false
- }
- return true
- }
- func (rr Registrations) Filter(category string, key string, subkey string) Registrations {
- output := Registrations{}
- for _, k := range rr {
- if k.IsMatch(category, key, subkey) {
- output = append(output, k)
- }
- }
- return output
- }
- func (rr Registrations) Keys() []string {
- keys := make([]string, len(rr))
- for i, k := range rr {
- keys[i] = k.Key
- }
- return keys
- }
- func (rr Registrations) SubKeys() []string {
- subkeys := make([]string, len(rr))
- for i, k := range rr {
- subkeys[i] = k.SubKey
- }
- return subkeys
- }
- func (pp Producers) FilterByActive(inactivityTimeout time.Duration, tombstoneLifetime time.Duration) Producers {
- now := time.Now()
- results := Producers{}
- for _, p := range pp {
- cur := time.Unix(0, atomic.LoadInt64(&p.peerInfo.lastUpdate))
- if now.Sub(cur) > inactivityTimeout || p.IsTombstoned(tombstoneLifetime) {
- continue
- }
- results = append(results, p)
- }
- return results
- }
- func (pp Producers) PeerInfo() []*PeerInfo {
- results := []*PeerInfo{}
- for _, p := range pp {
- results = append(results, p.peerInfo)
- }
- return results
- }
- func ProducerMap2Slice(pm ProducerMap) Producers {
- var producers Producers
- for _, producer := range pm {
- producers = append(producers, producer)
- }
- return producers
- }
|