Skip to content

Commit 2c729b6

Browse files
committed
RL: move burst and rate out of ratelimit state
1 parent 46dc4f3 commit 2c729b6

File tree

3 files changed

+154
-259
lines changed

3 files changed

+154
-259
lines changed

forward/bwlimit.go

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,18 @@ import (
1414
const copyChunkSize = 128 * 1024
1515

1616
type BWLimit struct {
17-
d []rate.Limiter
18-
u []rate.Limiter
17+
limit rate.Limit
18+
burst int64
19+
d []rate.Limiter
20+
u []rate.Limiter
1921
}
2022

2123
func NewBWLimit(bytesPerSecond float64, burst int64, buckets uint, separate bool) *BWLimit {
2224
if buckets == 0 {
2325
buckets = 1
2426
}
25-
lim := *(rate.NewLimiter(rate.Limit(bytesPerSecond), max(copyChunkSize, burst)))
27+
burst = max(copyChunkSize, burst)
28+
lim := *(rate.NewLimiter(burst))
2629
d := make([]rate.Limiter, buckets)
2730
for i := range d {
2831
d[i] = lim
@@ -35,8 +38,10 @@ func NewBWLimit(bytesPerSecond float64, burst int64, buckets uint, separate bool
3538
}
3639
}
3740
return &BWLimit{
38-
d: d,
39-
u: u,
41+
limit: rate.Limit(bytesPerSecond),
42+
burst: burst,
43+
d: d,
44+
u: u,
4045
}
4146
}
4247

@@ -48,7 +53,7 @@ func (l *BWLimit) copy(ctx context.Context, rl *rate.Limiter, dst io.Writer, src
4853
var n int64
4954
for {
5055
t := time.Now()
51-
r := rl.ReserveN(t, copyChunkSize)
56+
r := rl.ReserveN(l.limit, l.burst, t, copyChunkSize)
5257
if !r.OK() {
5358
err = errors.New("can't get rate limit reservation")
5459
return
@@ -65,9 +70,9 @@ func (l *BWLimit) copy(ctx context.Context, rl *rate.Limiter, dst io.Writer, src
6570
n, err = io.Copy(dst, lim)
6671
written += n
6772
if n < copyChunkSize {
68-
r.CancelAt(t)
73+
r.CancelAt(l.limit, l.burst, t)
6974
if n > 0 {
70-
rl.ReserveN(t, n)
75+
rl.ReserveN(l.limit, l.burst, t, n)
7176
}
7277
}
7378
if err != nil {

rate/rate.go

Lines changed: 36 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -56,65 +56,44 @@ func Every(interval time.Duration) Limit {
5656
// Limiter is safe for simultaneous use by multiple goroutines.
5757
type Limiter struct {
5858
mu sync.Mutex
59-
limit Limit
60-
burst int64
6159
tokens float64
6260
// last is the last time the limiter's tokens field was updated
6361
last int64
6462
// lastEvent is the latest time of a rate-limited event (past or future)
6563
lastEvent int64
6664
}
6765

68-
// Limit returns the maximum overall event rate.
69-
func (lim *Limiter) Limit() Limit {
70-
lim.mu.Lock()
71-
defer lim.mu.Unlock()
72-
return lim.limit
73-
}
74-
75-
// Burst returns the maximum burst size. Burst is the maximum number of tokens
76-
// that can be consumed in a single call to Allow, Reserve, or Wait, so higher
77-
// Burst values allow more events to happen at once.
78-
// A zero Burst allows no events, unless limit == Inf.
79-
func (lim *Limiter) Burst() int64 {
80-
lim.mu.Lock()
81-
defer lim.mu.Unlock()
82-
return lim.burst
83-
}
84-
8566
// TokensAt returns the number of tokens available at time t.
86-
func (lim *Limiter) TokensAt(t time.Time) float64 {
67+
func (lim *Limiter) TokensAt(limit Limit, burst int64, t time.Time) float64 {
8768
lim.mu.Lock()
88-
tokens := lim.advance(t) // does not mutate lim
69+
tokens := lim.advance(limit, burst, t) // does not mutate lim
8970
lim.mu.Unlock()
9071
return tokens
9172
}
9273

9374
// Tokens returns the number of tokens available now.
94-
func (lim *Limiter) Tokens() float64 {
95-
return lim.TokensAt(time.Now())
75+
func (lim *Limiter) Tokens(limit Limit, burst int64) float64 {
76+
return lim.TokensAt(limit, burst, time.Now())
9677
}
9778

9879
// NewLimiter returns a new Limiter that allows events up to rate r and permits
9980
// bursts of at most b tokens.
100-
func NewLimiter(r Limit, b int64) *Limiter {
81+
func NewLimiter(b int64) *Limiter {
10182
return &Limiter{
102-
limit: r,
103-
burst: b,
10483
tokens: float64(b),
10584
}
10685
}
10786

10887
// Allow reports whether an event may happen now.
109-
func (lim *Limiter) Allow() bool {
110-
return lim.AllowN(time.Now(), 1)
88+
func (lim *Limiter) Allow(limit Limit, burst int64) bool {
89+
return lim.AllowN(limit, burst, time.Now(), 1)
11190
}
11291

11392
// AllowN reports whether n events may happen at time t.
11493
// Use this method if you intend to drop / skip events that exceed the rate limit.
11594
// Otherwise use Reserve or Wait.
116-
func (lim *Limiter) AllowN(t time.Time, n int64) bool {
117-
return lim.reserveN(t, n, 0).ok
95+
func (lim *Limiter) AllowN(limit Limit, burst int64, t time.Time, n int64) bool {
96+
return lim.reserveN(limit, burst, t, n, 0).ok
11897
}
11998

12099
// A Reservation holds information about events that are permitted by a Limiter to happen after a delay.
@@ -159,37 +138,37 @@ func (r *Reservation) DelayFrom(t time.Time) time.Duration {
159138
}
160139

161140
// Cancel is shorthand for CancelAt(time.Now()).
162-
func (r *Reservation) Cancel() {
163-
r.CancelAt(time.Now())
141+
func (r *Reservation) Cancel(limit Limit, burst int64) {
142+
r.CancelAt(limit, burst, time.Now())
164143
}
165144

166145
// CancelAt indicates that the reservation holder will not perform the reserved action
167146
// and reverses the effects of this Reservation on the rate limit as much as possible,
168147
// considering that other reservations may have already been made.
169-
func (r *Reservation) CancelAt(t time.Time) {
148+
func (r *Reservation) CancelAt(limit Limit, burst int64, t time.Time) {
170149
if !r.ok {
171150
return
172151
}
173152

174153
r.lim.mu.Lock()
175154
defer r.lim.mu.Unlock()
176155

177-
if r.lim.limit == Inf || r.tokens == 0 || time.Unix(0, r.timeToAct).Before(t) {
156+
if limit == Inf || r.tokens == 0 || time.Unix(0, r.timeToAct).Before(t) {
178157
return
179158
}
180159

181160
// calculate tokens to restore
182161
// The duration between lim.lastEvent and r.timeToAct tells us how many tokens were reserved
183162
// after r was obtained. These tokens should not be restored.
184-
restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(time.Duration(r.lim.lastEvent - r.timeToAct))
163+
restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(time.Duration(r.lim.lastEvent-r.timeToAct))
185164
if restoreTokens <= 0 {
186165
return
187166
}
188167
// advance time to now
189-
tokens := r.lim.advance(t)
168+
tokens := r.lim.advance(limit, burst, t)
190169
// calculate new number of tokens
191170
tokens += restoreTokens
192-
if burst := float64(r.lim.burst); tokens > burst {
171+
if burst := float64(burst); tokens > burst {
193172
tokens = burst
194173
}
195174
// update state
@@ -204,8 +183,8 @@ func (r *Reservation) CancelAt(t time.Time) {
204183
}
205184

206185
// Reserve is shorthand for ReserveN(time.Now(), 1).
207-
func (lim *Limiter) Reserve() *Reservation {
208-
return lim.ReserveN(time.Now(), 1)
186+
func (lim *Limiter) Reserve(limit Limit, burst int64) *Reservation {
187+
return lim.ReserveN(limit, burst, time.Now(), 1)
209188
}
210189

211190
// ReserveN returns a Reservation that indicates how long the caller must wait before n events happen.
@@ -224,38 +203,33 @@ func (lim *Limiter) Reserve() *Reservation {
224203
// Use this method if you wish to wait and slow down in accordance with the rate limit without dropping events.
225204
// If you need to respect a deadline or cancel the delay, use Wait instead.
226205
// To drop or skip events exceeding rate limit, use Allow instead.
227-
func (lim *Limiter) ReserveN(t time.Time, n int64) *Reservation {
228-
r := lim.reserveN(t, n, InfDuration)
206+
func (lim *Limiter) ReserveN(limit Limit, burst int64, t time.Time, n int64) *Reservation {
207+
r := lim.reserveN(limit, burst, t, n, InfDuration)
229208
return &r
230209
}
231210

232211
// Wait is shorthand for WaitN(ctx, 1).
233-
func (lim *Limiter) Wait(ctx context.Context) (err error) {
234-
return lim.WaitN(ctx, 1)
212+
func (lim *Limiter) Wait(ctx context.Context, limit Limit, burst int64) (err error) {
213+
return lim.WaitN(ctx, limit, burst, 1)
235214
}
236215

237216
// WaitN blocks until lim permits n events to happen.
238217
// It returns an error if n exceeds the Limiter's burst size, the Context is
239218
// canceled, or the expected wait time exceeds the Context's Deadline.
240219
// The burst limit is ignored if the rate limit is Inf.
241-
func (lim *Limiter) WaitN(ctx context.Context, n int64) (err error) {
220+
func (lim *Limiter) WaitN(ctx context.Context, limit Limit, burst int64, n int64) (err error) {
242221
// The test code calls lim.wait with a fake timer generator.
243222
// This is the real timer generator.
244223
newTimer := func(d time.Duration) (<-chan time.Time, func() bool, func()) {
245224
timer := time.NewTimer(d)
246225
return timer.C, timer.Stop, func() {}
247226
}
248227

249-
return lim.wait(ctx, n, time.Now(), newTimer)
228+
return lim.wait(ctx, limit, burst, n, time.Now(), newTimer)
250229
}
251230

252231
// wait is the internal implementation of WaitN.
253-
func (lim *Limiter) wait(ctx context.Context, n int64, t time.Time, newTimer func(d time.Duration) (<-chan time.Time, func() bool, func())) error {
254-
lim.mu.Lock()
255-
burst := lim.burst
256-
limit := lim.limit
257-
lim.mu.Unlock()
258-
232+
func (lim *Limiter) wait(ctx context.Context, limit Limit, burst int64, n int64, t time.Time, newTimer func(d time.Duration) (<-chan time.Time, func() bool, func())) error {
259233
if n > burst && limit != Inf {
260234
return fmt.Errorf("rate: Wait(n=%d) exceeds limiter's burst %d", n, burst)
261235
}
@@ -271,7 +245,7 @@ func (lim *Limiter) wait(ctx context.Context, n int64, t time.Time, newTimer fun
271245
waitLimit = deadline.Sub(t)
272246
}
273247
// Reserve
274-
r := lim.reserveN(t, n, waitLimit)
248+
r := lim.reserveN(limit, burst, t, n, waitLimit)
275249
if !r.ok {
276250
return fmt.Errorf("rate: Wait(n=%d) would exceed context deadline", n)
277251
}
@@ -290,55 +264,19 @@ func (lim *Limiter) wait(ctx context.Context, n int64, t time.Time, newTimer fun
290264
case <-ctx.Done():
291265
// Context was canceled before we could proceed. Cancel the
292266
// reservation, which may permit other events to proceed sooner.
293-
r.Cancel()
267+
r.Cancel(limit, burst)
294268
return ctx.Err()
295269
}
296270
}
297271

298-
// SetLimit is shorthand for SetLimitAt(time.Now(), newLimit).
299-
func (lim *Limiter) SetLimit(newLimit Limit) {
300-
lim.SetLimitAt(time.Now(), newLimit)
301-
}
302-
303-
// SetLimitAt sets a new Limit for the limiter. The new Limit, and Burst, may be violated
304-
// or underutilized by those which reserved (using Reserve or Wait) but did not yet act
305-
// before SetLimitAt was called.
306-
func (lim *Limiter) SetLimitAt(t time.Time, newLimit Limit) {
307-
lim.mu.Lock()
308-
defer lim.mu.Unlock()
309-
310-
tokens := lim.advance(t)
311-
312-
lim.last = t.UnixNano()
313-
lim.tokens = tokens
314-
lim.limit = newLimit
315-
}
316-
317-
// SetBurst is shorthand for SetBurstAt(time.Now(), newBurst).
318-
func (lim *Limiter) SetBurst(newBurst int64) {
319-
lim.SetBurstAt(time.Now(), newBurst)
320-
}
321-
322-
// SetBurstAt sets a new burst size for the limiter.
323-
func (lim *Limiter) SetBurstAt(t time.Time, newBurst int64) {
324-
lim.mu.Lock()
325-
defer lim.mu.Unlock()
326-
327-
tokens := lim.advance(t)
328-
329-
lim.last = t.UnixNano()
330-
lim.tokens = tokens
331-
lim.burst = newBurst
332-
}
333-
334272
// reserveN is a helper method for AllowN, ReserveN, and WaitN.
335273
// maxFutureReserve specifies the maximum reservation wait duration allowed.
336274
// reserveN returns Reservation, not *Reservation, to avoid allocation in AllowN and WaitN.
337-
func (lim *Limiter) reserveN(t time.Time, n int64, maxFutureReserve time.Duration) Reservation {
275+
func (lim *Limiter) reserveN(limit Limit, burst int64, t time.Time, n int64, maxFutureReserve time.Duration) Reservation {
338276
lim.mu.Lock()
339277
defer lim.mu.Unlock()
340278

341-
if lim.limit == Inf {
279+
if limit == Inf {
342280
return Reservation{
343281
ok: true,
344282
lim: lim,
@@ -347,25 +285,25 @@ func (lim *Limiter) reserveN(t time.Time, n int64, maxFutureReserve time.Duratio
347285
}
348286
}
349287

350-
tokens := lim.advance(t)
288+
tokens := lim.advance(limit, burst, t)
351289

352290
// Calculate the remaining number of tokens resulting from the request.
353291
tokens -= float64(n)
354292

355293
// Calculate the wait duration
356294
var waitDuration time.Duration
357295
if tokens < 0 {
358-
waitDuration = lim.limit.durationFromTokens(-tokens)
296+
waitDuration = limit.durationFromTokens(-tokens)
359297
}
360298

361299
// Decide result
362-
ok := n <= lim.burst && waitDuration <= maxFutureReserve
300+
ok := n <= burst && waitDuration <= maxFutureReserve
363301

364302
// Prepare reservation
365303
r := Reservation{
366304
ok: ok,
367305
lim: lim,
368-
limit: lim.limit,
306+
limit: limit,
369307
}
370308
if ok {
371309
r.tokens = n
@@ -384,17 +322,17 @@ func (lim *Limiter) reserveN(t time.Time, n int64, maxFutureReserve time.Duratio
384322
// resulting from the passage of time.
385323
// lim is not changed.
386324
// advance requires that lim.mu is held.
387-
func (lim *Limiter) advance(t time.Time) (newTokens float64) {
325+
func (lim *Limiter) advance(limit Limit, burst int64, t time.Time) (newTokens float64) {
388326
last := time.Unix(0, lim.last)
389327
if t.Before(last) {
390328
last = t
391329
}
392330

393331
// Calculate the new number of tokens, due to time that passed.
394332
elapsed := t.Sub(last)
395-
delta := lim.limit.tokensFromDuration(elapsed)
333+
delta := limit.tokensFromDuration(elapsed)
396334
tokens := lim.tokens + delta
397-
if burst := float64(lim.burst); tokens > burst {
335+
if burst := float64(burst); tokens > burst {
398336
tokens = burst
399337
}
400338
return tokens

0 commit comments

Comments
 (0)