Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions cmd/controller-manager/app/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,11 +294,18 @@ func startClusterController(ctx controllerscontext.Context) (enabled bool, err e
taintManager := &cluster.NoExecuteTaintManager{
Client: mgr.GetClient(),
EventRecorder: mgr.GetEventRecorderFor(cluster.TaintManagerName),
InformerManager: ctx.ControlPlaneInformerManager,
ClusterTaintEvictionRetryFrequency: 10 * time.Second,
ConcurrentReconciles: 3,
RateLimiterOptions: ctx.Opts.RateLimiterOptions,
EnableNoExecuteTaintEviction: ctx.Opts.FailoverConfiguration.EnableNoExecuteTaintEviction,
NoExecuteTaintEvictionPurgeMode: ctx.Opts.FailoverConfiguration.NoExecuteTaintEvictionPurgeMode,
EvictionQueueOptions: cluster.EvictionQueueOptions{
ResourceEvictionRate: ctx.Opts.FailoverConfiguration.ResourceEvictionRate,
SecondaryResourceEvictionRate: ctx.Opts.FailoverConfiguration.SecondaryResourceEvictionRate,
UnhealthyClusterThreshold: ctx.Opts.FailoverConfiguration.UnhealthyClusterThreshold,
LargeClusterNumThreshold: ctx.Opts.FailoverConfiguration.LargeClusterNumThreshold,
},
}
if err := taintManager.SetupWithManager(mgr); err != nil {
return false, err
Expand Down
19 changes: 19 additions & 0 deletions cmd/controller-manager/app/options/failover.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,21 @@ type FailoverOptions struct {
// and then triggers rescheduling to other clusters.
// Default: "Gracefully".
NoExecuteTaintEvictionPurgeMode string
// ResourceEvictionRate is the number of resources to be evicted per second.
// This is the default rate when the system is considered healthy.
ResourceEvictionRate float32
// SecondaryResourceEvictionRate is the secondary resource eviction rate.
// When the number of cluster failures in the Karmada instance exceeds the UnhealthyClusterThreshold,
// the resource eviction rate will be reduced to this secondary level.
SecondaryResourceEvictionRate float32
// UnhealthyClusterThreshold is the threshold of unhealthy clusters.
// If the ratio of unhealthy clusters to total clusters exceeds this threshold, there are too many cluster failures in the Karmada instance
// and the eviction rate will be downgraded to the secondary rate.
UnhealthyClusterThreshold float32
// LargeClusterNumThreshold is the threshold for a large-scale Karmada instance.
// When the number of clusters in the instance exceeds this threshold and the instance is unhealthy,
// the eviction rate is downgraded. For smaller instances that are unhealthy, eviction might be halted completely.
LargeClusterNumThreshold int
}

// AddFlags adds flags related to FailoverOptions for controller manager to the specified FlagSet.
Expand All @@ -45,6 +60,10 @@ func (o *FailoverOptions) AddFlags(flags *pflag.FlagSet) {

flags.BoolVar(&o.EnableNoExecuteTaintEviction, "enable-no-execute-taint-eviction", false, "Enables controller response to NoExecute taints on clusters, which triggers eviction of workloads without explicit tolerations. Given the impact of eviction caused by NoExecute Taint, this parameter is designed to remain disabled by default and requires careful evaluation by administrators before being enabled.\n")
flags.StringVar(&o.NoExecuteTaintEvictionPurgeMode, "no-execute-taint-eviction-purge-mode", "Gracefully", "Controls resource cleanup behavior for NoExecute-triggered evictions (only active when --enable-no-execute-taint-eviction=true). Supported values are \"Directly\", and \"Gracefully\". \"Directly\" mode directly evicts workloads first (risking temporary service interruption) and then triggers rescheduling to other clusters, while \"Gracefully\" mode first schedules workloads to new clusters and then cleans up original workloads after successful startup elsewhere to ensure service continuity.")
flags.Float32Var(&o.ResourceEvictionRate, "resource-eviction-rate", 0.5, "The number of resources to be evicted per second.")
flags.Float32Var(&o.SecondaryResourceEvictionRate, "secondary-resource-eviction-rate", 0.1, "The secondary resource eviction rate when the Karmada instance is unhealthy.")
flags.Float32Var(&o.UnhealthyClusterThreshold, "unhealthy-cluster-threshold", 0.55, "The unhealthy threshold of the cluster, if the ratio of unhealthy clusters to total clusters exceeds this threshold, the Karmada instance is considered unhealthy.")
flags.IntVar(&o.LargeClusterNumThreshold, "large-cluster-num-threshold", 10, "The large-scale threshold of the Karmada instance. When the number of clusters in a large-scale federation exceeds this threshold and the federation is unhealthy, the resource eviction rate will be reduced; otherwise, the eviction will be stopped.")
}

// Validate checks FailoverOptions and return a slice of found errs.
Expand Down
148 changes: 148 additions & 0 deletions pkg/controllers/cluster/dynamic_rate_limiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
Copyright 2025 The Karmada Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package cluster

import (
"time"

"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"

clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
"github.com/karmada-io/karmada/pkg/metrics"
"github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager"
)

// maxEvictionDelay is the maximum delay for eviction when the rate is 0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This variable refers to the eviction of one resource every 1000 seconds, right?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The maxEvictionDelay := 1000 * time.Second specifies the maximum wait time the queue will impose on an element when the calculated currentRate == 0 (i.e., eviction should be paused). Rather than setting a rate of "one evict every 1000 seconds," it uses a long delay to pause processing, waking up after 1000 seconds to re-evaluate the rate. Normally, the processing interval is time.Duration(1/currentRate * time.Second). For example, if currentRate = 5/s, the queue will process one element every 200ms.

const maxEvictionDelay = 1000 * time.Second

// DynamicRateLimiter adjusts its rate based on the overall health of clusters.
// It implements the workqueue.RateLimiter interface with dynamic behavior.
type DynamicRateLimiter[T comparable] struct {
resourceEvictionRate float32
secondaryResourceEvictionRate float32
unhealthyClusterThreshold float32
largeClusterNumThreshold int
informerManager genericmanager.SingleClusterInformerManager
}

// NewDynamicRateLimiter creates a new DynamicRateLimiter with the given options.
func NewDynamicRateLimiter[T comparable](informerManager genericmanager.SingleClusterInformerManager, opts EvictionQueueOptions) workqueue.TypedRateLimiter[T] {
return &DynamicRateLimiter[T]{
resourceEvictionRate: opts.ResourceEvictionRate,
secondaryResourceEvictionRate: opts.SecondaryResourceEvictionRate,
unhealthyClusterThreshold: opts.UnhealthyClusterThreshold,
largeClusterNumThreshold: opts.LargeClusterNumThreshold,
informerManager: informerManager,
}
}

// When determines how long to wait before processing an item.
// Returns a longer delay when the system is unhealthy.
func (d *DynamicRateLimiter[T]) When(_ T) time.Duration {
currentRate := d.getCurrentRate()
if currentRate == 0 {
return maxEvictionDelay
}
return time.Duration(1 / currentRate * float32(time.Second))
}

// getCurrentRate calculates the appropriate rate based on cluster health:
// - Normal rate when system is healthy
// - Secondary rate when system is unhealthy but large-scale
// - Zero (halt evictions) when system is unhealthy and small-scale
func (d *DynamicRateLimiter[T]) getCurrentRate() float32 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add test code for these new additions?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure ,I will upload the test code to this pr

clusterGVR := schema.GroupVersionResource{
Group: clusterv1alpha1.GroupName,
Version: "v1alpha1",
Resource: "clusters",
}

var lister = d.informerManager.Lister(clusterGVR)
if lister == nil {
klog.Errorf("Failed to get cluster lister, halting eviction for safety")
return 0
}

clusters, err := lister.List(labels.Everything())
if err != nil {
klog.Errorf("Failed to list clusters from informer cache: %v, halting eviction for safety", err)
return 0
}

totalClusters := len(clusters)
if totalClusters == 0 {
return d.resourceEvictionRate
}

unhealthyClusters := 0
for _, clusterObj := range clusters {
cluster, ok := clusterObj.(*clusterv1alpha1.Cluster)
if !ok {
continue
}
if !util.IsClusterReady(&cluster.Status) {
unhealthyClusters++
}
}

// Update metrics
failureRate := float32(unhealthyClusters) / float32(totalClusters)
metrics.RecordClusterHealthMetrics(unhealthyClusters, float64(failureRate))

// Determine rate based on health status
isUnhealthy := failureRate > d.unhealthyClusterThreshold
if !isUnhealthy {
return d.resourceEvictionRate
}

isLargeScale := totalClusters > d.largeClusterNumThreshold
if isLargeScale {
klog.V(2).Infof("System is unhealthy (failure rate: %.2f), downgrading eviction rate to secondary rate: %.2f/s",
failureRate, d.secondaryResourceEvictionRate)
return d.secondaryResourceEvictionRate
}

klog.V(2).Infof("System is unhealthy (failure rate: %.2f) and instance is small, halting eviction.", failureRate)
return 0
}

// Forget is a no-op as this rate limiter doesn't track individual items.
func (d *DynamicRateLimiter[T]) Forget(_ T) {
// No-op
}

// NumRequeues always returns 0 as this rate limiter doesn't track retries.
func (d *DynamicRateLimiter[T]) NumRequeues(_ T) int {
return 0
}

// NewGracefulEvictionRateLimiter creates a combined rate limiter for eviction.
// It uses the maximum delay from both dynamic and default rate limiters to ensure
// both cluster health and retry backoff are considered.
func NewGracefulEvictionRateLimiter[T comparable](
informerManager genericmanager.SingleClusterInformerManager,
evictionOpts EvictionQueueOptions,
rateLimiterOpts ratelimiterflag.Options) workqueue.TypedRateLimiter[T] {
dynamicLimiter := NewDynamicRateLimiter[T](informerManager, evictionOpts)
defaultLimiter := ratelimiterflag.DefaultControllerRateLimiter[T](rateLimiterOpts)
return workqueue.NewTypedMaxOfRateLimiter[T](dynamicLimiter, defaultLimiter)
}
Loading