Skip to content

Commit 714c151

Browse files
Explicitly set a exponential backoff rate limiter for controller config (#416)
The rate limiter base delay and max delay values can be configured using flags. The default values are 500ms for base delay and 15min for max delay.
1 parent 03c61d3 commit 714c151

File tree

4 files changed

+204
-3
lines changed

4 files changed

+204
-3
lines changed

controllers/options.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,15 @@
11
package controllers
22

3-
import "errors"
3+
import (
4+
"errors"
5+
6+
"k8s.io/client-go/util/workqueue"
7+
)
48

59
// ControllerConfig is the configuration for cluster and machine controllers
610
type ControllerConfig struct {
711
MaxConcurrentReconciles int
12+
RateLimiter workqueue.RateLimiter
813
}
914

1015
// ControllerConfigOpts is a function that can be used to configure the controller config
@@ -20,3 +25,14 @@ func WithMaxConcurrentReconciles(max int) ControllerConfigOpts {
2025
return nil
2126
}
2227
}
28+
29+
// WithRateLimiter sets the rate limiter for the controller
30+
func WithRateLimiter(rateLimiter workqueue.RateLimiter) ControllerConfigOpts {
31+
return func(c *ControllerConfig) error {
32+
if rateLimiter == nil {
33+
return errors.New("rate limiter cannot be nil")
34+
}
35+
c.RateLimiter = rateLimiter
36+
return nil
37+
}
38+
}

controllers/options_test.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"testing"
55

66
"github.com/stretchr/testify/assert"
7+
"k8s.io/client-go/util/workqueue"
78
)
89

910
func TestWithMaxConcurrentReconciles(t *testing.T) {
@@ -37,3 +38,38 @@ func TestWithMaxConcurrentReconciles(t *testing.T) {
3738
})
3839
}
3940
}
41+
42+
func TestWithRateLimiter(t *testing.T) {
43+
tests := []struct {
44+
name string
45+
rateLimiter workqueue.RateLimiter
46+
expectError bool
47+
expectedType interface{}
48+
}{
49+
{
50+
name: "TestWithRateLimiterNil",
51+
rateLimiter: nil,
52+
expectError: true,
53+
expectedType: nil,
54+
},
55+
{
56+
name: "TestWithRateLimiterSet",
57+
rateLimiter: workqueue.DefaultControllerRateLimiter(),
58+
expectError: false,
59+
expectedType: &workqueue.MaxOfRateLimiter{},
60+
},
61+
}
62+
for _, tt := range tests {
63+
t.Run(tt.name, func(t *testing.T) {
64+
opt := WithRateLimiter(tt.rateLimiter)
65+
config := &ControllerConfig{}
66+
err := opt(config)
67+
if tt.expectError {
68+
assert.Error(t, err)
69+
} else {
70+
assert.NoError(t, err)
71+
assert.IsType(t, tt.expectedType, config.RateLimiter)
72+
}
73+
})
74+
}
75+
}

main.go

Lines changed: 68 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package main
1818

