Skip to content

Commit e31892d

Browse files
committed
Implement a per-item rate-limiter
ConfigurationPolicy already had something for this, under the "evaluation-backoff" flag. However, this was implemented *inside* the reconcile, and could often be overriden by other logic. The new approach can more easily be implemented on multiple controllers. This also tweaks (and explains) the default RateLimiters that were being used. Refs: - https://issues.redhat.com/browse/ACM-22682 Signed-off-by: Justin Kulikauskas <[email protected]>
1 parent 2ac5d70 commit e31892d

File tree

7 files changed

+85
-28
lines changed

7 files changed

+85
-28
lines changed

.vscode/launch.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
"--log-level=3",
1515
"--v=5",
1616
"--enable-operator-policy=true",
17-
"--evaluation-backoff=1",
17+
"--evaluation-backoff=2",
1818
],
1919
"env": {
2020
"WATCH_NAMESPACE": "managed",
@@ -35,7 +35,7 @@
3535
"--v=5",
3636
"--enable-operator-policy=true",
3737
"--target-kubeconfig-path=${workspaceFolder}/kubeconfig_managed2",
38-
"--evaluation-backoff=1",
38+
"--evaluation-backoff=2",
3939
],
4040
"env": {
4141
"WATCH_NAMESPACE": "managed",

controllers/configurationpolicy_controller.go

Lines changed: 11 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"encoding/base64"
99
"errors"
1010
"fmt"
11-
"math"
1211
"reflect"
1312
"regexp"
1413
"sort"
@@ -106,6 +105,7 @@ func (r *ConfigurationPolicyReconciler) SetupWithManager(
106105
Named(ControllerName).
107106
WithOptions(controller.Options{
108107
MaxConcurrentReconciles: int(evaluationConcurrency),
108+
RateLimiter: newPolicyRateLimiter(r.EvalBackoffSeconds),
109109
}).
110110
For(&policyv1.ConfigurationPolicy{}, builder.WithPredicates(
111111
predicate.Funcs{
@@ -449,13 +449,6 @@ func (r *ConfigurationPolicyReconciler) shouldEvaluatePolicy(
449449
}
450450
}
451451

452-
lastEvaluated, err := time.Parse(time.RFC3339, policy.Status.LastEvaluated)
453-
if err != nil {
454-
log.Error(err, "The policy has an invalid status.lastEvaluated value. Will evaluate it now.")
455-
456-
return true, 0
457-
}
458-
459452
usesSelector := policy.Spec.NamespaceSelector.LabelSelector != nil ||
460453
len(policy.Spec.NamespaceSelector.Include) != 0
461454

@@ -488,20 +481,6 @@ func (r *ConfigurationPolicyReconciler) shouldEvaluatePolicy(
488481
return false, 0
489482

490483
case errors.Is(getIntervalErr, policyv1.ErrIsWatch):
491-
minNextEval := lastEvaluated.Add(time.Second * time.Duration(r.EvalBackoffSeconds))
492-
durationLeft := minNextEval.Sub(now)
493-
494-
if durationLeft > 0 {
495-
log.V(1).Info(
496-
"The policy evaluation is configured for a watch event but rescheduling the evaluation due to the "+
497-
"configured evaluation backoff",
498-
"evaluationBackoffSeconds", r.EvalBackoffSeconds,
499-
"remainingSeconds", math.Round(durationLeft.Seconds()),
500-
)
501-
502-
return false, durationLeft
503-
}
504-
505484
log.V(1).Info("The policy evaluation is configured for a watch event. Will evaluate now.")
506485

507486
return true, 0
@@ -516,6 +495,16 @@ func (r *ConfigurationPolicyReconciler) shouldEvaluatePolicy(
516495
return true, 0
517496
}
518497

498+
// At this point, we have a valid evaluation interval, we can now determine
499+
// how long we need to wait (if at all).
500+
501+
lastEvaluated, err := time.Parse(time.RFC3339, policy.Status.LastEvaluated)
502+
if err != nil {
503+
log.Error(err, "The policy has an invalid status.lastEvaluated value. Will evaluate it now.")
504+
505+
return true, 0
506+
}
507+
519508
nextEvaluation := lastEvaluated.Add(interval)
520509
durationLeft := nextEvaluation.Sub(now)
521510

controllers/operatorpolicy_controller.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import (
3838
ctrl "sigs.k8s.io/controller-runtime"
3939
"sigs.k8s.io/controller-runtime/pkg/builder"
4040
"sigs.k8s.io/controller-runtime/pkg/client"
41+
"sigs.k8s.io/controller-runtime/pkg/controller"
4142
"sigs.k8s.io/controller-runtime/pkg/event"
4243
"sigs.k8s.io/controller-runtime/pkg/handler"
4344
"sigs.k8s.io/controller-runtime/pkg/predicate"
@@ -128,6 +129,9 @@ func (r *OperatorPolicyReconciler) SetupWithManager(
128129
) error {
129130
return ctrl.NewControllerManagedBy(mgr).
130131
Named(OperatorControllerName).
132+
WithOptions(controller.Options{
133+
RateLimiter: newPolicyRateLimiter(2),
134+
}).
131135
For(&policyv1beta1.OperatorPolicy{}, builder.WithPredicates(predicate.Funcs{
132136
// Skip most pure status/metadata updates
133137
UpdateFunc: func(e event.UpdateEvent) bool {

controllers/ratelimit.go

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
// Copyright (c) 2025 Red Hat, Inc.
2+
// Copyright Contributors to the Open Cluster Management project
3+
4+
package controllers
5+
6+
import (
7+
"sync"
8+
"time"
9+
10+
"golang.org/x/time/rate"
11+
"k8s.io/client-go/util/workqueue"
12+
"sigs.k8s.io/controller-runtime/pkg/reconcile"
13+
)
14+
15+
func newPolicyRateLimiter(minimumSecondsPerItem uint32) workqueue.TypedRateLimiter[reconcile.Request] {
16+
return workqueue.NewTypedMaxOfRateLimiter(
17+
// Based on the one in [email protected] DefaultTypedControllerRateLimiter
18+
workqueue.NewTypedItemExponentialFailureRateLimiter[reconcile.Request](
19+
500*time.Millisecond, // base delay: 0.5 seconds (5ms in client-go's default)
20+
10*time.Minute, // max delay: 10 minutes (16m40s in client-go's default)
21+
),
22+
// This is an overall (not per-item) limiter with 10 qps, 100 bucket size.
23+
// This is identical to the one in [email protected] DefaultTypedControllerRateLimiter
24+
&workqueue.TypedBucketRateLimiter[reconcile.Request]{
25+
Limiter: rate.NewLimiter(rate.Limit(10), 100),
26+
},
27+
// This limits each item individually, so each has a minimum interval between reconciles.
28+
&PerItemRateLimiter[reconcile.Request]{
29+
limiters: map[reconcile.Request]*rate.Limiter{},
30+
rate: rate.Every(time.Second * time.Duration(minimumSecondsPerItem)),
31+
burst: 1,
32+
},
33+
)
34+
}
35+
36+
type PerItemRateLimiter[T comparable] struct {
37+
lock sync.Mutex
38+
limiters map[T]*rate.Limiter
39+
rate rate.Limit
40+
burst int
41+
}
42+
43+
// Forget is a no-op for a PerItemRateLimiter. RateLimiters in client-go only limit retries on
44+
// failures, but this limiter applies to *all* requests.
45+
func (r *PerItemRateLimiter[T]) Forget(item T) {
46+
}
47+
48+
// NumRequeues always returns 0 for a PerItemRateLimiter.
49+
func (r *PerItemRateLimiter[T]) NumRequeues(item T) int {
50+
return 0
51+
}
52+
53+
func (r *PerItemRateLimiter[T]) When(item T) time.Duration {
54+
r.lock.Lock()
55+
defer r.lock.Unlock()
56+
57+
limiter, ok := r.limiters[item]
58+
if !ok {
59+
limiter = rate.NewLimiter(r.rate, r.burst)
60+
r.limiters[item] = limiter
61+
}
62+
63+
return limiter.Reserve().Delay()
64+
}

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ require (
2121
github.com/stretchr/testify v1.10.0
2222
go.uber.org/zap v1.27.0
2323
golang.org/x/mod v0.24.0
24+
golang.org/x/time v0.11.0
2425
gopkg.in/yaml.v3 v3.0.1
2526
k8s.io/api v0.31.9
2627
k8s.io/apiextensions-apiserver v0.31.9
@@ -101,7 +102,6 @@ require (
101102
golang.org/x/sys v0.33.0 // indirect
102103
golang.org/x/term v0.32.0 // indirect
103104
golang.org/x/text v0.25.0 // indirect
104-
golang.org/x/time v0.11.0 // indirect
105105
golang.org/x/tools v0.33.0 // indirect
106106
gomodules.xyz/jsonpatch/v2 v2.5.0 // indirect
107107
google.golang.org/genproto/googleapis/api v0.0.0-20240528184218-531527333157 // indirect

main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -806,7 +806,7 @@ func parseOpts(flags *pflag.FlagSet, args []string) *ctrlOpts {
806806
flags.Uint32Var(
807807
&opts.evalBackoffSeconds,
808808
"evaluation-backoff",
809-
10,
809+
5,
810810
"The number of seconds before a policy is eligible for reevaluation in watch mode (throttles frequently "+
811811
"evaluated policies)",
812812
)

main_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ func TestRunMain(t *testing.T) {
2121
"--leader-elect=false",
2222
fmt.Sprintf("--target-kubeconfig-path=%s", os.Getenv("TARGET_KUBECONFIG_PATH")),
2323
"--log-level=1",
24-
// Speed up the tests by not throttling the policy evaluations
25-
"--evaluation-backoff=1",
24+
// Speed up the tests by not throttling the policy evaluations very much
25+
"--evaluation-backoff=2",
2626
"--enable-operator-policy=true",
2727
)
2828

0 commit comments

Comments
 (0)