http.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330
  1. package nsqlookupd
  2. import (
  3. "fmt"
  4. "net/http"
  5. "net/http/pprof"
  6. "sync/atomic"
  7. "github.com/julienschmidt/httprouter"
  8. "github.com/nsqio/nsq/internal/http_api"
  9. "github.com/nsqio/nsq/internal/protocol"
  10. "github.com/nsqio/nsq/internal/version"
  11. )
  12. type httpServer struct {
  13. nsqlookupd *NSQLookupd
  14. router http.Handler
  15. }
  16. func newHTTPServer(l *NSQLookupd) *httpServer {
  17. log := http_api.Log(l.logf)
  18. router := httprouter.New()
  19. router.HandleMethodNotAllowed = true
  20. router.PanicHandler = http_api.LogPanicHandler(l.logf)
  21. router.NotFound = http_api.LogNotFoundHandler(l.logf)
  22. router.MethodNotAllowed = http_api.LogMethodNotAllowedHandler(l.logf)
  23. s := &httpServer{
  24. nsqlookupd: l,
  25. router: router,
  26. }
  27. router.Handle("GET", "/ping", http_api.Decorate(s.pingHandler, log, http_api.PlainText))
  28. router.Handle("GET", "/info", http_api.Decorate(s.doInfo, log, http_api.V1))
  29. // v1 negotiate
  30. router.Handle("GET", "/debug", http_api.Decorate(s.doDebug, log, http_api.V1))
  31. router.Handle("GET", "/lookup", http_api.Decorate(s.doLookup, log, http_api.V1))
  32. router.Handle("GET", "/topics", http_api.Decorate(s.doTopics, log, http_api.V1))
  33. router.Handle("GET", "/channels", http_api.Decorate(s.doChannels, log, http_api.V1))
  34. router.Handle("GET", "/nodes", http_api.Decorate(s.doNodes, log, http_api.V1))
  35. // only v1
  36. router.Handle("POST", "/topic/create", http_api.Decorate(s.doCreateTopic, log, http_api.V1))
  37. router.Handle("POST", "/topic/delete", http_api.Decorate(s.doDeleteTopic, log, http_api.V1))
  38. router.Handle("POST", "/channel/create", http_api.Decorate(s.doCreateChannel, log, http_api.V1))
  39. router.Handle("POST", "/channel/delete", http_api.Decorate(s.doDeleteChannel, log, http_api.V1))
  40. router.Handle("POST", "/topic/tombstone", http_api.Decorate(s.doTombstoneTopicProducer, log, http_api.V1))
  41. // debug
  42. router.HandlerFunc("GET", "/debug/pprof", pprof.Index)
  43. router.HandlerFunc("GET", "/debug/pprof/cmdline", pprof.Cmdline)
  44. router.HandlerFunc("GET", "/debug/pprof/symbol", pprof.Symbol)
  45. router.HandlerFunc("POST", "/debug/pprof/symbol", pprof.Symbol)
  46. router.HandlerFunc("GET", "/debug/pprof/profile", pprof.Profile)
  47. router.Handler("GET", "/debug/pprof/heap", pprof.Handler("heap"))
  48. router.Handler("GET", "/debug/pprof/goroutine", pprof.Handler("goroutine"))
  49. router.Handler("GET", "/debug/pprof/block", pprof.Handler("block"))
  50. router.Handler("GET", "/debug/pprof/threadcreate", pprof.Handler("threadcreate"))
  51. return s
  52. }
  53. func (s *httpServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
  54. s.router.ServeHTTP(w, req)
  55. }
  56. func (s *httpServer) pingHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
  57. return "OK", nil
  58. }
  59. func (s *httpServer) doInfo(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
  60. return struct {
  61. Version string `json:"version"`
  62. }{
  63. Version: version.Binary,
  64. }, nil
  65. }
  66. func (s *httpServer) doTopics(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
  67. topics := s.nsqlookupd.DB.FindRegistrations("topic", "*", "").Keys()
  68. return map[string]interface{}{
  69. "topics": topics,
  70. }, nil
  71. }
  72. func (s *httpServer) doChannels(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
  73. reqParams, err := http_api.NewReqParams(req)
  74. if err != nil {
  75. return nil, http_api.Err{400, "INVALID_REQUEST"}
  76. }
  77. topicName, err := reqParams.Get("topic")
  78. if err != nil {
  79. return nil, http_api.Err{400, "MISSING_ARG_TOPIC"}
  80. }
  81. channels := s.nsqlookupd.DB.FindRegistrations("channel", topicName, "*").SubKeys()
  82. return map[string]interface{}{
  83. "channels": channels,
  84. }, nil
  85. }
  86. func (s *httpServer) doLookup(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
  87. reqParams, err := http_api.NewReqParams(req)
  88. if err != nil {
  89. return nil, http_api.Err{400, "INVALID_REQUEST"}
  90. }
  91. topicName, err := reqParams.Get("topic")
  92. if err != nil {
  93. return nil, http_api.Err{400, "MISSING_ARG_TOPIC"}
  94. }
  95. registration := s.nsqlookupd.DB.FindRegistrations("topic", topicName, "")
  96. if len(registration) == 0 {
  97. return nil, http_api.Err{404, "TOPIC_NOT_FOUND"}
  98. }
  99. channels := s.nsqlookupd.DB.FindRegistrations("channel", topicName, "*").SubKeys()
  100. producers := s.nsqlookupd.DB.FindProducers("topic", topicName, "")
  101. producers = producers.FilterByActive(s.nsqlookupd.opts.InactiveProducerTimeout,
  102. s.nsqlookupd.opts.TombstoneLifetime)
  103. return map[string]interface{}{
  104. "channels": channels,
  105. "producers": producers.PeerInfo(),
  106. }, nil
  107. }
  108. func (s *httpServer) doCreateTopic(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
  109. reqParams, err := http_api.NewReqParams(req)
  110. if err != nil {
  111. return nil, http_api.Err{400, "INVALID_REQUEST"}
  112. }
  113. topicName, err := reqParams.Get("topic")
  114. if err != nil {
  115. return nil, http_api.Err{400, "MISSING_ARG_TOPIC"}
  116. }
  117. if !protocol.IsValidTopicName(topicName) {
  118. return nil, http_api.Err{400, "INVALID_ARG_TOPIC"}
  119. }
  120. s.nsqlookupd.logf(LOG_INFO, "DB: adding topic(%s)", topicName)
  121. key := Registration{"topic", topicName, ""}
  122. s.nsqlookupd.DB.AddRegistration(key)
  123. return nil, nil
  124. }
  125. func (s *httpServer) doDeleteTopic(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
  126. reqParams, err := http_api.NewReqParams(req)
  127. if err != nil {
  128. return nil, http_api.Err{400, "INVALID_REQUEST"}
  129. }
  130. topicName, err := reqParams.Get("topic")
  131. if err != nil {
  132. return nil, http_api.Err{400, "MISSING_ARG_TOPIC"}
  133. }
  134. registrations := s.nsqlookupd.DB.FindRegistrations("channel", topicName, "*")
  135. for _, registration := range registrations {
  136. s.nsqlookupd.logf(LOG_INFO, "DB: removing channel(%s) from topic(%s)", registration.SubKey, topicName)
  137. s.nsqlookupd.DB.RemoveRegistration(registration)
  138. }
  139. registrations = s.nsqlookupd.DB.FindRegistrations("topic", topicName, "")
  140. for _, registration := range registrations {
  141. s.nsqlookupd.logf(LOG_INFO, "DB: removing topic(%s)", topicName)
  142. s.nsqlookupd.DB.RemoveRegistration(registration)
  143. }
  144. return nil, nil
  145. }
  146. func (s *httpServer) doTombstoneTopicProducer(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
  147. reqParams, err := http_api.NewReqParams(req)
  148. if err != nil {
  149. return nil, http_api.Err{400, "INVALID_REQUEST"}
  150. }
  151. topicName, err := reqParams.Get("topic")
  152. if err != nil {
  153. return nil, http_api.Err{400, "MISSING_ARG_TOPIC"}
  154. }
  155. node, err := reqParams.Get("node")
  156. if err != nil {
  157. return nil, http_api.Err{400, "MISSING_ARG_NODE"}
  158. }
  159. s.nsqlookupd.logf(LOG_INFO, "DB: setting tombstone for producer@%s of topic(%s)", node, topicName)
  160. producers := s.nsqlookupd.DB.FindProducers("topic", topicName, "")
  161. for _, p := range producers {
  162. thisNode := fmt.Sprintf("%s:%d", p.peerInfo.BroadcastAddress, p.peerInfo.HTTPPort)
  163. if thisNode == node {
  164. p.Tombstone()
  165. }
  166. }
  167. return nil, nil
  168. }
  169. func (s *httpServer) doCreateChannel(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
  170. reqParams, err := http_api.NewReqParams(req)
  171. if err != nil {
  172. return nil, http_api.Err{400, "INVALID_REQUEST"}
  173. }
  174. topicName, channelName, err := http_api.GetTopicChannelArgs(reqParams)
  175. if err != nil {
  176. return nil, http_api.Err{400, err.Error()}
  177. }
  178. s.nsqlookupd.logf(LOG_INFO, "DB: adding channel(%s) in topic(%s)", channelName, topicName)
  179. key := Registration{"channel", topicName, channelName}
  180. s.nsqlookupd.DB.AddRegistration(key)
  181. s.nsqlookupd.logf(LOG_INFO, "DB: adding topic(%s)", topicName)
  182. key = Registration{"topic", topicName, ""}
  183. s.nsqlookupd.DB.AddRegistration(key)
  184. return nil, nil
  185. }
  186. func (s *httpServer) doDeleteChannel(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
  187. reqParams, err := http_api.NewReqParams(req)
  188. if err != nil {
  189. return nil, http_api.Err{400, "INVALID_REQUEST"}
  190. }
  191. topicName, channelName, err := http_api.GetTopicChannelArgs(reqParams)
  192. if err != nil {
  193. return nil, http_api.Err{400, err.Error()}
  194. }
  195. registrations := s.nsqlookupd.DB.FindRegistrations("channel", topicName, channelName)
  196. if len(registrations) == 0 {
  197. return nil, http_api.Err{404, "CHANNEL_NOT_FOUND"}
  198. }
  199. s.nsqlookupd.logf(LOG_INFO, "DB: removing channel(%s) from topic(%s)", channelName, topicName)
  200. for _, registration := range registrations {
  201. s.nsqlookupd.DB.RemoveRegistration(registration)
  202. }
  203. return nil, nil
  204. }
  205. type node struct {
  206. RemoteAddress string `json:"remote_address"`
  207. Hostname string `json:"hostname"`
  208. BroadcastAddress string `json:"broadcast_address"`
  209. TCPPort int `json:"tcp_port"`
  210. HTTPPort int `json:"http_port"`
  211. Version string `json:"version"`
  212. Tombstones []bool `json:"tombstones"`
  213. Topics []string `json:"topics"`
  214. }
  215. func (s *httpServer) doNodes(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
  216. // dont filter out tombstoned nodes
  217. producers := s.nsqlookupd.DB.FindProducers("client", "", "").FilterByActive(
  218. s.nsqlookupd.opts.InactiveProducerTimeout, 0)
  219. nodes := make([]*node, len(producers))
  220. topicProducersMap := make(map[string]Producers)
  221. for i, p := range producers {
  222. topics := s.nsqlookupd.DB.LookupRegistrations(p.peerInfo.id).Filter("topic", "*", "").Keys()
  223. // for each topic find the producer that matches this peer
  224. // to add tombstone information
  225. tombstones := make([]bool, len(topics))
  226. for j, t := range topics {
  227. if _, exists := topicProducersMap[t]; !exists {
  228. topicProducersMap[t] = s.nsqlookupd.DB.FindProducers("topic", t, "")
  229. }
  230. topicProducers := topicProducersMap[t]
  231. for _, tp := range topicProducers {
  232. if tp.peerInfo == p.peerInfo {
  233. tombstones[j] = tp.IsTombstoned(s.nsqlookupd.opts.TombstoneLifetime)
  234. break
  235. }
  236. }
  237. }
  238. nodes[i] = &node{
  239. RemoteAddress: p.peerInfo.RemoteAddress,
  240. Hostname: p.peerInfo.Hostname,
  241. BroadcastAddress: p.peerInfo.BroadcastAddress,
  242. TCPPort: p.peerInfo.TCPPort,
  243. HTTPPort: p.peerInfo.HTTPPort,
  244. Version: p.peerInfo.Version,
  245. Tombstones: tombstones,
  246. Topics: topics,
  247. }
  248. }
  249. return map[string]interface{}{
  250. "producers": nodes,
  251. }, nil
  252. }
  253. func (s *httpServer) doDebug(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
  254. s.nsqlookupd.DB.RLock()
  255. defer s.nsqlookupd.DB.RUnlock()
  256. data := make(map[string][]map[string]interface{})
  257. for r, producers := range s.nsqlookupd.DB.registrationMap {
  258. key := r.Category + ":" + r.Key + ":" + r.SubKey
  259. for _, p := range producers {
  260. m := map[string]interface{}{
  261. "id": p.peerInfo.id,
  262. "hostname": p.peerInfo.Hostname,
  263. "broadcast_address": p.peerInfo.BroadcastAddress,
  264. "tcp_port": p.peerInfo.TCPPort,
  265. "http_port": p.peerInfo.HTTPPort,
  266. "version": p.peerInfo.Version,
  267. "last_update": atomic.LoadInt64(&p.peerInfo.lastUpdate),
  268. "tombstoned": p.tombstoned,
  269. "tombstoned_at": p.tombstonedAt.UnixNano(),
  270. }
  271. data[key] = append(data[key], m)
  272. }
  273. }
  274. return data, nil
  275. }