http.go 26 KB


  1. package nsqadmin
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "html/template"
  6. "io"
  7. "io/ioutil"
  8. "mime"
  9. "net"
  10. "net/http"
  11. "net/http/httputil"
  12. "net/url"
  13. "path"
  14. "reflect"
  15. "strings"
  16. "time"
  17. "github.com/julienschmidt/httprouter"
  18. "github.com/nsqio/nsq/internal/clusterinfo"
  19. "github.com/nsqio/nsq/internal/http_api"
  20. "github.com/nsqio/nsq/internal/lg"
  21. "github.com/nsqio/nsq/internal/protocol"
  22. "github.com/nsqio/nsq/internal/version"
  23. )
  24. func maybeWarnMsg(msgs []string) string {
  25. if len(msgs) > 0 {
  26. return "WARNING: " + strings.Join(msgs, "; ")
  27. }
  28. return ""
  29. }
  30. // this is similar to httputil.NewSingleHostReverseProxy except it passes along basic auth
  31. func NewSingleHostReverseProxy(target *url.URL, connectTimeout time.Duration, requestTimeout time.Duration) *httputil.ReverseProxy {
  32. director := func(req *http.Request) {
  33. req.URL.Scheme = target.Scheme
  34. req.URL.Host = target.Host
  35. if target.User != nil {
  36. passwd, _ := target.User.Password()
  37. req.SetBasicAuth(target.User.Username(), passwd)
  38. }
  39. }
  40. return &httputil.ReverseProxy{
  41. Director: director,
  42. Transport: http_api.NewDeadlineTransport(connectTimeout, requestTimeout),
  43. }
  44. }
  45. type httpServer struct {
  46. nsqadmin *NSQAdmin
  47. router http.Handler
  48. client *http_api.Client
  49. ci *clusterinfo.ClusterInfo
  50. basePath string
  51. devStaticDir string
  52. }
  53. func NewHTTPServer(nsqadmin *NSQAdmin) *httpServer {
  54. log := http_api.Log(nsqadmin.logf)
  55. client := http_api.NewClient(nsqadmin.httpClientTLSConfig, nsqadmin.getOpts().HTTPClientConnectTimeout,
  56. nsqadmin.getOpts().HTTPClientRequestTimeout)
  57. router := httprouter.New()
  58. router.HandleMethodNotAllowed = true
  59. router.PanicHandler = http_api.LogPanicHandler(nsqadmin.logf)
  60. router.NotFound = http_api.LogNotFoundHandler(nsqadmin.logf)
  61. router.MethodNotAllowed = http_api.LogMethodNotAllowedHandler(nsqadmin.logf)
  62. s := &httpServer{
  63. nsqadmin: nsqadmin,
  64. router: router,
  65. client: client,
  66. ci: clusterinfo.New(nsqadmin.logf, client),
  67. basePath: nsqadmin.getOpts().BasePath,
  68. devStaticDir: nsqadmin.getOpts().DevStaticDir,
  69. }
  70. bp := func(p string) string {
  71. return path.Join(s.basePath, p)
  72. }
  73. router.Handle("GET", bp("/"), http_api.Decorate(s.indexHandler, log))
  74. router.Handle("GET", bp("/ping"), http_api.Decorate(s.pingHandler, log, http_api.PlainText))
  75. router.Handle("GET", bp("/topics"), http_api.Decorate(s.indexHandler, log))
  76. router.Handle("GET", bp("/topics/:topic"), http_api.Decorate(s.indexHandler, log))
  77. router.Handle("GET", bp("/topics/:topic/:channel"), http_api.Decorate(s.indexHandler, log))
  78. router.Handle("GET", bp("/nodes"), http_api.Decorate(s.indexHandler, log))
  79. router.Handle("GET", bp("/nodes/:node"), http_api.Decorate(s.indexHandler, log))
  80. router.Handle("GET", bp("/counter"), http_api.Decorate(s.indexHandler, log))
  81. router.Handle("GET", bp("/lookup"), http_api.Decorate(s.indexHandler, log))
  82. router.Handle("GET", bp("/static/:asset"), http_api.Decorate(s.staticAssetHandler, log, http_api.PlainText))
  83. router.Handle("GET", bp("/fonts/:asset"), http_api.Decorate(s.staticAssetHandler, log, http_api.PlainText))
  84. if s.nsqadmin.getOpts().ProxyGraphite {
  85. proxy := NewSingleHostReverseProxy(nsqadmin.graphiteURL, nsqadmin.getOpts().HTTPClientConnectTimeout,
  86. nsqadmin.getOpts().HTTPClientRequestTimeout)
  87. router.Handler("GET", bp("/render"), proxy)
  88. }
  89. // v1 endpoints
  90. router.Handle("GET", bp("/api/topics"), http_api.Decorate(s.topicsHandler, log, http_api.V1))
  91. router.Handle("GET", bp("/api/topics/:topic"), http_api.Decorate(s.topicHandler, log, http_api.V1))
  92. router.Handle("GET", bp("/api/topics/:topic/:channel"), http_api.Decorate(s.channelHandler, log, http_api.V1))
  93. router.Handle("GET", bp("/api/nodes"), http_api.Decorate(s.nodesHandler, log, http_api.V1))
  94. router.Handle("GET", bp("/api/nodes/:node"), http_api.Decorate(s.nodeHandler, log, http_api.V1))
  95. router.Handle("POST", bp("/api/topics"), http_api.Decorate(s.createTopicChannelHandler, log, http_api.V1))
  96. router.Handle("POST", bp("/api/topics/:topic"), http_api.Decorate(s.topicActionHandler, log, http_api.V1))
  97. router.Handle("POST", bp("/api/topics/:topic/:channel"), http_api.Decorate(s.channelActionHandler, log, http_api.V1))
  98. router.Handle("DELETE", bp("/api/nodes/:node"), http_api.Decorate(s.tombstoneNodeForTopicHandler, log, http_api.V1))
  99. router.Handle("DELETE", bp("/api/topics/:topic"), http_api.Decorate(s.deleteTopicHandler, log, http_api.V1))
  100. router.Handle("DELETE", bp("/api/topics/:topic/:channel"), http_api.Decorate(s.deleteChannelHandler, log, http_api.V1))
  101. router.Handle("GET", bp("/api/counter"), http_api.Decorate(s.counterHandler, log, http_api.V1))
  102. router.Handle("GET", bp("/api/graphite"), http_api.Decorate(s.graphiteHandler, log, http_api.V1))
  103. router.Handle("GET", bp("/config/:opt"), http_api.Decorate(s.doConfig, log, http_api.V1))
  104. router.Handle("PUT", bp("/config/:opt"), http_api.Decorate(s.doConfig, log, http_api.V1))
  105. return s
  106. }
  107. func (s *httpServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
  108. s.router.ServeHTTP(w, req)
  109. }
  110. func (s *httpServer) pingHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
  111. return "OK", nil
  112. }
  113. func (s *httpServer) indexHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
  114. asset, _ := Asset("index.html")
  115. t, _ := template.New("index").Funcs(template.FuncMap{
  116. "basePath": func(p string) string {
  117. return path.Join(s.basePath, p)
  118. },
  119. }).Parse(string(asset))
  120. w.Header().Set("Content-Type", "text/html")
  121. t.Execute(w, struct {
  122. Version string
  123. ProxyGraphite bool
  124. GraphEnabled bool
  125. GraphiteURL string
  126. StatsdInterval int
  127. StatsdCounterFormat string
  128. StatsdGaugeFormat string
  129. StatsdPrefix string
  130. NSQLookupd []string
  131. IsAdmin bool
  132. }{
  133. Version: version.Binary,
  134. ProxyGraphite: s.nsqadmin.getOpts().ProxyGraphite,
  135. GraphEnabled: s.nsqadmin.getOpts().GraphiteURL != "",
  136. GraphiteURL: s.nsqadmin.getOpts().GraphiteURL,
  137. StatsdInterval: int(s.nsqadmin.getOpts().StatsdInterval / time.Second),
  138. StatsdCounterFormat: s.nsqadmin.getOpts().StatsdCounterFormat,
  139. StatsdGaugeFormat: s.nsqadmin.getOpts().StatsdGaugeFormat,
  140. StatsdPrefix: s.nsqadmin.getOpts().StatsdPrefix,
  141. NSQLookupd: s.nsqadmin.getOpts().NSQLookupdHTTPAddresses,
  142. IsAdmin: s.isAuthorizedAdminRequest(req),
  143. })
  144. return nil, nil
  145. }
  146. func (s *httpServer) staticAssetHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
  147. assetName := ps.ByName("asset")
  148. var (
  149. asset []byte
  150. err error
  151. )
  152. if s.devStaticDir != "" {
  153. s.nsqadmin.logf(LOG_DEBUG, "using dev dir %q for static asset %q", s.devStaticDir, assetName)
  154. fsPath := path.Join(s.devStaticDir, assetName)
  155. asset, err = ioutil.ReadFile(fsPath)
  156. } else {
  157. asset, err = staticAsset(assetName)
  158. }
  159. if err != nil {
  160. return nil, http_api.Err{404, "NOT_FOUND"}
  161. }
  162. ext := path.Ext(assetName)
  163. ct := mime.TypeByExtension(ext)
  164. if ct == "" {
  165. switch ext {
  166. case ".map":
  167. ct = "application/json"
  168. case ".svg":
  169. ct = "image/svg+xml"
  170. case ".woff":
  171. ct = "application/font-woff"
  172. case ".ttf":
  173. ct = "application/font-sfnt"
  174. case ".eot":
  175. ct = "application/vnd.ms-fontobject"
  176. case ".woff2":
  177. ct = "application/font-woff2"
  178. }
  179. }
  180. if ct != "" {
  181. w.Header().Set("Content-Type", ct)
  182. }
  183. return string(asset), nil
  184. }
  185. func (s *httpServer) topicsHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
  186. var messages []string
  187. reqParams, err := http_api.NewReqParams(req)
  188. if err != nil {
  189. return nil, http_api.Err{400, err.Error()}
  190. }
  191. var topics []string
  192. if len(s.nsqadmin.getOpts().NSQLookupdHTTPAddresses) != 0 {
  193. topics, err = s.ci.GetLookupdTopics(s.nsqadmin.getOpts().NSQLookupdHTTPAddresses)
  194. } else {
  195. topics, err = s.ci.GetNSQDTopics(s.nsqadmin.getOpts().NSQDHTTPAddresses)
  196. }
  197. if err != nil {
  198. pe, ok := err.(clusterinfo.PartialErr)
  199. if !ok {
  200. s.nsqadmin.logf(LOG_ERROR, "failed to get topics - %s", err)
  201. return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR: %s", err)}
  202. }
  203. s.nsqadmin.logf(LOG_WARN, "%s", err)
  204. messages = append(messages, pe.Error())
  205. }
  206. inactive, _ := reqParams.Get("inactive")
  207. if inactive == "true" {
  208. topicChannelMap := make(map[string][]string)
  209. if len(s.nsqadmin.getOpts().NSQLookupdHTTPAddresses) == 0 {
  210. goto respond
  211. }
  212. for _, topicName := range topics {
  213. producers, _ := s.ci.GetLookupdTopicProducers(
  214. topicName, s.nsqadmin.getOpts().NSQLookupdHTTPAddresses)
  215. if len(producers) == 0 {
  216. topicChannels, _ := s.ci.GetLookupdTopicChannels(
  217. topicName, s.nsqadmin.getOpts().NSQLookupdHTTPAddresses)
  218. topicChannelMap[topicName] = topicChannels
  219. }
  220. }
  221. respond:
  222. return struct {
  223. Topics map[string][]string `json:"topics"`
  224. Message string `json:"message"`
  225. }{topicChannelMap, maybeWarnMsg(messages)}, nil
  226. }
  227. return struct {
  228. Topics []string `json:"topics"`
  229. Message string `json:"message"`
  230. }{topics, maybeWarnMsg(messages)}, nil
  231. }
  232. func (s *httpServer) topicHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
  233. var messages []string
  234. topicName := ps.ByName("topic")
  235. producers, err := s.ci.GetTopicProducers(topicName,
  236. s.nsqadmin.getOpts().NSQLookupdHTTPAddresses,
  237. s.nsqadmin.getOpts().NSQDHTTPAddresses)
  238. if err != nil {
  239. pe, ok := err.(clusterinfo.PartialErr)
  240. if !ok {
  241. s.nsqadmin.logf(LOG_ERROR, "failed to get topic producers - %s", err)
  242. return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR: %s", err)}
  243. }
  244. s.nsqadmin.logf(LOG_WARN, "%s", err)
  245. messages = append(messages, pe.Error())
  246. }
  247. topicStats, _, err := s.ci.GetNSQDStats(producers, topicName, "", false)
  248. if err != nil {
  249. pe, ok := err.(clusterinfo.PartialErr)
  250. if !ok {
  251. s.nsqadmin.logf(LOG_ERROR, "failed to get topic metadata - %s", err)
  252. return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR: %s", err)}
  253. }
  254. s.nsqadmin.logf(LOG_WARN, "%s", err)
  255. messages = append(messages, pe.Error())
  256. }
  257. allNodesTopicStats := &clusterinfo.TopicStats{TopicName: topicName}
  258. for _, t := range topicStats {
  259. allNodesTopicStats.Add(t)
  260. }
  261. return struct {
  262. *clusterinfo.TopicStats
  263. Message string `json:"message"`
  264. }{allNodesTopicStats, maybeWarnMsg(messages)}, nil
  265. }
  266. func (s *httpServer) channelHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
  267. var messages []string
  268. topicName := ps.ByName("topic")
  269. channelName := ps.ByName("channel")
  270. producers, err := s.ci.GetTopicProducers(topicName,
  271. s.nsqadmin.getOpts().NSQLookupdHTTPAddresses,
  272. s.nsqadmin.getOpts().NSQDHTTPAddresses)
  273. if err != nil {
  274. pe, ok := err.(clusterinfo.PartialErr)
  275. if !ok {
  276. s.nsqadmin.logf(LOG_ERROR, "failed to get topic producers - %s", err)
  277. return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR: %s", err)}
  278. }
  279. s.nsqadmin.logf(LOG_WARN, "%s", err)
  280. messages = append(messages, pe.Error())
  281. }
  282. _, channelStats, err := s.ci.GetNSQDStats(producers, topicName, channelName, true)
  283. if err != nil {
  284. pe, ok := err.(clusterinfo.PartialErr)
  285. if !ok {
  286. s.nsqadmin.logf(LOG_ERROR, "failed to get channel metadata - %s", err)
  287. return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR: %s", err)}
  288. }
  289. s.nsqadmin.logf(LOG_WARN, "%s", err)
  290. messages = append(messages, pe.Error())
  291. }
  292. return struct {
  293. *clusterinfo.ChannelStats
  294. Message string `json:"message"`
  295. }{channelStats[channelName], maybeWarnMsg(messages)}, nil
  296. }
  297. func (s *httpServer) nodesHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
  298. var messages []string
  299. producers, err := s.ci.GetProducers(s.nsqadmin.getOpts().NSQLookupdHTTPAddresses, s.nsqadmin.getOpts().NSQDHTTPAddresses)
  300. if err != nil {
  301. pe, ok := err.(clusterinfo.PartialErr)
  302. if !ok {
  303. s.nsqadmin.logf(LOG_ERROR, "failed to get nodes - %s", err)
  304. return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR: %s", err)}
  305. }
  306. s.nsqadmin.logf(LOG_WARN, "%s", err)
  307. messages = append(messages, pe.Error())
  308. }
  309. return struct {
  310. Nodes clusterinfo.Producers `json:"nodes"`
  311. Message string `json:"message"`
  312. }{producers, maybeWarnMsg(messages)}, nil
  313. }
  314. func (s *httpServer) nodeHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
  315. var messages []string
  316. node := ps.ByName("node")
  317. producers, err := s.ci.GetProducers(s.nsqadmin.getOpts().NSQLookupdHTTPAddresses, s.nsqadmin.getOpts().NSQDHTTPAddresses)
  318. if err != nil {
  319. pe, ok := err.(clusterinfo.PartialErr)
  320. if !ok {
  321. s.nsqadmin.logf(LOG_ERROR, "failed to get producers - %s", err)
  322. return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR: %s", err)}
  323. }
  324. s.nsqadmin.logf(LOG_WARN, "%s", err)
  325. messages = append(messages, pe.Error())
  326. }
  327. producer := producers.Search(node)
  328. if producer == nil {
  329. return nil, http_api.Err{404, "NODE_NOT_FOUND"}
  330. }
  331. topicStats, _, err := s.ci.GetNSQDStats(clusterinfo.Producers{producer}, "", "", true)
  332. if err != nil {
  333. s.nsqadmin.logf(LOG_ERROR, "failed to get nsqd stats - %s", err)
  334. return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR: %s", err)}
  335. }
  336. var totalClients int64
  337. var totalMessages int64
  338. for _, ts := range topicStats {
  339. for _, cs := range ts.Channels {
  340. totalClients += int64(len(cs.Clients))
  341. }
  342. totalMessages += ts.MessageCount
  343. }
  344. return struct {
  345. Node string `json:"node"`
  346. TopicStats []*clusterinfo.TopicStats `json:"topics"`
  347. TotalMessages int64 `json:"total_messages"`
  348. TotalClients int64 `json:"total_clients"`
  349. Message string `json:"message"`
  350. }{
  351. Node: node,
  352. TopicStats: topicStats,
  353. TotalMessages: totalMessages,
  354. TotalClients: totalClients,
  355. Message: maybeWarnMsg(messages),
  356. }, nil
  357. }
  358. func (s *httpServer) tombstoneNodeForTopicHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
  359. var messages []string
  360. node := ps.ByName("node")
  361. var body struct {
  362. Topic string `json:"topic"`
  363. }
  364. err := json.NewDecoder(req.Body).Decode(&body)
  365. if err != nil {
  366. return nil, http_api.Err{400, "INVALID_BODY"}
  367. }
  368. if !protocol.IsValidTopicName(body.Topic) {
  369. return nil, http_api.Err{400, "INVALID_TOPIC"}
  370. }
  371. err = s.ci.TombstoneNodeForTopic(body.Topic, node,
  372. s.nsqadmin.getOpts().NSQLookupdHTTPAddresses)
  373. if err != nil {
  374. pe, ok := err.(clusterinfo.PartialErr)
  375. if !ok {
  376. s.nsqadmin.logf(LOG_ERROR, "failed to tombstone node for topic - %s", err)
  377. return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR: %s", err)}
  378. }
  379. s.nsqadmin.logf(LOG_WARN, "%s", err)
  380. messages = append(messages, pe.Error())
  381. }
  382. s.notifyAdminAction("tombstone_topic_producer", body.Topic, "", node, req)
  383. return struct {
  384. Message string `json:"message"`
  385. }{maybeWarnMsg(messages)}, nil
  386. }
  387. func (s *httpServer) createTopicChannelHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
  388. var messages []string
  389. var body struct {
  390. Topic string `json:"topic"`
  391. Channel string `json:"channel"`
  392. }
  393. if !s.isAuthorizedAdminRequest(req) {
  394. return nil, http_api.Err{403, "FORBIDDEN"}
  395. }
  396. err := json.NewDecoder(req.Body).Decode(&body)
  397. if err != nil {
  398. return nil, http_api.Err{400, err.Error()}
  399. }
  400. if !protocol.IsValidTopicName(body.Topic) {
  401. s.nsqadmin.logf(LOG_ERROR, "invalid topic - %s", body.Topic)
  402. return nil, http_api.Err{400, "INVALID_TOPIC"}
  403. }
  404. if len(body.Channel) > 0 && !protocol.IsValidChannelName(body.Channel) {
  405. s.nsqadmin.logf(LOG_ERROR, "invalid channel - %s", body.Channel)
  406. return nil, http_api.Err{400, "INVALID_CHANNEL"}
  407. }
  408. err = s.ci.CreateTopicChannel(body.Topic, body.Channel,
  409. s.nsqadmin.getOpts().NSQLookupdHTTPAddresses)
  410. if err != nil {
  411. pe, ok := err.(clusterinfo.PartialErr)
  412. if !ok {
  413. s.nsqadmin.logf(LOG_ERROR, "failed to create topic/channel - %s", err)
  414. return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR: %s", err)}
  415. }
  416. s.nsqadmin.logf(LOG_WARN, "%s", err)
  417. messages = append(messages, pe.Error())
  418. }
  419. s.notifyAdminAction("create_topic", body.Topic, "", "", req)
  420. if len(body.Channel) > 0 {
  421. s.notifyAdminAction("create_channel", body.Topic, body.Channel, "", req)
  422. }
  423. return struct {
  424. Message string `json:"message"`
  425. }{maybeWarnMsg(messages)}, nil
  426. }
  427. func (s *httpServer) deleteTopicHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
  428. var messages []string
  429. if !s.isAuthorizedAdminRequest(req) {
  430. return nil, http_api.Err{403, "FORBIDDEN"}
  431. }
  432. topicName := ps.ByName("topic")
  433. err := s.ci.DeleteTopic(topicName,
  434. s.nsqadmin.getOpts().NSQLookupdHTTPAddresses,
  435. s.nsqadmin.getOpts().NSQDHTTPAddresses)
  436. if err != nil {
  437. pe, ok := err.(clusterinfo.PartialErr)
  438. if !ok {
  439. s.nsqadmin.logf(LOG_ERROR, "failed to delete topic - %s", err)
  440. return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR: %s", err)}
  441. }
  442. s.nsqadmin.logf(LOG_WARN, "%s", err)
  443. messages = append(messages, pe.Error())
  444. }
  445. s.notifyAdminAction("delete_topic", topicName, "", "", req)
  446. return struct {
  447. Message string `json:"message"`
  448. }{maybeWarnMsg(messages)}, nil
  449. }
  450. func (s *httpServer) deleteChannelHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
  451. var messages []string
  452. if !s.isAuthorizedAdminRequest(req) {
  453. return nil, http_api.Err{403, "FORBIDDEN"}
  454. }
  455. topicName := ps.ByName("topic")
  456. channelName := ps.ByName("channel")
  457. err := s.ci.DeleteChannel(topicName, channelName,
  458. s.nsqadmin.getOpts().NSQLookupdHTTPAddresses,
  459. s.nsqadmin.getOpts().NSQDHTTPAddresses)
  460. if err != nil {
  461. pe, ok := err.(clusterinfo.PartialErr)
  462. if !ok {
  463. s.nsqadmin.logf(LOG_ERROR, "failed to delete channel - %s", err)
  464. return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR: %s", err)}
  465. }
  466. s.nsqadmin.logf(LOG_WARN, "%s", err)
  467. messages = append(messages, pe.Error())
  468. }
  469. s.notifyAdminAction("delete_channel", topicName, channelName, "", req)
  470. return struct {
  471. Message string `json:"message"`
  472. }{maybeWarnMsg(messages)}, nil
  473. }
  474. func (s *httpServer) topicActionHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
  475. topicName := ps.ByName("topic")
  476. return s.topicChannelAction(req, topicName, "")
  477. }
  478. func (s *httpServer) channelActionHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
  479. topicName := ps.ByName("topic")
  480. channelName := ps.ByName("channel")
  481. return s.topicChannelAction(req, topicName, channelName)
  482. }
  483. func (s *httpServer) topicChannelAction(req *http.Request, topicName string, channelName string) (interface{}, error) {
  484. var messages []string
  485. var body struct {
  486. Action string `json:"action"`
  487. }
  488. if !s.isAuthorizedAdminRequest(req) {
  489. return nil, http_api.Err{403, "FORBIDDEN"}
  490. }
  491. err := json.NewDecoder(req.Body).Decode(&body)
  492. if err != nil {
  493. return nil, http_api.Err{400, err.Error()}
  494. }
  495. switch body.Action {
  496. case "pause":
  497. if channelName != "" {
  498. err = s.ci.PauseChannel(topicName, channelName,
  499. s.nsqadmin.getOpts().NSQLookupdHTTPAddresses,
  500. s.nsqadmin.getOpts().NSQDHTTPAddresses)
  501. s.notifyAdminAction("pause_channel", topicName, channelName, "", req)
  502. } else {
  503. err = s.ci.PauseTopic(topicName,
  504. s.nsqadmin.getOpts().NSQLookupdHTTPAddresses,
  505. s.nsqadmin.getOpts().NSQDHTTPAddresses)
  506. s.notifyAdminAction("pause_topic", topicName, "", "", req)
  507. }
  508. case "unpause":
  509. if channelName != "" {
  510. err = s.ci.UnPauseChannel(topicName, channelName,
  511. s.nsqadmin.getOpts().NSQLookupdHTTPAddresses,
  512. s.nsqadmin.getOpts().NSQDHTTPAddresses)
  513. s.notifyAdminAction("unpause_channel", topicName, channelName, "", req)
  514. } else {
  515. err = s.ci.UnPauseTopic(topicName,
  516. s.nsqadmin.getOpts().NSQLookupdHTTPAddresses,
  517. s.nsqadmin.getOpts().NSQDHTTPAddresses)
  518. s.notifyAdminAction("unpause_topic", topicName, "", "", req)
  519. }
  520. case "empty":
  521. if channelName != "" {
  522. err = s.ci.EmptyChannel(topicName, channelName,
  523. s.nsqadmin.getOpts().NSQLookupdHTTPAddresses,
  524. s.nsqadmin.getOpts().NSQDHTTPAddresses)
  525. s.notifyAdminAction("empty_channel", topicName, channelName, "", req)
  526. } else {
  527. err = s.ci.EmptyTopic(topicName,
  528. s.nsqadmin.getOpts().NSQLookupdHTTPAddresses,
  529. s.nsqadmin.getOpts().NSQDHTTPAddresses)
  530. s.notifyAdminAction("empty_topic", topicName, "", "", req)
  531. }
  532. default:
  533. return nil, http_api.Err{400, "INVALID_ACTION"}
  534. }
  535. if err != nil {
  536. pe, ok := err.(clusterinfo.PartialErr)
  537. if !ok {
  538. s.nsqadmin.logf(LOG_ERROR, "failed to %s topic/channel - %s", body.Action, err)
  539. return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR: %s", err)}
  540. }
  541. s.nsqadmin.logf(LOG_WARN, "%s", err)
  542. messages = append(messages, pe.Error())
  543. }
  544. return struct {
  545. Message string `json:"message"`
  546. }{maybeWarnMsg(messages)}, nil
  547. }
  548. type counterStats struct {
  549. Node string `json:"node"`
  550. TopicName string `json:"topic_name"`
  551. ChannelName string `json:"channel_name"`
  552. MessageCount int64 `json:"message_count"`
  553. }
  554. func (s *httpServer) counterHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
  555. var messages []string
  556. stats := make(map[string]*counterStats)
  557. producers, err := s.ci.GetProducers(s.nsqadmin.getOpts().NSQLookupdHTTPAddresses, s.nsqadmin.getOpts().NSQDHTTPAddresses)
  558. if err != nil {
  559. pe, ok := err.(clusterinfo.PartialErr)
  560. if !ok {
  561. s.nsqadmin.logf(LOG_ERROR, "failed to get counter producer list - %s", err)
  562. return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR: %s", err)}
  563. }
  564. s.nsqadmin.logf(LOG_WARN, "%s", err)
  565. messages = append(messages, pe.Error())
  566. }
  567. _, channelStats, err := s.ci.GetNSQDStats(producers, "", "", false)
  568. if err != nil {
  569. pe, ok := err.(clusterinfo.PartialErr)
  570. if !ok {
  571. s.nsqadmin.logf(LOG_ERROR, "failed to get nsqd stats - %s", err)
  572. return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR: %s", err)}
  573. }
  574. s.nsqadmin.logf(LOG_WARN, "%s", err)
  575. messages = append(messages, pe.Error())
  576. }
  577. for _, channelStats := range channelStats {
  578. for _, hostChannelStats := range channelStats.NodeStats {
  579. key := fmt.Sprintf("%s:%s:%s", channelStats.TopicName, channelStats.ChannelName, hostChannelStats.Node)
  580. s, ok := stats[key]
  581. if !ok {
  582. s = &counterStats{
  583. Node: hostChannelStats.Node,
  584. TopicName: channelStats.TopicName,
  585. ChannelName: channelStats.ChannelName,
  586. }
  587. stats[key] = s
  588. }
  589. s.MessageCount += hostChannelStats.MessageCount
  590. }
  591. }
  592. return struct {
  593. Stats map[string]*counterStats `json:"stats"`
  594. Message string `json:"message"`
  595. }{stats, maybeWarnMsg(messages)}, nil
  596. }
  597. func (s *httpServer) graphiteHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
  598. reqParams, err := http_api.NewReqParams(req)
  599. if err != nil {
  600. return nil, http_api.Err{400, "INVALID_REQUEST"}
  601. }
  602. metric, err := reqParams.Get("metric")
  603. if err != nil || metric != "rate" {
  604. return nil, http_api.Err{400, "INVALID_ARG_METRIC"}
  605. }
  606. target, err := reqParams.Get("target")
  607. if err != nil {
  608. return nil, http_api.Err{400, "INVALID_ARG_TARGET"}
  609. }
  610. params := url.Values{}
  611. params.Set("from", fmt.Sprintf("-%dsec", s.nsqadmin.getOpts().StatsdInterval*2/time.Second))
  612. params.Set("until", fmt.Sprintf("-%dsec", s.nsqadmin.getOpts().StatsdInterval/time.Second))
  613. params.Set("format", "json")
  614. params.Set("target", target)
  615. query := fmt.Sprintf("/render?%s", params.Encode())
  616. url := s.nsqadmin.getOpts().GraphiteURL + query
  617. s.nsqadmin.logf(LOG_INFO, "GRAPHITE: %s", url)
  618. var response []struct {
  619. Target string `json:"target"`
  620. DataPoints [][]*float64 `json:"datapoints"`
  621. }
  622. err = s.client.GETV1(url, &response)
  623. if err != nil {
  624. s.nsqadmin.logf(LOG_ERROR, "graphite request failed - %s", err)
  625. return nil, http_api.Err{500, "INTERNAL_ERROR"}
  626. }
  627. var rateStr string
  628. rate := *response[0].DataPoints[0][0]
  629. if rate < 0 {
  630. rateStr = "N/A"
  631. } else {
  632. rateDivisor := s.nsqadmin.getOpts().StatsdInterval / time.Second
  633. rateStr = fmt.Sprintf("%.2f", rate/float64(rateDivisor))
  634. }
  635. return struct {
  636. Rate string `json:"rate"`
  637. }{rateStr}, nil
  638. }
  639. func (s *httpServer) doConfig(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
  640. opt := ps.ByName("opt")
  641. allowConfigFromCIDR := s.nsqadmin.getOpts().AllowConfigFromCIDR
  642. if allowConfigFromCIDR != "" {
  643. _, ipnet, _ := net.ParseCIDR(allowConfigFromCIDR)
  644. addr, _, err := net.SplitHostPort(req.RemoteAddr)
  645. if err != nil {
  646. s.nsqadmin.logf(LOG_ERROR, "failed to parse RemoteAddr %s", req.RemoteAddr)
  647. return nil, http_api.Err{400, "INVALID_REMOTE_ADDR"}
  648. }
  649. ip := net.ParseIP(addr)
  650. if ip == nil {
  651. s.nsqadmin.logf(LOG_ERROR, "failed to parse RemoteAddr %s", req.RemoteAddr)
  652. return nil, http_api.Err{400, "INVALID_REMOTE_ADDR"}
  653. }
  654. if !ipnet.Contains(ip) {
  655. return nil, http_api.Err{403, "FORBIDDEN"}
  656. }
  657. }
  658. if req.Method == "PUT" {
  659. // add 1 so that it's greater than our max when we test for it
  660. // (LimitReader returns a "fake" EOF)
  661. readMax := int64(1024*1024 + 1)
  662. body, err := ioutil.ReadAll(io.LimitReader(req.Body, readMax))
  663. if err != nil {
  664. return nil, http_api.Err{500, "INTERNAL_ERROR"}
  665. }
  666. if int64(len(body)) == readMax || len(body) == 0 {
  667. return nil, http_api.Err{413, "INVALID_VALUE"}
  668. }
  669. opts := *s.nsqadmin.getOpts()
  670. switch opt {
  671. case "nsqlookupd_http_addresses":
  672. err := json.Unmarshal(body, &opts.NSQLookupdHTTPAddresses)
  673. if err != nil {
  674. return nil, http_api.Err{400, "INVALID_VALUE"}
  675. }
  676. case "log_level":
  677. logLevelStr := string(body)
  678. logLevel, err := lg.ParseLogLevel(logLevelStr)
  679. if err != nil {
  680. return nil, http_api.Err{400, "INVALID_VALUE"}
  681. }
  682. opts.LogLevel = logLevel
  683. default:
  684. return nil, http_api.Err{400, "INVALID_OPTION"}
  685. }
  686. s.nsqadmin.swapOpts(&opts)
  687. }
  688. v, ok := getOptByCfgName(s.nsqadmin.getOpts(), opt)
  689. if !ok {
  690. return nil, http_api.Err{400, "INVALID_OPTION"}
  691. }
  692. return v, nil
  693. }
  694. func (s *httpServer) isAuthorizedAdminRequest(req *http.Request) bool {
  695. adminUsers := s.nsqadmin.getOpts().AdminUsers
  696. if len(adminUsers) == 0 {
  697. return true
  698. }
  699. aclHttpHeader := s.nsqadmin.getOpts().AclHttpHeader
  700. user := req.Header.Get(aclHttpHeader)
  701. for _, v := range adminUsers {
  702. if v == user {
  703. return true
  704. }
  705. }
  706. return false
  707. }
  708. func getOptByCfgName(opts interface{}, name string) (interface{}, bool) {
  709. val := reflect.ValueOf(opts).Elem()
  710. typ := val.Type()
  711. for i := 0; i < typ.NumField(); i++ {
  712. field := typ.Field(i)
  713. flagName := field.Tag.Get("flag")
  714. cfgName := field.Tag.Get("cfg")
  715. if flagName == "" {
  716. continue
  717. }
  718. if cfgName == "" {
  719. cfgName = strings.Replace(flagName, "-", "_", -1)
  720. }
  721. if name != cfgName {
  722. continue
  723. }
  724. return val.FieldByName(field.Name).Interface(), true
  725. }
  726. return nil, false
  727. }