12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394 |
- package util
- import (
- "sync"
- "time"
- )
- // Deduplicated (debounced) channel. To close the channel, call `close(dchan.In())`.
- type DChan[T comparable] struct {
- inChan chan T
- outChan chan T
- existingElems map[T]struct{}
- mtx sync.Mutex
- }
- func NewDChan[T comparable](capacity int) *DChan[T] {
- dchan := &DChan[T]{
- inChan: make(chan T, capacity),
- outChan: make(chan T),
- existingElems: make(map[T]struct{}),
- }
- go func() {
- defer close(dchan.outChan)
- for elem := range dchan.inChan {
- dchan.mtx.Lock()
- if _, ok := dchan.existingElems[elem]; !ok {
- // Send the element to the output channel.
- dchan.existingElems[elem] = struct{}{}
- dchan.mtx.Unlock()
- dchan.outChan <- elem
- } else {
- // Deduplicate the element.
- dchan.mtx.Unlock()
- }
- }
- }()
- return dchan
- }
- func (dc *DChan[T]) In() chan<- T {
- return dc.inChan
- }
- func (dc *DChan[T]) Out() <-chan T {
- return dc.outChan
- }
- // Marks an element as read (stops deduplication), so that it can be sent and received again.
- func (dc *DChan[T]) MarkElemReadDone(elem T) {
- dc.mtx.Lock()
- delete(dc.existingElems, elem)
- dc.mtx.Unlock()
- }
- // Checks if an element is in the channel.
- func (dc *DChan[T]) Contains(elem T) bool {
- dc.mtx.Lock()
- _, ok := dc.existingElems[elem]
- dc.mtx.Unlock()
- return ok
- }
- // Writes an element to the input channel.
- func (dc *DChan[T]) Write(elem T) {
- dc.inChan <- elem
- }
- // Queues an element to be written to the input channel after a delay. If already in the channel, the delay is ignored.
- func (dc *DChan[T]) DelayedWrite(elem T, delay time.Duration) {
- if delay <= 0 {
- dc.inChan <- elem
- return
- }
- // Check if the element is already in the channel.
- dc.mtx.Lock()
- if _, ok := dc.existingElems[elem]; ok {
- // Existing element, abort.
- dc.mtx.Unlock()
- return
- } else {
- // New element, queue it.
- dc.existingElems[elem] = struct{}{}
- dc.mtx.Unlock()
- }
- // Write the element after the delay.
- go func() {
- time.Sleep(delay)
- // HACK!
- dc.MarkElemReadDone(elem)
- dc.inChan <- elem
- }()
- }
|