dchan.go 2.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. package util
  2. import (
  3. "sync"
  4. "time"
  5. )
  6. // Deduplicated (debounced) channel. To close the channel, call `close(dchan.In())`.
  7. type DChan[T comparable] struct {
  8. inChan chan T
  9. outChan chan T
  10. existingElems map[T]struct{}
  11. mtx sync.Mutex
  12. }
  13. func NewDChan[T comparable](capacity int) *DChan[T] {
  14. dchan := &DChan[T]{
  15. inChan: make(chan T, capacity),
  16. outChan: make(chan T),
  17. existingElems: make(map[T]struct{}),
  18. }
  19. go func() {
  20. defer close(dchan.outChan)
  21. for elem := range dchan.inChan {
  22. dchan.mtx.Lock()
  23. if _, ok := dchan.existingElems[elem]; !ok {
  24. // Send the element to the output channel.
  25. dchan.existingElems[elem] = struct{}{}
  26. dchan.mtx.Unlock()
  27. dchan.outChan <- elem
  28. } else {
  29. // Deduplicate the element.
  30. dchan.mtx.Unlock()
  31. }
  32. }
  33. }()
  34. return dchan
  35. }
  36. func (dc *DChan[T]) In() chan<- T {
  37. return dc.inChan
  38. }
  39. func (dc *DChan[T]) Out() <-chan T {
  40. return dc.outChan
  41. }
  42. // Marks an element as read (stops deduplication), so that it can be sent and received again.
  43. func (dc *DChan[T]) MarkElemReadDone(elem T) {
  44. dc.mtx.Lock()
  45. delete(dc.existingElems, elem)
  46. dc.mtx.Unlock()
  47. }
  48. // Checks if an element is in the channel.
  49. func (dc *DChan[T]) Contains(elem T) bool {
  50. dc.mtx.Lock()
  51. _, ok := dc.existingElems[elem]
  52. dc.mtx.Unlock()
  53. return ok
  54. }
  55. // Writes an element to the input channel.
  56. func (dc *DChan[T]) Write(elem T) {
  57. dc.inChan <- elem
  58. }
  59. // Queues an element to be written to the input channel after a delay. If already in the channel, the delay is ignored.
  60. func (dc *DChan[T]) DelayedWrite(elem T, delay time.Duration) {
  61. if delay <= 0 {
  62. dc.inChan <- elem
  63. return
  64. }
  65. // Check if the element is already in the channel.
  66. dc.mtx.Lock()
  67. if _, ok := dc.existingElems[elem]; ok {
  68. // Existing element, abort.
  69. dc.mtx.Unlock()
  70. return
  71. } else {
  72. // New element, queue it.
  73. dc.existingElems[elem] = struct{}{}
  74. dc.mtx.Unlock()
  75. }
  76. // Write the element after the delay.
  77. go func() {
  78. time.Sleep(delay)
  79. // HACK!
  80. dc.MarkElemReadDone(elem)
  81. dc.inChan <- elem
  82. }()
  83. }