1919
import (
2020
"context"
21+
"errors"
2122
"flag"
2223
"fmt"
2324
"os"
@@ -26,6 +27,7 @@ import (
2627
"github.com/go-logr/logr"
2728
"github.com/spf13/pflag"
2829
"go.uber.org/zap/zapcore"
30+
"golang.org/x/time/rate"
2931
"k8s.io/apimachinery/pkg/runtime"
3032
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
3133
"k8s.io/client-go/informers"
@@ -35,6 +37,7 @@ import (
3537
_ "k8s.io/client-go/plugin/pkg/client/auth"
3638
"k8s.io/client-go/rest"
3739
"k8s.io/client-go/tools/cache"
40+
"k8s.io/client-go/util/workqueue"
3841
capiv1 "sigs.k8s.io/cluster-api/api/v1beta1"
3942
bootstrapv1 "sigs.k8s.io/cluster-api/bootstrap/kubeadm/api/v1beta1"
4043
capiflags "sigs.k8s.io/cluster-api/util/flags"
@@ -75,8 +78,54 @@ type managerConfig struct {
7578
concurrentReconcilesNutanixMachine int
7679
diagnosticsOptions capiflags.DiagnosticsOptions
7780

78-
logger logr.Logger
79-
restConfig *rest.Config
81+
logger logr.Logger
82+
restConfig *rest.Config
83+
rateLimiter workqueue.RateLimiter
84+
}
85+
86+
// compositeRateLimiter will build a limiter similar to the default from DefaultControllerRateLimiter but with custom values.
87+
func compositeRateLimiter(baseDelay, maxDelay time.Duration, bucketSize, qps int) (workqueue.RateLimiter, error) {
88+
// Validate the rate limiter configuration
89+
if err := validateRateLimiterConfig(baseDelay, maxDelay, bucketSize, qps); err != nil {
90+
return nil, err
91+
}
92+
exponentialBackoffLimiter := workqueue.NewItemExponentialFailureRateLimiter(baseDelay, maxDelay)
93+
bucketLimiter := &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(qps), bucketSize)}
94+
return workqueue.NewMaxOfRateLimiter(exponentialBackoffLimiter, bucketLimiter), nil
95+
}
96+
97+
// validateRateLimiterConfig validates the rate limiter configuration parameters
98+
func validateRateLimiterConfig(baseDelay, maxDelay time.Duration, bucketSize, qps int) error {
99+
// Check if baseDelay is a non-negative value
100+
if baseDelay < 0 {
101+
return errors.New("baseDelay cannot be negative")
102+
}
103+
104+
// Check if maxDelay is non-negative and greater than or equal to baseDelay
105+
if maxDelay < 0 {
106+
return errors.New("maxDelay cannot be negative")
107+
}
108+
109+
if maxDelay < baseDelay {
110+
return errors.New("maxDelay should be greater than or equal to baseDelay")
111+
}
112+
113+
// Check if bucketSize is a positive number
114+
if bucketSize <= 0 {
115+
return errors.New("bucketSize must be positive")
116+
}
117+
118+
// Check if qps is a positive number
119+
if qps <= 0 {
120+
return errors.New("minimum QPS must be positive")
121+
}
122+
123+
// Check if bucketSize is at least as large as the QPS
124+
if bucketSize < qps {
125+
return errors.New("bucketSize must be at least as large as the QPS to handle bursts effectively")
126+
}
127+
128+
return nil
80129
}
81130

82131
func parseFlags(config *managerConfig) {
@@ -88,6 +137,13 @@ func parseFlags(config *managerConfig) {
88137
pflag.IntVar(&maxConcurrentReconciles, "max-concurrent-reconciles", defaultMaxConcurrentReconciles,
89138
"The maximum number of allowed, concurrent reconciles.")
90139

140+
var baseDelay, maxDelay time.Duration
141+
var bucketSize, qps int
142+
pflag.DurationVar(&baseDelay, "rate-limiter-base-delay", 500*time.Millisecond, "The base delay for the rate limiter.")
143+
pflag.DurationVar(&maxDelay, "rate-limiter-max-delay", 15*time.Minute, "The maximum delay for the rate limiter.")
144+
pflag.IntVar(&bucketSize, "rate-limiter-bucket-size", 100, "The bucket size for the rate limiter.")
145+
pflag.IntVar(&qps, "rate-limiter-qps", 10, "The QPS for the rate limiter.")
146+
91147
opts := zap.Options{
92148
TimeEncoder: zapcore.RFC3339TimeEncoder,
93149
}
@@ -100,6 +156,14 @@ func parseFlags(config *managerConfig) {
100156

101157
config.concurrentReconcilesNutanixCluster = maxConcurrentReconciles
102158
config.concurrentReconcilesNutanixMachine = maxConcurrentReconciles
159+
160+
rateLimiter, err := compositeRateLimiter(baseDelay, maxDelay, bucketSize, qps)
161+
if err != nil {
162+
config.logger.Error(err, "unable to create composite rate limiter")
163+
os.Exit(1)
164+
}
165+
166+
config.rateLimiter = rateLimiter
103167
}
104168

105169
func setupLogger() logr.Logger {
@@ -189,6 +253,7 @@ func runManager(ctx context.Context, mgr manager.Manager, config *managerConfig)
189253

190254
clusterControllerOpts := []controllers.ControllerConfigOpts{
191255
controllers.WithMaxConcurrentReconciles(config.concurrentReconcilesNutanixCluster),
256+
controllers.WithRateLimiter(config.rateLimiter),
192257
}
193258

194259
if err := setupNutanixClusterController(ctx, mgr, secretInformer, configMapInformer, clusterControllerOpts...); err != nil {
@@ -197,6 +262,7 @@ func runManager(ctx context.Context, mgr manager.Manager, config *managerConfig)
197262

198263
machineControllerOpts := []controllers.ControllerConfigOpts{
199264
controllers.WithMaxConcurrentReconciles(config.concurrentReconcilesNutanixMachine),
265+
controllers.WithRateLimiter(config.rateLimiter),
200266
}
201267

202268
if err := setupNutanixMachineController(ctx, mgr, secretInformer, configMapInformer, machineControllerOpts...); err != nil {

main_test.go

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"os"
77
"path/filepath"
88
"testing"
9+
"time"
910

1011
"github.com/golang/mock/gomock"
1112
"github.com/stretchr/testify/assert"
@@ -119,12 +120,15 @@ func testRunManagerCommon(t *testing.T, ctrl *gomock.Controller) (*mockctlclient
119120

120121
require.NoError(t, err)
121122

123+
rateLimiter, err := compositeRateLimiter(500*time.Millisecond, 15*time.Minute, 100, 10)
124+
require.NoError(t, err)
122125
config := &managerConfig{
123126
enableLeaderElection: false,
124127
logger: setupLogger(),
125128
concurrentReconcilesNutanixCluster: 1,
126129
concurrentReconcilesNutanixMachine: 1,
127130
restConfig: cfg,
131+
rateLimiter: rateLimiter,
128132
}
129133

130134
client := mockctlclient.NewMockClient(ctrl)
@@ -246,3 +250,82 @@ func TestSetupControllersFailedAddToManager(t *testing.T) {
246250
err = setupNutanixMachineController(context.Background(), mgr, secretInformer, configMapInformer, copts...)
247251
assert.Error(t, err)
248252
}
253+
254+
func TestRateLimiter(t *testing.T) {
255+
tests := []struct {
256+
name string
257+
baseDelay time.Duration
258+
maxDelay time.Duration
259+
maxBurst int
260+
qps int
261+
expectedErr string
262+
}{
263+
{
264+
name: "valid rate limiter",
265+
baseDelay: 500 * time.Millisecond,
266+
maxDelay: 15 * time.Minute,
267+
maxBurst: 100,
268+
qps: 10,
269+
},
270+
{
271+
name: "negative base delay",
272+
baseDelay: -500 * time.Millisecond,
273+
maxDelay: 15 * time.Minute,
274+
maxBurst: 100,
275+
qps: 10,
276+
expectedErr: "baseDelay cannot be negative",
277+
},
278+
{
279+
name: "negative max delay",
280+
baseDelay: 500 * time.Millisecond,
281+
maxDelay: -15 * time.Minute,
282+
maxBurst: 100,
283+
qps: 10,
284+
expectedErr: "maxDelay cannot be negative",
285+
},
286+
{
287+
name: "maxDelay should be greater than or equal to baseDelay",
288+
baseDelay: 500 * time.Millisecond,
289+
maxDelay: 400 * time.Millisecond,
290+
maxBurst: 100,
291+
qps: 10,
292+
expectedErr: "maxDelay should be greater than or equal to baseDelay",
293+
},
294+
{
295+
name: "bucketSize must be positive",
296+
baseDelay: 500 * time.Millisecond,
297+
maxDelay: 15 * time.Minute,
298+
maxBurst: 0,
299+
qps: 10,
300+
expectedErr: "bucketSize must be positive",
301+
},
302+
{
303+
name: "qps must be positive",
304+
baseDelay: 500 * time.Millisecond,
305+
maxDelay: 15 * time.Minute,
306+
maxBurst: 100,
307+
qps: 0,
308+
expectedErr: "minimum QPS must be positive",
309+
},
310+
{
311+
name: "bucketSize must be greater than or equal to qps",
312+
baseDelay: 500 * time.Millisecond,
313+
maxDelay: 15 * time.Minute,
314+
maxBurst: 10,
315+
qps: 100,
316+
expectedErr: "bucketSize must be at least as large as the QPS to handle bursts effectively",
317+
},
318+
}
319+
320+
for _, tt := range tests {
321+
t.Run(tt.name, func(t *testing.T) {
322+
_, err := compositeRateLimiter(tt.baseDelay, tt.maxDelay, tt.maxBurst, tt.qps)
323+
if tt.expectedErr != "" {
324+
assert.Error(t, err)
325+
assert.Contains(t, err.Error(), tt.expectedErr)
326+
} else {
327+
assert.NoError(t, err)
328+
}
329+
})
330+
}
331+
}

0 commit comments

Comments
 (0)