bench_channels.go 1.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. package main
  2. import (
  3. "bufio"
  4. "flag"
  5. "fmt"
  6. "net"
  7. "sync"
  8. "time"
  9. "github.com/nsqio/go-nsq"
  10. )
  11. var (
  12. num = flag.Int("num", 10000, "num channels")
  13. tcpAddress = flag.String("nsqd-tcp-address", "127.0.0.1:4150", "<addr>:<port> to connect to nsqd")
  14. )
  15. func main() {
  16. flag.Parse()
  17. var wg sync.WaitGroup
  18. goChan := make(chan int)
  19. rdyChan := make(chan int)
  20. for j := 0; j < *num; j++ {
  21. wg.Add(1)
  22. go func(id int) {
  23. subWorker(*num, *tcpAddress, fmt.Sprintf("t%d", j), "ch", rdyChan, goChan, id)
  24. wg.Done()
  25. }(j)
  26. <-rdyChan
  27. time.Sleep(5 * time.Millisecond)
  28. }
  29. close(goChan)
  30. wg.Wait()
  31. }
  32. func subWorker(n int, tcpAddr string,
  33. topic string, channel string,
  34. rdyChan chan int, goChan chan int, id int) {
  35. conn, err := net.DialTimeout("tcp", tcpAddr, time.Second)
  36. if err != nil {
  37. panic(err.Error())
  38. }
  39. conn.Write(nsq.MagicV2)
  40. rw := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn))
  41. ci := make(map[string]interface{})
  42. ci["client_id"] = "test"
  43. cmd, _ := nsq.Identify(ci)
  44. cmd.WriteTo(rw)
  45. nsq.Subscribe(topic, channel).WriteTo(rw)
  46. rdyCount := 1
  47. rdy := rdyCount
  48. rdyChan <- 1
  49. <-goChan
  50. nsq.Ready(rdyCount).WriteTo(rw)
  51. rw.Flush()
  52. nsq.ReadResponse(rw)
  53. nsq.ReadResponse(rw)
  54. for {
  55. resp, err := nsq.ReadResponse(rw)
  56. if err != nil {
  57. panic(err.Error())
  58. }
  59. frameType, data, err := nsq.UnpackResponse(resp)
  60. if err != nil {
  61. panic(err.Error())
  62. }
  63. if frameType == nsq.FrameTypeError {
  64. panic(string(data))
  65. } else if frameType == nsq.FrameTypeResponse {
  66. nsq.Nop().WriteTo(rw)
  67. rw.Flush()
  68. continue
  69. }
  70. msg, err := nsq.DecodeMessage(data)
  71. if err != nil {
  72. panic(err.Error())
  73. }
  74. nsq.Finish(msg.ID).WriteTo(rw)
  75. rdy--
  76. if rdy == 0 {
  77. nsq.Ready(rdyCount).WriteTo(rw)
  78. rdy = rdyCount
  79. rw.Flush()
  80. }
  81. }
  82. }