From ce3cce58fb75a401d6e94deef2a3b8dac7a7e905 Mon Sep 17 00:00:00 2001 From: Justin Kulikauskas Date: Mon, 27 Oct 2025 13:26:16 -0400 Subject: [PATCH] Rate limit ConfigurationPolicy evaluations There was already an "evaluation-backoff" flag, however it was not the first thing considered in the Reconcile, and so it had limited effectiveness. The controller-runtime ratelimiters only throttle reconciles requeued due to errors: this new implementation properly prevents a constantly updating watched resource from monopolizing the config-policy-controller. This pattern could likely easily be used in other controllers. Refs: - https://issues.redhat.com/browse/ACM-22682 Signed-off-by: Justin Kulikauskas --- .vscode/launch.json | 4 +- controllers/configurationpolicy_controller.go | 45 ++++----- controllers/ratelimit.go | 44 +++++++++ go.mod | 2 +- main.go | 5 +- main_test.go | 4 +- test/e2e/case47_ratelimit_test.go | 95 +++++++++++++++++++ .../case47_ratelimit/case47-cfgpolicy.yaml | 16 ++++ .../case47-configmap-to-watch.yaml | 7 ++ 9 files changed, 193 insertions(+), 29 deletions(-) create mode 100644 controllers/ratelimit.go create mode 100644 test/e2e/case47_ratelimit_test.go create mode 100644 test/resources/case47_ratelimit/case47-cfgpolicy.yaml create mode 100644 test/resources/case47_ratelimit/case47-configmap-to-watch.yaml diff --git a/.vscode/launch.json b/.vscode/launch.json index 24d70b23..bbe62669 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -14,7 +14,7 @@ "--log-level=3", "--v=5", "--enable-operator-policy=true", - "--evaluation-backoff=1", + "--evaluation-backoff=2", ], "env": { "WATCH_NAMESPACE": "managed", @@ -35,7 +35,7 @@ "--v=5", "--enable-operator-policy=true", "--target-kubeconfig-path=${workspaceFolder}/kubeconfig_managed2", - "--evaluation-backoff=1", + "--evaluation-backoff=2", ], "env": { "WATCH_NAMESPACE": "managed", diff --git a/controllers/configurationpolicy_controller.go b/controllers/configurationpolicy_controller.go index fb7c44f4..c8a286bd 100644 --- a/controllers/configurationpolicy_controller.go +++ b/controllers/configurationpolicy_controller.go @@ -8,7 +8,6 @@ import ( "encoding/base64" "errors" "fmt" - "math" "reflect" "regexp" "sort" @@ -193,6 +192,7 @@ type ConfigurationPolicyReconciler struct { // The number of seconds before a policy is eligible for reevaluation in watch mode (throttles frequently evaluated // policies) EvalBackoffSeconds uint32 + ItemLimiters *PerItemRateLimiter[reconcile.Request] // lastEvaluatedCache contains the value of the last known ConfigurationPolicy resourceVersion per UID. // This is a workaround to account for race conditions where the status is updated but the controller-runtime cache // has not updated yet. @@ -212,6 +212,18 @@ type ConfigurationPolicyReconciler struct { // Reconcile is responsible for evaluating and rescheduling ConfigurationPolicy evaluations. func (r *ConfigurationPolicyReconciler) Reconcile(ctx context.Context, request ctrl.Request) (ctrl.Result, error) { log := log.WithValues("name", request.Name, "namespace", request.Namespace) + + if r.ItemLimiters != nil { + limiter := r.ItemLimiters.GetLimiter(request) + + // Check if a token is available; if so, `Allow` will spend it and return true + if limiter.Tokens() < 1.0 || !limiter.Allow() { + log.V(2).Info("Throttling policy evaluation") + + return reconcile.Result{RequeueAfter: time.Second * time.Duration(r.EvalBackoffSeconds)}, nil + } + } + policy := &policyv1.ConfigurationPolicy{} cleanup, err := r.cleanupImmediately() @@ -469,13 +481,6 @@ func (r *ConfigurationPolicyReconciler) shouldEvaluatePolicy( } } - lastEvaluated, err := time.Parse(time.RFC3339, policy.Status.LastEvaluated) - if err != nil { - log.Error(err, "The policy has an invalid status.lastEvaluated value. Will evaluate it now.") - - return true, 0 - } - usesSelector := policy.Spec.NamespaceSelector.LabelSelector != nil || len(policy.Spec.NamespaceSelector.Include) != 0 @@ -508,20 +513,6 @@ func (r *ConfigurationPolicyReconciler) shouldEvaluatePolicy( return false, 0 case errors.Is(getIntervalErr, policyv1.ErrIsWatch): - minNextEval := lastEvaluated.Add(time.Second * time.Duration(r.EvalBackoffSeconds)) - durationLeft := minNextEval.Sub(now) - - if durationLeft > 0 { - log.V(1).Info( - "The policy evaluation is configured for a watch event but rescheduling the evaluation due to the "+ - "configured evaluation backoff", - "evaluationBackoffSeconds", r.EvalBackoffSeconds, - "remainingSeconds", math.Round(durationLeft.Seconds()), - ) - - return false, durationLeft - } - log.V(1).Info("The policy evaluation is configured for a watch event. Will evaluate now.") return true, 0 @@ -536,6 +527,16 @@ func (r *ConfigurationPolicyReconciler) shouldEvaluatePolicy( return true, 0 } + // At this point, we have a valid evaluation interval, we can now determine + // how long we need to wait (if at all). + + lastEvaluated, err := time.Parse(time.RFC3339, policy.Status.LastEvaluated) + if err != nil { + log.Error(err, "The policy has an invalid status.lastEvaluated value. Will evaluate it now.") + + return true, 0 + } + nextEvaluation := lastEvaluated.Add(interval) durationLeft := nextEvaluation.Sub(now) diff --git a/controllers/ratelimit.go b/controllers/ratelimit.go new file mode 100644 index 00000000..18a500fb --- /dev/null +++ b/controllers/ratelimit.go @@ -0,0 +1,44 @@ +// Copyright (c) 2025 Red Hat, Inc. +// Copyright Contributors to the Open Cluster Management project + +package controllers + +import ( + "sync" + "time" + + "golang.org/x/time/rate" +) + +func NewPerItemRateLimiter[T comparable](backoffSeconds uint32, burst int) *PerItemRateLimiter[T] { + return &PerItemRateLimiter[T]{ + lock: sync.RWMutex{}, + limiters: map[T]*rate.Limiter{}, + rate: rate.Every(time.Second * time.Duration(backoffSeconds)), + burst: burst, + } +} + +type PerItemRateLimiter[T comparable] struct { + lock sync.RWMutex + limiters map[T]*rate.Limiter + rate rate.Limit + burst int +} + +func (l *PerItemRateLimiter[T]) GetLimiter(item T) *rate.Limiter { + l.lock.RLock() + limiter, exists := l.limiters[item] + l.lock.RUnlock() + + if !exists { + l.lock.Lock() + + limiter = rate.NewLimiter(l.rate, l.burst) + l.limiters[item] = limiter + + l.lock.Unlock() + } + + return limiter +} diff --git a/go.mod b/go.mod index 4b9b5b00..9aa52c58 100644 --- a/go.mod +++ b/go.mod @@ -21,6 +21,7 @@ require ( github.com/stretchr/testify v1.11.1 go.uber.org/zap v1.27.0 golang.org/x/mod v0.29.0 + golang.org/x/time v0.14.0 gopkg.in/yaml.v3 v3.0.1 k8s.io/api v0.33.5 k8s.io/apiextensions-apiserver v0.33.5 @@ -110,7 +111,6 @@ require ( golang.org/x/sys v0.37.0 // indirect golang.org/x/term v0.35.0 // indirect golang.org/x/text v0.29.0 // indirect - golang.org/x/time v0.14.0 // indirect golang.org/x/tools v0.37.0 // indirect gomodules.xyz/jsonpatch/v2 v2.5.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20250929231259-57b25ae835d4 // indirect diff --git a/main.go b/main.go index 383331ab..d28f7032 100644 --- a/main.go +++ b/main.go @@ -515,6 +515,7 @@ func main() { EnableMetrics: opts.enableMetrics, UninstallMode: beingUninstalled, EvalBackoffSeconds: opts.evalBackoffSeconds, + ItemLimiters: controllers.NewPerItemRateLimiter[reconcile.Request](opts.evalBackoffSeconds, 1), HubDynamicWatcher: configPolHubDynamicWatcher, HubClient: hubClient, ClusterName: opts.clusterName, @@ -795,8 +796,8 @@ func parseOpts(flags *pflag.FlagSet, args []string) *ctrlOpts { flags.Uint32Var( &opts.evalBackoffSeconds, "evaluation-backoff", - 10, - "The number of seconds before a policy is eligible for reevaluation in watch mode (throttles frequently "+ + 5, + "The number of seconds before a policy is eligible for reevaluation (throttles frequently "+ "evaluated policies)", ) diff --git a/main_test.go b/main_test.go index 74b48474..7dcf311b 100644 --- a/main_test.go +++ b/main_test.go @@ -21,8 +21,8 @@ func TestRunMain(t *testing.T) { "--leader-elect=false", fmt.Sprintf("--target-kubeconfig-path=%s", os.Getenv("TARGET_KUBECONFIG_PATH")), "--log-level=1", - // Speed up the tests by not throttling the policy evaluations - "--evaluation-backoff=1", + // Speed up the tests by not throttling the policy evaluations very much + "--evaluation-backoff=2", "--enable-operator-policy=true", ) diff --git a/test/e2e/case47_ratelimit_test.go b/test/e2e/case47_ratelimit_test.go new file mode 100644 index 00000000..03a999dc --- /dev/null +++ b/test/e2e/case47_ratelimit_test.go @@ -0,0 +1,95 @@ +// Copyright (c) 2025 Red Hat, Inc. +// Copyright Contributors to the Open Cluster Management project + +package e2e + +import ( + "fmt" + "strconv" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + + "open-cluster-management.io/config-policy-controller/test/utils" +) + +var _ = Describe("Test config policy ratelimiting", Ordered, func() { + const ( + policyName = "case47-cfgpolicy" + policyYaml = "../resources/case47_ratelimit/" + policyName + ".yaml" + configMapName = "case47-configmap-to-watch" + configMapYaml = "../resources/case47_ratelimit/" + configMapName + ".yaml" + managedCMName = "case47-configmap-from-policy" + ) + + metricCheck := func(metricName string, label string, value string) (float64, error) { + metric := utils.GetMetrics( + metricName, fmt.Sprintf(`%s=\"%s\"`, label, value)) + if len(metric) == 0 { + return 0, fmt.Errorf("failed to retrieve any %s metric", metricName) + } + metricVal, err := strconv.ParseFloat(metric[0], 64) + if err != nil { + return 0, fmt.Errorf("error converting metric: %w", err) + } + + return metricVal, nil + } + + BeforeAll(func() { + By("Creating " + policyYaml) + utils.Kubectl("apply", "-f", policyYaml, "-n", testNamespace) + By("Creating " + configMapYaml) + utils.Kubectl("apply", "-f", configMapYaml) // The YAML specifies namespace "default" + }) + + It("should initially have a small number of evaluations", func() { + Eventually( + metricCheck, 10, 2, + ).WithArguments("config_policy_evaluation_total", "name", policyName).Should(BeNumerically("<", 4)) + + Consistently( + metricCheck, 10, 2, + ).WithArguments("config_policy_evaluation_total", "name", policyName).Should(BeNumerically("<", 4)) + }) + + value := 0 + + It("should limit the number of evaluations when a watched object changes frequently", func() { + start := time.Now() + + By("Updating the watched configmap frequently for 10 seconds") + for start.Add(10 * time.Second).After(time.Now()) { + value++ + utils.Kubectl("patch", "configmap", configMapName, "--type=json", "-p", + `[{"op": "replace", "path": "/data/foo", "value": "`+strconv.Itoa(value)+`"}]`) + time.Sleep(150 * time.Millisecond) + } + + Consistently( + metricCheck, 10, 2, + ).WithArguments("config_policy_evaluation_total", "name", policyName).Should(BeNumerically("<", 12)) + }) + + It("should have updated the object to the final value", func() { + By("Verifying the configmap has bar=" + strconv.Itoa(value)) + Eventually(func(g Gomega) { + cm := utils.GetWithTimeout(clientManagedDynamic, gvrConfigMap, + managedCMName, "default", true, defaultTimeoutSeconds) + g.Expect(cm).NotTo(BeNil()) + + val, found, err := unstructured.NestedString(cm.Object, "data", "bar") + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(found).To(BeTrue()) + g.Expect(val).Should(Equal(strconv.Itoa(value))) + }, defaultTimeoutSeconds, 1).Should(Succeed()) + }) + + AfterAll(func() { + utils.KubectlDelete("-n", testNamespace, "-f", policyYaml) + utils.KubectlDelete("-f", configMapYaml) + utils.KubectlDelete("configmap", "-n", "default", "case47-configmap-from-policy") + }) +}) diff --git a/test/resources/case47_ratelimit/case47-cfgpolicy.yaml b/test/resources/case47_ratelimit/case47-cfgpolicy.yaml new file mode 100644 index 00000000..6725bab3 --- /dev/null +++ b/test/resources/case47_ratelimit/case47-cfgpolicy.yaml @@ -0,0 +1,16 @@ +apiVersion: policy.open-cluster-management.io/v1 +kind: ConfigurationPolicy +metadata: + name: case47-cfgpolicy +spec: + remediationAction: enforce + object-templates: + - complianceType: musthave + objectDefinition: + apiVersion: v1 + kind: ConfigMap + metadata: + name: case47-configmap-from-policy + namespace: default + data: + bar: '{{ fromConfigMap "default" "case47-configmap-to-watch" "foo" }}' diff --git a/test/resources/case47_ratelimit/case47-configmap-to-watch.yaml b/test/resources/case47_ratelimit/case47-configmap-to-watch.yaml new file mode 100644 index 00000000..dc2e6946 --- /dev/null +++ b/test/resources/case47_ratelimit/case47-configmap-to-watch.yaml @@ -0,0 +1,7 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: case47-configmap-to-watch + namespace: default +data: + foo: "0"