123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146 |
- // This is a client that writes out to a file, and optionally rolls the file
- package main
- import (
- "flag"
- "fmt"
- "log"
- "os"
- "os/signal"
- "syscall"
- "time"
- "github.com/mreiferson/go-options"
- "github.com/nsqio/go-nsq"
- "github.com/nsqio/nsq/internal/app"
- "github.com/nsqio/nsq/internal/lg"
- "github.com/nsqio/nsq/internal/version"
- )
- func hasArg(s string) bool {
- argExist := false
- flag.Visit(func(f *flag.Flag) {
- if f.Name == s {
- argExist = true
- }
- })
- return argExist
- }
- func flagSet() *flag.FlagSet {
- fs := flag.NewFlagSet("nsqd", flag.ExitOnError)
- fs.Bool("version", false, "print version string")
- fs.String("log-level", "info", "set log verbosity: debug, info, warn, error, or fatal")
- fs.String("log-prefix", "[nsq_to_file] ", "log message prefix")
- fs.String("channel", "nsq_to_file", "nsq channel")
- fs.Int("max-in-flight", 200, "max number of messages to allow in flight")
- fs.String("output-dir", "/tmp", "directory to write output files to")
- fs.String("work-dir", "", "directory for in-progress files before moving to output-dir")
- fs.String("datetime-format", "%Y-%m-%d_%H", "strftime compatible format for <DATETIME> in filename format")
- fs.String("filename-format", "<TOPIC>.<HOST><REV>.<DATETIME>.log", "output filename format (<TOPIC>, <HOST>, <PID>, <DATETIME>, <REV> are replaced. <REV> is increased when file already exists)")
- fs.String("host-identifier", "", "value to output in log filename in place of hostname. <SHORT_HOST> and <HOSTNAME> are valid replacement tokens")
- fs.Int("gzip-level", 6, "gzip compression level (1-9, 1=BestSpeed, 9=BestCompression)")
- fs.Bool("gzip", false, "gzip output files.")
- fs.Bool("skip-empty-files", false, "skip writing empty files")
- fs.Duration("topic-refresh", time.Minute, "how frequently the topic list should be refreshed")
- fs.String("topic-pattern", "", "only log topics matching the following pattern")
- fs.Int64("rotate-size", 0, "rotate the file when it grows bigger than `rotate-size` bytes")
- fs.Duration("rotate-interval", 0, "rotate the file every duration")
- fs.Duration("sync-interval", 30*time.Second, "sync file to disk every duration")
- fs.Duration("http-client-connect-timeout", 2*time.Second, "timeout for HTTP connect")
- fs.Duration("http-client-request-timeout", 5*time.Second, "timeout for HTTP request")
- nsqdTCPAddrs := app.StringArray{}
- lookupdHTTPAddrs := app.StringArray{}
- topics := app.StringArray{}
- consumerOpts := app.StringArray{}
- fs.Var(&nsqdTCPAddrs, "nsqd-tcp-address", "nsqd TCP address (may be given multiple times)")
- fs.Var(&lookupdHTTPAddrs, "lookupd-http-address", "lookupd HTTP address (may be given multiple times)")
- fs.Var(&topics, "topic", "nsq topic (may be given multiple times)")
- fs.Var(&consumerOpts, "consumer-opt", "option to passthrough to nsq.Consumer (may be given multiple times, http://godoc.org/github.com/nsqio/go-nsq#Config)")
- return fs
- }
- func main() {
- fs := flagSet()
- fs.Parse(os.Args[1:])
- if args := fs.Args(); len(args) > 0 {
- log.Fatalf("unknown arguments: %s", args)
- }
- opts := NewOptions()
- options.Resolve(opts, fs, nil)
- logger := log.New(os.Stderr, opts.LogPrefix, log.Ldate|log.Ltime|log.Lmicroseconds)
- logLevel, err := lg.ParseLogLevel(opts.LogLevel)
- if err != nil {
- log.Fatal("--log-level is invalid")
- }
- logf := func(lvl lg.LogLevel, f string, args ...interface{}) {
- lg.Logf(logger, logLevel, lvl, f, args...)
- }
- if fs.Lookup("version").Value.(flag.Getter).Get().(bool) {
- fmt.Printf("nsq_to_file v%s\n", version.Binary)
- return
- }
- if opts.Channel == "" {
- log.Fatal("--channel is required")
- }
- if opts.HTTPClientConnectTimeout <= 0 {
- log.Fatal("--http-client-connect-timeout should be positive")
- }
- if opts.HTTPClientRequestTimeout <= 0 {
- log.Fatal("--http-client-request-timeout should be positive")
- }
- if len(opts.NSQDTCPAddrs) == 0 && len(opts.NSQLookupdHTTPAddrs) == 0 {
- log.Fatal("--nsqd-tcp-address or --lookupd-http-address required.")
- }
- if len(opts.NSQDTCPAddrs) != 0 && len(opts.NSQLookupdHTTPAddrs) != 0 {
- log.Fatal("use --nsqd-tcp-address or --lookupd-http-address not both")
- }
- if opts.GZIPLevel < 1 || opts.GZIPLevel > 9 {
- log.Fatalf("invalid --gzip-level value (%d), should be 1-9", opts.GZIPLevel)
- }
- if len(opts.Topics) == 0 && len(opts.TopicPattern) == 0 {
- log.Fatal("--topic or --topic-pattern required")
- }
- if len(opts.Topics) == 0 && len(opts.NSQLookupdHTTPAddrs) == 0 {
- log.Fatal("--lookupd-http-address must be specified when no --topic specified")
- }
- if opts.WorkDir == "" {
- opts.WorkDir = opts.OutputDir
- }
- cfg := nsq.NewConfig()
- cfgFlag := nsq.ConfigFlag{cfg}
- for _, opt := range opts.ConsumerOpts {
- cfgFlag.Set(opt)
- }
- cfg.UserAgent = fmt.Sprintf("smq_to_file/%s go-smq/%s", version.Binary, nsq.VERSION)
- cfg.MaxInFlight = opts.MaxInFlight
- hupChan := make(chan os.Signal, 1)
- termChan := make(chan os.Signal, 1)
- signal.Notify(hupChan, syscall.SIGHUP)
- signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)
- discoverer := newTopicDiscoverer(logf, opts, cfg, hupChan, termChan)
- discoverer.run()
- }
|