123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120 |
- package quantile
- import (
- "strings"
- "sync"
- "time"
- "github.com/bmizerany/perks/quantile"
- "github.com/nsqio/nsq/internal/stringy"
- )
- type Result struct {
- Count int `json:"count"`
- Percentiles []map[string]float64 `json:"percentiles"`
- }
- func (r *Result) String() string {
- var s []string
- for _, item := range r.Percentiles {
- s = append(s, stringy.NanoSecondToHuman(item["value"]))
- }
- return strings.Join(s, ", ")
- }
- type Quantile struct {
- sync.Mutex
- streams [2]quantile.Stream
- currentIndex uint8
- lastMoveWindow time.Time
- currentStream *quantile.Stream
- Percentiles []float64
- MoveWindowTime time.Duration
- }
- func New(WindowTime time.Duration, Percentiles []float64) *Quantile {
- q := Quantile{
- currentIndex: 0,
- lastMoveWindow: time.Now(),
- MoveWindowTime: WindowTime / 2,
- Percentiles: Percentiles,
- }
- for i := 0; i < 2; i++ {
- q.streams[i] = *quantile.NewTargeted(Percentiles...)
- }
- q.currentStream = &q.streams[0]
- return &q
- }
- func (q *Quantile) Result() *Result {
- if q == nil {
- return &Result{}
- }
- queryHandler := q.QueryHandler()
- result := Result{
- Count: queryHandler.Count(),
- Percentiles: make([]map[string]float64, len(q.Percentiles)),
- }
- for i, p := range q.Percentiles {
- value := queryHandler.Query(p)
- result.Percentiles[i] = map[string]float64{"quantile": p, "value": value}
- }
- return &result
- }
- func (q *Quantile) Insert(msgStartTime int64) {
- q.Lock()
- now := time.Now()
- for q.IsDataStale(now) {
- q.moveWindow()
- }
- q.currentStream.Insert(float64(now.UnixNano() - msgStartTime))
- q.Unlock()
- }
- func (q *Quantile) QueryHandler() *quantile.Stream {
- q.Lock()
- now := time.Now()
- for q.IsDataStale(now) {
- q.moveWindow()
- }
- merged := quantile.NewTargeted(q.Percentiles...)
- merged.Merge(q.streams[0].Samples())
- merged.Merge(q.streams[1].Samples())
- q.Unlock()
- return merged
- }
- func (q *Quantile) IsDataStale(now time.Time) bool {
- return now.After(q.lastMoveWindow.Add(q.MoveWindowTime))
- }
- func (q *Quantile) Merge(them *Quantile) {
- q.Lock()
- them.Lock()
- iUs := q.currentIndex
- iThem := them.currentIndex
- q.streams[iUs].Merge(them.streams[iThem].Samples())
- iUs ^= 0x1
- iThem ^= 0x1
- q.streams[iUs].Merge(them.streams[iThem].Samples())
- if q.lastMoveWindow.Before(them.lastMoveWindow) {
- q.lastMoveWindow = them.lastMoveWindow
- }
- q.Unlock()
- them.Unlock()
- }
- func (q *Quantile) moveWindow() {
- q.currentIndex ^= 0x1
- q.currentStream = &q.streams[q.currentIndex]
- q.lastMoveWindow = q.lastMoveWindow.Add(q.MoveWindowTime)
- q.currentStream.Reset()
- }
|