From afd04e9659ea713ef3d5125a753dcbbcdbe4243e Mon Sep 17 00:00:00 2001 From: zach593 Date: Fri, 7 Nov 2025 17:12:51 +0800 Subject: [PATCH 1/2] feat: introduce asyncPriorityWorker in resourceDetector Processor Signed-off-by: zach593 --- pkg/detector/detector.go | 37 +++- pkg/detector/detector_test.go | 22 ++- .../ratelimiterflag/ratelimiterflag.go | 26 ++- .../ratelimiterflag/ratelimiterflag_test.go | 64 ++++++ pkg/util/fedinformer/handlers.go | 6 +- pkg/util/fedinformer/handlers_test.go | 2 +- pkg/util/worker.go | 83 +++++++- pkg/util/worker_test.go | 185 +++++++++++++++++- 8 files changed, 392 insertions(+), 33 deletions(-) create mode 100644 pkg/sharedcli/ratelimiterflag/ratelimiterflag_test.go diff --git a/pkg/detector/detector.go b/pkg/detector/detector.go index e246cf3e95b8..fe6db79eba87 100644 --- a/pkg/detector/detector.go +++ b/pkg/detector/detector.go @@ -75,7 +75,7 @@ type ResourceDetector struct { InformerManager genericmanager.SingleClusterInformerManager ControllerRuntimeCache ctrlcache.Cache EventHandler cache.ResourceEventHandler - Processor util.AsyncWorker + Processor util.AsyncPriorityWorker SkippedResourceConfig *util.SkippedResourceConfig SkippedPropagatingNamespaces []*regexp.Regexp // ResourceInterpreter knows the details of resource structure. @@ -136,6 +136,7 @@ func (d *ResourceDetector) Start(ctx context.Context) error { KeyFunc: ResourceItemKeyFunc, ReconcileFunc: d.Reconcile, RateLimiterOptions: d.RateLimiterOptions, + UsePriorityQueue: features.FeatureGate.Enabled(features.ControllerPriorityQueue), } d.Processor = util.NewAsyncWorker(detectorWorkerOptions) d.Processor.Run(ctx, d.ConcurrentResourceTemplateSyncs) @@ -310,12 +311,17 @@ func (d *ResourceDetector) EventFilter(obj interface{}) bool { } // OnAdd handles object add event and push the object to queue. -func (d *ResourceDetector) OnAdd(obj interface{}) { +func (d *ResourceDetector) OnAdd(obj interface{}, isInitialList bool) { runtimeObj, ok := obj.(runtime.Object) if !ok { return } - d.Processor.Enqueue(ResourceItem{Obj: runtimeObj}) + + var priority *int + if isInitialList { + priority = ptr.To(util.LowPriority) + } + d.Processor.EnqueueWithOpts(util.AddOpts{Priority: priority}, ResourceItem{Obj: runtimeObj}) } // OnUpdate handles object update event and push the object to queue. @@ -375,7 +381,7 @@ func (d *ResourceDetector) OnDelete(obj interface{}) { return } } - d.OnAdd(obj) + d.OnAdd(obj, false) } // LookForMatchedPolicy tries to find a policy for object referenced by object key. @@ -1492,9 +1498,22 @@ func (d *ResourceDetector) applyReplicaInterpretation(object *unstructured.Unstr // Therefore, only set ResourceChangeByKarmada in lazy activation mode. // For more details, see: https://github.com/karmada-io/karmada/issues/5996. func (d *ResourceDetector) enqueueResourceTemplateForPolicyChange(key keys.ClusterWideKey, pref policyv1alpha1.ActivationPreference) { - if util.IsLazyActivationEnabled(pref) { - d.Processor.Add(keys.ClusterWideKeyWithConfig{ClusterWideKey: key, ResourceChangeByKarmada: true}) - return - } - d.Processor.Add(keys.ClusterWideKeyWithConfig{ClusterWideKey: key}) + enqueueKey := keys.ClusterWideKeyWithConfig{ClusterWideKey: key, ResourceChangeByKarmada: util.IsLazyActivationEnabled(pref)} + // Use of low priority here is based on the following reasons: + // + // 1. The purpose of using low priority is to ensure that user-triggered changes + // are processed first when the controller restarts. + // At startup, items are enqueued both in the Processor and here. + // If low priority is applied only in the Processor, enqueueing here would + // reset most already queued items’ priority back to 0. + // + // 2. The resourceTemplate is usually modified by users, while changes in pp/cpp + // may come from administrators. + // Since admin changes should not block user changes, otherwise users would + // perceive controller blocking, they are processed later by assigning them + // a lower priority. + // In other words, when all resourceTemplates already need to be queued for processing, + // this allows user-triggered modifications to resourceTemplates + // to be prioritized for handling. + d.Processor.AddWithOpts(util.AddOpts{Priority: ptr.To(util.LowPriority)}, enqueueKey) } diff --git a/pkg/detector/detector_test.go b/pkg/detector/detector_test.go index d3898eccd23c..f82d50613842 100644 --- a/pkg/detector/detector_test.go +++ b/pkg/detector/detector_test.go @@ -366,6 +366,7 @@ func TestOnAdd(t *testing.T) { name string obj interface{} expectedEnqueue bool + isInitialList bool }{ { name: "valid unstructured object", @@ -381,6 +382,21 @@ func TestOnAdd(t *testing.T) { }, expectedEnqueue: true, }, + { + name: "valid unstructured object, with low priority", + obj: &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "apps/v1", + "kind": "Deployment", + "metadata": map[string]interface{}{ + "name": "test-deployment", + "namespace": "default", + }, + }, + }, + expectedEnqueue: true, + isInitialList: true, + }, { name: "invalid unstructured object", obj: &unstructured.Unstructured{ @@ -410,7 +426,7 @@ func TestOnAdd(t *testing.T) { d := &ResourceDetector{ Processor: mockProcessor, } - d.OnAdd(tt.obj) + d.OnAdd(tt.obj, tt.isInitialList) if tt.expectedEnqueue { assert.Equal(t, 1, mockProcessor.enqueueCount, "Object should be enqueued") assert.IsType(t, ResourceItem{}, mockProcessor.lastEnqueued, "Enqueued item should be of type ResourceItem") @@ -1056,6 +1072,10 @@ func (m *mockAsyncWorker) Add(_ interface{}) { m.enqueueCount++ } func (m *mockAsyncWorker) AddAfter(_ interface{}, _ time.Duration) {} +func (m *mockAsyncWorker) AddWithOpts(_ util.AddOpts, _ ...any) {} +func (m *mockAsyncWorker) EnqueueWithOpts(_ util.AddOpts, item any) { + m.Enqueue(item) +} func (m *mockAsyncWorker) Run(_ context.Context, _ int) {} diff --git a/pkg/sharedcli/ratelimiterflag/ratelimiterflag.go b/pkg/sharedcli/ratelimiterflag/ratelimiterflag.go index ecc689f4bbdd..a2b9466a7a95 100644 --- a/pkg/sharedcli/ratelimiterflag/ratelimiterflag.go +++ b/pkg/sharedcli/ratelimiterflag/ratelimiterflag.go @@ -47,21 +47,27 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) { fs.IntVar(&o.RateLimiterBucketSize, "rate-limiter-bucket-size", 100, "The bucket size for rate limier.") } -// DefaultControllerRateLimiter provide a default rate limiter for controller, and users can tune it by corresponding flags. -func DefaultControllerRateLimiter[T comparable](opts Options) workqueue.TypedRateLimiter[T] { +// SetDefaults sets the default values for Options. +func (o *Options) SetDefaults() *Options { // set defaults - if opts.RateLimiterBaseDelay <= 0 { - opts.RateLimiterBaseDelay = 5 * time.Millisecond + if o.RateLimiterBaseDelay <= 0 { + o.RateLimiterBaseDelay = 5 * time.Millisecond } - if opts.RateLimiterMaxDelay <= 0 { - opts.RateLimiterMaxDelay = 1000 * time.Second + if o.RateLimiterMaxDelay <= 0 { + o.RateLimiterMaxDelay = 1000 * time.Second } - if opts.RateLimiterQPS <= 0 { - opts.RateLimiterQPS = 10 + if o.RateLimiterQPS <= 0 { + o.RateLimiterQPS = 10 } - if opts.RateLimiterBucketSize <= 0 { - opts.RateLimiterBucketSize = 100 + if o.RateLimiterBucketSize <= 0 { + o.RateLimiterBucketSize = 100 } + return o +} + +// DefaultControllerRateLimiter provide a default rate limiter for controller, and users can tune it by corresponding flags. +func DefaultControllerRateLimiter[T comparable](opts Options) workqueue.TypedRateLimiter[T] { + opts.SetDefaults() return workqueue.NewTypedMaxOfRateLimiter[T]( workqueue.NewTypedItemExponentialFailureRateLimiter[T](opts.RateLimiterBaseDelay, opts.RateLimiterMaxDelay), diff --git a/pkg/sharedcli/ratelimiterflag/ratelimiterflag_test.go b/pkg/sharedcli/ratelimiterflag/ratelimiterflag_test.go new file mode 100644 index 000000000000..2496bf4376d7 --- /dev/null +++ b/pkg/sharedcli/ratelimiterflag/ratelimiterflag_test.go @@ -0,0 +1,64 @@ +/* +Copyright 2025 The Karmada Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ratelimiterflag + +import ( + "reflect" + "testing" + "time" +) + +func TestOptions_SetDefaults(t *testing.T) { + type fields struct { + RateLimiterBaseDelay time.Duration + RateLimiterMaxDelay time.Duration + RateLimiterQPS int + RateLimiterBucketSize int + } + tests := []struct { + name string + fields fields + want *Options + }{ + {name: "all zero -> defaults", fields: fields{}, want: &Options{RateLimiterBaseDelay: 5 * time.Millisecond, RateLimiterMaxDelay: 1000 * time.Second, RateLimiterQPS: 10, RateLimiterBucketSize: 100}}, + {name: "all negative -> defaults", fields: fields{RateLimiterBaseDelay: -1, RateLimiterMaxDelay: -1, RateLimiterQPS: -1, RateLimiterBucketSize: -1}, want: &Options{RateLimiterBaseDelay: 5 * time.Millisecond, RateLimiterMaxDelay: 1000 * time.Second, RateLimiterQPS: 10, RateLimiterBucketSize: 100}}, + {name: "only base delay invalid", fields: fields{RateLimiterBaseDelay: 0, RateLimiterMaxDelay: 2 * time.Second, RateLimiterQPS: 20, RateLimiterBucketSize: 200}, want: &Options{RateLimiterBaseDelay: 5 * time.Millisecond, RateLimiterMaxDelay: 2 * time.Second, RateLimiterQPS: 20, RateLimiterBucketSize: 200}}, + {name: "custom values unchanged", fields: fields{RateLimiterBaseDelay: 1 * time.Second, RateLimiterMaxDelay: 2 * time.Second, RateLimiterQPS: 50, RateLimiterBucketSize: 500}, want: &Options{RateLimiterBaseDelay: 1 * time.Second, RateLimiterMaxDelay: 2 * time.Second, RateLimiterQPS: 50, RateLimiterBucketSize: 500}}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + o := &Options{ + RateLimiterBaseDelay: tt.fields.RateLimiterBaseDelay, + RateLimiterMaxDelay: tt.fields.RateLimiterMaxDelay, + RateLimiterQPS: tt.fields.RateLimiterQPS, + RateLimiterBucketSize: tt.fields.RateLimiterBucketSize, + } + if got := o.SetDefaults(); !reflect.DeepEqual(got, tt.want) { + t.Errorf("SetDefaults() = %v, want %v", got, tt.want) + } + }) + } + + t.Run("receiver is changed", func(t *testing.T) { + o := &Options{} + o.SetDefaults() + want := &Options{RateLimiterBaseDelay: 5 * time.Millisecond, RateLimiterMaxDelay: 1000 * time.Second, RateLimiterQPS: 10, RateLimiterBucketSize: 100} + if !reflect.DeepEqual(o, want) { + t.Errorf("SetDefaults() changed self to %v, want %v", o, want) + } + }) +} diff --git a/pkg/util/fedinformer/handlers.go b/pkg/util/fedinformer/handlers.go index 2b20218c6c1f..b1d915b6de55 100644 --- a/pkg/util/fedinformer/handlers.go +++ b/pkg/util/fedinformer/handlers.go @@ -63,13 +63,13 @@ func NewHandlerOnEvents(addFunc func(obj interface{}), updateFunc func(oldObj, n // coming in, ensuring the appropriate nested handler method is invoked. // // Note: An object that starts passing the filter after an update is considered an add, and -// an object that stops passing the filter after an update is considered a delete. +// an object that stops passing the filter after an update is considered a deletion. // Like the handlers, the filter MUST NOT modify the objects it is given. -func NewFilteringHandlerOnAllEvents(filterFunc func(obj interface{}) bool, addFunc func(obj interface{}), +func NewFilteringHandlerOnAllEvents(filterFunc func(obj interface{}) bool, addFunc func(obj interface{}, isInitialList bool), updateFunc func(oldObj, newObj interface{}), deleteFunc func(obj interface{})) cache.ResourceEventHandler { return &cache.FilteringResourceEventHandler{ FilterFunc: filterFunc, - Handler: cache.ResourceEventHandlerFuncs{ + Handler: cache.ResourceEventHandlerDetailedFuncs{ AddFunc: addFunc, UpdateFunc: updateFunc, DeleteFunc: deleteFunc, diff --git a/pkg/util/fedinformer/handlers_test.go b/pkg/util/fedinformer/handlers_test.go index 41ef2bb12f9c..267705765eb4 100644 --- a/pkg/util/fedinformer/handlers_test.go +++ b/pkg/util/fedinformer/handlers_test.go @@ -214,7 +214,7 @@ func TestNewFilteringHandlerOnAllEvents(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { var addCalled, updateCalled, deleteCalled bool - addFunc := func(_ interface{}) { addCalled = true } + addFunc := func(_ interface{}, _ bool) { addCalled = true } updateFunc := func(_, _ interface{}) { updateCalled = true } deleteFunc := func(_ interface{}) { deleteCalled = true } diff --git a/pkg/util/worker.go b/pkg/util/worker.go index 6aed6d42a955..d78df4d4ce50 100644 --- a/pkg/util/worker.go +++ b/pkg/util/worker.go @@ -24,6 +24,7 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue" "github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag" ) @@ -46,6 +47,35 @@ type AsyncWorker interface { Run(ctx context.Context, workerNumber int) } +// LowPriority is the priority value for low priority items. +const LowPriority = -100 + +// AsyncPriorityWorker is an extension of AsyncWorker with priority queue support. +type AsyncPriorityWorker interface { + // EnqueueWithOpts generates the key of 'obj' according to a 'KeyFunc' then adds the key as an item to priority queue by 'AddWithOpts'. + EnqueueWithOpts(opts AddOpts, obj any) + + // AddWithOpts adds items to the priority queue with options. + AddWithOpts(opts AddOpts, item ...any) + + AsyncWorker +} + +// AddOpts defines the options for adding items to priority queue. +type AddOpts struct { + After time.Duration + RateLimited bool + Priority *int +} + +func (o *AddOpts) toPriorityQueueAddOpts() priorityqueue.AddOpts { + return priorityqueue.AddOpts{ + After: o.After, + RateLimited: o.RateLimited, + Priority: o.Priority, + } +} + // QueueKey is the item key that stores in queue. // The key could be arbitrary types. // @@ -61,6 +91,7 @@ type KeyFunc func(obj interface{}) (QueueKey, error) type ReconcileFunc func(key QueueKey) error type asyncWorker struct { + name string // keyFunc is the function that make keys for API objects. keyFunc KeyFunc // reconcileFunc is the function that process keys from the queue. @@ -77,16 +108,28 @@ type Options struct { KeyFunc KeyFunc ReconcileFunc ReconcileFunc RateLimiterOptions ratelimiterflag.Options + UsePriorityQueue bool } // NewAsyncWorker returns a asyncWorker which can process resource periodic. -func NewAsyncWorker(opt Options) AsyncWorker { +func NewAsyncWorker(opt Options) AsyncPriorityWorker { + var queue workqueue.TypedRateLimitingInterface[any] + if opt.UsePriorityQueue { + rateLimiterOpts := opt.RateLimiterOptions.SetDefaults() + queue = priorityqueue.New[any](opt.Name, func(o *priorityqueue.Opts[any]) { + // change to controller-runtime priorityqueue default rateLimiter + o.RateLimiter = workqueue.NewTypedItemExponentialFailureRateLimiter[any](rateLimiterOpts.RateLimiterBaseDelay, rateLimiterOpts.RateLimiterMaxDelay) + }) + } else { + queue = workqueue.NewTypedRateLimitingQueueWithConfig(ratelimiterflag.DefaultControllerRateLimiter[any](opt.RateLimiterOptions), workqueue.TypedRateLimitingQueueConfig[any]{ + Name: opt.Name, + }) + } return &asyncWorker{ + name: opt.Name, keyFunc: opt.KeyFunc, reconcileFunc: opt.ReconcileFunc, - queue: workqueue.NewTypedRateLimitingQueueWithConfig(ratelimiterflag.DefaultControllerRateLimiter[any](opt.RateLimiterOptions), workqueue.TypedRateLimitingQueueConfig[any]{ - Name: opt.Name, - }), + queue: queue, } } @@ -104,6 +147,20 @@ func (w *asyncWorker) Enqueue(obj interface{}) { w.Add(key) } +func (w *asyncWorker) EnqueueWithOpts(opts AddOpts, obj any) { + key, err := w.keyFunc(obj) + if err != nil { + klog.Errorf("Failed to generate key for obj: %+v, err: %v", obj, err) + return + } + + if key == nil { + return + } + + w.AddWithOpts(opts, key) +} + func (w *asyncWorker) Add(item interface{}) { if item == nil { klog.Warningf("Ignore nil item from queue") @@ -122,6 +179,24 @@ func (w *asyncWorker) AddAfter(item interface{}, duration time.Duration) { w.queue.AddAfter(item, duration) } +func (w *asyncWorker) AddWithOpts(opts AddOpts, item ...any) { + if item == nil { + klog.Warningf("Ignore nil item from queue") + return + } + + pq, ok := w.queue.(priorityqueue.PriorityQueue[any]) + if !ok { + klog.Warningf("queue is not priority queue, fallback to normal queue, queueName: %s", w.name) + for _, it := range item { + w.queue.Add(it) + } + return + } + + pq.AddWithOpts(opts.toPriorityQueueAddOpts(), item...) +} + func (w *asyncWorker) worker() { key, quit := w.queue.Get() if quit { diff --git a/pkg/util/worker_test.go b/pkg/util/worker_test.go index 0718110769a8..25d28553b301 100644 --- a/pkg/util/worker_test.go +++ b/pkg/util/worker_test.go @@ -26,11 +26,12 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/utils/ptr" "github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag" ) -func newTestAsyncWorker(reconcileFunc ReconcileFunc) *asyncWorker { +func newTestAsyncWorker(reconcileFunc ReconcileFunc, usePriorityQueue bool) *asyncWorker { options := Options{ Name: "test_async_worker", KeyFunc: MetaNamespaceKeyFunc, @@ -41,6 +42,7 @@ func newTestAsyncWorker(reconcileFunc ReconcileFunc) *asyncWorker { RateLimiterQPS: 5000, RateLimiterBucketSize: 100, }, + UsePriorityQueue: usePriorityQueue, } worker := NewAsyncWorker(options) @@ -50,7 +52,25 @@ func newTestAsyncWorker(reconcileFunc ReconcileFunc) *asyncWorker { func Test_asyncWorker_Enqueue(t *testing.T) { const name = "fake_node" - worker := newTestAsyncWorker(nil) + worker := newTestAsyncWorker(nil, false) + + node := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: name}, + } + + worker.Enqueue(node) + + item, _ := worker.queue.Get() + + if name != item { + t.Errorf("Added Item: %v, want: %v", item, name) + } +} + +func Test_asyncPriorityWorker_Enqueue(t *testing.T) { + const name = "fake_node" + + worker := newTestAsyncWorker(nil, true) node := &corev1.Node{ ObjectMeta: metav1.ObjectMeta{Name: name}, @@ -69,7 +89,30 @@ func Test_asyncWorker_AddAfter(t *testing.T) { const name = "fake_node" const duration = 1 * time.Second - worker := newTestAsyncWorker(nil) + worker := newTestAsyncWorker(nil, false) + + start := time.Now() + worker.AddAfter(name, duration) + + item, _ := worker.queue.Get() + end := time.Now() + + if name != item { + t.Errorf("Added Item: %v, want: %v", item, name) + } + + elapsed := end.Sub(start) + if elapsed < duration { + t.Errorf("Added Item should be dequeued after %v, but the actually elapsed time is %v.", + duration.String(), elapsed.String()) + } +} + +func Test_asyncPriorityWorker_AddAfter(t *testing.T) { + const name = "fake_node" + const duration = 1 * time.Second + + worker := newTestAsyncWorker(nil, true) start := time.Now() worker.AddAfter(name, duration) @@ -83,7 +126,7 @@ func Test_asyncWorker_AddAfter(t *testing.T) { elapsed := end.Sub(start) if elapsed < duration { - t.Errorf("Added Item should dequeued after %v, but the actually elapsed time is %v.", + t.Errorf("Added Item should be dequeued after %v, but the actually elapsed time is %v.", duration.String(), elapsed.String()) } } @@ -130,7 +173,44 @@ func Test_asyncWorker_Run(t *testing.T) { const cnt = 2000 reconcile := newAsyncWorkerReconciler() - worker := newTestAsyncWorker(reconcile.ReconcileFunc) + worker := newTestAsyncWorker(reconcile.ReconcileFunc, false) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + worker.Run(ctx, 5) + + for i := 0; i < cnt; i++ { + worker.Add(i) + } + + err := assertUntil(20*time.Second, func() error { + processed := reconcile.ProcessedItem() + if len(processed) < cnt { + return fmt.Errorf("processed item not equal to input, len() = %v, processed item is %v", + len(processed), processed) + } + + for i := 0; i < cnt; i++ { + if _, ok := processed[i]; !ok { + return fmt.Errorf("expected item not processed, expected: %v, all processed item: %v", + i, processed) + } + } + + return nil + }) + + if err != nil { + t.Error(err.Error()) + } +} + +func Test_asyncPriorityWorker_Run(t *testing.T) { + const cnt = 2000 + + reconcile := newAsyncWorkerReconciler() + worker := newTestAsyncWorker(reconcile.ReconcileFunc, true) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -187,3 +267,98 @@ func assertUntil(maxDuration time.Duration, assertion func() error) error { return lastErr } + +func Test_asyncWorker_AddWithOpts(t *testing.T) { + t.Run("addAfter", func(t *testing.T) { + const name = "fake_node" + const duration = 1 * time.Second + + worker := newTestAsyncWorker(nil, true) + + start := time.Now() + worker.AddWithOpts(AddOpts{After: duration}, name) + + item, _ := worker.queue.Get() + end := time.Now() + + if name != item { + t.Errorf("Added Item: %v, want: %v", item, name) + } + + elapsed := end.Sub(start) + if elapsed < duration { + t.Errorf("Added Item should be dequeued after %v, but the actually elapsed time is %v.", + duration.String(), elapsed.String()) + } + }) + + t.Run("test FIFO with priority", func(t *testing.T) { + const node1 = "fake_node1" + const node2 = "fake_node2" + const node3 = "fake_node3" + const node4 = "fake_node4" + const node5 = "fake_node5" + + worker := newTestAsyncWorker(nil, true) + + worker.AddWithOpts(AddOpts{Priority: ptr.To(LowPriority)}, node1) + worker.AddWithOpts(AddOpts{}, node2) + worker.AddWithOpts(AddOpts{Priority: ptr.To(LowPriority)}, node3) + worker.AddWithOpts(AddOpts{}, node3) + worker.AddWithOpts(AddOpts{Priority: ptr.To(LowPriority)}, node4) + worker.AddWithOpts(AddOpts{}, node5) + + item, _ := worker.queue.Get() + if node2 != item { + t.Errorf("Added Item: %v, want: %v", item, node2) + } + + item, _ = worker.queue.Get() + if node3 != item { + t.Errorf("Added Item: %v, want: %v", item, node3) + } + + item, _ = worker.queue.Get() + if node5 != item { + t.Errorf("Added Item: %v, want: %v", item, node5) + } + + item, _ = worker.queue.Get() + if node1 != item { + t.Errorf("Added Item: %v, want: %v", item, node1) + } + + item, _ = worker.queue.Get() + if node4 != item { + t.Errorf("Added Item: %v, want: %v", item, node4) + } + }) +} + +func Test_asyncWorker_EnqueueWithOpts(t *testing.T) { + t.Run("low priority", func(t *testing.T) { + const nodeName1 = "fake_node1" + const nodeName2 = "fake_node2" + + worker := newTestAsyncWorker(nil, true) + + node1 := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: nodeName1}, + } + worker.EnqueueWithOpts(AddOpts{Priority: ptr.To(LowPriority)}, node1) + node2 := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: nodeName2}, + } + worker.EnqueueWithOpts(AddOpts{}, node2) + + item, _ := worker.queue.Get() + + if nodeName2 != item { + t.Errorf("Added Item: %v, want: %v", item, nodeName2) + } + item, _ = worker.queue.Get() + if nodeName1 != item { + t.Errorf("Added Item: %v, want: %v", item, nodeName1) + } + }) +} From 890699c91db5be6b2c8e6bf717b1fa127a82252e Mon Sep 17 00:00:00 2001 From: zach593 Date: Mon, 24 Nov 2025 23:31:12 +0800 Subject: [PATCH 2/2] update asyncPriorityQueue AddWithOpts() implementation Signed-off-by: zach593 --- pkg/util/worker.go | 13 +++++++++++-- pkg/util/worker_test.go | 25 ++++++++++++++++++++++++- 2 files changed, 35 insertions(+), 3 deletions(-) diff --git a/pkg/util/worker.go b/pkg/util/worker.go index d78df4d4ce50..d7a2e0da0a0b 100644 --- a/pkg/util/worker.go +++ b/pkg/util/worker.go @@ -185,11 +185,20 @@ func (w *asyncWorker) AddWithOpts(opts AddOpts, item ...any) { return } - pq, ok := w.queue.(priorityqueue.PriorityQueue[any]) + pq, ok := w.queue.(interface { + AddWithOpts(o priorityqueue.AddOpts, Items ...any) + }) if !ok { klog.Warningf("queue is not priority queue, fallback to normal queue, queueName: %s", w.name) for _, it := range item { - w.queue.Add(it) + switch { + case opts.After > 0: + w.queue.AddAfter(it, opts.After) + case opts.RateLimited: + w.queue.AddRateLimited(it) + default: + w.queue.Add(it) + } } return } diff --git a/pkg/util/worker_test.go b/pkg/util/worker_test.go index 25d28553b301..56d87e9a4d12 100644 --- a/pkg/util/worker_test.go +++ b/pkg/util/worker_test.go @@ -269,7 +269,30 @@ func assertUntil(maxDuration time.Duration, assertion func() error) error { } func Test_asyncWorker_AddWithOpts(t *testing.T) { - t.Run("addAfter", func(t *testing.T) { + t.Run("AddAfter, pq not enabled", func(t *testing.T) { + const name = "fake_node" + const duration = 1 * time.Second + + worker := newTestAsyncWorker(nil, false) + + start := time.Now() + worker.AddWithOpts(AddOpts{After: duration}, name) + + item, _ := worker.queue.Get() + end := time.Now() + + if name != item { + t.Errorf("Added Item: %v, want: %v", item, name) + } + + elapsed := end.Sub(start) + if elapsed < duration { + t.Errorf("Added Item should be dequeued after %v, but the actually elapsed time is %v.", + duration.String(), elapsed.String()) + } + }) + + t.Run("addAfter, pq enabled", func(t *testing.T) { const name = "fake_node" const duration = 1 * time.Second