Skip to content

Commit 6846a0a

Browse files
committed
Implement ItemBucketRateLimiter
1 parent 208df38 commit 6846a0a

File tree

3 files changed

+78
-0
lines changed

3 files changed

+78
-0
lines changed

staging/src/k8s.io/client-go/util/workqueue/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ go_test(
2020
deps = [
2121
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
2222
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
23+
"//vendor/golang.org/x/time/rate:go_default_library",
2324
],
2425
)
2526

staging/src/k8s.io/client-go/util/workqueue/default_rate_limiters.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,54 @@ func (r *BucketRateLimiter) NumRequeues(item interface{}) int {
6262
func (r *BucketRateLimiter) Forget(item interface{}) {
6363
}
6464

65+
// ItemBucketRateLimiter implements a workqueue ratelimiter API using standard rate.Limiter.
66+
// Each key is using a separate limiter.
67+
type ItemBucketRateLimiter struct {
68+
r rate.Limit
69+
burst int
70+
71+
limitersLock sync.Mutex
72+
limiters map[interface{}]*rate.Limiter
73+
}
74+
75+
var _ RateLimiter = &ItemBucketRateLimiter{}
76+
77+
// NewItemBucketRateLimiter creates new ItemBucketRateLimiter instance.
78+
func NewItemBucketRateLimiter(r rate.Limit, burst int) *ItemBucketRateLimiter {
79+
return &ItemBucketRateLimiter{
80+
r: r,
81+
burst: burst,
82+
limiters: make(map[interface{}]*rate.Limiter),
83+
}
84+
}
85+
86+
// When returns a time.Duration which we need to wait before item is processed.
87+
func (r *ItemBucketRateLimiter) When(item interface{}) time.Duration {
88+
r.limitersLock.Lock()
89+
defer r.limitersLock.Unlock()
90+
91+
limiter, ok := r.limiters[item]
92+
if !ok {
93+
limiter = rate.NewLimiter(r.r, r.burst)
94+
r.limiters[item] = limiter
95+
}
96+
97+
return limiter.Reserve().Delay()
98+
}
99+
100+
// NumRequeues returns always 0 (doesn't apply to ItemBucketRateLimiter).
101+
func (r *ItemBucketRateLimiter) NumRequeues(item interface{}) int {
102+
return 0
103+
}
104+
105+
// Forget removes item from the internal state.
106+
func (r *ItemBucketRateLimiter) Forget(item interface{}) {
107+
r.limitersLock.Lock()
108+
defer r.limitersLock.Unlock()
109+
110+
delete(r.limiters, item)
111+
}
112+
65113
// ItemExponentialFailureRateLimiter does a simple baseDelay*2^<num-failures> limit
66114
// dealing with max failures and expiration are up to the caller
67115
type ItemExponentialFailureRateLimiter struct {

staging/src/k8s.io/client-go/util/workqueue/default_rate_limiters_test.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ package workqueue
1919
import (
2020
"testing"
2121
"time"
22+
23+
"golang.org/x/time/rate"
2224
)
2325

2426
func TestItemExponentialFailureRateLimiter(t *testing.T) {
@@ -96,6 +98,33 @@ func TestItemExponentialFailureRateLimiterOverFlow(t *testing.T) {
9698

9799
}
98100

101+
func TestItemBucketRateLimiter(t *testing.T) {
102+
limiter := NewItemBucketRateLimiter(rate.Every(100*time.Millisecond), 1)
103+
104+
// Use initial burst.
105+
if got := limiter.When("one"); got != 0 {
106+
t.Errorf("limiter.When(two) = %v; want 0", got)
107+
}
108+
for i := 0; i < 1000; i++ {
109+
limiter.When("one")
110+
}
111+
// limiter.When should be at this point = 1000 * rate.Limit.
112+
// We set the threshold 1s below this value to avoid race conditions.
113+
if got, want := limiter.When("one"), 990*100*time.Millisecond; got < want {
114+
t.Errorf("limiter.When(one) = %v; want at least %v", got, want)
115+
}
116+
117+
if got := limiter.When("two"); got != 0 {
118+
t.Errorf("limiter.When(two) = %v; want 0", got)
119+
}
120+
121+
limiter.Forget("one")
122+
// Use new budget.
123+
if got := limiter.When("one"); got != 0 {
124+
t.Errorf("limiter.When(two) = %v; want 0", got)
125+
}
126+
}
127+
99128
func TestItemFastSlowRateLimiter(t *testing.T) {
100129
limiter := NewItemFastSlowRateLimiter(5*time.Millisecond, 10*time.Second, 3)
101130

0 commit comments

Comments
 (0)