http.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680
  1. package nsqd
  2. import (
  3. "bufio"
  4. "bytes"
  5. "encoding/json"
  6. "fmt"
  7. "io"
  8. "io/ioutil"
  9. "net/http"
  10. "net/http/pprof"
  11. "net/url"
  12. "os"
  13. "reflect"
  14. "runtime"
  15. "runtime/debug"
  16. "strconv"
  17. "strings"
  18. "time"
  19. "github.com/julienschmidt/httprouter"
  20. "github.com/nsqio/nsq/internal/http_api"
  21. "github.com/nsqio/nsq/internal/lg"
  22. "github.com/nsqio/nsq/internal/protocol"
  23. "github.com/nsqio/nsq/internal/version"
  24. )
  25. var boolParams = map[string]bool{
  26. "true": true,
  27. "1": true,
  28. "false": false,
  29. "0": false,
  30. }
  31. type httpServer struct {
  32. nsqd *NSQD
  33. tlsEnabled bool
  34. tlsRequired bool
  35. router http.Handler
  36. }
  37. func newHTTPServer(nsqd *NSQD, tlsEnabled bool, tlsRequired bool) *httpServer {
  38. log := http_api.Log(nsqd.logf)
  39. router := httprouter.New()
  40. router.HandleMethodNotAllowed = true
  41. router.PanicHandler = http_api.LogPanicHandler(nsqd.logf)
  42. router.NotFound = http_api.LogNotFoundHandler(nsqd.logf)
  43. router.MethodNotAllowed = http_api.LogMethodNotAllowedHandler(nsqd.logf)
  44. s := &httpServer{
  45. nsqd: nsqd,
  46. tlsEnabled: tlsEnabled,
  47. tlsRequired: tlsRequired,
  48. router: router,
  49. }
  50. router.Handle("GET", "/ping", http_api.Decorate(s.pingHandler, log, http_api.PlainText))
  51. router.Handle("GET", "/info", http_api.Decorate(s.doInfo, log, http_api.V1))
  52. // v1 negotiate
  53. router.Handle("POST", "/pub", http_api.Decorate(s.doPUB, http_api.V1))
  54. router.Handle("POST", "/mpub", http_api.Decorate(s.doMPUB, http_api.V1))
  55. router.Handle("GET", "/stats", http_api.Decorate(s.doStats, log, http_api.V1))
  56. // only v1
  57. router.Handle("POST", "/topic/create", http_api.Decorate(s.doCreateTopic, log, http_api.V1))
  58. router.Handle("POST", "/topic/delete", http_api.Decorate(s.doDeleteTopic, log, http_api.V1))
  59. router.Handle("POST", "/topic/empty", http_api.Decorate(s.doEmptyTopic, log, http_api.V1))
  60. router.Handle("POST", "/topic/pause", http_api.Decorate(s.doPauseTopic, log, http_api.V1))
  61. router.Handle("POST", "/topic/unpause", http_api.Decorate(s.doPauseTopic, log, http_api.V1))
  62. router.Handle("POST", "/channel/create", http_api.Decorate(s.doCreateChannel, log, http_api.V1))
  63. router.Handle("POST", "/channel/delete", http_api.Decorate(s.doDeleteChannel, log, http_api.V1))
  64. router.Handle("POST", "/channel/empty", http_api.Decorate(s.doEmptyChannel, log, http_api.V1))
  65. router.Handle("POST", "/channel/pause", http_api.Decorate(s.doPauseChannel, log, http_api.V1))
  66. router.Handle("POST", "/channel/unpause", http_api.Decorate(s.doPauseChannel, log, http_api.V1))
  67. router.Handle("GET", "/config/:opt", http_api.Decorate(s.doConfig, log, http_api.V1))
  68. router.Handle("PUT", "/config/:opt", http_api.Decorate(s.doConfig, log, http_api.V1))
  69. // debug
  70. router.HandlerFunc("GET", "/debug/pprof/", pprof.Index)
  71. router.HandlerFunc("GET", "/debug/pprof/cmdline", pprof.Cmdline)
  72. router.HandlerFunc("GET", "/debug/pprof/symbol", pprof.Symbol)
  73. router.HandlerFunc("POST", "/debug/pprof/symbol", pprof.Symbol)
  74. router.HandlerFunc("GET", "/debug/pprof/profile", pprof.Profile)
  75. router.Handler("GET", "/debug/pprof/heap", pprof.Handler("heap"))
  76. router.Handler("GET", "/debug/pprof/goroutine", pprof.Handler("goroutine"))
  77. router.Handler("GET", "/debug/pprof/block", pprof.Handler("block"))
  78. router.Handle("PUT", "/debug/setblockrate", http_api.Decorate(setBlockRateHandler, log, http_api.PlainText))
  79. router.Handle("POST", "/debug/freememory", http_api.Decorate(freeMemory, log, http_api.PlainText))
  80. router.Handler("GET", "/debug/pprof/threadcreate", pprof.Handler("threadcreate"))
  81. return s
  82. }
  83. func setBlockRateHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
  84. rate, err := strconv.Atoi(req.FormValue("rate"))
  85. if err != nil {
  86. return nil, http_api.Err{http.StatusBadRequest, fmt.Sprintf("invalid block rate : %s", err.Error())}
  87. }
  88. runtime.SetBlockProfileRate(rate)
  89. return nil, nil
  90. }
  91. func freeMemory(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
  92. debug.FreeOSMemory()
  93. return nil, nil
  94. }
  95. func (s *httpServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
  96. if !s.tlsEnabled && s.tlsRequired {
  97. resp := fmt.Sprintf(`{"message": "TLS_REQUIRED", "https_port": %d}`,
  98. s.nsqd.RealHTTPSAddr().Port)
  99. w.Header().Set("X-SMQ-Content-Type", "smq; version=1.0")
  100. w.Header().Set("Content-Type", "application/json; charset=utf-8")
  101. w.WriteHeader(403)
  102. io.WriteString(w, resp)
  103. return
  104. }
  105. s.router.ServeHTTP(w, req)
  106. }
  107. func (s *httpServer) pingHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
  108. health := s.nsqd.GetHealth()
  109. if !s.nsqd.IsHealthy() {
  110. return nil, http_api.Err{500, health}
  111. }
  112. return health, nil
  113. }
  114. func (s *httpServer) doInfo(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
  115. hostname, err := os.Hostname()
  116. if err != nil {
  117. return nil, http_api.Err{500, err.Error()}
  118. }
  119. return struct {
  120. Version string `json:"version"`
  121. BroadcastAddress string `json:"broadcast_address"`
  122. Hostname string `json:"hostname"`
  123. HTTPPort int `json:"http_port"`
  124. TCPPort int `json:"tcp_port"`
  125. StartTime int64 `json:"start_time"`
  126. MaxHeartBeatInterval time.Duration `json:"max_heartbeat_interval"`
  127. MaxOutBufferSize int64 `json:"max_output_buffer_size"`
  128. MaxOutBufferTimeout time.Duration `json:"max_output_buffer_timeout"`
  129. MaxDeflateLevel int `json:"max_deflate_level"`
  130. }{
  131. Version: version.Binary,
  132. BroadcastAddress: s.nsqd.getOpts().BroadcastAddress,
  133. Hostname: hostname,
  134. TCPPort: s.nsqd.RealTCPAddr().Port,
  135. HTTPPort: s.nsqd.RealHTTPAddr().Port,
  136. StartTime: s.nsqd.GetStartTime().Unix(),
  137. MaxHeartBeatInterval: s.nsqd.getOpts().MaxHeartbeatInterval,
  138. MaxOutBufferSize: s.nsqd.getOpts().MaxOutputBufferSize,
  139. MaxOutBufferTimeout: s.nsqd.getOpts().MaxOutputBufferTimeout,
  140. MaxDeflateLevel: s.nsqd.getOpts().MaxDeflateLevel,
  141. }, nil
  142. }
  143. func (s *httpServer) getExistingTopicFromQuery(req *http.Request) (*http_api.ReqParams, *Topic, string, error) {
  144. reqParams, err := http_api.NewReqParams(req)
  145. if err != nil {
  146. s.nsqd.logf(LOG_ERROR, "failed to parse request params - %s", err)
  147. return nil, nil, "", http_api.Err{400, "INVALID_REQUEST"}
  148. }
  149. topicName, channelName, err := http_api.GetTopicChannelArgs(reqParams)
  150. if err != nil {
  151. return nil, nil, "", http_api.Err{400, err.Error()}
  152. }
  153. topic, err := s.nsqd.GetExistingTopic(topicName)
  154. if err != nil {
  155. return nil, nil, "", http_api.Err{404, "TOPIC_NOT_FOUND"}
  156. }
  157. return reqParams, topic, channelName, err
  158. }
  159. func (s *httpServer) getTopicFromQuery(req *http.Request) (url.Values, *Topic, error) {
  160. reqParams, err := url.ParseQuery(req.URL.RawQuery)
  161. if err != nil {
  162. s.nsqd.logf(LOG_ERROR, "failed to parse request params - %s", err)
  163. return nil, nil, http_api.Err{400, "INVALID_REQUEST"}
  164. }
  165. topicNames, ok := reqParams["topic"]
  166. if !ok {
  167. return nil, nil, http_api.Err{400, "MISSING_ARG_TOPIC"}
  168. }
  169. topicName := topicNames[0]
  170. if !protocol.IsValidTopicName(topicName) {
  171. return nil, nil, http_api.Err{400, "INVALID_TOPIC"}
  172. }
  173. return reqParams, s.nsqd.GetTopic(topicName), nil
  174. }
  175. func (s *httpServer) doPUB(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
  176. // TODO: one day I'd really like to just error on chunked requests
  177. // to be able to fail "too big" requests before we even read
  178. if req.ContentLength > s.nsqd.getOpts().MaxMsgSize {
  179. return nil, http_api.Err{413, "MSG_TOO_BIG"}
  180. }
  181. // add 1 so that it's greater than our max when we test for it
  182. // (LimitReader returns a "fake" EOF)
  183. readMax := s.nsqd.getOpts().MaxMsgSize + 1
  184. body, err := ioutil.ReadAll(io.LimitReader(req.Body, readMax))
  185. if err != nil {
  186. return nil, http_api.Err{500, "INTERNAL_ERROR"}
  187. }
  188. if int64(len(body)) == readMax {
  189. return nil, http_api.Err{413, "MSG_TOO_BIG"}
  190. }
  191. if len(body) == 0 {
  192. return nil, http_api.Err{400, "MSG_EMPTY"}
  193. }
  194. reqParams, topic, err := s.getTopicFromQuery(req)
  195. if err != nil {
  196. return nil, err
  197. }
  198. var deferred time.Duration
  199. if ds, ok := reqParams["defer"]; ok {
  200. var di int64
  201. di, err = strconv.ParseInt(ds[0], 10, 64)
  202. if err != nil {
  203. return nil, http_api.Err{400, "INVALID_DEFER"}
  204. }
  205. deferred = time.Duration(di) * time.Millisecond
  206. if deferred < 0 || deferred > s.nsqd.getOpts().MaxReqTimeout {
  207. return nil, http_api.Err{400, "INVALID_DEFER"}
  208. }
  209. }
  210. msg := NewMessage(topic.GenerateID(), body)
  211. s.nsqd.logf(LOG_INFO, "receive pub message - %s", msg)
  212. msg.deferred = deferred
  213. err = topic.PutMessage(msg)
  214. if err != nil {
  215. return nil, http_api.Err{503, "EXITING"}
  216. }
  217. return "OK", nil
  218. }
  219. func (s *httpServer) doMPUB(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
  220. var msgs []*Message
  221. var exit bool
  222. // TODO: one day I'd really like to just error on chunked requests
  223. // to be able to fail "too big" requests before we even read
  224. if req.ContentLength > s.nsqd.getOpts().MaxBodySize {
  225. return nil, http_api.Err{413, "BODY_TOO_BIG"}
  226. }
  227. reqParams, topic, err := s.getTopicFromQuery(req)
  228. if err != nil {
  229. return nil, err
  230. }
  231. // text mode is default, but unrecognized binary opt considered true
  232. binaryMode := false
  233. if vals, ok := reqParams["binary"]; ok {
  234. if binaryMode, ok = boolParams[vals[0]]; !ok {
  235. binaryMode = true
  236. s.nsqd.logf(LOG_WARN, "deprecated value '%s' used for /mpub binary param", vals[0])
  237. }
  238. }
  239. if binaryMode {
  240. tmp := make([]byte, 4)
  241. msgs, err = readMPUB(req.Body, tmp, topic,
  242. s.nsqd.getOpts().MaxMsgSize, s.nsqd.getOpts().MaxBodySize)
  243. if err != nil {
  244. return nil, http_api.Err{413, err.(*protocol.FatalClientErr).Code[2:]}
  245. }
  246. } else {
  247. // add 1 so that it's greater than our max when we test for it
  248. // (LimitReader returns a "fake" EOF)
  249. readMax := s.nsqd.getOpts().MaxBodySize + 1
  250. rdr := bufio.NewReader(io.LimitReader(req.Body, readMax))
  251. total := 0
  252. for !exit {
  253. var block []byte
  254. block, err = rdr.ReadBytes('|')
  255. s.nsqd.logf(LOG_INFO, "receive block - %s", string(block))
  256. if err != nil {
  257. if err != io.EOF {
  258. return nil, http_api.Err{500, "INTERNAL_ERROR"}
  259. }
  260. exit = true
  261. }
  262. total += len(block)
  263. if int64(total) == readMax {
  264. return nil, http_api.Err{413, "BODY_TOO_BIG"}
  265. }
  266. if len(block) > 0 && block[len(block)-1] == '|' {
  267. block = block[:len(block)-1]
  268. }
  269. // silently discard 0 length messages
  270. // this maintains the behavior pre 0.2.22
  271. if len(block) == 0 {
  272. continue
  273. }
  274. if int64(len(block)) > s.nsqd.getOpts().MaxMsgSize {
  275. return nil, http_api.Err{413, "MSG_TOO_BIG"}
  276. }
  277. msg := NewMessage(topic.GenerateID(), block)
  278. msgs = append(msgs, msg)
  279. }
  280. }
  281. err = topic.PutMessages(msgs)
  282. if err != nil {
  283. return nil, http_api.Err{503, "EXITING"}
  284. }
  285. return "OK", nil
  286. }
  287. func (s *httpServer) doCreateTopic(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
  288. _, _, err := s.getTopicFromQuery(req)
  289. return nil, err
  290. }
  291. func (s *httpServer) doEmptyTopic(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
  292. reqParams, err := http_api.NewReqParams(req)
  293. if err != nil {
  294. s.nsqd.logf(LOG_ERROR, "failed to parse request params - %s", err)
  295. return nil, http_api.Err{400, "INVALID_REQUEST"}
  296. }
  297. topicName, err := reqParams.Get("topic")
  298. if err != nil {
  299. return nil, http_api.Err{400, "MISSING_ARG_TOPIC"}
  300. }
  301. if !protocol.IsValidTopicName(topicName) {
  302. return nil, http_api.Err{400, "INVALID_TOPIC"}
  303. }
  304. topic, err := s.nsqd.GetExistingTopic(topicName)
  305. if err != nil {
  306. return nil, http_api.Err{404, "TOPIC_NOT_FOUND"}
  307. }
  308. err = topic.Empty()
  309. if err != nil {
  310. return nil, http_api.Err{500, "INTERNAL_ERROR"}
  311. }
  312. return nil, nil
  313. }
  314. func (s *httpServer) doDeleteTopic(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
  315. reqParams, err := http_api.NewReqParams(req)
  316. if err != nil {
  317. s.nsqd.logf(LOG_ERROR, "failed to parse request params - %s", err)
  318. return nil, http_api.Err{400, "INVALID_REQUEST"}
  319. }
  320. topicName, err := reqParams.Get("topic")
  321. if err != nil {
  322. return nil, http_api.Err{400, "MISSING_ARG_TOPIC"}
  323. }
  324. err = s.nsqd.DeleteExistingTopic(topicName)
  325. if err != nil {
  326. return nil, http_api.Err{404, "TOPIC_NOT_FOUND"}
  327. }
  328. return nil, nil
  329. }
  330. func (s *httpServer) doPauseTopic(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
  331. reqParams, err := http_api.NewReqParams(req)
  332. if err != nil {
  333. s.nsqd.logf(LOG_ERROR, "failed to parse request params - %s", err)
  334. return nil, http_api.Err{400, "INVALID_REQUEST"}
  335. }
  336. topicName, err := reqParams.Get("topic")
  337. if err != nil {
  338. return nil, http_api.Err{400, "MISSING_ARG_TOPIC"}
  339. }
  340. topic, err := s.nsqd.GetExistingTopic(topicName)
  341. if err != nil {
  342. return nil, http_api.Err{404, "TOPIC_NOT_FOUND"}
  343. }
  344. if strings.Contains(req.URL.Path, "unpause") {
  345. err = topic.UnPause()
  346. } else {
  347. err = topic.Pause()
  348. }
  349. if err != nil {
  350. s.nsqd.logf(LOG_ERROR, "failure in %s - %s", req.URL.Path, err)
  351. return nil, http_api.Err{500, "INTERNAL_ERROR"}
  352. }
  353. // pro-actively persist metadata so in case of process failure
  354. // nsqd won't suddenly (un)pause a topic
  355. s.nsqd.Lock()
  356. s.nsqd.PersistMetadata()
  357. s.nsqd.Unlock()
  358. return nil, nil
  359. }
  360. func (s *httpServer) doCreateChannel(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
  361. _, topic, channelName, err := s.getExistingTopicFromQuery(req)
  362. if err != nil {
  363. return nil, err
  364. }
  365. topic.GetChannel(channelName)
  366. return nil, nil
  367. }
  368. func (s *httpServer) doEmptyChannel(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
  369. _, topic, channelName, err := s.getExistingTopicFromQuery(req)
  370. if err != nil {
  371. return nil, err
  372. }
  373. channel, err := topic.GetExistingChannel(channelName)
  374. if err != nil {
  375. return nil, http_api.Err{404, "CHANNEL_NOT_FOUND"}
  376. }
  377. err = channel.Empty()
  378. if err != nil {
  379. return nil, http_api.Err{500, "INTERNAL_ERROR"}
  380. }
  381. return nil, nil
  382. }
  383. func (s *httpServer) doDeleteChannel(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
  384. _, topic, channelName, err := s.getExistingTopicFromQuery(req)
  385. if err != nil {
  386. return nil, err
  387. }
  388. err = topic.DeleteExistingChannel(channelName)
  389. if err != nil {
  390. return nil, http_api.Err{404, "CHANNEL_NOT_FOUND"}
  391. }
  392. return nil, nil
  393. }
  394. func (s *httpServer) doPauseChannel(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
  395. _, topic, channelName, err := s.getExistingTopicFromQuery(req)
  396. if err != nil {
  397. return nil, err
  398. }
  399. channel, err := topic.GetExistingChannel(channelName)
  400. if err != nil {
  401. return nil, http_api.Err{404, "CHANNEL_NOT_FOUND"}
  402. }
  403. if strings.Contains(req.URL.Path, "unpause") {
  404. err = channel.UnPause()
  405. } else {
  406. err = channel.Pause()
  407. }
  408. if err != nil {
  409. s.nsqd.logf(LOG_ERROR, "failure in %s - %s", req.URL.Path, err)
  410. return nil, http_api.Err{500, "INTERNAL_ERROR"}
  411. }
  412. // pro-actively persist metadata so in case of process failure
  413. // nsqd won't suddenly (un)pause a channel
  414. s.nsqd.Lock()
  415. s.nsqd.PersistMetadata()
  416. s.nsqd.Unlock()
  417. return nil, nil
  418. }
  419. func (s *httpServer) doStats(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
  420. reqParams, err := http_api.NewReqParams(req)
  421. if err != nil {
  422. s.nsqd.logf(LOG_ERROR, "failed to parse request params - %s", err)
  423. return nil, http_api.Err{400, "INVALID_REQUEST"}
  424. }
  425. formatString, _ := reqParams.Get("format")
  426. topicName, _ := reqParams.Get("topic")
  427. channelName, _ := reqParams.Get("channel")
  428. includeClientsParam, _ := reqParams.Get("include_clients")
  429. includeMemParam, _ := reqParams.Get("include_mem")
  430. jsonFormat := formatString == "json"
  431. includeClients, ok := boolParams[includeClientsParam]
  432. if !ok {
  433. includeClients = true
  434. }
  435. includeMem, ok := boolParams[includeMemParam]
  436. if !ok {
  437. includeMem = true
  438. }
  439. stats := s.nsqd.GetStats(topicName, channelName, includeClients)
  440. health := s.nsqd.GetHealth()
  441. startTime := s.nsqd.GetStartTime()
  442. uptime := time.Since(startTime)
  443. var ms *memStats
  444. if includeMem {
  445. m := getMemStats()
  446. ms = &m
  447. }
  448. if !jsonFormat {
  449. return s.printStats(stats, ms, health, startTime, uptime), nil
  450. }
  451. // TODO: should producer stats be hung off topics?
  452. return struct {
  453. Version string `json:"version"`
  454. Health string `json:"health"`
  455. StartTime int64 `json:"start_time"`
  456. Topics []TopicStats `json:"topics"`
  457. Memory *memStats `json:"memory,omitempty"`
  458. Producers []ClientStats `json:"producers"`
  459. }{version.Binary, health, startTime.Unix(), stats.Topics, ms, stats.Producers}, nil
  460. }
  461. func (s *httpServer) printStats(stats Stats, ms *memStats, health string, startTime time.Time, uptime time.Duration) []byte {
  462. var buf bytes.Buffer
  463. w := &buf
  464. fmt.Fprintf(w, "%s\n", version.String("smqd"))
  465. fmt.Fprintf(w, "start_time %v\n", startTime.Format(time.RFC3339))
  466. fmt.Fprintf(w, "uptime %s\n", uptime)
  467. fmt.Fprintf(w, "\nHealth: %s\n", health)
  468. if ms != nil {
  469. fmt.Fprintf(w, "\nMemory:\n")
  470. fmt.Fprintf(w, " %-25s\t%d\n", "heap_objects", ms.HeapObjects)
  471. fmt.Fprintf(w, " %-25s\t%d\n", "heap_idle_bytes", ms.HeapIdleBytes)
  472. fmt.Fprintf(w, " %-25s\t%d\n", "heap_in_use_bytes", ms.HeapInUseBytes)
  473. fmt.Fprintf(w, " %-25s\t%d\n", "heap_released_bytes", ms.HeapReleasedBytes)
  474. fmt.Fprintf(w, " %-25s\t%d\n", "gc_pause_usec_100", ms.GCPauseUsec100)
  475. fmt.Fprintf(w, " %-25s\t%d\n", "gc_pause_usec_99", ms.GCPauseUsec99)
  476. fmt.Fprintf(w, " %-25s\t%d\n", "gc_pause_usec_95", ms.GCPauseUsec95)
  477. fmt.Fprintf(w, " %-25s\t%d\n", "next_gc_bytes", ms.NextGCBytes)
  478. fmt.Fprintf(w, " %-25s\t%d\n", "gc_total_runs", ms.GCTotalRuns)
  479. }
  480. if len(stats.Topics) == 0 {
  481. fmt.Fprintf(w, "\nTopics: None\n")
  482. } else {
  483. fmt.Fprintf(w, "\nTopics:")
  484. }
  485. for _, t := range stats.Topics {
  486. var pausedPrefix string
  487. if t.Paused {
  488. pausedPrefix = "*P "
  489. } else {
  490. pausedPrefix = " "
  491. }
  492. fmt.Fprintf(w, "\n%s[%-15s] depth: %-5d be-depth: %-5d msgs: %-8d e2e%%: %s\n",
  493. pausedPrefix,
  494. t.TopicName,
  495. t.Depth,
  496. t.BackendDepth,
  497. t.MessageCount,
  498. t.E2eProcessingLatency,
  499. )
  500. for _, c := range t.Channels {
  501. if c.Paused {
  502. pausedPrefix = " *P "
  503. } else {
  504. pausedPrefix = " "
  505. }
  506. fmt.Fprintf(w, "%s[%-25s] depth: %-5d be-depth: %-5d inflt: %-4d def: %-4d re-q: %-5d timeout: %-5d msgs: %-8d e2e%%: %s\n",
  507. pausedPrefix,
  508. c.ChannelName,
  509. c.Depth,
  510. c.BackendDepth,
  511. c.InFlightCount,
  512. c.DeferredCount,
  513. c.RequeueCount,
  514. c.TimeoutCount,
  515. c.MessageCount,
  516. c.E2eProcessingLatency,
  517. )
  518. for _, client := range c.Clients {
  519. fmt.Fprintf(w, " %s\n", client)
  520. }
  521. }
  522. }
  523. if len(stats.Producers) == 0 {
  524. fmt.Fprintf(w, "\nProducers: None\n")
  525. } else {
  526. fmt.Fprintf(w, "\nProducers:\n")
  527. for _, client := range stats.Producers {
  528. fmt.Fprintf(w, " %s\n", client)
  529. }
  530. }
  531. return buf.Bytes()
  532. }
  533. func (s *httpServer) doConfig(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
  534. opt := ps.ByName("opt")
  535. if req.Method == "PUT" {
  536. // add 1 so that it's greater than our max when we test for it
  537. // (LimitReader returns a "fake" EOF)
  538. readMax := s.nsqd.getOpts().MaxMsgSize + 1
  539. body, err := ioutil.ReadAll(io.LimitReader(req.Body, readMax))
  540. if err != nil {
  541. return nil, http_api.Err{500, "INTERNAL_ERROR"}
  542. }
  543. if int64(len(body)) == readMax || len(body) == 0 {
  544. return nil, http_api.Err{413, "INVALID_VALUE"}
  545. }
  546. opts := *s.nsqd.getOpts()
  547. switch opt {
  548. case "nsqlookupd_tcp_addresses":
  549. err := json.Unmarshal(body, &opts.NSQLookupdTCPAddresses)
  550. if err != nil {
  551. return nil, http_api.Err{400, "INVALID_VALUE"}
  552. }
  553. case "log_level":
  554. logLevelStr := string(body)
  555. logLevel, err := lg.ParseLogLevel(logLevelStr)
  556. if err != nil {
  557. return nil, http_api.Err{400, "INVALID_VALUE"}
  558. }
  559. opts.LogLevel = logLevel
  560. default:
  561. return nil, http_api.Err{400, "INVALID_OPTION"}
  562. }
  563. s.nsqd.swapOpts(&opts)
  564. s.nsqd.triggerOptsNotification()
  565. }
  566. v, ok := getOptByCfgName(s.nsqd.getOpts(), opt)
  567. if !ok {
  568. return nil, http_api.Err{400, "INVALID_OPTION"}
  569. }
  570. return v, nil
  571. }
  572. func getOptByCfgName(opts interface{}, name string) (interface{}, bool) {
  573. val := reflect.ValueOf(opts).Elem()
  574. typ := val.Type()
  575. for i := 0; i < typ.NumField(); i++ {
  576. field := typ.Field(i)
  577. flagName := field.Tag.Get("flag")
  578. cfgName := field.Tag.Get("cfg")
  579. if flagName == "" {
  580. continue
  581. }
  582. if cfgName == "" {
  583. cfgName = strings.Replace(flagName, "-", "_", -1)
  584. }
  585. if name != cfgName {
  586. continue
  587. }
  588. return val.FieldByName(field.Name).Interface(), true
  589. }
  590. return nil, false
  591. }