options.go 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206
  1. package main
  2. import (
  3. "crypto/tls"
  4. "flag"
  5. "fmt"
  6. "strconv"
  7. "strings"
  8. "github.com/nsqio/nsq/internal/app"
  9. "github.com/nsqio/nsq/internal/lg"
  10. "github.com/nsqio/nsq/nsqd"
  11. )
  12. type tlsRequiredOption int
  13. func (t *tlsRequiredOption) Set(s string) error {
  14. s = strings.ToLower(s)
  15. if s == "tcp-https" {
  16. *t = nsqd.TLSRequiredExceptHTTP
  17. return nil
  18. }
  19. required, err := strconv.ParseBool(s)
  20. if required {
  21. *t = nsqd.TLSRequired
  22. } else {
  23. *t = nsqd.TLSNotRequired
  24. }
  25. return err
  26. }
  27. func (t *tlsRequiredOption) Get() interface{} { return int(*t) }
  28. func (t *tlsRequiredOption) String() string {
  29. return strconv.FormatInt(int64(*t), 10)
  30. }
  31. func (t *tlsRequiredOption) IsBoolFlag() bool { return true }
  32. type tlsMinVersionOption uint16
  33. var tlsVersionTable = []struct {
  34. val uint16
  35. str string
  36. }{
  37. {tls.VersionSSL30, "ssl3.0"},
  38. {tls.VersionTLS10, "tls1.0"},
  39. {tls.VersionTLS11, "tls1.1"},
  40. {tls.VersionTLS12, "tls1.2"},
  41. {tls.VersionTLS13, "tls1.3"},
  42. }
  43. func (t *tlsMinVersionOption) Set(s string) error {
  44. s = strings.ToLower(s)
  45. if s == "" {
  46. return nil
  47. }
  48. for _, v := range tlsVersionTable {
  49. if s == v.str {
  50. *t = tlsMinVersionOption(v.val)
  51. return nil
  52. }
  53. }
  54. return fmt.Errorf("unknown tlsVersionOption %q", s)
  55. }
  56. func (t *tlsMinVersionOption) Get() interface{} { return uint16(*t) }
  57. func (t *tlsMinVersionOption) String() string {
  58. for _, v := range tlsVersionTable {
  59. if uint16(*t) == v.val {
  60. return v.str
  61. }
  62. }
  63. return strconv.FormatInt(int64(*t), 10)
  64. }
  65. type config map[string]interface{}
  66. // Validate settings in the config file, and fatal on errors
  67. func (cfg config) Validate() {
  68. // special validation/translation
  69. if v, exists := cfg["tls_required"]; exists {
  70. var t tlsRequiredOption
  71. err := t.Set(fmt.Sprintf("%v", v))
  72. if err == nil {
  73. cfg["tls_required"] = t.String()
  74. } else {
  75. logFatal("failed parsing tls_required %+v", v)
  76. }
  77. }
  78. if v, exists := cfg["tls_min_version"]; exists {
  79. var t tlsMinVersionOption
  80. err := t.Set(fmt.Sprintf("%v", v))
  81. if err == nil {
  82. newVal := fmt.Sprintf("%v", t.Get())
  83. if newVal != "0" {
  84. cfg["tls_min_version"] = newVal
  85. } else {
  86. delete(cfg, "tls_min_version")
  87. }
  88. } else {
  89. logFatal("failed parsing tls_min_version %+v", v)
  90. }
  91. }
  92. if v, exists := cfg["log_level"]; exists {
  93. var t lg.LogLevel
  94. err := t.Set(fmt.Sprintf("%v", v))
  95. if err == nil {
  96. cfg["log_level"] = t
  97. } else {
  98. logFatal("failed parsing log_level %+v", v)
  99. }
  100. }
  101. }
  102. func nsqdFlagSet(opts *nsqd.Options) *flag.FlagSet {
  103. flagSet := flag.NewFlagSet("nsqd", flag.ExitOnError)
  104. // basic options
  105. flagSet.Bool("version", false, "print version string")
  106. flagSet.String("config", "", "path to config file")
  107. logLevel := opts.LogLevel
  108. flagSet.Var(&logLevel, "log-level", "set log verbosity: debug, info, warn, error, or fatal")
  109. // flagSet.String("log-prefix", "[nsqd] ", "log message prefix")
  110. flagSet.String("log-prefix", "[smqd] ", "log message prefix")
  111. flagSet.Bool("verbose", false, "[deprecated] has no effect, use --log-level")
  112. flagSet.Int64("node-id", opts.ID, "unique part for message IDs, (int) in range [0,1024) (default is hash of hostname)")
  113. flagSet.Bool("worker-id", false, "[deprecated] use --node-id")
  114. flagSet.String("https-address", opts.HTTPSAddress, "<addr>:<port> to listen on for HTTPS clients")
  115. flagSet.String("http-address", opts.HTTPAddress, "<addr>:<port> to listen on for HTTP clients")
  116. flagSet.String("tcp-address", opts.TCPAddress, "<addr>:<port> to listen on for TCP clients")
  117. authHTTPAddresses := app.StringArray{}
  118. flagSet.Var(&authHTTPAddresses, "auth-http-address", "<addr>:<port> or a full url to query auth server (may be given multiple times)")
  119. flagSet.String("broadcast-address", opts.BroadcastAddress, "address that will be registered with lookupd (defaults to the OS hostname)")
  120. flagSet.Int("broadcast-tcp-port", opts.BroadcastTCPPort, "TCP port that will be registered with lookupd (defaults to the TCP port that this nsqd is listening on)")
  121. flagSet.Int("broadcast-http-port", opts.BroadcastHTTPPort, "HTTP port that will be registered with lookupd (defaults to the HTTP port that this nsqd is listening on)")
  122. lookupdTCPAddrs := app.StringArray{}
  123. flagSet.Var(&lookupdTCPAddrs, "lookupd-tcp-address", "lookupd TCP address (may be given multiple times)")
  124. flagSet.Duration("http-client-connect-timeout", opts.HTTPClientConnectTimeout, "timeout for HTTP connect")
  125. flagSet.Duration("http-client-request-timeout", opts.HTTPClientRequestTimeout, "timeout for HTTP request")
  126. // diskqueue options
  127. flagSet.String("data-path", opts.DataPath, "path to store disk-backed messages")
  128. flagSet.Int64("mem-queue-size", opts.MemQueueSize, "number of messages to keep in memory (per topic/channel)")
  129. flagSet.Int64("max-bytes-per-file", opts.MaxBytesPerFile, "number of bytes per diskqueue file before rolling")
  130. flagSet.Int64("sync-every", opts.SyncEvery, "number of messages per diskqueue fsync")
  131. flagSet.Duration("sync-timeout", opts.SyncTimeout, "duration of time per diskqueue fsync")
  132. flagSet.Int("queue-scan-worker-pool-max", opts.QueueScanWorkerPoolMax, "max concurrency for checking in-flight and deferred message timeouts")
  133. flagSet.Int("queue-scan-selection-count", opts.QueueScanSelectionCount, "number of channels to check per cycle (every 100ms) for in-flight and deferred timeouts")
  134. // msg and command options
  135. flagSet.Duration("msg-timeout", opts.MsgTimeout, "default duration to wait before auto-requeing a message")
  136. flagSet.Duration("max-msg-timeout", opts.MaxMsgTimeout, "maximum duration before a message will timeout")
  137. flagSet.Int64("max-msg-size", opts.MaxMsgSize, "maximum size of a single message in bytes")
  138. flagSet.Duration("max-req-timeout", opts.MaxReqTimeout, "maximum requeuing timeout for a message")
  139. flagSet.Int64("max-body-size", opts.MaxBodySize, "maximum size of a single command body")
  140. // client overridable configuration options
  141. flagSet.Duration("max-heartbeat-interval", opts.MaxHeartbeatInterval, "maximum client configurable duration of time between client heartbeats")
  142. flagSet.Int64("max-rdy-count", opts.MaxRdyCount, "maximum RDY count for a client")
  143. flagSet.Int64("max-output-buffer-size", opts.MaxOutputBufferSize, "maximum client configurable size (in bytes) for a client output buffer")
  144. flagSet.Duration("max-output-buffer-timeout", opts.MaxOutputBufferTimeout, "maximum client configurable duration of time between flushing to a client")
  145. flagSet.Duration("min-output-buffer-timeout", opts.MinOutputBufferTimeout, "minimum client configurable duration of time between flushing to a client")
  146. flagSet.Duration("output-buffer-timeout", opts.OutputBufferTimeout, "default duration of time between flushing data to clients")
  147. flagSet.Int("max-channel-consumers", opts.MaxChannelConsumers, "maximum channel consumer connection count per nsqd instance (default 0, i.e., unlimited)")
  148. // statsd integration options
  149. flagSet.String("statsd-address", opts.StatsdAddress, "UDP <addr>:<port> of a statsd daemon for pushing stats")
  150. flagSet.Duration("statsd-interval", opts.StatsdInterval, "duration between pushing to statsd")
  151. flagSet.Bool("statsd-mem-stats", opts.StatsdMemStats, "toggle sending memory and GC stats to statsd")
  152. flagSet.String("statsd-prefix", opts.StatsdPrefix, "prefix used for keys sent to statsd (%s for host replacement)")
  153. flagSet.Int("statsd-udp-packet-size", opts.StatsdUDPPacketSize, "the size in bytes of statsd UDP packets")
  154. flagSet.Bool("statsd-exclude-ephemeral", opts.StatsdExcludeEphemeral, "Skip ephemeral topics and channels when sending stats to statsd")
  155. // End to end percentile flags
  156. e2eProcessingLatencyPercentiles := app.FloatArray{}
  157. flagSet.Var(&e2eProcessingLatencyPercentiles, "e2e-processing-latency-percentile", "message processing time percentiles (as float (0, 1.0]) to track (can be specified multiple times or comma separated '1.0,0.99,0.95', default none)")
  158. flagSet.Duration("e2e-processing-latency-window-time", opts.E2EProcessingLatencyWindowTime, "calculate end to end latency quantiles for this duration of time (ie: 60s would only show quantile calculations from the past 60 seconds)")
  159. // TLS config
  160. flagSet.String("tls-cert", opts.TLSCert, "path to certificate file")
  161. flagSet.String("tls-key", opts.TLSKey, "path to key file")
  162. flagSet.String("tls-client-auth-policy", opts.TLSClientAuthPolicy, "client certificate auth policy ('require' or 'require-verify')")
  163. flagSet.String("tls-root-ca-file", opts.TLSRootCAFile, "path to certificate authority file")
  164. tlsRequired := tlsRequiredOption(opts.TLSRequired)
  165. tlsMinVersion := tlsMinVersionOption(opts.TLSMinVersion)
  166. flagSet.Var(&tlsRequired, "tls-required", "require TLS for client connections (true, false, tcp-https)")
  167. flagSet.Var(&tlsMinVersion, "tls-min-version", "minimum SSL/TLS version acceptable ('ssl3.0', 'tls1.0', 'tls1.1', 'tls1.2' or 'tls1.3')")
  168. // compression
  169. flagSet.Bool("deflate", opts.DeflateEnabled, "enable deflate feature negotiation (client compression)")
  170. flagSet.Int("max-deflate-level", opts.MaxDeflateLevel, "max deflate compression level a client can negotiate (> values == > nsqd CPU usage)")
  171. flagSet.Bool("snappy", opts.SnappyEnabled, "enable snappy feature negotiation (client compression)")
  172. // qos realtime
  173. flagSet.Bool("qos", opts.QosEnabled, "enable qos")
  174. flagSet.Bool("rt", opts.RtEnabled, "enable rt")
  175. flagSet.Bool("fifo", opts.FifoEnabled, "enable fifo")
  176. return flagSet
  177. }