Skip to content

Commit f7f4225

Browse files
committed
feat: dynamic ratelimiter for gracefuleviction
Signed-off-by: whosefriendA <[email protected]>
1 parent 1c6f1db commit f7f4225

File tree

9 files changed

+589
-14
lines changed

9 files changed

+589
-14
lines changed

cmd/controller-manager/app/controllermanager.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,7 @@ func startClusterController(ctx controllerscontext.Context) (enabled bool, err e
299299
RateLimiterOptions: ctx.Opts.RateLimiterOptions,
300300
EnableNoExecuteTaintEviction: ctx.Opts.FailoverConfiguration.EnableNoExecuteTaintEviction,
301301
NoExecuteTaintEvictionPurgeMode: ctx.Opts.FailoverConfiguration.NoExecuteTaintEvictionPurgeMode,
302+
EvictionQueueOptions: ctx.Opts.EvictionQueueOptions,
302303
}
303304
if err := taintManager.SetupWithManager(mgr); err != nil {
304305
return false, err
@@ -924,6 +925,7 @@ func setupControllers(ctx context.Context, mgr controllerruntime.Manager, opts *
924925
HPAControllerConfiguration: opts.HPAControllerConfiguration,
925926
FederatedResourceQuotaOptions: opts.FederatedResourceQuotaOptions,
926927
FailoverConfiguration: opts.FailoverOptions,
928+
EvictionQueueOptions: opts.EvictionQueueOptions,
927929
},
928930
Context: ctx,
929931
DynamicClientSet: dynamicClientSet,

cmd/controller-manager/app/options/options.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ package options
1818

1919
import (
2020
"fmt"
21+
evictionQueueconfig "github.com/karmada-io/karmada/pkg/controllers/cluster/evictionqueue_config"
22+
2123
"regexp"
2224
"strings"
2325
"time"
@@ -148,6 +150,8 @@ type Options struct {
148150
FederatedResourceQuotaOptions FederatedResourceQuotaOptions
149151
// FailoverOptions holds the Failover configurations.
150152
FailoverOptions FailoverOptions
153+
// EvictionQueueOptions holds the GracefulEviction
154+
EvictionQueueOptions evictionQueueconfig.EvictionQueueOptions
151155
}
152156

153157
// NewOptions builds an empty options.
@@ -234,6 +238,7 @@ func (o *Options) AddFlags(flags *pflag.FlagSet, allControllers, disabledByDefau
234238
o.HPAControllerConfiguration.AddFlags(flags)
235239
o.FederatedResourceQuotaOptions.AddFlags(flags)
236240
o.FailoverOptions.AddFlags(flags)
241+
o.EvictionQueueOptions.AddFlags(flags)
237242
features.FeatureGate.AddFlag(flags)
238243
}
239244

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
/*
2+
Copyright 2022 The Karmada Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package cluster
18+
19+
import (
20+
"time"
21+
22+
"k8s.io/apimachinery/pkg/labels"
23+
"k8s.io/client-go/util/workqueue"
24+
"k8s.io/klog/v2"
25+
26+
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
27+
config "github.com/karmada-io/karmada/pkg/controllers/cluster/evictionqueue_config"
28+
"github.com/karmada-io/karmada/pkg/metrics"
29+
"github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag"
30+
"github.com/karmada-io/karmada/pkg/util"
31+
"github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager"
32+
)
33+
34+
// maxEvictionDelay is the maximum delay for eviction when the rate is 0
35+
const maxEvictionDelay = 1000 * time.Second
36+
37+
// DynamicRateLimiter adjusts its rate based on the overall health of clusters.
38+
// It implements the workqueue.RateLimiter interface with dynamic behavior.
39+
type DynamicRateLimiter[T comparable] struct {
40+
resourceEvictionRate float32
41+
secondaryResourceEvictionRate float32
42+
unhealthyClusterThreshold float32
43+
largeClusterNumThreshold int
44+
informerManager genericmanager.SingleClusterInformerManager
45+
}
46+
47+
// NewDynamicRateLimiter creates a new DynamicRateLimiter with the given options.
48+
func NewDynamicRateLimiter[T comparable](informerManager genericmanager.SingleClusterInformerManager, opts config.EvictionQueueOptions) workqueue.TypedRateLimiter[T] {
49+
return &DynamicRateLimiter[T]{
50+
resourceEvictionRate: opts.ResourceEvictionRate,
51+
secondaryResourceEvictionRate: opts.SecondaryResourceEvictionRate,
52+
unhealthyClusterThreshold: opts.UnhealthyClusterThreshold,
53+
largeClusterNumThreshold: opts.LargeClusterNumThreshold,
54+
informerManager: informerManager,
55+
}
56+
}
57+
58+
// When determines how long to wait before processing an item.
59+
// Returns a longer delay when the system is unhealthy.
60+
func (d *DynamicRateLimiter[T]) When(item T) time.Duration {
61+
currentRate := d.getCurrentRate()
62+
if currentRate == 0 {
63+
return maxEvictionDelay
64+
}
65+
return time.Duration(1 / currentRate * float32(time.Second))
66+
}
67+
68+
// getCurrentRate calculates the appropriate rate based on cluster health:
69+
// - Normal rate when system is healthy
70+
// - Secondary rate when system is unhealthy but large-scale
71+
// - Zero (halt evictions) when system is unhealthy and small-scale
72+
func (d *DynamicRateLimiter[T]) getCurrentRate() float32 {
73+
clusterGVR := clusterv1alpha1.SchemeGroupVersion.WithResource("clusters")
74+
75+
var lister = d.informerManager.Lister(clusterGVR)
76+
if lister == nil {
77+
klog.Errorf("Failed to get cluster lister, halting eviction for safety")
78+
return 0
79+
}
80+
81+
clusters, err := lister.List(labels.Everything())
82+
if err != nil {
83+
klog.Errorf("Failed to list clusters from informer cache: %v, halting eviction for safety", err)
84+
return 0
85+
}
86+
87+
totalClusters := len(clusters)
88+
if totalClusters == 0 {
89+
return d.resourceEvictionRate
90+
}
91+
92+
unhealthyClusters := 0
93+
for _, clusterObj := range clusters {
94+
cluster, ok := clusterObj.(*clusterv1alpha1.Cluster)
95+
if !ok {
96+
continue
97+
}
98+
if !util.IsClusterReady(&cluster.Status) {
99+
unhealthyClusters++
100+
}
101+
}
102+
103+
// Update metrics
104+
failureRate := float32(unhealthyClusters) / float32(totalClusters)
105+
metrics.RecordClusterHealthMetrics(unhealthyClusters, float64(failureRate))
106+
107+
// Determine rate based on health status
108+
isUnhealthy := failureRate > d.unhealthyClusterThreshold
109+
if !isUnhealthy {
110+
return d.resourceEvictionRate
111+
}
112+
113+
isLargeScale := totalClusters > d.largeClusterNumThreshold
114+
if isLargeScale {
115+
klog.V(2).Infof("System is unhealthy (failure rate: %.2f), downgrading eviction rate to secondary rate: %.2f/s",
116+
failureRate, d.secondaryResourceEvictionRate)
117+
return d.secondaryResourceEvictionRate
118+
}
119+
120+
klog.V(2).Infof("System is unhealthy (failure rate: %.2f) and instance is small, halting eviction.", failureRate)
121+
return 0
122+
}
123+
124+
// Forget is a no-op as this rate limiter doesn't track individual items.
125+
func (d *DynamicRateLimiter[T]) Forget(item T) {
126+
// No-op
127+
}
128+
129+
// NumRequeues always returns 0 as this rate limiter doesn't track retries.
130+
func (d *DynamicRateLimiter[T]) NumRequeues(item T) int {
131+
return 0
132+
}
133+
134+
// NewGracefulEvictionRateLimiter creates a combined rate limiter for eviction.
135+
// It uses the maximum delay from both dynamic and default rate limiters to ensure
136+
// both cluster health and retry backoff are considered.
137+
func NewGracefulEvictionRateLimiter[T comparable](
138+
informerManager genericmanager.SingleClusterInformerManager,
139+
evictionOpts config.EvictionQueueOptions,
140+
rateLimiterOpts ratelimiterflag.Options) workqueue.TypedRateLimiter[T] {
141+
142+
dynamicLimiter := NewDynamicRateLimiter[T](informerManager, evictionOpts)
143+
defaultLimiter := ratelimiterflag.DefaultControllerRateLimiter[T](rateLimiterOpts)
144+
return workqueue.NewTypedMaxOfRateLimiter[T](dynamicLimiter, defaultLimiter)
145+
}
Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
/*
2+
Copyright 2022 The Karmada Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package cluster
18+
19+
import (
20+
"context"
21+
"time"
22+
23+
"k8s.io/client-go/util/workqueue"
24+
"k8s.io/klog/v2"
25+
26+
config "github.com/karmada-io/karmada/pkg/controllers/cluster/evictionqueue_config"
27+
"github.com/karmada-io/karmada/pkg/metrics"
28+
"github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag"
29+
"github.com/karmada-io/karmada/pkg/util"
30+
"github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager"
31+
)
32+
33+
// EvictionWorker enhances AsyncWorker with dynamic rate limiting and metrics
34+
// for eviction operations. It provides a queue that adjusts its processing rate
35+
// based on cluster health status.
36+
type EvictionWorker interface {
37+
util.AsyncWorker
38+
}
39+
40+
type evictionWorker struct {
41+
name string
42+
keyFunc util.KeyFunc
43+
reconcileFunc util.ReconcileFunc
44+
resourceKindFunc func(key interface{}) (clusterName, resourceKind string)
45+
queue workqueue.TypedRateLimitingInterface[any]
46+
}
47+
48+
// EvictionWorkerOptions configures a new EvictionWorker instance.
49+
type EvictionWorkerOptions struct {
50+
// Name is the queue's name used for metrics and logging
51+
Name string
52+
53+
// KeyFunc generates keys from objects for queue operations
54+
KeyFunc util.KeyFunc
55+
56+
// ReconcileFunc processes keys from the queue
57+
ReconcileFunc util.ReconcileFunc
58+
59+
// ResourceKindFunc returns resource metadata for metrics collection
60+
ResourceKindFunc func(key interface{}) (clusterName, resourceKind string)
61+
62+
// InformerManager provides cluster information for dynamic rate limiting
63+
InformerManager genericmanager.SingleClusterInformerManager
64+
65+
// EvictionQueueOptions configures dynamic rate limiting behavior
66+
EvictionQueueOptions config.EvictionQueueOptions
67+
68+
// RateLimiterOptions configures general rate limiter behavior
69+
RateLimiterOptions ratelimiterflag.Options
70+
}
71+
72+
// NewEvictionWorker creates a new EvictionWorker with dynamic rate limiting.
73+
func NewEvictionWorker(opts EvictionWorkerOptions) EvictionWorker {
74+
rateLimiter := NewGracefulEvictionRateLimiter[interface{}](
75+
opts.InformerManager,
76+
opts.EvictionQueueOptions,
77+
opts.RateLimiterOptions,
78+
)
79+
80+
return &evictionWorker{
81+
name: opts.Name,
82+
keyFunc: opts.KeyFunc,
83+
reconcileFunc: opts.ReconcileFunc,
84+
resourceKindFunc: opts.ResourceKindFunc,
85+
queue: workqueue.NewRateLimitingQueueWithConfig(rateLimiter, workqueue.RateLimitingQueueConfig{
86+
Name: opts.Name,
87+
}),
88+
}
89+
}
90+
91+
// Enqueue converts an object to a key and adds it to the queue.
92+
func (w *evictionWorker) Enqueue(obj interface{}) {
93+
key, err := w.keyFunc(obj)
94+
if err != nil {
95+
klog.Errorf("Failed to generate key for obj: %+v, err: %v", obj, err)
96+
return
97+
}
98+
99+
if key == nil {
100+
return
101+
}
102+
103+
w.Add(key)
104+
}
105+
106+
// Add puts an item into the queue and updates metrics.
107+
func (w *evictionWorker) Add(item interface{}) {
108+
if item == nil {
109+
klog.Warningf("Ignore nil item from queue")
110+
return
111+
}
112+
113+
w.queue.Add(item)
114+
metrics.RecordEvictionQueueMetrics(w.name, float64(w.queue.Len()))
115+
116+
// Update resource kind metrics if possible
117+
if w.resourceKindFunc != nil {
118+
clusterName, resourceKind := w.resourceKindFunc(item)
119+
metrics.RecordEvictionKindMetrics(clusterName, resourceKind, true)
120+
}
121+
}
122+
123+
// AddAfter adds an item to the queue after a delay and updates metrics.
124+
func (w *evictionWorker) AddAfter(item interface{}, duration time.Duration) {
125+
if item == nil {
126+
klog.Warningf("Ignore nil item from queue")
127+
return
128+
}
129+
130+
w.queue.AddAfter(item, duration)
131+
metrics.RecordEvictionQueueMetrics(w.name, float64(w.queue.Len()))
132+
133+
// Update resource kind metrics if possible
134+
if w.resourceKindFunc != nil {
135+
clusterName, resourceKind := w.resourceKindFunc(item)
136+
metrics.RecordEvictionKindMetrics(clusterName, resourceKind, true)
137+
}
138+
}
139+
140+
// worker processes items from the queue until the context is canceled.
141+
func (w *evictionWorker) worker(ctx context.Context) {
142+
for w.processNextWorkItem(ctx) {
143+
}
144+
}
145+
146+
// processNextWorkItem handles a single item from the queue with metrics tracking.
147+
// Returns false when the queue is shutting down, true otherwise.
148+
func (w *evictionWorker) processNextWorkItem(ctx context.Context) bool {
149+
key, quit := w.queue.Get()
150+
if quit {
151+
return false
152+
}
153+
defer w.queue.Done(key)
154+
155+
// Update queue metrics
156+
metrics.RecordEvictionQueueMetrics(w.name, float64(w.queue.Len()))
157+
158+
// Get resource metadata for metrics
159+
var clusterName, resourceKind string
160+
if w.resourceKindFunc != nil {
161+
clusterName, resourceKind = w.resourceKindFunc(key)
162+
}
163+
164+
// Process the item and measure latency
165+
startTime := time.Now()
166+
err := w.reconcileFunc(key)
167+
metrics.RecordEvictionProcessingMetrics(w.name, err, startTime)
168+
169+
if err != nil {
170+
// Requeue with rate limiting on error
171+
w.queue.AddRateLimited(key)
172+
// Item remains in queue, so don't decrease metrics count
173+
return true
174+
}
175+
176+
// Successfully processed
177+
w.queue.Forget(key)
178+
179+
// Decrease resource kind count only after successful processing
180+
metrics.RecordEvictionKindMetrics(clusterName, resourceKind, false)
181+
182+
return true
183+
}
184+
185+
// Run starts worker goroutines and ensures cleanup when context is canceled.
186+
func (w *evictionWorker) Run(ctx context.Context, workerNumber int) {
187+
klog.Infof("Starting %d workers for eviction worker %s", workerNumber, w.name)
188+
for i := 0; i < workerNumber; i++ {
189+
go w.worker(ctx)
190+
}
191+
192+
// Clean up when context is canceled
193+
go func() {
194+
<-ctx.Done()
195+
klog.Infof("Shutting down eviction worker %s", w.name)
196+
w.queue.ShutDown()
197+
}()
198+
}

0 commit comments

Comments
 (0)