Skip to content

Commit 858719e

Browse files
authored
Change events/batcher to use events/queue as backend. (dapr#82)
* events/batcher: use events/queue as queue backend Signed-off-by: joshvanl <[email protected]> * Make events/queue/queue key type comparable Signed-off-by: joshvanl <[email protected]> * Explicitly define NewProcessor generic type in test Signed-off-by: joshvanl <[email protected]> --------- Signed-off-by: joshvanl <[email protected]>
1 parent c24d1d2 commit 858719e

File tree

8 files changed

+95
-113
lines changed

8 files changed

+95
-113
lines changed

events/batcher/batcher.go

Lines changed: 44 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -19,41 +19,42 @@ import (
1919
"time"
2020

2121
"k8s.io/utils/clock"
22-
)
2322

24-
// key is the type of the comparable key used to batch events.
25-
type key interface {
26-
comparable
27-
}
23+
"github.com/dapr/kit/events/queue"
24+
)
2825

2926
// Batcher is a one to many event batcher. It batches events and sends them to
3027
// the added event channel subscribers. Events are sent to the channels after
3128
// the interval has elapsed. If events with the same key are received within
3229
// the interval, the timer is reset.
33-
type Batcher[T key] struct {
30+
type Batcher[T comparable] struct {
3431
interval time.Duration
35-
actives map[T]clock.Timer
3632
eventChs []chan<- struct{}
33+
queue *queue.Processor[T, *item[T]]
3734

38-
clock clock.WithDelayedExecution
35+
clock clock.Clock
3936
lock sync.Mutex
4037
wg sync.WaitGroup
4138
closeCh chan struct{}
4239
closed atomic.Bool
4340
}
4441

4542
// New creates a new Batcher with the given interval and key type.
46-
func New[T key](interval time.Duration) *Batcher[T] {
47-
return &Batcher[T]{
43+
func New[T comparable](interval time.Duration) *Batcher[T] {
44+
b := &Batcher[T]{
4845
interval: interval,
49-
actives: make(map[T]clock.Timer),
5046
clock: clock.RealClock{},
5147
closeCh: make(chan struct{}),
5248
}
49+
50+
b.queue = queue.NewProcessor[T, *item[T]](b.execute)
51+
52+
return b
5353
}
5454

5555
// WithClock sets the clock used by the batcher. Used for testing.
56-
func (b *Batcher[T]) WithClock(clock clock.WithDelayedExecution) {
56+
func (b *Batcher[T]) WithClock(clock clock.Clock) {
57+
b.queue.WithClock(clock)
5758
b.clock = clock
5859
}
5960

@@ -68,63 +69,56 @@ func (b *Batcher[T]) Subscribe(eventCh ...chan<- struct{}) {
6869
b.eventChs = append(b.eventChs, eventCh...)
6970
}
7071

71-
// Batch adds the given key to the batcher. If an event for this key is already
72-
// active, the timer is reset. If the batcher is closed, the key is silently
73-
// dropped.
74-
func (b *Batcher[T]) Batch(key T) {
72+
func (b *Batcher[T]) execute(_ *item[T]) {
7573
b.lock.Lock()
7674
defer b.lock.Unlock()
77-
7875
if b.closed.Load() {
7976
return
8077
}
81-
82-
if active, ok := b.actives[key]; ok {
83-
if !active.Stop() {
84-
<-active.C()
85-
}
86-
active.Reset(b.interval)
87-
return
78+
b.wg.Add(len(b.eventChs))
79+
for _, eventCh := range b.eventChs {
80+
go func(eventCh chan<- struct{}) {
81+
defer b.wg.Done()
82+
select {
83+
case eventCh <- struct{}{}:
84+
case <-b.closeCh:
85+
}
86+
}(eventCh)
8887
}
88+
}
8989

90-
b.actives[key] = b.clock.AfterFunc(b.interval, func() {
91-
b.lock.Lock()
92-
defer b.lock.Unlock()
93-
94-
b.wg.Add(len(b.eventChs))
95-
delete(b.actives, key)
96-
for _, eventCh := range b.eventChs {
97-
go func(eventCh chan<- struct{}) {
98-
defer b.wg.Done()
99-
select {
100-
case eventCh <- struct{}{}:
101-
case <-b.closeCh:
102-
}
103-
}(eventCh)
104-
}
90+
// Batch adds the given key to the batcher. If an event for this key is already
91+
// active, the timer is reset. If the batcher is closed, the key is silently
92+
// dropped.
93+
func (b *Batcher[T]) Batch(key T) {
94+
b.queue.Enqueue(&item[T]{
95+
key: key,
96+
ttl: b.clock.Now().Add(b.interval),
10597
})
10698
}
10799

108100
// Close closes the batcher. It blocks until all events have been sent to the
109101
// subscribers. The batcher will be a no-op after this call.
110102
func (b *Batcher[T]) Close() {
111103
defer b.wg.Wait()
112-
113-
// Lock to ensure that no new timers are created.
114104
b.lock.Lock()
115105
if b.closed.CompareAndSwap(false, true) {
116106
close(b.closeCh)
117107
}
118-
actives := b.actives
119108
b.lock.Unlock()
109+
b.queue.Close()
110+
}
120111

121-
for _, active := range actives {
122-
if !active.Stop() {
123-
<-active.C()
124-
}
125-
}
112+
// item implements queue.queueable.
113+
type item[T comparable] struct {
114+
key T
115+
ttl time.Time
116+
}
126117

127-
b.lock.Lock()
128-
b.actives = nil
129-
b.lock.Unlock()
118+
func (b *item[T]) Key() T {
119+
return b.key
120+
}
121+
122+
func (b *item[T]) ScheduledTime() time.Time {
123+
return b.ttl
130124
}

events/batcher/batcher_test.go

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ func TestBatch(t *testing.T) {
5151

5252
fakeClock := testingclock.NewFakeClock(time.Now())
5353
b := New[string](time.Millisecond * 10)
54-
b.clock = fakeClock
54+
b.WithClock(fakeClock)
5555
ch1 := make(chan struct{})
5656
ch2 := make(chan struct{})
5757
ch3 := make(chan struct{})
@@ -67,8 +67,6 @@ func TestBatch(t *testing.T) {
6767
b.Batch("key3")
6868
b.Batch("key3")
6969

70-
assert.Len(t, b.actives, 3)
71-
7270
assert.Eventually(t, func() bool {
7371
return fakeClock.HasWaiters()
7472
}, time.Second*5, time.Millisecond*100)
@@ -112,19 +110,8 @@ func TestClose(t *testing.T) {
112110
b.Subscribe(ch)
113111
assert.Len(t, b.eventChs, 1)
114112
b.Batch("key1")
115-
assert.Len(t, b.actives, 1)
116113
b.Close()
117114
assert.True(t, b.closed.Load())
118-
assert.Empty(t, b.actives)
119-
}
120-
121-
func TestBatchAfterClose(t *testing.T) {
122-
t.Parallel()
123-
124-
b := New[string](time.Millisecond * 10)
125-
b.Close()
126-
b.Batch("key1")
127-
assert.Empty(t, b.actives)
128115
}
129116

130117
func TestSubscribeAfterClose(t *testing.T) {

events/queue/eventqueue_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ func ExampleProcessor() {
4343
}
4444

4545
// Create the processor
46-
processor := NewProcessor[*queueableItem](executeFn)
46+
processor := NewProcessor[string, *queueableItem](executeFn)
4747

4848
// Add items to the processor, in any order, using Enqueue
4949
processor.Enqueue(&queueableItem{Name: "item1", ExecutionTime: time.Now().Add(500 * time.Millisecond)})

events/queue/processor.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,9 @@ import (
2626
var ErrProcessorStopped = errors.New("processor is stopped")
2727

2828
// Processor manages the queue of items and processes them at the correct time.
29-
type Processor[T queueable] struct {
29+
type Processor[K comparable, T queueable[K]] struct {
3030
executeFn func(r T)
31-
queue queue[T]
31+
queue queue[K, T]
3232
clock kclock.Clock
3333
lock sync.Mutex
3434
wg sync.WaitGroup
@@ -40,10 +40,10 @@ type Processor[T queueable] struct {
4040

4141
// NewProcessor returns a new Processor object.
4242
// executeFn is the callback invoked when the item is to be executed; this will be invoked in a background goroutine.
43-
func NewProcessor[T queueable](executeFn func(r T)) *Processor[T] {
44-
return &Processor[T]{
43+
func NewProcessor[K comparable, T queueable[K]](executeFn func(r T)) *Processor[K, T] {
44+
return &Processor[K, T]{
4545
executeFn: executeFn,
46-
queue: newQueue[T](),
46+
queue: newQueue[K, T](),
4747
processorRunningCh: make(chan struct{}, 1),
4848
stopCh: make(chan struct{}),
4949
resetCh: make(chan struct{}, 1),
@@ -52,14 +52,14 @@ func NewProcessor[T queueable](executeFn func(r T)) *Processor[T] {
5252
}
5353

5454
// WithClock sets the clock used by the processor. Used for testing.
55-
func (p *Processor[T]) WithClock(clock kclock.Clock) *Processor[T] {
55+
func (p *Processor[K, T]) WithClock(clock kclock.Clock) *Processor[K, T] {
5656
p.clock = clock
5757
return p
5858
}
5959

6060
// Enqueue adds a new item to the queue.
6161
// If a item with the same ID already exists, it'll be replaced.
62-
func (p *Processor[T]) Enqueue(r T) error {
62+
func (p *Processor[K, T]) Enqueue(r T) error {
6363
if p.stopped.Load() {
6464
return ErrProcessorStopped
6565
}
@@ -79,7 +79,7 @@ func (p *Processor[T]) Enqueue(r T) error {
7979
}
8080

8181
// Dequeue removes a item from the queue.
82-
func (p *Processor[T]) Dequeue(key string) error {
82+
func (p *Processor[K, T]) Dequeue(key K) error {
8383
if p.stopped.Load() {
8484
return ErrProcessorStopped
8585
}
@@ -99,7 +99,7 @@ func (p *Processor[T]) Dequeue(key string) error {
9999

100100
// Close stops the processor.
101101
// This method blocks until the processor loop returns.
102-
func (p *Processor[T]) Close() error {
102+
func (p *Processor[K, T]) Close() error {
103103
defer p.wg.Wait()
104104
if p.stopped.CompareAndSwap(false, true) {
105105
// Send a signal to stop
@@ -114,7 +114,7 @@ func (p *Processor[T]) Close() error {
114114

115115
// Start the processing loop if it's not already running.
116116
// This must be invoked while the caller has a lock.
117-
func (p *Processor[T]) process(isNext bool) {
117+
func (p *Processor[K, T]) process(isNext bool) {
118118
// Do not start a loop if it's already running
119119
select {
120120
case p.processorRunningCh <- struct{}{}:
@@ -140,7 +140,7 @@ func (p *Processor[T]) process(isNext bool) {
140140
}
141141

142142
// Processing loop.
143-
func (p *Processor[T]) processLoop() {
143+
func (p *Processor[K, T]) processLoop() {
144144
defer func() {
145145
// Release the channel when exiting
146146
<-p.processorRunningCh
@@ -209,7 +209,7 @@ func (p *Processor[T]) processLoop() {
209209
}
210210

211211
// Executes a item when it's time.
212-
func (p *Processor[T]) execute(r T) {
212+
func (p *Processor[K, T]) execute(r T) {
213213
// Pop the item now that we're ready to process it
214214
// There's a small chance this is a different item than the one we peeked before
215215
p.lock.Lock()

events/queue/processor_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ func TestProcessor(t *testing.T) {
3131
// Create the processor
3232
clock := clocktesting.NewFakeClock(time.Now())
3333
executeCh := make(chan *queueableItem)
34-
processor := NewProcessor(func(r *queueableItem) {
34+
processor := NewProcessor[string](func(r *queueableItem) {
3535
executeCh <- r
3636
})
3737
processor.clock = clock
@@ -364,7 +364,7 @@ func TestClose(t *testing.T) {
364364
// Create the processor
365365
clock := clocktesting.NewFakeClock(time.Now())
366366
executeCh := make(chan *queueableItem)
367-
processor := NewProcessor(func(r *queueableItem) {
367+
processor := NewProcessor[string](func(r *queueableItem) {
368368
executeCh <- r
369369
})
370370
processor.clock = clock

0 commit comments

Comments
 (0)