123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124 |
- package main
- import (
- "bufio"
- "flag"
- "fmt"
- "log"
- "net"
- "runtime"
- "strings"
- "sync"
- "sync/atomic"
- "time"
- "github.com/nsqio/go-nsq"
- )
- var (
- runfor = flag.Duration("runfor", 10*time.Second, "duration of time to run")
- tcpAddress = flag.String("nsqd-tcp-address", "127.0.0.1:4150", "<addr>:<port> to connect to nsqd")
- size = flag.Int("size", 200, "size of messages")
- topic = flag.String("topic", "sub_bench", "topic to receive messages on")
- channel = flag.String("channel", "ch", "channel to receive messages on")
- deadline = flag.String("deadline", "", "deadline to start the benchmark run")
- rdy = flag.Int("rdy", 2500, "RDY count to use")
- )
- var totalMsgCount int64
- func main() {
- flag.Parse()
- var wg sync.WaitGroup
- log.SetPrefix("[bench_reader] ")
- goChan := make(chan int)
- rdyChan := make(chan int)
- workers := runtime.GOMAXPROCS(0)
- for j := 0; j < workers; j++ {
- wg.Add(1)
- go func(id int) {
- subWorker(*runfor, workers, *tcpAddress, *topic, *channel, rdyChan, goChan, id)
- wg.Done()
- }(j)
- <-rdyChan
- }
- if *deadline != "" {
- t, err := time.Parse("2006-01-02 15:04:05", *deadline)
- if err != nil {
- log.Fatal(err)
- }
- d := t.Sub(time.Now())
- log.Printf("sleeping until %s (%s)", t, d)
- time.Sleep(d)
- }
- start := time.Now()
- close(goChan)
- wg.Wait()
- end := time.Now()
- duration := end.Sub(start)
- tmc := atomic.LoadInt64(&totalMsgCount)
- log.Printf("duration: %s - %.03fmb/s - %.03fops/s - %.03fus/op",
- duration,
- float64(tmc*int64(*size))/duration.Seconds()/1024/1024,
- float64(tmc)/duration.Seconds(),
- float64(duration/time.Microsecond)/float64(tmc))
- }
- func subWorker(td time.Duration, workers int, tcpAddr string, topic string, channel string, rdyChan chan int, goChan chan int, id int) {
- conn, err := net.DialTimeout("tcp", tcpAddr, time.Second)
- if err != nil {
- panic(err.Error())
- }
- conn.Write(nsq.MagicV2)
- rw := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn))
- ci := make(map[string]interface{})
- ci["client_id"] = "reader"
- ci["hostname"] = "reader"
- ci["user_agent"] = fmt.Sprintf("bench_reader/%s", nsq.VERSION)
- cmd, _ := nsq.Identify(ci)
- cmd.WriteTo(rw)
- nsq.Subscribe(topic, channel).WriteTo(rw)
- rdyChan <- 1
- <-goChan
- nsq.Ready(*rdy).WriteTo(rw)
- rw.Flush()
- nsq.ReadResponse(rw)
- nsq.ReadResponse(rw)
- var msgCount int64
- go func() {
- time.Sleep(td)
- conn.Close()
- }()
- for {
- resp, err := nsq.ReadResponse(rw)
- if err != nil {
- if strings.Contains(err.Error(), "use of closed network connection") {
- break
- }
- panic(err.Error())
- }
- frameType, data, err := nsq.UnpackResponse(resp)
- if err != nil {
- panic(err.Error())
- }
- if frameType == nsq.FrameTypeError {
- panic(string(data))
- } else if frameType == nsq.FrameTypeResponse {
- continue
- }
- msg, err := nsq.DecodeMessage(data)
- if err != nil {
- panic(err.Error())
- }
- nsq.Finish(msg.ID).WriteTo(rw)
- msgCount++
- if float64(msgCount%int64(*rdy)) > float64(*rdy)*0.75 {
- rw.Flush()
- }
- }
- atomic.AddInt64(&totalMsgCount, msgCount)
- }
|