quantile.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. package quantile
  2. import (
  3. "strings"
  4. "sync"
  5. "time"
  6. "github.com/bmizerany/perks/quantile"
  7. "github.com/nsqio/nsq/internal/stringy"
  8. )
  9. type Result struct {
  10. Count int `json:"count"`
  11. Percentiles []map[string]float64 `json:"percentiles"`
  12. }
  13. func (r *Result) String() string {
  14. var s []string
  15. for _, item := range r.Percentiles {
  16. s = append(s, stringy.NanoSecondToHuman(item["value"]))
  17. }
  18. return strings.Join(s, ", ")
  19. }
  20. type Quantile struct {
  21. sync.Mutex
  22. streams [2]quantile.Stream
  23. currentIndex uint8
  24. lastMoveWindow time.Time
  25. currentStream *quantile.Stream
  26. Percentiles []float64
  27. MoveWindowTime time.Duration
  28. }
  29. func New(WindowTime time.Duration, Percentiles []float64) *Quantile {
  30. q := Quantile{
  31. currentIndex: 0,
  32. lastMoveWindow: time.Now(),
  33. MoveWindowTime: WindowTime / 2,
  34. Percentiles: Percentiles,
  35. }
  36. for i := 0; i < 2; i++ {
  37. q.streams[i] = *quantile.NewTargeted(Percentiles...)
  38. }
  39. q.currentStream = &q.streams[0]
  40. return &q
  41. }
  42. func (q *Quantile) Result() *Result {
  43. if q == nil {
  44. return &Result{}
  45. }
  46. queryHandler := q.QueryHandler()
  47. result := Result{
  48. Count: queryHandler.Count(),
  49. Percentiles: make([]map[string]float64, len(q.Percentiles)),
  50. }
  51. for i, p := range q.Percentiles {
  52. value := queryHandler.Query(p)
  53. result.Percentiles[i] = map[string]float64{"quantile": p, "value": value}
  54. }
  55. return &result
  56. }
  57. func (q *Quantile) Insert(msgStartTime int64) {
  58. q.Lock()
  59. now := time.Now()
  60. for q.IsDataStale(now) {
  61. q.moveWindow()
  62. }
  63. q.currentStream.Insert(float64(now.UnixNano() - msgStartTime))
  64. q.Unlock()
  65. }
  66. func (q *Quantile) QueryHandler() *quantile.Stream {
  67. q.Lock()
  68. now := time.Now()
  69. for q.IsDataStale(now) {
  70. q.moveWindow()
  71. }
  72. merged := quantile.NewTargeted(q.Percentiles...)
  73. merged.Merge(q.streams[0].Samples())
  74. merged.Merge(q.streams[1].Samples())
  75. q.Unlock()
  76. return merged
  77. }
  78. func (q *Quantile) IsDataStale(now time.Time) bool {
  79. return now.After(q.lastMoveWindow.Add(q.MoveWindowTime))
  80. }
  81. func (q *Quantile) Merge(them *Quantile) {
  82. q.Lock()
  83. them.Lock()
  84. iUs := q.currentIndex
  85. iThem := them.currentIndex
  86. q.streams[iUs].Merge(them.streams[iThem].Samples())
  87. iUs ^= 0x1
  88. iThem ^= 0x1
  89. q.streams[iUs].Merge(them.streams[iThem].Samples())
  90. if q.lastMoveWindow.Before(them.lastMoveWindow) {
  91. q.lastMoveWindow = them.lastMoveWindow
  92. }
  93. q.Unlock()
  94. them.Unlock()
  95. }
  96. func (q *Quantile) moveWindow() {
  97. q.currentIndex ^= 0x1
  98. q.currentStream = &q.streams[q.currentIndex]
  99. q.lastMoveWindow = q.lastMoveWindow.Add(q.MoveWindowTime)
  100. q.currentStream.Reset()
  101. }