package util import ( "sync" "time" ) // Deduplicated (debounced) channel. To close the channel, call `close(dchan.In())`. type DChan[T comparable] struct { inChan chan T outChan chan T existingElems map[T]struct{} mtx sync.Mutex } func NewDChan[T comparable](capacity int) *DChan[T] { dchan := &DChan[T]{ inChan: make(chan T, capacity), outChan: make(chan T), existingElems: make(map[T]struct{}), } go func() { defer close(dchan.outChan) for elem := range dchan.inChan { dchan.mtx.Lock() if _, ok := dchan.existingElems[elem]; !ok { // Send the element to the output channel. dchan.existingElems[elem] = struct{}{} dchan.mtx.Unlock() dchan.outChan <- elem } else { // Deduplicate the element. dchan.mtx.Unlock() } } }() return dchan } func (dc *DChan[T]) In() chan<- T { return dc.inChan } func (dc *DChan[T]) Out() <-chan T { return dc.outChan } // Marks an element as read (stops deduplication), so that it can be sent and received again. func (dc *DChan[T]) MarkElemReadDone(elem T) { dc.mtx.Lock() delete(dc.existingElems, elem) dc.mtx.Unlock() } // Checks if an element is in the channel. func (dc *DChan[T]) Contains(elem T) bool { dc.mtx.Lock() _, ok := dc.existingElems[elem] dc.mtx.Unlock() return ok } // Writes an element to the input channel. func (dc *DChan[T]) Write(elem T) { dc.inChan <- elem } // Queues an element to be written to the input channel after a delay. If already in the channel, the delay is ignored. func (dc *DChan[T]) DelayedWrite(elem T, delay time.Duration) { if delay <= 0 { dc.inChan <- elem return } // Check if the element is already in the channel. dc.mtx.Lock() if _, ok := dc.existingElems[elem]; ok { // Existing element, abort. dc.mtx.Unlock() return } else { // New element, queue it. dc.existingElems[elem] = struct{}{} dc.mtx.Unlock() } // Write the element after the delay. go func() { time.Sleep(delay) // HACK! dc.MarkElemReadDone(elem) dc.inChan <- elem }() }