123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330 |
- package nsqlookupd
- import (
- "fmt"
- "net/http"
- "net/http/pprof"
- "sync/atomic"
- "github.com/julienschmidt/httprouter"
- "github.com/nsqio/nsq/internal/http_api"
- "github.com/nsqio/nsq/internal/protocol"
- "github.com/nsqio/nsq/internal/version"
- )
- type httpServer struct {
- nsqlookupd *NSQLookupd
- router http.Handler
- }
- func newHTTPServer(l *NSQLookupd) *httpServer {
- log := http_api.Log(l.logf)
- router := httprouter.New()
- router.HandleMethodNotAllowed = true
- router.PanicHandler = http_api.LogPanicHandler(l.logf)
- router.NotFound = http_api.LogNotFoundHandler(l.logf)
- router.MethodNotAllowed = http_api.LogMethodNotAllowedHandler(l.logf)
- s := &httpServer{
- nsqlookupd: l,
- router: router,
- }
- router.Handle("GET", "/ping", http_api.Decorate(s.pingHandler, log, http_api.PlainText))
- router.Handle("GET", "/info", http_api.Decorate(s.doInfo, log, http_api.V1))
- // v1 negotiate
- router.Handle("GET", "/debug", http_api.Decorate(s.doDebug, log, http_api.V1))
- router.Handle("GET", "/lookup", http_api.Decorate(s.doLookup, log, http_api.V1))
- router.Handle("GET", "/topics", http_api.Decorate(s.doTopics, log, http_api.V1))
- router.Handle("GET", "/channels", http_api.Decorate(s.doChannels, log, http_api.V1))
- router.Handle("GET", "/nodes", http_api.Decorate(s.doNodes, log, http_api.V1))
- // only v1
- router.Handle("POST", "/topic/create", http_api.Decorate(s.doCreateTopic, log, http_api.V1))
- router.Handle("POST", "/topic/delete", http_api.Decorate(s.doDeleteTopic, log, http_api.V1))
- router.Handle("POST", "/channel/create", http_api.Decorate(s.doCreateChannel, log, http_api.V1))
- router.Handle("POST", "/channel/delete", http_api.Decorate(s.doDeleteChannel, log, http_api.V1))
- router.Handle("POST", "/topic/tombstone", http_api.Decorate(s.doTombstoneTopicProducer, log, http_api.V1))
- // debug
- router.HandlerFunc("GET", "/debug/pprof", pprof.Index)
- router.HandlerFunc("GET", "/debug/pprof/cmdline", pprof.Cmdline)
- router.HandlerFunc("GET", "/debug/pprof/symbol", pprof.Symbol)
- router.HandlerFunc("POST", "/debug/pprof/symbol", pprof.Symbol)
- router.HandlerFunc("GET", "/debug/pprof/profile", pprof.Profile)
- router.Handler("GET", "/debug/pprof/heap", pprof.Handler("heap"))
- router.Handler("GET", "/debug/pprof/goroutine", pprof.Handler("goroutine"))
- router.Handler("GET", "/debug/pprof/block", pprof.Handler("block"))
- router.Handler("GET", "/debug/pprof/threadcreate", pprof.Handler("threadcreate"))
- return s
- }
- func (s *httpServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
- s.router.ServeHTTP(w, req)
- }
- func (s *httpServer) pingHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
- return "OK", nil
- }
- func (s *httpServer) doInfo(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
- return struct {
- Version string `json:"version"`
- }{
- Version: version.Binary,
- }, nil
- }
- func (s *httpServer) doTopics(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
- topics := s.nsqlookupd.DB.FindRegistrations("topic", "*", "").Keys()
- return map[string]interface{}{
- "topics": topics,
- }, nil
- }
- func (s *httpServer) doChannels(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
- reqParams, err := http_api.NewReqParams(req)
- if err != nil {
- return nil, http_api.Err{400, "INVALID_REQUEST"}
- }
- topicName, err := reqParams.Get("topic")
- if err != nil {
- return nil, http_api.Err{400, "MISSING_ARG_TOPIC"}
- }
- channels := s.nsqlookupd.DB.FindRegistrations("channel", topicName, "*").SubKeys()
- return map[string]interface{}{
- "channels": channels,
- }, nil
- }
- func (s *httpServer) doLookup(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
- reqParams, err := http_api.NewReqParams(req)
- if err != nil {
- return nil, http_api.Err{400, "INVALID_REQUEST"}
- }
- topicName, err := reqParams.Get("topic")
- if err != nil {
- return nil, http_api.Err{400, "MISSING_ARG_TOPIC"}
- }
- registration := s.nsqlookupd.DB.FindRegistrations("topic", topicName, "")
- if len(registration) == 0 {
- return nil, http_api.Err{404, "TOPIC_NOT_FOUND"}
- }
- channels := s.nsqlookupd.DB.FindRegistrations("channel", topicName, "*").SubKeys()
- producers := s.nsqlookupd.DB.FindProducers("topic", topicName, "")
- producers = producers.FilterByActive(s.nsqlookupd.opts.InactiveProducerTimeout,
- s.nsqlookupd.opts.TombstoneLifetime)
- return map[string]interface{}{
- "channels": channels,
- "producers": producers.PeerInfo(),
- }, nil
- }
- func (s *httpServer) doCreateTopic(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
- reqParams, err := http_api.NewReqParams(req)
- if err != nil {
- return nil, http_api.Err{400, "INVALID_REQUEST"}
- }
- topicName, err := reqParams.Get("topic")
- if err != nil {
- return nil, http_api.Err{400, "MISSING_ARG_TOPIC"}
- }
- if !protocol.IsValidTopicName(topicName) {
- return nil, http_api.Err{400, "INVALID_ARG_TOPIC"}
- }
- s.nsqlookupd.logf(LOG_INFO, "DB: adding topic(%s)", topicName)
- key := Registration{"topic", topicName, ""}
- s.nsqlookupd.DB.AddRegistration(key)
- return nil, nil
- }
- func (s *httpServer) doDeleteTopic(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
- reqParams, err := http_api.NewReqParams(req)
- if err != nil {
- return nil, http_api.Err{400, "INVALID_REQUEST"}
- }
- topicName, err := reqParams.Get("topic")
- if err != nil {
- return nil, http_api.Err{400, "MISSING_ARG_TOPIC"}
- }
- registrations := s.nsqlookupd.DB.FindRegistrations("channel", topicName, "*")
- for _, registration := range registrations {
- s.nsqlookupd.logf(LOG_INFO, "DB: removing channel(%s) from topic(%s)", registration.SubKey, topicName)
- s.nsqlookupd.DB.RemoveRegistration(registration)
- }
- registrations = s.nsqlookupd.DB.FindRegistrations("topic", topicName, "")
- for _, registration := range registrations {
- s.nsqlookupd.logf(LOG_INFO, "DB: removing topic(%s)", topicName)
- s.nsqlookupd.DB.RemoveRegistration(registration)
- }
- return nil, nil
- }
- func (s *httpServer) doTombstoneTopicProducer(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
- reqParams, err := http_api.NewReqParams(req)
- if err != nil {
- return nil, http_api.Err{400, "INVALID_REQUEST"}
- }
- topicName, err := reqParams.Get("topic")
- if err != nil {
- return nil, http_api.Err{400, "MISSING_ARG_TOPIC"}
- }
- node, err := reqParams.Get("node")
- if err != nil {
- return nil, http_api.Err{400, "MISSING_ARG_NODE"}
- }
- s.nsqlookupd.logf(LOG_INFO, "DB: setting tombstone for producer@%s of topic(%s)", node, topicName)
- producers := s.nsqlookupd.DB.FindProducers("topic", topicName, "")
- for _, p := range producers {
- thisNode := fmt.Sprintf("%s:%d", p.peerInfo.BroadcastAddress, p.peerInfo.HTTPPort)
- if thisNode == node {
- p.Tombstone()
- }
- }
- return nil, nil
- }
- func (s *httpServer) doCreateChannel(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
- reqParams, err := http_api.NewReqParams(req)
- if err != nil {
- return nil, http_api.Err{400, "INVALID_REQUEST"}
- }
- topicName, channelName, err := http_api.GetTopicChannelArgs(reqParams)
- if err != nil {
- return nil, http_api.Err{400, err.Error()}
- }
- s.nsqlookupd.logf(LOG_INFO, "DB: adding channel(%s) in topic(%s)", channelName, topicName)
- key := Registration{"channel", topicName, channelName}
- s.nsqlookupd.DB.AddRegistration(key)
- s.nsqlookupd.logf(LOG_INFO, "DB: adding topic(%s)", topicName)
- key = Registration{"topic", topicName, ""}
- s.nsqlookupd.DB.AddRegistration(key)
- return nil, nil
- }
- func (s *httpServer) doDeleteChannel(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
- reqParams, err := http_api.NewReqParams(req)
- if err != nil {
- return nil, http_api.Err{400, "INVALID_REQUEST"}
- }
- topicName, channelName, err := http_api.GetTopicChannelArgs(reqParams)
- if err != nil {
- return nil, http_api.Err{400, err.Error()}
- }
- registrations := s.nsqlookupd.DB.FindRegistrations("channel", topicName, channelName)
- if len(registrations) == 0 {
- return nil, http_api.Err{404, "CHANNEL_NOT_FOUND"}
- }
- s.nsqlookupd.logf(LOG_INFO, "DB: removing channel(%s) from topic(%s)", channelName, topicName)
- for _, registration := range registrations {
- s.nsqlookupd.DB.RemoveRegistration(registration)
- }
- return nil, nil
- }
- type node struct {
- RemoteAddress string `json:"remote_address"`
- Hostname string `json:"hostname"`
- BroadcastAddress string `json:"broadcast_address"`
- TCPPort int `json:"tcp_port"`
- HTTPPort int `json:"http_port"`
- Version string `json:"version"`
- Tombstones []bool `json:"tombstones"`
- Topics []string `json:"topics"`
- }
- func (s *httpServer) doNodes(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
- // dont filter out tombstoned nodes
- producers := s.nsqlookupd.DB.FindProducers("client", "", "").FilterByActive(
- s.nsqlookupd.opts.InactiveProducerTimeout, 0)
- nodes := make([]*node, len(producers))
- topicProducersMap := make(map[string]Producers)
- for i, p := range producers {
- topics := s.nsqlookupd.DB.LookupRegistrations(p.peerInfo.id).Filter("topic", "*", "").Keys()
- // for each topic find the producer that matches this peer
- // to add tombstone information
- tombstones := make([]bool, len(topics))
- for j, t := range topics {
- if _, exists := topicProducersMap[t]; !exists {
- topicProducersMap[t] = s.nsqlookupd.DB.FindProducers("topic", t, "")
- }
- topicProducers := topicProducersMap[t]
- for _, tp := range topicProducers {
- if tp.peerInfo == p.peerInfo {
- tombstones[j] = tp.IsTombstoned(s.nsqlookupd.opts.TombstoneLifetime)
- break
- }
- }
- }
- nodes[i] = &node{
- RemoteAddress: p.peerInfo.RemoteAddress,
- Hostname: p.peerInfo.Hostname,
- BroadcastAddress: p.peerInfo.BroadcastAddress,
- TCPPort: p.peerInfo.TCPPort,
- HTTPPort: p.peerInfo.HTTPPort,
- Version: p.peerInfo.Version,
- Tombstones: tombstones,
- Topics: topics,
- }
- }
- return map[string]interface{}{
- "producers": nodes,
- }, nil
- }
- func (s *httpServer) doDebug(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
- s.nsqlookupd.DB.RLock()
- defer s.nsqlookupd.DB.RUnlock()
- data := make(map[string][]map[string]interface{})
- for r, producers := range s.nsqlookupd.DB.registrationMap {
- key := r.Category + ":" + r.Key + ":" + r.SubKey
- for _, p := range producers {
- m := map[string]interface{}{
- "id": p.peerInfo.id,
- "hostname": p.peerInfo.Hostname,
- "broadcast_address": p.peerInfo.BroadcastAddress,
- "tcp_port": p.peerInfo.TCPPort,
- "http_port": p.peerInfo.HTTPPort,
- "version": p.peerInfo.Version,
- "last_update": atomic.LoadInt64(&p.peerInfo.lastUpdate),
- "tombstoned": p.tombstoned,
- "tombstoned_at": p.tombstonedAt.UnixNano(),
- }
- data[key] = append(data[key], m)
- }
- }
- return data, nil
- }
|