123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680 |
- package nsqd
- import (
- "bufio"
- "bytes"
- "encoding/json"
- "fmt"
- "io"
- "io/ioutil"
- "net/http"
- "net/http/pprof"
- "net/url"
- "os"
- "reflect"
- "runtime"
- "runtime/debug"
- "strconv"
- "strings"
- "time"
- "github.com/julienschmidt/httprouter"
- "github.com/nsqio/nsq/internal/http_api"
- "github.com/nsqio/nsq/internal/lg"
- "github.com/nsqio/nsq/internal/protocol"
- "github.com/nsqio/nsq/internal/version"
- )
- var boolParams = map[string]bool{
- "true": true,
- "1": true,
- "false": false,
- "0": false,
- }
- type httpServer struct {
- nsqd *NSQD
- tlsEnabled bool
- tlsRequired bool
- router http.Handler
- }
- func newHTTPServer(nsqd *NSQD, tlsEnabled bool, tlsRequired bool) *httpServer {
- log := http_api.Log(nsqd.logf)
- router := httprouter.New()
- router.HandleMethodNotAllowed = true
- router.PanicHandler = http_api.LogPanicHandler(nsqd.logf)
- router.NotFound = http_api.LogNotFoundHandler(nsqd.logf)
- router.MethodNotAllowed = http_api.LogMethodNotAllowedHandler(nsqd.logf)
- s := &httpServer{
- nsqd: nsqd,
- tlsEnabled: tlsEnabled,
- tlsRequired: tlsRequired,
- router: router,
- }
- router.Handle("GET", "/ping", http_api.Decorate(s.pingHandler, log, http_api.PlainText))
- router.Handle("GET", "/info", http_api.Decorate(s.doInfo, log, http_api.V1))
- // v1 negotiate
- router.Handle("POST", "/pub", http_api.Decorate(s.doPUB, http_api.V1))
- router.Handle("POST", "/mpub", http_api.Decorate(s.doMPUB, http_api.V1))
- router.Handle("GET", "/stats", http_api.Decorate(s.doStats, log, http_api.V1))
- // only v1
- router.Handle("POST", "/topic/create", http_api.Decorate(s.doCreateTopic, log, http_api.V1))
- router.Handle("POST", "/topic/delete", http_api.Decorate(s.doDeleteTopic, log, http_api.V1))
- router.Handle("POST", "/topic/empty", http_api.Decorate(s.doEmptyTopic, log, http_api.V1))
- router.Handle("POST", "/topic/pause", http_api.Decorate(s.doPauseTopic, log, http_api.V1))
- router.Handle("POST", "/topic/unpause", http_api.Decorate(s.doPauseTopic, log, http_api.V1))
- router.Handle("POST", "/channel/create", http_api.Decorate(s.doCreateChannel, log, http_api.V1))
- router.Handle("POST", "/channel/delete", http_api.Decorate(s.doDeleteChannel, log, http_api.V1))
- router.Handle("POST", "/channel/empty", http_api.Decorate(s.doEmptyChannel, log, http_api.V1))
- router.Handle("POST", "/channel/pause", http_api.Decorate(s.doPauseChannel, log, http_api.V1))
- router.Handle("POST", "/channel/unpause", http_api.Decorate(s.doPauseChannel, log, http_api.V1))
- router.Handle("GET", "/config/:opt", http_api.Decorate(s.doConfig, log, http_api.V1))
- router.Handle("PUT", "/config/:opt", http_api.Decorate(s.doConfig, log, http_api.V1))
- // debug
- router.HandlerFunc("GET", "/debug/pprof/", pprof.Index)
- router.HandlerFunc("GET", "/debug/pprof/cmdline", pprof.Cmdline)
- router.HandlerFunc("GET", "/debug/pprof/symbol", pprof.Symbol)
- router.HandlerFunc("POST", "/debug/pprof/symbol", pprof.Symbol)
- router.HandlerFunc("GET", "/debug/pprof/profile", pprof.Profile)
- router.Handler("GET", "/debug/pprof/heap", pprof.Handler("heap"))
- router.Handler("GET", "/debug/pprof/goroutine", pprof.Handler("goroutine"))
- router.Handler("GET", "/debug/pprof/block", pprof.Handler("block"))
- router.Handle("PUT", "/debug/setblockrate", http_api.Decorate(setBlockRateHandler, log, http_api.PlainText))
- router.Handle("POST", "/debug/freememory", http_api.Decorate(freeMemory, log, http_api.PlainText))
- router.Handler("GET", "/debug/pprof/threadcreate", pprof.Handler("threadcreate"))
- return s
- }
- func setBlockRateHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
- rate, err := strconv.Atoi(req.FormValue("rate"))
- if err != nil {
- return nil, http_api.Err{http.StatusBadRequest, fmt.Sprintf("invalid block rate : %s", err.Error())}
- }
- runtime.SetBlockProfileRate(rate)
- return nil, nil
- }
- func freeMemory(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
- debug.FreeOSMemory()
- return nil, nil
- }
- func (s *httpServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
- if !s.tlsEnabled && s.tlsRequired {
- resp := fmt.Sprintf(`{"message": "TLS_REQUIRED", "https_port": %d}`,
- s.nsqd.RealHTTPSAddr().Port)
- w.Header().Set("X-SMQ-Content-Type", "smq; version=1.0")
- w.Header().Set("Content-Type", "application/json; charset=utf-8")
- w.WriteHeader(403)
- io.WriteString(w, resp)
- return
- }
- s.router.ServeHTTP(w, req)
- }
- func (s *httpServer) pingHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
- health := s.nsqd.GetHealth()
- if !s.nsqd.IsHealthy() {
- return nil, http_api.Err{500, health}
- }
- return health, nil
- }
- func (s *httpServer) doInfo(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
- hostname, err := os.Hostname()
- if err != nil {
- return nil, http_api.Err{500, err.Error()}
- }
- return struct {
- Version string `json:"version"`
- BroadcastAddress string `json:"broadcast_address"`
- Hostname string `json:"hostname"`
- HTTPPort int `json:"http_port"`
- TCPPort int `json:"tcp_port"`
- StartTime int64 `json:"start_time"`
- MaxHeartBeatInterval time.Duration `json:"max_heartbeat_interval"`
- MaxOutBufferSize int64 `json:"max_output_buffer_size"`
- MaxOutBufferTimeout time.Duration `json:"max_output_buffer_timeout"`
- MaxDeflateLevel int `json:"max_deflate_level"`
- }{
- Version: version.Binary,
- BroadcastAddress: s.nsqd.getOpts().BroadcastAddress,
- Hostname: hostname,
- TCPPort: s.nsqd.RealTCPAddr().Port,
- HTTPPort: s.nsqd.RealHTTPAddr().Port,
- StartTime: s.nsqd.GetStartTime().Unix(),
- MaxHeartBeatInterval: s.nsqd.getOpts().MaxHeartbeatInterval,
- MaxOutBufferSize: s.nsqd.getOpts().MaxOutputBufferSize,
- MaxOutBufferTimeout: s.nsqd.getOpts().MaxOutputBufferTimeout,
- MaxDeflateLevel: s.nsqd.getOpts().MaxDeflateLevel,
- }, nil
- }
- func (s *httpServer) getExistingTopicFromQuery(req *http.Request) (*http_api.ReqParams, *Topic, string, error) {
- reqParams, err := http_api.NewReqParams(req)
- if err != nil {
- s.nsqd.logf(LOG_ERROR, "failed to parse request params - %s", err)
- return nil, nil, "", http_api.Err{400, "INVALID_REQUEST"}
- }
- topicName, channelName, err := http_api.GetTopicChannelArgs(reqParams)
- if err != nil {
- return nil, nil, "", http_api.Err{400, err.Error()}
- }
- topic, err := s.nsqd.GetExistingTopic(topicName)
- if err != nil {
- return nil, nil, "", http_api.Err{404, "TOPIC_NOT_FOUND"}
- }
- return reqParams, topic, channelName, err
- }
- func (s *httpServer) getTopicFromQuery(req *http.Request) (url.Values, *Topic, error) {
- reqParams, err := url.ParseQuery(req.URL.RawQuery)
- if err != nil {
- s.nsqd.logf(LOG_ERROR, "failed to parse request params - %s", err)
- return nil, nil, http_api.Err{400, "INVALID_REQUEST"}
- }
- topicNames, ok := reqParams["topic"]
- if !ok {
- return nil, nil, http_api.Err{400, "MISSING_ARG_TOPIC"}
- }
- topicName := topicNames[0]
- if !protocol.IsValidTopicName(topicName) {
- return nil, nil, http_api.Err{400, "INVALID_TOPIC"}
- }
- return reqParams, s.nsqd.GetTopic(topicName), nil
- }
- func (s *httpServer) doPUB(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
- // TODO: one day I'd really like to just error on chunked requests
- // to be able to fail "too big" requests before we even read
- if req.ContentLength > s.nsqd.getOpts().MaxMsgSize {
- return nil, http_api.Err{413, "MSG_TOO_BIG"}
- }
- // add 1 so that it's greater than our max when we test for it
- // (LimitReader returns a "fake" EOF)
- readMax := s.nsqd.getOpts().MaxMsgSize + 1
- body, err := ioutil.ReadAll(io.LimitReader(req.Body, readMax))
- if err != nil {
- return nil, http_api.Err{500, "INTERNAL_ERROR"}
- }
- if int64(len(body)) == readMax {
- return nil, http_api.Err{413, "MSG_TOO_BIG"}
- }
- if len(body) == 0 {
- return nil, http_api.Err{400, "MSG_EMPTY"}
- }
- reqParams, topic, err := s.getTopicFromQuery(req)
- if err != nil {
- return nil, err
- }
- var deferred time.Duration
- if ds, ok := reqParams["defer"]; ok {
- var di int64
- di, err = strconv.ParseInt(ds[0], 10, 64)
- if err != nil {
- return nil, http_api.Err{400, "INVALID_DEFER"}
- }
- deferred = time.Duration(di) * time.Millisecond
- if deferred < 0 || deferred > s.nsqd.getOpts().MaxReqTimeout {
- return nil, http_api.Err{400, "INVALID_DEFER"}
- }
- }
- msg := NewMessage(topic.GenerateID(), body)
- s.nsqd.logf(LOG_INFO, "receive pub message - %s", msg)
- msg.deferred = deferred
- err = topic.PutMessage(msg)
- if err != nil {
- return nil, http_api.Err{503, "EXITING"}
- }
- return "OK", nil
- }
- func (s *httpServer) doMPUB(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
- var msgs []*Message
- var exit bool
- // TODO: one day I'd really like to just error on chunked requests
- // to be able to fail "too big" requests before we even read
- if req.ContentLength > s.nsqd.getOpts().MaxBodySize {
- return nil, http_api.Err{413, "BODY_TOO_BIG"}
- }
- reqParams, topic, err := s.getTopicFromQuery(req)
- if err != nil {
- return nil, err
- }
- // text mode is default, but unrecognized binary opt considered true
- binaryMode := false
- if vals, ok := reqParams["binary"]; ok {
- if binaryMode, ok = boolParams[vals[0]]; !ok {
- binaryMode = true
- s.nsqd.logf(LOG_WARN, "deprecated value '%s' used for /mpub binary param", vals[0])
- }
- }
- if binaryMode {
- tmp := make([]byte, 4)
- msgs, err = readMPUB(req.Body, tmp, topic,
- s.nsqd.getOpts().MaxMsgSize, s.nsqd.getOpts().MaxBodySize)
- if err != nil {
- return nil, http_api.Err{413, err.(*protocol.FatalClientErr).Code[2:]}
- }
- } else {
- // add 1 so that it's greater than our max when we test for it
- // (LimitReader returns a "fake" EOF)
- readMax := s.nsqd.getOpts().MaxBodySize + 1
- rdr := bufio.NewReader(io.LimitReader(req.Body, readMax))
- total := 0
- for !exit {
- var block []byte
- block, err = rdr.ReadBytes('|')
- s.nsqd.logf(LOG_INFO, "receive block - %s", string(block))
- if err != nil {
- if err != io.EOF {
- return nil, http_api.Err{500, "INTERNAL_ERROR"}
- }
- exit = true
- }
- total += len(block)
- if int64(total) == readMax {
- return nil, http_api.Err{413, "BODY_TOO_BIG"}
- }
- if len(block) > 0 && block[len(block)-1] == '|' {
- block = block[:len(block)-1]
- }
- // silently discard 0 length messages
- // this maintains the behavior pre 0.2.22
- if len(block) == 0 {
- continue
- }
- if int64(len(block)) > s.nsqd.getOpts().MaxMsgSize {
- return nil, http_api.Err{413, "MSG_TOO_BIG"}
- }
- msg := NewMessage(topic.GenerateID(), block)
- msgs = append(msgs, msg)
- }
- }
- err = topic.PutMessages(msgs)
- if err != nil {
- return nil, http_api.Err{503, "EXITING"}
- }
- return "OK", nil
- }
- func (s *httpServer) doCreateTopic(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
- _, _, err := s.getTopicFromQuery(req)
- return nil, err
- }
- func (s *httpServer) doEmptyTopic(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
- reqParams, err := http_api.NewReqParams(req)
- if err != nil {
- s.nsqd.logf(LOG_ERROR, "failed to parse request params - %s", err)
- return nil, http_api.Err{400, "INVALID_REQUEST"}
- }
- topicName, err := reqParams.Get("topic")
- if err != nil {
- return nil, http_api.Err{400, "MISSING_ARG_TOPIC"}
- }
- if !protocol.IsValidTopicName(topicName) {
- return nil, http_api.Err{400, "INVALID_TOPIC"}
- }
- topic, err := s.nsqd.GetExistingTopic(topicName)
- if err != nil {
- return nil, http_api.Err{404, "TOPIC_NOT_FOUND"}
- }
- err = topic.Empty()
- if err != nil {
- return nil, http_api.Err{500, "INTERNAL_ERROR"}
- }
- return nil, nil
- }
- func (s *httpServer) doDeleteTopic(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
- reqParams, err := http_api.NewReqParams(req)
- if err != nil {
- s.nsqd.logf(LOG_ERROR, "failed to parse request params - %s", err)
- return nil, http_api.Err{400, "INVALID_REQUEST"}
- }
- topicName, err := reqParams.Get("topic")
- if err != nil {
- return nil, http_api.Err{400, "MISSING_ARG_TOPIC"}
- }
- err = s.nsqd.DeleteExistingTopic(topicName)
- if err != nil {
- return nil, http_api.Err{404, "TOPIC_NOT_FOUND"}
- }
- return nil, nil
- }
- func (s *httpServer) doPauseTopic(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
- reqParams, err := http_api.NewReqParams(req)
- if err != nil {
- s.nsqd.logf(LOG_ERROR, "failed to parse request params - %s", err)
- return nil, http_api.Err{400, "INVALID_REQUEST"}
- }
- topicName, err := reqParams.Get("topic")
- if err != nil {
- return nil, http_api.Err{400, "MISSING_ARG_TOPIC"}
- }
- topic, err := s.nsqd.GetExistingTopic(topicName)
- if err != nil {
- return nil, http_api.Err{404, "TOPIC_NOT_FOUND"}
- }
- if strings.Contains(req.URL.Path, "unpause") {
- err = topic.UnPause()
- } else {
- err = topic.Pause()
- }
- if err != nil {
- s.nsqd.logf(LOG_ERROR, "failure in %s - %s", req.URL.Path, err)
- return nil, http_api.Err{500, "INTERNAL_ERROR"}
- }
- // pro-actively persist metadata so in case of process failure
- // nsqd won't suddenly (un)pause a topic
- s.nsqd.Lock()
- s.nsqd.PersistMetadata()
- s.nsqd.Unlock()
- return nil, nil
- }
- func (s *httpServer) doCreateChannel(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
- _, topic, channelName, err := s.getExistingTopicFromQuery(req)
- if err != nil {
- return nil, err
- }
- topic.GetChannel(channelName)
- return nil, nil
- }
- func (s *httpServer) doEmptyChannel(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
- _, topic, channelName, err := s.getExistingTopicFromQuery(req)
- if err != nil {
- return nil, err
- }
- channel, err := topic.GetExistingChannel(channelName)
- if err != nil {
- return nil, http_api.Err{404, "CHANNEL_NOT_FOUND"}
- }
- err = channel.Empty()
- if err != nil {
- return nil, http_api.Err{500, "INTERNAL_ERROR"}
- }
- return nil, nil
- }
- func (s *httpServer) doDeleteChannel(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
- _, topic, channelName, err := s.getExistingTopicFromQuery(req)
- if err != nil {
- return nil, err
- }
- err = topic.DeleteExistingChannel(channelName)
- if err != nil {
- return nil, http_api.Err{404, "CHANNEL_NOT_FOUND"}
- }
- return nil, nil
- }
- func (s *httpServer) doPauseChannel(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
- _, topic, channelName, err := s.getExistingTopicFromQuery(req)
- if err != nil {
- return nil, err
- }
- channel, err := topic.GetExistingChannel(channelName)
- if err != nil {
- return nil, http_api.Err{404, "CHANNEL_NOT_FOUND"}
- }
- if strings.Contains(req.URL.Path, "unpause") {
- err = channel.UnPause()
- } else {
- err = channel.Pause()
- }
- if err != nil {
- s.nsqd.logf(LOG_ERROR, "failure in %s - %s", req.URL.Path, err)
- return nil, http_api.Err{500, "INTERNAL_ERROR"}
- }
- // pro-actively persist metadata so in case of process failure
- // nsqd won't suddenly (un)pause a channel
- s.nsqd.Lock()
- s.nsqd.PersistMetadata()
- s.nsqd.Unlock()
- return nil, nil
- }
- func (s *httpServer) doStats(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
- reqParams, err := http_api.NewReqParams(req)
- if err != nil {
- s.nsqd.logf(LOG_ERROR, "failed to parse request params - %s", err)
- return nil, http_api.Err{400, "INVALID_REQUEST"}
- }
- formatString, _ := reqParams.Get("format")
- topicName, _ := reqParams.Get("topic")
- channelName, _ := reqParams.Get("channel")
- includeClientsParam, _ := reqParams.Get("include_clients")
- includeMemParam, _ := reqParams.Get("include_mem")
- jsonFormat := formatString == "json"
- includeClients, ok := boolParams[includeClientsParam]
- if !ok {
- includeClients = true
- }
- includeMem, ok := boolParams[includeMemParam]
- if !ok {
- includeMem = true
- }
- stats := s.nsqd.GetStats(topicName, channelName, includeClients)
- health := s.nsqd.GetHealth()
- startTime := s.nsqd.GetStartTime()
- uptime := time.Since(startTime)
- var ms *memStats
- if includeMem {
- m := getMemStats()
- ms = &m
- }
- if !jsonFormat {
- return s.printStats(stats, ms, health, startTime, uptime), nil
- }
- // TODO: should producer stats be hung off topics?
- return struct {
- Version string `json:"version"`
- Health string `json:"health"`
- StartTime int64 `json:"start_time"`
- Topics []TopicStats `json:"topics"`
- Memory *memStats `json:"memory,omitempty"`
- Producers []ClientStats `json:"producers"`
- }{version.Binary, health, startTime.Unix(), stats.Topics, ms, stats.Producers}, nil
- }
- func (s *httpServer) printStats(stats Stats, ms *memStats, health string, startTime time.Time, uptime time.Duration) []byte {
- var buf bytes.Buffer
- w := &buf
- fmt.Fprintf(w, "%s\n", version.String("smqd"))
- fmt.Fprintf(w, "start_time %v\n", startTime.Format(time.RFC3339))
- fmt.Fprintf(w, "uptime %s\n", uptime)
- fmt.Fprintf(w, "\nHealth: %s\n", health)
- if ms != nil {
- fmt.Fprintf(w, "\nMemory:\n")
- fmt.Fprintf(w, " %-25s\t%d\n", "heap_objects", ms.HeapObjects)
- fmt.Fprintf(w, " %-25s\t%d\n", "heap_idle_bytes", ms.HeapIdleBytes)
- fmt.Fprintf(w, " %-25s\t%d\n", "heap_in_use_bytes", ms.HeapInUseBytes)
- fmt.Fprintf(w, " %-25s\t%d\n", "heap_released_bytes", ms.HeapReleasedBytes)
- fmt.Fprintf(w, " %-25s\t%d\n", "gc_pause_usec_100", ms.GCPauseUsec100)
- fmt.Fprintf(w, " %-25s\t%d\n", "gc_pause_usec_99", ms.GCPauseUsec99)
- fmt.Fprintf(w, " %-25s\t%d\n", "gc_pause_usec_95", ms.GCPauseUsec95)
- fmt.Fprintf(w, " %-25s\t%d\n", "next_gc_bytes", ms.NextGCBytes)
- fmt.Fprintf(w, " %-25s\t%d\n", "gc_total_runs", ms.GCTotalRuns)
- }
- if len(stats.Topics) == 0 {
- fmt.Fprintf(w, "\nTopics: None\n")
- } else {
- fmt.Fprintf(w, "\nTopics:")
- }
- for _, t := range stats.Topics {
- var pausedPrefix string
- if t.Paused {
- pausedPrefix = "*P "
- } else {
- pausedPrefix = " "
- }
- fmt.Fprintf(w, "\n%s[%-15s] depth: %-5d be-depth: %-5d msgs: %-8d e2e%%: %s\n",
- pausedPrefix,
- t.TopicName,
- t.Depth,
- t.BackendDepth,
- t.MessageCount,
- t.E2eProcessingLatency,
- )
- for _, c := range t.Channels {
- if c.Paused {
- pausedPrefix = " *P "
- } else {
- pausedPrefix = " "
- }
- fmt.Fprintf(w, "%s[%-25s] depth: %-5d be-depth: %-5d inflt: %-4d def: %-4d re-q: %-5d timeout: %-5d msgs: %-8d e2e%%: %s\n",
- pausedPrefix,
- c.ChannelName,
- c.Depth,
- c.BackendDepth,
- c.InFlightCount,
- c.DeferredCount,
- c.RequeueCount,
- c.TimeoutCount,
- c.MessageCount,
- c.E2eProcessingLatency,
- )
- for _, client := range c.Clients {
- fmt.Fprintf(w, " %s\n", client)
- }
- }
- }
- if len(stats.Producers) == 0 {
- fmt.Fprintf(w, "\nProducers: None\n")
- } else {
- fmt.Fprintf(w, "\nProducers:\n")
- for _, client := range stats.Producers {
- fmt.Fprintf(w, " %s\n", client)
- }
- }
- return buf.Bytes()
- }
- func (s *httpServer) doConfig(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
- opt := ps.ByName("opt")
- if req.Method == "PUT" {
- // add 1 so that it's greater than our max when we test for it
- // (LimitReader returns a "fake" EOF)
- readMax := s.nsqd.getOpts().MaxMsgSize + 1
- body, err := ioutil.ReadAll(io.LimitReader(req.Body, readMax))
- if err != nil {
- return nil, http_api.Err{500, "INTERNAL_ERROR"}
- }
- if int64(len(body)) == readMax || len(body) == 0 {
- return nil, http_api.Err{413, "INVALID_VALUE"}
- }
- opts := *s.nsqd.getOpts()
- switch opt {
- case "nsqlookupd_tcp_addresses":
- err := json.Unmarshal(body, &opts.NSQLookupdTCPAddresses)
- if err != nil {
- return nil, http_api.Err{400, "INVALID_VALUE"}
- }
- case "log_level":
- logLevelStr := string(body)
- logLevel, err := lg.ParseLogLevel(logLevelStr)
- if err != nil {
- return nil, http_api.Err{400, "INVALID_VALUE"}
- }
- opts.LogLevel = logLevel
- default:
- return nil, http_api.Err{400, "INVALID_OPTION"}
- }
- s.nsqd.swapOpts(&opts)
- s.nsqd.triggerOptsNotification()
- }
- v, ok := getOptByCfgName(s.nsqd.getOpts(), opt)
- if !ok {
- return nil, http_api.Err{400, "INVALID_OPTION"}
- }
- return v, nil
- }
- func getOptByCfgName(opts interface{}, name string) (interface{}, bool) {
- val := reflect.ValueOf(opts).Elem()
- typ := val.Type()
- for i := 0; i < typ.NumField(); i++ {
- field := typ.Field(i)
- flagName := field.Tag.Get("flag")
- cfgName := field.Tag.Get("cfg")
- if flagName == "" {
- continue
- }
- if cfgName == "" {
- cfgName = strings.Replace(flagName, "-", "_", -1)
- }
- if name != cfgName {
- continue
- }
- return val.FieldByName(field.Name).Interface(), true
- }
- return nil, false
- }
|