Skip to content

Commit 4f434eb

Browse files
Adds events/ratelimiting/coalescing (dapr#64)
* Adds events/ratelimiting/coalecing events/ratelimiting is a new utility package to provides helpers which can be used to rate limit events. Coalecing is a new helper that will exponentially rate limit events. It will coalesce events into a single event if they occur within the same rate limiting window. Coalesce also has the option to forcibly fire an event when the number of events reaches a certain threshold. Added to prevent events from never being fired in a high throughput scenario. Signed-off-by: joshvanl <[email protected]> Signed-off-by: Josh van Leeuwen <[email protected]> Signed-off-by: Alessandro (Ale) Segala <[email protected]> Co-authored-by: Alessandro (Ale) Segala <[email protected]>
1 parent 55bfe3b commit 4f434eb

File tree

5 files changed

+671
-1
lines changed

5 files changed

+671
-1
lines changed

.golangci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ run:
1313
tests: true
1414

1515
# list of build tags, all linters use it. Default is empty list.
16-
uild-tags:
16+
build-tags:
1717
- unit
1818

1919
# which dirs to skip: they won't be analyzed;

events/ratelimiting/coalescing.go

Lines changed: 251 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,251 @@
1+
/*
2+
Copyright 2023 The Dapr Authors
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package ratelimiting
15+
16+
import (
17+
"context"
18+
"errors"
19+
"sync"
20+
"sync/atomic"
21+
"time"
22+
23+
"k8s.io/utils/clock"
24+
)
25+
26+
// OptionsCoalescing configures a Coalescing RateLimiter.
27+
type OptionsCoalescing struct {
28+
// InitialDelay is the initial delay for the rate limiter. The rate limiter
29+
// will not delay events less than the initial delay.
30+
// Defaults to 500ms.
31+
InitialDelay *time.Duration
32+
33+
// MaxDelay is the maximum delay for the rate limiter. The rate limiter will
34+
// not delay events longer than the max delay.
35+
// Defaults to 5s.
36+
MaxDelay *time.Duration
37+
38+
// MaxPendingEvents is the maximum number of events that can pending on a
39+
// rate limiter, before it fires an event anyway. Useful to prevent a rate
40+
// limiter never firing events in a high throughput scenario.
41+
// Defaults to unlimited.
42+
MaxPendingEvents *int
43+
}
44+
45+
// coalescing is a rate limiter that rate limits events. It coalesces events
46+
// that occur within a rate limiting window.
47+
type coalescing struct {
48+
initialDelay time.Duration
49+
maxDelay time.Duration
50+
maxPendingEvents *int
51+
52+
pendingEvents int
53+
timer clock.Timer
54+
hasTimer atomic.Bool
55+
inputCh chan struct{}
56+
currentDur time.Duration
57+
backoffFactor int
58+
59+
wg sync.WaitGroup
60+
lock sync.RWMutex
61+
clock clock.WithTicker
62+
running atomic.Bool
63+
closeCh chan struct{}
64+
closed atomic.Bool
65+
}
66+
67+
func NewCoalescing(opts OptionsCoalescing) (RateLimiter, error) {
68+
initialDelay := time.Millisecond * 500
69+
if opts.InitialDelay != nil {
70+
initialDelay = *opts.InitialDelay
71+
}
72+
if initialDelay <= 0 {
73+
return nil, errors.New("initial delay must be > 0")
74+
}
75+
76+
maxDelay := time.Second * 5
77+
if opts.MaxDelay != nil {
78+
maxDelay = *opts.MaxDelay
79+
}
80+
if maxDelay <= 0 {
81+
return nil, errors.New("max delay must be > 0")
82+
}
83+
84+
if maxDelay < initialDelay {
85+
return nil, errors.New("max delay must be >= base delay")
86+
}
87+
88+
if opts.MaxPendingEvents != nil && *opts.MaxPendingEvents <= 0 {
89+
return nil, errors.New("max pending events must be > 0")
90+
}
91+
92+
return &coalescing{
93+
initialDelay: initialDelay,
94+
maxDelay: maxDelay,
95+
maxPendingEvents: opts.MaxPendingEvents,
96+
currentDur: initialDelay,
97+
backoffFactor: 1,
98+
inputCh: make(chan struct{}),
99+
closeCh: make(chan struct{}),
100+
clock: clock.RealClock{},
101+
}, nil
102+
}
103+
104+
// Run runs the rate limiter. It will begin rate limiting events after the
105+
// first event is received.
106+
func (c *coalescing) Run(ctx context.Context, ch chan<- struct{}) error {
107+
if !c.running.CompareAndSwap(false, true) {
108+
return errors.New("already running")
109+
}
110+
111+
// Prevent wg race condition on Close and Run.
112+
c.lock.Lock()
113+
c.wg.Add(1)
114+
c.lock.Unlock()
115+
defer c.wg.Done()
116+
117+
ctx, cancel := context.WithCancel(ctx)
118+
defer cancel()
119+
120+
for {
121+
// If the timer doesn't exist yet, we're waiting for the first event (which
122+
// will fire immediately when received).
123+
var timerCh <-chan time.Time
124+
c.lock.RLock()
125+
if c.hasTimer.Load() {
126+
timerCh = c.timer.C()
127+
}
128+
c.lock.RUnlock()
129+
130+
select {
131+
case <-ctx.Done():
132+
return nil
133+
case <-c.closeCh:
134+
cancel()
135+
return nil
136+
137+
case <-c.inputCh:
138+
c.handleInputCh(ctx, ch)
139+
140+
case <-timerCh:
141+
c.handleTimerFired(ctx, ch)
142+
}
143+
}
144+
}
145+
146+
func (c *coalescing) handleInputCh(ctx context.Context, ch chan<- struct{}) {
147+
c.lock.Lock()
148+
defer c.lock.Unlock()
149+
150+
switch {
151+
case !c.hasTimer.Load():
152+
// We don't have a timer yet, so this is the first event that has fired. We
153+
// fire the event immediately, and set the timer to fire again after the
154+
// initial delay.
155+
c.timer = c.clock.NewTimer(c.initialDelay)
156+
c.hasTimer.Store(true)
157+
c.fireEvent(ctx, ch)
158+
159+
default:
160+
// If maxPendingEvents is set and we have reached it then fire the event
161+
// immediately.
162+
if c.maxPendingEvents != nil && c.pendingEvents >= *c.maxPendingEvents {
163+
c.fireEvent(ctx, ch)
164+
return
165+
}
166+
167+
if !c.timer.Stop() {
168+
<-c.timer.C()
169+
}
170+
171+
// Setup backoff. Backoff is exponential. If initial is 500ms and max is
172+
// 5s, the backoff will follow:
173+
// 500ms, 1s, 2s, 4s, 5s, 5s, 5s, ...
174+
if c.currentDur < c.maxDelay {
175+
c.backoffFactor *= 2
176+
c.currentDur = time.Duration(float64(c.initialDelay) * float64(c.backoffFactor))
177+
if c.currentDur > c.maxDelay {
178+
c.currentDur = c.maxDelay
179+
}
180+
}
181+
182+
c.timer.Reset(c.currentDur)
183+
}
184+
}
185+
186+
func (c *coalescing) handleTimerFired(ctx context.Context, ch chan<- struct{}) {
187+
c.lock.Lock()
188+
defer c.lock.Unlock()
189+
c.fireEvent(ctx, ch)
190+
c.reset()
191+
}
192+
193+
func (c *coalescing) fireEvent(ctx context.Context, ch chan<- struct{}) {
194+
// Important to only send on the channel if there are pending events,
195+
// otherwise we will double send an event, for example if only a single event
196+
// was sent and then the rate limiting window expired with no new events.
197+
if c.pendingEvents > 0 {
198+
c.pendingEvents = 0
199+
c.wg.Add(1)
200+
go func() {
201+
defer c.wg.Done()
202+
select {
203+
case ch <- struct{}{}:
204+
case <-ctx.Done():
205+
}
206+
}()
207+
}
208+
}
209+
210+
func (c *coalescing) reset() {
211+
if !c.timer.Stop() {
212+
select {
213+
case <-c.timer.C():
214+
default:
215+
}
216+
}
217+
218+
c.pendingEvents = 0
219+
c.currentDur = c.initialDelay
220+
c.backoffFactor = 1
221+
c.hasTimer.Store(false)
222+
c.timer = nil
223+
}
224+
225+
func (c *coalescing) Add() {
226+
c.lock.Lock()
227+
defer c.lock.Unlock()
228+
c.pendingEvents++
229+
c.wg.Add(1)
230+
go func() {
231+
defer c.wg.Done()
232+
select {
233+
case c.inputCh <- struct{}{}:
234+
case <-c.closeCh:
235+
}
236+
}()
237+
}
238+
239+
func (c *coalescing) Close() {
240+
defer func() {
241+
// Prevent wg race condition on Close and Run.
242+
c.lock.Lock()
243+
c.wg.Wait()
244+
c.lock.Unlock()
245+
}()
246+
if c.closed.CompareAndSwap(false, true) {
247+
close(c.closeCh)
248+
}
249+
}
250+
251+
var _ RateLimiter = (*coalescing)(nil)

0 commit comments

Comments
 (0)