nsq_to_file.go 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. // This is a client that writes out to a file, and optionally rolls the file
  2. package main
  3. import (
  4. "flag"
  5. "fmt"
  6. "log"
  7. "os"
  8. "os/signal"
  9. "syscall"
  10. "time"
  11. "github.com/mreiferson/go-options"
  12. "github.com/nsqio/go-nsq"
  13. "github.com/nsqio/nsq/internal/app"
  14. "github.com/nsqio/nsq/internal/lg"
  15. "github.com/nsqio/nsq/internal/version"
  16. )
  17. func hasArg(s string) bool {
  18. argExist := false
  19. flag.Visit(func(f *flag.Flag) {
  20. if f.Name == s {
  21. argExist = true
  22. }
  23. })
  24. return argExist
  25. }
  26. func flagSet() *flag.FlagSet {
  27. fs := flag.NewFlagSet("nsqd", flag.ExitOnError)
  28. fs.Bool("version", false, "print version string")
  29. fs.String("log-level", "info", "set log verbosity: debug, info, warn, error, or fatal")
  30. fs.String("log-prefix", "[nsq_to_file] ", "log message prefix")
  31. fs.String("channel", "nsq_to_file", "nsq channel")
  32. fs.Int("max-in-flight", 200, "max number of messages to allow in flight")
  33. fs.String("output-dir", "/tmp", "directory to write output files to")
  34. fs.String("work-dir", "", "directory for in-progress files before moving to output-dir")
  35. fs.String("datetime-format", "%Y-%m-%d_%H", "strftime compatible format for <DATETIME> in filename format")
  36. fs.String("filename-format", "<TOPIC>.<HOST><REV>.<DATETIME>.log", "output filename format (<TOPIC>, <HOST>, <PID>, <DATETIME>, <REV> are replaced. <REV> is increased when file already exists)")
  37. fs.String("host-identifier", "", "value to output in log filename in place of hostname. <SHORT_HOST> and <HOSTNAME> are valid replacement tokens")
  38. fs.Int("gzip-level", 6, "gzip compression level (1-9, 1=BestSpeed, 9=BestCompression)")
  39. fs.Bool("gzip", false, "gzip output files.")
  40. fs.Bool("skip-empty-files", false, "skip writing empty files")
  41. fs.Duration("topic-refresh", time.Minute, "how frequently the topic list should be refreshed")
  42. fs.String("topic-pattern", "", "only log topics matching the following pattern")
  43. fs.Int64("rotate-size", 0, "rotate the file when it grows bigger than `rotate-size` bytes")
  44. fs.Duration("rotate-interval", 0, "rotate the file every duration")
  45. fs.Duration("sync-interval", 30*time.Second, "sync file to disk every duration")
  46. fs.Duration("http-client-connect-timeout", 2*time.Second, "timeout for HTTP connect")
  47. fs.Duration("http-client-request-timeout", 5*time.Second, "timeout for HTTP request")
  48. nsqdTCPAddrs := app.StringArray{}
  49. lookupdHTTPAddrs := app.StringArray{}
  50. topics := app.StringArray{}
  51. consumerOpts := app.StringArray{}
  52. fs.Var(&nsqdTCPAddrs, "nsqd-tcp-address", "nsqd TCP address (may be given multiple times)")
  53. fs.Var(&lookupdHTTPAddrs, "lookupd-http-address", "lookupd HTTP address (may be given multiple times)")
  54. fs.Var(&topics, "topic", "nsq topic (may be given multiple times)")
  55. fs.Var(&consumerOpts, "consumer-opt", "option to passthrough to nsq.Consumer (may be given multiple times, http://godoc.org/github.com/nsqio/go-nsq#Config)")
  56. return fs
  57. }
  58. func main() {
  59. fs := flagSet()
  60. fs.Parse(os.Args[1:])
  61. if args := fs.Args(); len(args) > 0 {
  62. log.Fatalf("unknown arguments: %s", args)
  63. }
  64. opts := NewOptions()
  65. options.Resolve(opts, fs, nil)
  66. logger := log.New(os.Stderr, opts.LogPrefix, log.Ldate|log.Ltime|log.Lmicroseconds)
  67. logLevel, err := lg.ParseLogLevel(opts.LogLevel)
  68. if err != nil {
  69. log.Fatal("--log-level is invalid")
  70. }
  71. logf := func(lvl lg.LogLevel, f string, args ...interface{}) {
  72. lg.Logf(logger, logLevel, lvl, f, args...)
  73. }
  74. if fs.Lookup("version").Value.(flag.Getter).Get().(bool) {
  75. fmt.Printf("nsq_to_file v%s\n", version.Binary)
  76. return
  77. }
  78. if opts.Channel == "" {
  79. log.Fatal("--channel is required")
  80. }
  81. if opts.HTTPClientConnectTimeout <= 0 {
  82. log.Fatal("--http-client-connect-timeout should be positive")
  83. }
  84. if opts.HTTPClientRequestTimeout <= 0 {
  85. log.Fatal("--http-client-request-timeout should be positive")
  86. }
  87. if len(opts.NSQDTCPAddrs) == 0 && len(opts.NSQLookupdHTTPAddrs) == 0 {
  88. log.Fatal("--nsqd-tcp-address or --lookupd-http-address required.")
  89. }
  90. if len(opts.NSQDTCPAddrs) != 0 && len(opts.NSQLookupdHTTPAddrs) != 0 {
  91. log.Fatal("use --nsqd-tcp-address or --lookupd-http-address not both")
  92. }
  93. if opts.GZIPLevel < 1 || opts.GZIPLevel > 9 {
  94. log.Fatalf("invalid --gzip-level value (%d), should be 1-9", opts.GZIPLevel)
  95. }
  96. if len(opts.Topics) == 0 && len(opts.TopicPattern) == 0 {
  97. log.Fatal("--topic or --topic-pattern required")
  98. }
  99. if len(opts.Topics) == 0 && len(opts.NSQLookupdHTTPAddrs) == 0 {
  100. log.Fatal("--lookupd-http-address must be specified when no --topic specified")
  101. }
  102. if opts.WorkDir == "" {
  103. opts.WorkDir = opts.OutputDir
  104. }
  105. cfg := nsq.NewConfig()
  106. cfgFlag := nsq.ConfigFlag{cfg}
  107. for _, opt := range opts.ConsumerOpts {
  108. cfgFlag.Set(opt)
  109. }
  110. cfg.UserAgent = fmt.Sprintf("smq_to_file/%s go-smq/%s", version.Binary, nsq.VERSION)
  111. cfg.MaxInFlight = opts.MaxInFlight
  112. hupChan := make(chan os.Signal, 1)
  113. termChan := make(chan os.Signal, 1)
  114. signal.Notify(hupChan, syscall.SIGHUP)
  115. signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)
  116. discoverer := newTopicDiscoverer(logf, opts, cfg, hupChan, termChan)
  117. discoverer.run()
  118. }