Skip to content

Commit 5db74da

Browse files
committed
fix: change the wrong enqueue rate
Signed-off-by: whosefriendA <[email protected]>
1 parent e29b12c commit 5db74da

File tree

3 files changed

+438
-47
lines changed

3 files changed

+438
-47
lines changed

pkg/controllers/cluster/dynamic_rate_limiter_test.go

Lines changed: 113 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,10 @@ limitations under the License.
1717
package cluster
1818

1919
import (
20+
"context"
2021
"errors"
22+
"math"
23+
"sync"
2124
"testing"
2225
"time"
2326

@@ -26,8 +29,11 @@ import (
2629
"k8s.io/apimachinery/pkg/runtime"
2730
"k8s.io/apimachinery/pkg/runtime/schema"
2831
"k8s.io/client-go/tools/cache"
32+
"k8s.io/client-go/util/workqueue"
2933

3034
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
35+
"github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag"
36+
"github.com/karmada-io/karmada/pkg/util"
3137
gmtesting "github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager/testing"
3238
)
3339

@@ -57,9 +63,8 @@ func makeCluster(name string, ready bool) *clusterv1alpha1.Cluster {
5763
ObjectMeta: metav1.ObjectMeta{Name: name},
5864
Status: clusterv1alpha1.ClusterStatus{
5965
Conditions: []metav1.Condition{{
60-
Type: string(clusterv1alpha1.ClusterConditionReady),
61-
Status: condStatus,
62-
LastTransitionTime: metav1.Now(),
66+
Type: clusterv1alpha1.ClusterConditionReady,
67+
Status: condStatus,
6368
}},
6469
},
6570
}
@@ -101,7 +106,6 @@ func TestDynamicRateLimiter_When_Scenarios(t *testing.T) {
101106
name: "unhealthy large-scale => secondary rate",
102107
objs: func() []runtime.Object {
103108
var out []runtime.Object
104-
// 11 clusters total (> threshold 10), 5 unhealthy -> failureRate ~0.45 > 0.3
105109
for i := 0; i < 6; i++ {
106110
out = append(out, makeCluster("h"+string(rune('a'+i)), true))
107111
}
@@ -116,7 +120,7 @@ func TestDynamicRateLimiter_When_Scenarios(t *testing.T) {
116120
},
117121
{
118122
name: "unhealthy small-scale => halt (max delay)",
119-
objs: []runtime.Object{makeCluster("c1", false), makeCluster("c2", true)}, // 1/2 unhealthy -> 0.5 > 0.3, total=2 <= threshold
123+
objs: []runtime.Object{makeCluster("c1", false), makeCluster("c2", true)},
120124
listErr: nil,
121125
opts: EvictionQueueOptions{ResourceEvictionRate: 10, SecondaryResourceEvictionRate: 1, UnhealthyClusterThreshold: 0.3, LargeClusterNumThreshold: 10},
122126
expected: maxEvictionDelay,
@@ -132,6 +136,7 @@ func TestDynamicRateLimiter_When_Scenarios(t *testing.T) {
132136

133137
for _, tt := range tests {
134138
t.Run(tt.name, func(t *testing.T) {
139+
t.Logf("Testing scenario: %s", tt.name)
135140
mgr := gmtesting.NewFakeSingleClusterManager(true, true, func(gvr schema.GroupVersionResource) cache.GenericLister {
136141
if gvr != clusterGVR {
137142
return nil
@@ -140,9 +145,112 @@ func TestDynamicRateLimiter_When_Scenarios(t *testing.T) {
140145
})
141146
limiter := NewDynamicRateLimiter[any](mgr, tt.opts)
142147
d := limiter.When(struct{}{})
148+
t.Logf("Got delay: %v, Expected delay: %v", d, tt.expected)
143149
if d != tt.expected {
144150
t.Fatalf("unexpected duration: got %v, want %v", d, tt.expected)
145151
}
146152
})
147153
}
148154
}
155+
156+
// TestGracefulEvictionRateLimiter_ExponentialBackoff Validation When a task continues to fail,
157+
// The combined rate limiter correctly exhibits exponential avoidance behavior.
158+
func TestGracefulEvictionRateLimiter_ExponentialBackoff(t *testing.T) {
159+
rateLimiterOpts := ratelimiterflag.Options{}
160+
const defaultBaseDelay = 5 * time.Millisecond
161+
const defaultMaxDelay = 1000 * time.Second
162+
163+
evictionOpts := EvictionQueueOptions{
164+
ResourceEvictionRate: 50, // 20ms dynamic delay
165+
SecondaryResourceEvictionRate: 0.1,
166+
UnhealthyClusterThreshold: 0.55,
167+
LargeClusterNumThreshold: 10,
168+
}
169+
170+
t.Logf("Testing with default exponential backoff options: BaseDelay=%v, MaxDelay=%v", defaultBaseDelay, defaultMaxDelay)
171+
expectedDynamicDelay := time.Second / time.Duration(evictionOpts.ResourceEvictionRate)
172+
t.Logf("Dynamic limiter is in healthy mode, providing a base delay of: %v (overridden for test)", expectedDynamicDelay)
173+
174+
healthyClusters := []runtime.Object{makeCluster("c1", true), makeCluster("c2", true)}
175+
clusterGVR := clusterv1alpha1.SchemeGroupVersion.WithResource("clusters")
176+
mgr := gmtesting.NewFakeSingleClusterManager(true, true, func(gvr schema.GroupVersionResource) cache.GenericLister {
177+
if gvr != clusterGVR {
178+
return nil
179+
}
180+
return &fakeGenericLister{objects: healthyClusters, err: nil}
181+
})
182+
183+
limiter := NewGracefulEvictionRateLimiter[any](mgr, evictionOpts, rateLimiterOpts)
184+
queue := workqueue.NewTypedRateLimitingQueueWithConfig[any](limiter, workqueue.TypedRateLimitingQueueConfig[any]{
185+
Name: "backoff-test-final",
186+
})
187+
188+
var (
189+
mu sync.Mutex
190+
attemptTimes []time.Time
191+
)
192+
193+
reconcileFunc := util.ReconcileFunc(func(key util.QueueKey) error {
194+
mu.Lock()
195+
attemptTimes = append(attemptTimes, time.Now())
196+
mu.Unlock()
197+
return errors.New("always fail to trigger backoff")
198+
})
199+
200+
worker := &evictionWorker{
201+
name: "backoff-worker",
202+
keyFunc: func(obj interface{}) (util.QueueKey, error) { return obj, nil },
203+
reconcileFunc: reconcileFunc,
204+
queue: queue,
205+
}
206+
207+
testDuration := 3 * time.Second
208+
ctx, cancel := context.WithTimeout(context.Background(), testDuration)
209+
defer cancel()
210+
worker.Run(ctx, 1)
211+
worker.Add("test-item")
212+
<-ctx.Done()
213+
214+
mu.Lock()
215+
defer mu.Unlock()
216+
217+
t.Logf("Total attempts in %v: %d", testDuration, len(attemptTimes))
218+
if len(attemptTimes) < 5 {
219+
t.Fatalf("Expected at least 5 attempts to observe backoff, but got %d", len(attemptTimes))
220+
}
221+
222+
t.Log("--- Analyzing delays between attempts ---")
223+
var lastObservedDelay time.Duration
224+
for i := 1; i < len(attemptTimes); i++ {
225+
observedDelay := attemptTimes[i].Sub(attemptTimes[i-1])
226+
227+
numRequeues := i - 1
228+
expectedBackoffNs := float64(defaultBaseDelay.Nanoseconds()) * math.Pow(2, float64(numRequeues))
229+
if expectedBackoffNs > float64(defaultMaxDelay.Nanoseconds()) {
230+
expectedBackoffNs = float64(defaultMaxDelay.Nanoseconds())
231+
}
232+
expectedBackoffDelay := time.Duration(expectedBackoffNs)
233+
234+
var expectedFinalDelay time.Duration
235+
if expectedBackoffDelay > expectedDynamicDelay {
236+
expectedFinalDelay = expectedBackoffDelay
237+
} else {
238+
expectedFinalDelay = expectedDynamicDelay
239+
}
240+
241+
t.Logf("Attempt %2d: Observed Delay=%-18v | Expected Backoff Delay=%-18v | Effective Expected Delay >= %-18v",
242+
i+1, observedDelay, expectedBackoffDelay, expectedFinalDelay)
243+
244+
if i > 1 {
245+
// Only check for a strict increase if the backoff delay is larger than the dynamic delay.
246+
if expectedBackoffDelay > expectedDynamicDelay && observedDelay < lastObservedDelay {
247+
t.Errorf("Attempt %d: Delay did not increase as expected after backoff became dominant. Previous: %v, Current: %v", i+1, lastObservedDelay, observedDelay)
248+
}
249+
}
250+
251+
if observedDelay < expectedFinalDelay*9/10 {
252+
t.Errorf("Attempt %d: Observed delay %v is significantly less than the effective expected delay %v", i+1, observedDelay, expectedFinalDelay)
253+
}
254+
lastObservedDelay = observedDelay
255+
}
256+
}

pkg/controllers/cluster/eviction_worker.go

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,13 @@ type evictionWorker struct {
5454
reconcileFunc util.ReconcileFunc
5555
resourceKindFunc func(key interface{}) (clusterName, resourceKind string)
5656
queue workqueue.TypedRateLimitingInterface[any]
57+
// pacer is the combined limiter (dynamic + default) used to throttle the
58+
// processing throughput between items even when initial enqueues are immediate.
59+
// It ensures overall throughput follows the dynamic rate under healthy/unhealthy states.
60+
pacer workqueue.TypedRateLimiter[any]
61+
// pacerKey is a sentinel key used for pacing. We call When/Forget on this key
62+
// for each successfully processed item to avoid exponential backoff accumulation.
63+
pacerKey any
5764
}
5865

5966
// EvictionWorkerOptions configures a new EvictionWorker instance.
@@ -82,7 +89,7 @@ type EvictionWorkerOptions struct {
8289

8390
// NewEvictionWorker creates a new EvictionWorker with dynamic rate limiting.
8491
func NewEvictionWorker(opts EvictionWorkerOptions) util.AsyncWorker {
85-
rateLimiter := NewGracefulEvictionRateLimiter[interface{}](
92+
rateLimiter := NewGracefulEvictionRateLimiter[any](
8693
opts.InformerManager,
8794
opts.EvictionQueueOptions,
8895
opts.RateLimiterOptions,
@@ -96,6 +103,8 @@ func NewEvictionWorker(opts EvictionWorkerOptions) util.AsyncWorker {
96103
queue: workqueue.NewTypedRateLimitingQueueWithConfig[any](rateLimiter, workqueue.TypedRateLimitingQueueConfig[any]{
97104
Name: opts.Name,
98105
}),
106+
pacer: rateLimiter,
107+
pacerKey: struct{}{},
99108
}
100109
}
101110

@@ -190,6 +199,15 @@ func (w *evictionWorker) processNextWorkItem(_ context.Context) bool {
190199
// Decrease resource kind count only after successful processing
191200
metrics.RecordEvictionKindMetrics(clusterName, resourceKind, false)
192201

202+
// Apply pacing between items to enforce overall throughput, based on the
203+
// combined limiter (dynamic health-aware + default backoff/bucket).
204+
if w.pacer != nil {
205+
if delay := w.pacer.When(w.pacerKey); delay > 0 {
206+
time.Sleep(delay)
207+
}
208+
w.pacer.Forget(w.pacerKey)
209+
}
210+
193211
return true
194212
}
195213

0 commit comments

Comments
 (0)