123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839 |
- package nsqadmin
- import (
- "encoding/json"
- "fmt"
- "html/template"
- "io"
- "io/ioutil"
- "mime"
- "net"
- "net/http"
- "net/http/httputil"
- "net/url"
- "path"
- "reflect"
- "strings"
- "time"
- "github.com/julienschmidt/httprouter"
- "github.com/nsqio/nsq/internal/clusterinfo"
- "github.com/nsqio/nsq/internal/http_api"
- "github.com/nsqio/nsq/internal/lg"
- "github.com/nsqio/nsq/internal/protocol"
- "github.com/nsqio/nsq/internal/version"
- )
- func maybeWarnMsg(msgs []string) string {
- if len(msgs) > 0 {
- return "WARNING: " + strings.Join(msgs, "; ")
- }
- return ""
- }
- // this is similar to httputil.NewSingleHostReverseProxy except it passes along basic auth
- func NewSingleHostReverseProxy(target *url.URL, connectTimeout time.Duration, requestTimeout time.Duration) *httputil.ReverseProxy {
- director := func(req *http.Request) {
- req.URL.Scheme = target.Scheme
- req.URL.Host = target.Host
- if target.User != nil {
- passwd, _ := target.User.Password()
- req.SetBasicAuth(target.User.Username(), passwd)
- }
- }
- return &httputil.ReverseProxy{
- Director: director,
- Transport: http_api.NewDeadlineTransport(connectTimeout, requestTimeout),
- }
- }
- type httpServer struct {
- nsqadmin *NSQAdmin
- router http.Handler
- client *http_api.Client
- ci *clusterinfo.ClusterInfo
- basePath string
- devStaticDir string
- }
- func NewHTTPServer(nsqadmin *NSQAdmin) *httpServer {
- log := http_api.Log(nsqadmin.logf)
- client := http_api.NewClient(nsqadmin.httpClientTLSConfig, nsqadmin.getOpts().HTTPClientConnectTimeout,
- nsqadmin.getOpts().HTTPClientRequestTimeout)
- router := httprouter.New()
- router.HandleMethodNotAllowed = true
- router.PanicHandler = http_api.LogPanicHandler(nsqadmin.logf)
- router.NotFound = http_api.LogNotFoundHandler(nsqadmin.logf)
- router.MethodNotAllowed = http_api.LogMethodNotAllowedHandler(nsqadmin.logf)
- s := &httpServer{
- nsqadmin: nsqadmin,
- router: router,
- client: client,
- ci: clusterinfo.New(nsqadmin.logf, client),
- basePath: nsqadmin.getOpts().BasePath,
- devStaticDir: nsqadmin.getOpts().DevStaticDir,
- }
- bp := func(p string) string {
- return path.Join(s.basePath, p)
- }
- router.Handle("GET", bp("/"), http_api.Decorate(s.indexHandler, log))
- router.Handle("GET", bp("/ping"), http_api.Decorate(s.pingHandler, log, http_api.PlainText))
- router.Handle("GET", bp("/topics"), http_api.Decorate(s.indexHandler, log))
- router.Handle("GET", bp("/topics/:topic"), http_api.Decorate(s.indexHandler, log))
- router.Handle("GET", bp("/topics/:topic/:channel"), http_api.Decorate(s.indexHandler, log))
- router.Handle("GET", bp("/nodes"), http_api.Decorate(s.indexHandler, log))
- router.Handle("GET", bp("/nodes/:node"), http_api.Decorate(s.indexHandler, log))
- router.Handle("GET", bp("/counter"), http_api.Decorate(s.indexHandler, log))
- router.Handle("GET", bp("/lookup"), http_api.Decorate(s.indexHandler, log))
- router.Handle("GET", bp("/static/:asset"), http_api.Decorate(s.staticAssetHandler, log, http_api.PlainText))
- router.Handle("GET", bp("/fonts/:asset"), http_api.Decorate(s.staticAssetHandler, log, http_api.PlainText))
- if s.nsqadmin.getOpts().ProxyGraphite {
- proxy := NewSingleHostReverseProxy(nsqadmin.graphiteURL, nsqadmin.getOpts().HTTPClientConnectTimeout,
- nsqadmin.getOpts().HTTPClientRequestTimeout)
- router.Handler("GET", bp("/render"), proxy)
- }
- // v1 endpoints
- router.Handle("GET", bp("/api/topics"), http_api.Decorate(s.topicsHandler, log, http_api.V1))
- router.Handle("GET", bp("/api/topics/:topic"), http_api.Decorate(s.topicHandler, log, http_api.V1))
- router.Handle("GET", bp("/api/topics/:topic/:channel"), http_api.Decorate(s.channelHandler, log, http_api.V1))
- router.Handle("GET", bp("/api/nodes"), http_api.Decorate(s.nodesHandler, log, http_api.V1))
- router.Handle("GET", bp("/api/nodes/:node"), http_api.Decorate(s.nodeHandler, log, http_api.V1))
- router.Handle("POST", bp("/api/topics"), http_api.Decorate(s.createTopicChannelHandler, log, http_api.V1))
- router.Handle("POST", bp("/api/topics/:topic"), http_api.Decorate(s.topicActionHandler, log, http_api.V1))
- router.Handle("POST", bp("/api/topics/:topic/:channel"), http_api.Decorate(s.channelActionHandler, log, http_api.V1))
- router.Handle("DELETE", bp("/api/nodes/:node"), http_api.Decorate(s.tombstoneNodeForTopicHandler, log, http_api.V1))
- router.Handle("DELETE", bp("/api/topics/:topic"), http_api.Decorate(s.deleteTopicHandler, log, http_api.V1))
- router.Handle("DELETE", bp("/api/topics/:topic/:channel"), http_api.Decorate(s.deleteChannelHandler, log, http_api.V1))
- router.Handle("GET", bp("/api/counter"), http_api.Decorate(s.counterHandler, log, http_api.V1))
- router.Handle("GET", bp("/api/graphite"), http_api.Decorate(s.graphiteHandler, log, http_api.V1))
- router.Handle("GET", bp("/config/:opt"), http_api.Decorate(s.doConfig, log, http_api.V1))
- router.Handle("PUT", bp("/config/:opt"), http_api.Decorate(s.doConfig, log, http_api.V1))
- return s
- }
- func (s *httpServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
- s.router.ServeHTTP(w, req)
- }
- func (s *httpServer) pingHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
- return "OK", nil
- }
- func (s *httpServer) indexHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
- asset, _ := Asset("index.html")
- t, _ := template.New("index").Funcs(template.FuncMap{
- "basePath": func(p string) string {
- return path.Join(s.basePath, p)
- },
- }).Parse(string(asset))
- w.Header().Set("Content-Type", "text/html")
- t.Execute(w, struct {
- Version string
- ProxyGraphite bool
- GraphEnabled bool
- GraphiteURL string
- StatsdInterval int
- StatsdCounterFormat string
- StatsdGaugeFormat string
- StatsdPrefix string
- NSQLookupd []string
- IsAdmin bool
- }{
- Version: version.Binary,
- ProxyGraphite: s.nsqadmin.getOpts().ProxyGraphite,
- GraphEnabled: s.nsqadmin.getOpts().GraphiteURL != "",
- GraphiteURL: s.nsqadmin.getOpts().GraphiteURL,
- StatsdInterval: int(s.nsqadmin.getOpts().StatsdInterval / time.Second),
- StatsdCounterFormat: s.nsqadmin.getOpts().StatsdCounterFormat,
- StatsdGaugeFormat: s.nsqadmin.getOpts().StatsdGaugeFormat,
- StatsdPrefix: s.nsqadmin.getOpts().StatsdPrefix,
- NSQLookupd: s.nsqadmin.getOpts().NSQLookupdHTTPAddresses,
- IsAdmin: s.isAuthorizedAdminRequest(req),
- })
- return nil, nil
- }
- func (s *httpServer) staticAssetHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
- assetName := ps.ByName("asset")
- var (
- asset []byte
- err error
- )
- if s.devStaticDir != "" {
- s.nsqadmin.logf(LOG_DEBUG, "using dev dir %q for static asset %q", s.devStaticDir, assetName)
- fsPath := path.Join(s.devStaticDir, assetName)
- asset, err = ioutil.ReadFile(fsPath)
- } else {
- asset, err = staticAsset(assetName)
- }
- if err != nil {
- return nil, http_api.Err{404, "NOT_FOUND"}
- }
- ext := path.Ext(assetName)
- ct := mime.TypeByExtension(ext)
- if ct == "" {
- switch ext {
- case ".map":
- ct = "application/json"
- case ".svg":
- ct = "image/svg+xml"
- case ".woff":
- ct = "application/font-woff"
- case ".ttf":
- ct = "application/font-sfnt"
- case ".eot":
- ct = "application/vnd.ms-fontobject"
- case ".woff2":
- ct = "application/font-woff2"
- }
- }
- if ct != "" {
- w.Header().Set("Content-Type", ct)
- }
- return string(asset), nil
- }
- func (s *httpServer) topicsHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
- var messages []string
- reqParams, err := http_api.NewReqParams(req)
- if err != nil {
- return nil, http_api.Err{400, err.Error()}
- }
- var topics []string
- if len(s.nsqadmin.getOpts().NSQLookupdHTTPAddresses) != 0 {
- topics, err = s.ci.GetLookupdTopics(s.nsqadmin.getOpts().NSQLookupdHTTPAddresses)
- } else {
- topics, err = s.ci.GetNSQDTopics(s.nsqadmin.getOpts().NSQDHTTPAddresses)
- }
- if err != nil {
- pe, ok := err.(clusterinfo.PartialErr)
- if !ok {
- s.nsqadmin.logf(LOG_ERROR, "failed to get topics - %s", err)
- return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR: %s", err)}
- }
- s.nsqadmin.logf(LOG_WARN, "%s", err)
- messages = append(messages, pe.Error())
- }
- inactive, _ := reqParams.Get("inactive")
- if inactive == "true" {
- topicChannelMap := make(map[string][]string)
- if len(s.nsqadmin.getOpts().NSQLookupdHTTPAddresses) == 0 {
- goto respond
- }
- for _, topicName := range topics {
- producers, _ := s.ci.GetLookupdTopicProducers(
- topicName, s.nsqadmin.getOpts().NSQLookupdHTTPAddresses)
- if len(producers) == 0 {
- topicChannels, _ := s.ci.GetLookupdTopicChannels(
- topicName, s.nsqadmin.getOpts().NSQLookupdHTTPAddresses)
- topicChannelMap[topicName] = topicChannels
- }
- }
- respond:
- return struct {
- Topics map[string][]string `json:"topics"`
- Message string `json:"message"`
- }{topicChannelMap, maybeWarnMsg(messages)}, nil
- }
- return struct {
- Topics []string `json:"topics"`
- Message string `json:"message"`
- }{topics, maybeWarnMsg(messages)}, nil
- }
- func (s *httpServer) topicHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
- var messages []string
- topicName := ps.ByName("topic")
- producers, err := s.ci.GetTopicProducers(topicName,
- s.nsqadmin.getOpts().NSQLookupdHTTPAddresses,
- s.nsqadmin.getOpts().NSQDHTTPAddresses)
- if err != nil {
- pe, ok := err.(clusterinfo.PartialErr)
- if !ok {
- s.nsqadmin.logf(LOG_ERROR, "failed to get topic producers - %s", err)
- return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR: %s", err)}
- }
- s.nsqadmin.logf(LOG_WARN, "%s", err)
- messages = append(messages, pe.Error())
- }
- topicStats, _, err := s.ci.GetNSQDStats(producers, topicName, "", false)
- if err != nil {
- pe, ok := err.(clusterinfo.PartialErr)
- if !ok {
- s.nsqadmin.logf(LOG_ERROR, "failed to get topic metadata - %s", err)
- return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR: %s", err)}
- }
- s.nsqadmin.logf(LOG_WARN, "%s", err)
- messages = append(messages, pe.Error())
- }
- allNodesTopicStats := &clusterinfo.TopicStats{TopicName: topicName}
- for _, t := range topicStats {
- allNodesTopicStats.Add(t)
- }
- return struct {
- *clusterinfo.TopicStats
- Message string `json:"message"`
- }{allNodesTopicStats, maybeWarnMsg(messages)}, nil
- }
- func (s *httpServer) channelHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
- var messages []string
- topicName := ps.ByName("topic")
- channelName := ps.ByName("channel")
- producers, err := s.ci.GetTopicProducers(topicName,
- s.nsqadmin.getOpts().NSQLookupdHTTPAddresses,
- s.nsqadmin.getOpts().NSQDHTTPAddresses)
- if err != nil {
- pe, ok := err.(clusterinfo.PartialErr)
- if !ok {
- s.nsqadmin.logf(LOG_ERROR, "failed to get topic producers - %s", err)
- return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR: %s", err)}
- }
- s.nsqadmin.logf(LOG_WARN, "%s", err)
- messages = append(messages, pe.Error())
- }
- _, channelStats, err := s.ci.GetNSQDStats(producers, topicName, channelName, true)
- if err != nil {
- pe, ok := err.(clusterinfo.PartialErr)
- if !ok {
- s.nsqadmin.logf(LOG_ERROR, "failed to get channel metadata - %s", err)
- return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR: %s", err)}
- }
- s.nsqadmin.logf(LOG_WARN, "%s", err)
- messages = append(messages, pe.Error())
- }
- return struct {
- *clusterinfo.ChannelStats
- Message string `json:"message"`
- }{channelStats[channelName], maybeWarnMsg(messages)}, nil
- }
- func (s *httpServer) nodesHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
- var messages []string
- producers, err := s.ci.GetProducers(s.nsqadmin.getOpts().NSQLookupdHTTPAddresses, s.nsqadmin.getOpts().NSQDHTTPAddresses)
- if err != nil {
- pe, ok := err.(clusterinfo.PartialErr)
- if !ok {
- s.nsqadmin.logf(LOG_ERROR, "failed to get nodes - %s", err)
- return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR: %s", err)}
- }
- s.nsqadmin.logf(LOG_WARN, "%s", err)
- messages = append(messages, pe.Error())
- }
- return struct {
- Nodes clusterinfo.Producers `json:"nodes"`
- Message string `json:"message"`
- }{producers, maybeWarnMsg(messages)}, nil
- }
- func (s *httpServer) nodeHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
- var messages []string
- node := ps.ByName("node")
- producers, err := s.ci.GetProducers(s.nsqadmin.getOpts().NSQLookupdHTTPAddresses, s.nsqadmin.getOpts().NSQDHTTPAddresses)
- if err != nil {
- pe, ok := err.(clusterinfo.PartialErr)
- if !ok {
- s.nsqadmin.logf(LOG_ERROR, "failed to get producers - %s", err)
- return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR: %s", err)}
- }
- s.nsqadmin.logf(LOG_WARN, "%s", err)
- messages = append(messages, pe.Error())
- }
- producer := producers.Search(node)
- if producer == nil {
- return nil, http_api.Err{404, "NODE_NOT_FOUND"}
- }
- topicStats, _, err := s.ci.GetNSQDStats(clusterinfo.Producers{producer}, "", "", true)
- if err != nil {
- s.nsqadmin.logf(LOG_ERROR, "failed to get nsqd stats - %s", err)
- return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR: %s", err)}
- }
- var totalClients int64
- var totalMessages int64
- for _, ts := range topicStats {
- for _, cs := range ts.Channels {
- totalClients += int64(len(cs.Clients))
- }
- totalMessages += ts.MessageCount
- }
- return struct {
- Node string `json:"node"`
- TopicStats []*clusterinfo.TopicStats `json:"topics"`
- TotalMessages int64 `json:"total_messages"`
- TotalClients int64 `json:"total_clients"`
- Message string `json:"message"`
- }{
- Node: node,
- TopicStats: topicStats,
- TotalMessages: totalMessages,
- TotalClients: totalClients,
- Message: maybeWarnMsg(messages),
- }, nil
- }
- func (s *httpServer) tombstoneNodeForTopicHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
- var messages []string
- node := ps.ByName("node")
- var body struct {
- Topic string `json:"topic"`
- }
- err := json.NewDecoder(req.Body).Decode(&body)
- if err != nil {
- return nil, http_api.Err{400, "INVALID_BODY"}
- }
- if !protocol.IsValidTopicName(body.Topic) {
- return nil, http_api.Err{400, "INVALID_TOPIC"}
- }
- err = s.ci.TombstoneNodeForTopic(body.Topic, node,
- s.nsqadmin.getOpts().NSQLookupdHTTPAddresses)
- if err != nil {
- pe, ok := err.(clusterinfo.PartialErr)
- if !ok {
- s.nsqadmin.logf(LOG_ERROR, "failed to tombstone node for topic - %s", err)
- return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR: %s", err)}
- }
- s.nsqadmin.logf(LOG_WARN, "%s", err)
- messages = append(messages, pe.Error())
- }
- s.notifyAdminAction("tombstone_topic_producer", body.Topic, "", node, req)
- return struct {
- Message string `json:"message"`
- }{maybeWarnMsg(messages)}, nil
- }
- func (s *httpServer) createTopicChannelHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
- var messages []string
- var body struct {
- Topic string `json:"topic"`
- Channel string `json:"channel"`
- }
- if !s.isAuthorizedAdminRequest(req) {
- return nil, http_api.Err{403, "FORBIDDEN"}
- }
- err := json.NewDecoder(req.Body).Decode(&body)
- if err != nil {
- return nil, http_api.Err{400, err.Error()}
- }
- if !protocol.IsValidTopicName(body.Topic) {
- s.nsqadmin.logf(LOG_ERROR, "invalid topic - %s", body.Topic)
- return nil, http_api.Err{400, "INVALID_TOPIC"}
- }
- if len(body.Channel) > 0 && !protocol.IsValidChannelName(body.Channel) {
- s.nsqadmin.logf(LOG_ERROR, "invalid channel - %s", body.Channel)
- return nil, http_api.Err{400, "INVALID_CHANNEL"}
- }
- err = s.ci.CreateTopicChannel(body.Topic, body.Channel,
- s.nsqadmin.getOpts().NSQLookupdHTTPAddresses)
- if err != nil {
- pe, ok := err.(clusterinfo.PartialErr)
- if !ok {
- s.nsqadmin.logf(LOG_ERROR, "failed to create topic/channel - %s", err)
- return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR: %s", err)}
- }
- s.nsqadmin.logf(LOG_WARN, "%s", err)
- messages = append(messages, pe.Error())
- }
- s.notifyAdminAction("create_topic", body.Topic, "", "", req)
- if len(body.Channel) > 0 {
- s.notifyAdminAction("create_channel", body.Topic, body.Channel, "", req)
- }
- return struct {
- Message string `json:"message"`
- }{maybeWarnMsg(messages)}, nil
- }
- func (s *httpServer) deleteTopicHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
- var messages []string
- if !s.isAuthorizedAdminRequest(req) {
- return nil, http_api.Err{403, "FORBIDDEN"}
- }
- topicName := ps.ByName("topic")
- err := s.ci.DeleteTopic(topicName,
- s.nsqadmin.getOpts().NSQLookupdHTTPAddresses,
- s.nsqadmin.getOpts().NSQDHTTPAddresses)
- if err != nil {
- pe, ok := err.(clusterinfo.PartialErr)
- if !ok {
- s.nsqadmin.logf(LOG_ERROR, "failed to delete topic - %s", err)
- return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR: %s", err)}
- }
- s.nsqadmin.logf(LOG_WARN, "%s", err)
- messages = append(messages, pe.Error())
- }
- s.notifyAdminAction("delete_topic", topicName, "", "", req)
- return struct {
- Message string `json:"message"`
- }{maybeWarnMsg(messages)}, nil
- }
- func (s *httpServer) deleteChannelHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
- var messages []string
- if !s.isAuthorizedAdminRequest(req) {
- return nil, http_api.Err{403, "FORBIDDEN"}
- }
- topicName := ps.ByName("topic")
- channelName := ps.ByName("channel")
- err := s.ci.DeleteChannel(topicName, channelName,
- s.nsqadmin.getOpts().NSQLookupdHTTPAddresses,
- s.nsqadmin.getOpts().NSQDHTTPAddresses)
- if err != nil {
- pe, ok := err.(clusterinfo.PartialErr)
- if !ok {
- s.nsqadmin.logf(LOG_ERROR, "failed to delete channel - %s", err)
- return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR: %s", err)}
- }
- s.nsqadmin.logf(LOG_WARN, "%s", err)
- messages = append(messages, pe.Error())
- }
- s.notifyAdminAction("delete_channel", topicName, channelName, "", req)
- return struct {
- Message string `json:"message"`
- }{maybeWarnMsg(messages)}, nil
- }
- func (s *httpServer) topicActionHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
- topicName := ps.ByName("topic")
- return s.topicChannelAction(req, topicName, "")
- }
- func (s *httpServer) channelActionHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
- topicName := ps.ByName("topic")
- channelName := ps.ByName("channel")
- return s.topicChannelAction(req, topicName, channelName)
- }
- func (s *httpServer) topicChannelAction(req *http.Request, topicName string, channelName string) (interface{}, error) {
- var messages []string
- var body struct {
- Action string `json:"action"`
- }
- if !s.isAuthorizedAdminRequest(req) {
- return nil, http_api.Err{403, "FORBIDDEN"}
- }
- err := json.NewDecoder(req.Body).Decode(&body)
- if err != nil {
- return nil, http_api.Err{400, err.Error()}
- }
- switch body.Action {
- case "pause":
- if channelName != "" {
- err = s.ci.PauseChannel(topicName, channelName,
- s.nsqadmin.getOpts().NSQLookupdHTTPAddresses,
- s.nsqadmin.getOpts().NSQDHTTPAddresses)
- s.notifyAdminAction("pause_channel", topicName, channelName, "", req)
- } else {
- err = s.ci.PauseTopic(topicName,
- s.nsqadmin.getOpts().NSQLookupdHTTPAddresses,
- s.nsqadmin.getOpts().NSQDHTTPAddresses)
- s.notifyAdminAction("pause_topic", topicName, "", "", req)
- }
- case "unpause":
- if channelName != "" {
- err = s.ci.UnPauseChannel(topicName, channelName,
- s.nsqadmin.getOpts().NSQLookupdHTTPAddresses,
- s.nsqadmin.getOpts().NSQDHTTPAddresses)
- s.notifyAdminAction("unpause_channel", topicName, channelName, "", req)
- } else {
- err = s.ci.UnPauseTopic(topicName,
- s.nsqadmin.getOpts().NSQLookupdHTTPAddresses,
- s.nsqadmin.getOpts().NSQDHTTPAddresses)
- s.notifyAdminAction("unpause_topic", topicName, "", "", req)
- }
- case "empty":
- if channelName != "" {
- err = s.ci.EmptyChannel(topicName, channelName,
- s.nsqadmin.getOpts().NSQLookupdHTTPAddresses,
- s.nsqadmin.getOpts().NSQDHTTPAddresses)
- s.notifyAdminAction("empty_channel", topicName, channelName, "", req)
- } else {
- err = s.ci.EmptyTopic(topicName,
- s.nsqadmin.getOpts().NSQLookupdHTTPAddresses,
- s.nsqadmin.getOpts().NSQDHTTPAddresses)
- s.notifyAdminAction("empty_topic", topicName, "", "", req)
- }
- default:
- return nil, http_api.Err{400, "INVALID_ACTION"}
- }
- if err != nil {
- pe, ok := err.(clusterinfo.PartialErr)
- if !ok {
- s.nsqadmin.logf(LOG_ERROR, "failed to %s topic/channel - %s", body.Action, err)
- return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR: %s", err)}
- }
- s.nsqadmin.logf(LOG_WARN, "%s", err)
- messages = append(messages, pe.Error())
- }
- return struct {
- Message string `json:"message"`
- }{maybeWarnMsg(messages)}, nil
- }
- type counterStats struct {
- Node string `json:"node"`
- TopicName string `json:"topic_name"`
- ChannelName string `json:"channel_name"`
- MessageCount int64 `json:"message_count"`
- }
- func (s *httpServer) counterHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
- var messages []string
- stats := make(map[string]*counterStats)
- producers, err := s.ci.GetProducers(s.nsqadmin.getOpts().NSQLookupdHTTPAddresses, s.nsqadmin.getOpts().NSQDHTTPAddresses)
- if err != nil {
- pe, ok := err.(clusterinfo.PartialErr)
- if !ok {
- s.nsqadmin.logf(LOG_ERROR, "failed to get counter producer list - %s", err)
- return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR: %s", err)}
- }
- s.nsqadmin.logf(LOG_WARN, "%s", err)
- messages = append(messages, pe.Error())
- }
- _, channelStats, err := s.ci.GetNSQDStats(producers, "", "", false)
- if err != nil {
- pe, ok := err.(clusterinfo.PartialErr)
- if !ok {
- s.nsqadmin.logf(LOG_ERROR, "failed to get nsqd stats - %s", err)
- return nil, http_api.Err{502, fmt.Sprintf("UPSTREAM_ERROR: %s", err)}
- }
- s.nsqadmin.logf(LOG_WARN, "%s", err)
- messages = append(messages, pe.Error())
- }
- for _, channelStats := range channelStats {
- for _, hostChannelStats := range channelStats.NodeStats {
- key := fmt.Sprintf("%s:%s:%s", channelStats.TopicName, channelStats.ChannelName, hostChannelStats.Node)
- s, ok := stats[key]
- if !ok {
- s = &counterStats{
- Node: hostChannelStats.Node,
- TopicName: channelStats.TopicName,
- ChannelName: channelStats.ChannelName,
- }
- stats[key] = s
- }
- s.MessageCount += hostChannelStats.MessageCount
- }
- }
- return struct {
- Stats map[string]*counterStats `json:"stats"`
- Message string `json:"message"`
- }{stats, maybeWarnMsg(messages)}, nil
- }
- func (s *httpServer) graphiteHandler(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
- reqParams, err := http_api.NewReqParams(req)
- if err != nil {
- return nil, http_api.Err{400, "INVALID_REQUEST"}
- }
- metric, err := reqParams.Get("metric")
- if err != nil || metric != "rate" {
- return nil, http_api.Err{400, "INVALID_ARG_METRIC"}
- }
- target, err := reqParams.Get("target")
- if err != nil {
- return nil, http_api.Err{400, "INVALID_ARG_TARGET"}
- }
- params := url.Values{}
- params.Set("from", fmt.Sprintf("-%dsec", s.nsqadmin.getOpts().StatsdInterval*2/time.Second))
- params.Set("until", fmt.Sprintf("-%dsec", s.nsqadmin.getOpts().StatsdInterval/time.Second))
- params.Set("format", "json")
- params.Set("target", target)
- query := fmt.Sprintf("/render?%s", params.Encode())
- url := s.nsqadmin.getOpts().GraphiteURL + query
- s.nsqadmin.logf(LOG_INFO, "GRAPHITE: %s", url)
- var response []struct {
- Target string `json:"target"`
- DataPoints [][]*float64 `json:"datapoints"`
- }
- err = s.client.GETV1(url, &response)
- if err != nil {
- s.nsqadmin.logf(LOG_ERROR, "graphite request failed - %s", err)
- return nil, http_api.Err{500, "INTERNAL_ERROR"}
- }
- var rateStr string
- rate := *response[0].DataPoints[0][0]
- if rate < 0 {
- rateStr = "N/A"
- } else {
- rateDivisor := s.nsqadmin.getOpts().StatsdInterval / time.Second
- rateStr = fmt.Sprintf("%.2f", rate/float64(rateDivisor))
- }
- return struct {
- Rate string `json:"rate"`
- }{rateStr}, nil
- }
- func (s *httpServer) doConfig(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
- opt := ps.ByName("opt")
- allowConfigFromCIDR := s.nsqadmin.getOpts().AllowConfigFromCIDR
- if allowConfigFromCIDR != "" {
- _, ipnet, _ := net.ParseCIDR(allowConfigFromCIDR)
- addr, _, err := net.SplitHostPort(req.RemoteAddr)
- if err != nil {
- s.nsqadmin.logf(LOG_ERROR, "failed to parse RemoteAddr %s", req.RemoteAddr)
- return nil, http_api.Err{400, "INVALID_REMOTE_ADDR"}
- }
- ip := net.ParseIP(addr)
- if ip == nil {
- s.nsqadmin.logf(LOG_ERROR, "failed to parse RemoteAddr %s", req.RemoteAddr)
- return nil, http_api.Err{400, "INVALID_REMOTE_ADDR"}
- }
- if !ipnet.Contains(ip) {
- return nil, http_api.Err{403, "FORBIDDEN"}
- }
- }
- if req.Method == "PUT" {
- // add 1 so that it's greater than our max when we test for it
- // (LimitReader returns a "fake" EOF)
- readMax := int64(1024*1024 + 1)
- body, err := ioutil.ReadAll(io.LimitReader(req.Body, readMax))
- if err != nil {
- return nil, http_api.Err{500, "INTERNAL_ERROR"}
- }
- if int64(len(body)) == readMax || len(body) == 0 {
- return nil, http_api.Err{413, "INVALID_VALUE"}
- }
- opts := *s.nsqadmin.getOpts()
- switch opt {
- case "nsqlookupd_http_addresses":
- err := json.Unmarshal(body, &opts.NSQLookupdHTTPAddresses)
- if err != nil {
- return nil, http_api.Err{400, "INVALID_VALUE"}
- }
- case "log_level":
- logLevelStr := string(body)
- logLevel, err := lg.ParseLogLevel(logLevelStr)
- if err != nil {
- return nil, http_api.Err{400, "INVALID_VALUE"}
- }
- opts.LogLevel = logLevel
- default:
- return nil, http_api.Err{400, "INVALID_OPTION"}
- }
- s.nsqadmin.swapOpts(&opts)
- }
- v, ok := getOptByCfgName(s.nsqadmin.getOpts(), opt)
- if !ok {
- return nil, http_api.Err{400, "INVALID_OPTION"}
- }
- return v, nil
- }
- func (s *httpServer) isAuthorizedAdminRequest(req *http.Request) bool {
- adminUsers := s.nsqadmin.getOpts().AdminUsers
- if len(adminUsers) == 0 {
- return true
- }
- aclHttpHeader := s.nsqadmin.getOpts().AclHttpHeader
- user := req.Header.Get(aclHttpHeader)
- for _, v := range adminUsers {
- if v == user {
- return true
- }
- }
- return false
- }
- func getOptByCfgName(opts interface{}, name string) (interface{}, bool) {
- val := reflect.ValueOf(opts).Elem()
- typ := val.Type()
- for i := 0; i < typ.NumField(); i++ {
- field := typ.Field(i)
- flagName := field.Tag.Get("flag")
- cfgName := field.Tag.Get("cfg")
- if flagName == "" {
- continue
- }
- if cfgName == "" {
- cfgName = strings.Replace(flagName, "-", "_", -1)
- }
- if name != cfgName {
- continue
- }
- return val.FieldByName(field.Name).Interface(), true
- }
- return nil, false
- }
|