Skip to content

Commit 5097292

Browse files
committed
Rate limit ConfigurationPolicy evaluations
There was already an "evaluation-backoff" flag, however it was not the first thing considered in the Reconcile, and so it had limited effectiveness. The controller-runtime ratelimiters only throttle reconciles requeued due to errors: this new implementation properly prevents a constantly updating watched resource from monopolizing the config-policy-controller. This pattern could likely easily be used in other controllers. Refs: - https://issues.redhat.com/browse/ACM-22682 Signed-off-by: Justin Kulikauskas <[email protected]>
1 parent 41f8e26 commit 5097292

File tree

9 files changed

+193
-29
lines changed

9 files changed

+193
-29
lines changed

.vscode/launch.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
"--log-level=3",
1515
"--v=5",
1616
"--enable-operator-policy=true",
17-
"--evaluation-backoff=1",
17+
"--evaluation-backoff=2",
1818
],
1919
"env": {
2020
"WATCH_NAMESPACE": "managed",
@@ -35,7 +35,7 @@
3535
"--v=5",
3636
"--enable-operator-policy=true",
3737
"--target-kubeconfig-path=${workspaceFolder}/kubeconfig_managed2",
38-
"--evaluation-backoff=1",
38+
"--evaluation-backoff=2",
3939
],
4040
"env": {
4141
"WATCH_NAMESPACE": "managed",

controllers/configurationpolicy_controller.go

Lines changed: 23 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"encoding/base64"
99
"errors"
1010
"fmt"
11-
"math"
1211
"reflect"
1312
"regexp"
1413
"sort"
@@ -193,6 +192,7 @@ type ConfigurationPolicyReconciler struct {
193192
// The number of seconds before a policy is eligible for reevaluation in watch mode (throttles frequently evaluated
194193
// policies)
195194
EvalBackoffSeconds uint32
195+
ItemLimiters *PerItemRateLimiter[reconcile.Request]
196196
// lastEvaluatedCache contains the value of the last known ConfigurationPolicy resourceVersion per UID.
197197
// This is a workaround to account for race conditions where the status is updated but the controller-runtime cache
198198
// has not updated yet.
@@ -212,6 +212,18 @@ type ConfigurationPolicyReconciler struct {
212212
// Reconcile is responsible for evaluating and rescheduling ConfigurationPolicy evaluations.
213213
func (r *ConfigurationPolicyReconciler) Reconcile(ctx context.Context, request ctrl.Request) (ctrl.Result, error) {
214214
log := log.WithValues("name", request.Name, "namespace", request.Namespace)
215+
216+
if r.ItemLimiters != nil {
217+
limiter := r.ItemLimiters.GetLimiter(request)
218+
219+
// Check if a token is available; if so, `Allow` will spend it and return true
220+
if limiter.Tokens() < 1.0 || !limiter.Allow() {
221+
log.V(2).Info("Throttling policy evaluation")
222+
223+
return reconcile.Result{RequeueAfter: time.Second * time.Duration(r.EvalBackoffSeconds)}, nil
224+
}
225+
}
226+
215227
policy := &policyv1.ConfigurationPolicy{}
216228

217229
cleanup, err := r.cleanupImmediately()
@@ -469,13 +481,6 @@ func (r *ConfigurationPolicyReconciler) shouldEvaluatePolicy(
469481
}
470482
}
471483

472-
lastEvaluated, err := time.Parse(time.RFC3339, policy.Status.LastEvaluated)
473-
if err != nil {
474-
log.Error(err, "The policy has an invalid status.lastEvaluated value. Will evaluate it now.")
475-
476-
return true, 0
477-
}
478-
479484
usesSelector := policy.Spec.NamespaceSelector.LabelSelector != nil ||
480485
len(policy.Spec.NamespaceSelector.Include) != 0
481486

@@ -508,20 +513,6 @@ func (r *ConfigurationPolicyReconciler) shouldEvaluatePolicy(
508513
return false, 0
509514

510515
case errors.Is(getIntervalErr, policyv1.ErrIsWatch):
511-
minNextEval := lastEvaluated.Add(time.Second * time.Duration(r.EvalBackoffSeconds))
512-
durationLeft := minNextEval.Sub(now)
513-
514-
if durationLeft > 0 {
515-
log.V(1).Info(
516-
"The policy evaluation is configured for a watch event but rescheduling the evaluation due to the "+
517-
"configured evaluation backoff",
518-
"evaluationBackoffSeconds", r.EvalBackoffSeconds,
519-
"remainingSeconds", math.Round(durationLeft.Seconds()),
520-
)
521-
522-
return false, durationLeft
523-
}
524-
525516
log.V(1).Info("The policy evaluation is configured for a watch event. Will evaluate now.")
526517

527518
return true, 0
@@ -536,6 +527,16 @@ func (r *ConfigurationPolicyReconciler) shouldEvaluatePolicy(
536527
return true, 0
537528
}
538529

530+
// At this point, we have a valid evaluation interval, we can now determine
531+
// how long we need to wait (if at all).
532+
533+
lastEvaluated, err := time.Parse(time.RFC3339, policy.Status.LastEvaluated)
534+
if err != nil {
535+
log.Error(err, "The policy has an invalid status.lastEvaluated value. Will evaluate it now.")
536+
537+
return true, 0
538+
}
539+
539540
nextEvaluation := lastEvaluated.Add(interval)
540541
durationLeft := nextEvaluation.Sub(now)
541542

controllers/ratelimit.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
// Copyright (c) 2025 Red Hat, Inc.
2+
// Copyright Contributors to the Open Cluster Management project
3+
4+
package controllers
5+
6+
import (
7+
"sync"
8+
"time"
9+
10+
"golang.org/x/time/rate"
11+
)
12+
13+
func NewPerItemRateLimiter[T comparable](backoffSeconds uint32, burst int) *PerItemRateLimiter[T] {
14+
return &PerItemRateLimiter[T]{
15+
lock: sync.RWMutex{},
16+
limiters: map[T]*rate.Limiter{},
17+
rate: rate.Every(time.Second * time.Duration(backoffSeconds)),
18+
burst: burst,
19+
}
20+
}
21+
22+
type PerItemRateLimiter[T comparable] struct {
23+
lock sync.RWMutex
24+
limiters map[T]*rate.Limiter
25+
rate rate.Limit
26+
burst int
27+
}
28+
29+
func (l *PerItemRateLimiter[T]) GetLimiter(item T) *rate.Limiter {
30+
l.lock.RLock()
31+
limiter, exists := l.limiters[item]
32+
l.lock.RUnlock()
33+
34+
if !exists {
35+
l.lock.Lock()
36+
37+
limiter = rate.NewLimiter(l.rate, l.burst)
38+
l.limiters[item] = limiter
39+
40+
l.lock.Unlock()
41+
}
42+
43+
return limiter
44+
}

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ require (
2121
github.com/stretchr/testify v1.10.0
2222
go.uber.org/zap v1.27.0
2323
golang.org/x/mod v0.24.0
24+
golang.org/x/time v0.11.0
2425
gopkg.in/yaml.v3 v3.0.1
2526
k8s.io/api v0.31.9
2627
k8s.io/apiextensions-apiserver v0.31.9
@@ -101,7 +102,6 @@ require (
101102
golang.org/x/sys v0.33.0 // indirect
102103
golang.org/x/term v0.32.0 // indirect
103104
golang.org/x/text v0.25.0 // indirect
104-
golang.org/x/time v0.11.0 // indirect
105105
golang.org/x/tools v0.33.0 // indirect
106106
gomodules.xyz/jsonpatch/v2 v2.5.0 // indirect
107107
google.golang.org/genproto/googleapis/api v0.0.0-20240528184218-531527333157 // indirect

main.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -515,6 +515,7 @@ func main() {
515515
EnableMetrics: opts.enableMetrics,
516516
UninstallMode: beingUninstalled,
517517
EvalBackoffSeconds: opts.evalBackoffSeconds,
518+
ItemLimiters: controllers.NewPerItemRateLimiter[reconcile.Request](opts.evalBackoffSeconds, 1),
518519
HubDynamicWatcher: configPolHubDynamicWatcher,
519520
HubClient: hubClient,
520521
ClusterName: opts.clusterName,
@@ -795,8 +796,8 @@ func parseOpts(flags *pflag.FlagSet, args []string) *ctrlOpts {
795796
flags.Uint32Var(
796797
&opts.evalBackoffSeconds,
797798
"evaluation-backoff",
798-
10,
799-
"The number of seconds before a policy is eligible for reevaluation in watch mode (throttles frequently "+
799+
5,
800+
"The number of seconds before a policy is eligible for reevaluation (throttles frequently "+
800801
"evaluated policies)",
801802
)
802803

main_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ func TestRunMain(t *testing.T) {
2121
"--leader-elect=false",
2222
fmt.Sprintf("--target-kubeconfig-path=%s", os.Getenv("TARGET_KUBECONFIG_PATH")),
2323
"--log-level=1",
24-
// Speed up the tests by not throttling the policy evaluations
25-
"--evaluation-backoff=1",
24+
// Speed up the tests by not throttling the policy evaluations very much
25+
"--evaluation-backoff=2",
2626
"--enable-operator-policy=true",
2727
)
2828

test/e2e/case46_ratelimit_test.go

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
// Copyright (c) 2025 Red Hat, Inc.
2+
// Copyright Contributors to the Open Cluster Management project
3+
4+
package e2e
5+
6+
import (
7+
"fmt"
8+
"strconv"
9+
"time"
10+
11+
. "github.com/onsi/ginkgo/v2"
12+
. "github.com/onsi/gomega"
13+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
14+
15+
"open-cluster-management.io/config-policy-controller/test/utils"
16+
)
17+
18+
var _ = Describe("Test config policy ratelimiting", Ordered, func() {
19+
const (
20+
policyName = "case46-cfgpolicy"
21+
policyYaml = "../resources/case46_ratelimit/" + policyName + ".yaml"
22+
configMapName = "case46-configmap-to-watch"
23+
configMapYaml = "../resources/case46_ratelimit/" + configMapName + ".yaml"
24+
managedCMName = "case46-configmap-from-policy"
25+
)
26+
27+
metricCheck := func(metricName string, label string, value string) (float64, error) {
28+
metric := utils.GetMetrics(
29+
metricName, fmt.Sprintf(`%s=\"%s\"`, label, value))
30+
if len(metric) == 0 {
31+
return 0, fmt.Errorf("failed to retrieve any %s metric", metricName)
32+
}
33+
metricVal, err := strconv.ParseFloat(metric[0], 64)
34+
if err != nil {
35+
return 0, fmt.Errorf("error converting metric: %w", err)
36+
}
37+
38+
return metricVal, nil
39+
}
40+
41+
BeforeAll(func() {
42+
By("Creating " + policyYaml)
43+
utils.Kubectl("apply", "-f", policyYaml, "-n", testNamespace)
44+
By("Creating " + configMapYaml)
45+
utils.Kubectl("apply", "-f", configMapYaml) // The YAML specifies namespace "default"
46+
})
47+
48+
It("should initially have a small number of evaluations", func() {
49+
Eventually(
50+
metricCheck, 10, 2,
51+
).WithArguments("config_policy_evaluation_total", "name", policyName).Should(BeNumerically("<", 4))
52+
53+
Consistently(
54+
metricCheck, 10, 2,
55+
).WithArguments("config_policy_evaluation_total", "name", policyName).Should(BeNumerically("<", 4))
56+
})
57+
58+
value := 0
59+
60+
It("should limit the number of evaluations when a watched object changes frequently", func() {
61+
start := time.Now()
62+
63+
By("Updating the watched configmap frequently for 10 seconds")
64+
for start.Add(10 * time.Second).After(time.Now()) {
65+
value++
66+
utils.Kubectl("patch", "configmap", configMapName, "--type=json", "-p",
67+
`[{"op": "replace", "path": "/data/foo", "value": "`+strconv.Itoa(value)+`"}]`)
68+
time.Sleep(150 * time.Millisecond)
69+
}
70+
71+
Consistently(
72+
metricCheck, 10, 2,
73+
).WithArguments("config_policy_evaluation_total", "name", policyName).Should(BeNumerically("<", 12))
74+
})
75+
76+
It("should have updated the object to the final value", func() {
77+
By("Verifying the configmap has bar=" + strconv.Itoa(value))
78+
Eventually(func(g Gomega) {
79+
cm := utils.GetWithTimeout(clientManagedDynamic, gvrConfigMap,
80+
managedCMName, "default", true, defaultTimeoutSeconds)
81+
g.Expect(cm).NotTo(BeNil())
82+
83+
val, found, err := unstructured.NestedString(cm.Object, "data", "bar")
84+
g.Expect(err).NotTo(HaveOccurred())
85+
g.Expect(found).To(BeTrue())
86+
g.Expect(val).Should(Equal(strconv.Itoa(value)))
87+
}, defaultTimeoutSeconds, 1).Should(Succeed())
88+
})
89+
90+
AfterAll(func() {
91+
utils.KubectlDelete("-n", testNamespace, "-f", policyYaml)
92+
utils.KubectlDelete("-f", configMapYaml)
93+
utils.KubectlDelete("configmap", "-n", "default", "case46-configmap-from-policy")
94+
})
95+
})
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
apiVersion: policy.open-cluster-management.io/v1
2+
kind: ConfigurationPolicy
3+
metadata:
4+
name: case46-cfgpolicy
5+
spec:
6+
remediationAction: enforce
7+
object-templates:
8+
- complianceType: musthave
9+
objectDefinition:
10+
apiVersion: v1
11+
kind: ConfigMap
12+
metadata:
13+
name: case46-configmap-from-policy
14+
namespace: default
15+
data:
16+
bar: '{{ fromConfigMap "default" "case46-configmap-to-watch" "foo" }}'
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
apiVersion: v1
2+
kind: ConfigMap
3+
metadata:
4+
name: case46-configmap-to-watch
5+
namespace: default
6+
data:
7+
foo: "0"

0 commit comments

Comments
 (0)