Skip to content

Commit ca97883

Browse files
zach593zongqingli
authored andcommitted
feat: introduce asyncPriorityWorker in resourceDetector Processor
Signed-off-by: zach593 <[email protected]>
1 parent 4b5aa6b commit ca97883

File tree

8 files changed

+391
-33
lines changed

8 files changed

+391
-33
lines changed

pkg/detector/detector.go

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ type ResourceDetector struct {
7575
InformerManager genericmanager.SingleClusterInformerManager
7676
ControllerRuntimeCache ctrlcache.Cache
7777
EventHandler cache.ResourceEventHandler
78-
Processor util.AsyncWorker
78+
Processor util.AsyncPriorityWorker
7979
SkippedResourceConfig *util.SkippedResourceConfig
8080
SkippedPropagatingNamespaces []*regexp.Regexp
8181
// ResourceInterpreter knows the details of resource structure.
@@ -136,6 +136,7 @@ func (d *ResourceDetector) Start(ctx context.Context) error {
136136
KeyFunc: ResourceItemKeyFunc,
137137
ReconcileFunc: d.Reconcile,
138138
RateLimiterOptions: d.RateLimiterOptions,
139+
UsePriorityQueue: features.FeatureGate.Enabled(features.ControllerPriorityQueue),
139140
}
140141
d.Processor = util.NewAsyncWorker(detectorWorkerOptions)
141142
d.Processor.Run(ctx, d.ConcurrentResourceTemplateSyncs)
@@ -310,12 +311,17 @@ func (d *ResourceDetector) EventFilter(obj interface{}) bool {
310311
}
311312

312313
// OnAdd handles object add event and push the object to queue.
313-
func (d *ResourceDetector) OnAdd(obj interface{}) {
314+
func (d *ResourceDetector) OnAdd(obj interface{}, isInitialList bool) {
314315
runtimeObj, ok := obj.(runtime.Object)
315316
if !ok {
316317
return
317318
}
318-
d.Processor.Enqueue(ResourceItem{Obj: runtimeObj})
319+
320+
priority := 0
321+
if isInitialList {
322+
priority = util.LowPriority
323+
}
324+
d.Processor.EnqueueWithOpts(util.AddOpts{Priority: priority}, ResourceItem{Obj: runtimeObj})
319325
}
320326

321327
// OnUpdate handles object update event and push the object to queue.
@@ -375,7 +381,7 @@ func (d *ResourceDetector) OnDelete(obj interface{}) {
375381
return
376382
}
377383
}
378-
d.OnAdd(obj)
384+
d.OnAdd(obj, false)
379385
}
380386

381387
// LookForMatchedPolicy tries to find a policy for object referenced by object key.
@@ -1492,9 +1498,22 @@ func (d *ResourceDetector) applyReplicaInterpretation(object *unstructured.Unstr
14921498
// Therefore, only set ResourceChangeByKarmada in lazy activation mode.
14931499
// For more details, see: https://github.com/karmada-io/karmada/issues/5996.
14941500
func (d *ResourceDetector) enqueueResourceTemplateForPolicyChange(key keys.ClusterWideKey, pref policyv1alpha1.ActivationPreference) {
1495-
if util.IsLazyActivationEnabled(pref) {
1496-
d.Processor.Add(keys.ClusterWideKeyWithConfig{ClusterWideKey: key, ResourceChangeByKarmada: true})
1497-
return
1498-
}
1499-
d.Processor.Add(keys.ClusterWideKeyWithConfig{ClusterWideKey: key})
1501+
enqueueKey := keys.ClusterWideKeyWithConfig{ClusterWideKey: key, ResourceChangeByKarmada: util.IsLazyActivationEnabled(pref)}
1502+
// Use of low priority here is based on the following reasons:
1503+
//
1504+
// 1. The purpose of using low priority is to ensure that user-triggered changes
1505+
// are processed first when the controller restarts.
1506+
// At startup, items are enqueued both in the Processor and here.
1507+
// If low priority is applied only in the Processor, enqueueing here would
1508+
// reset most already queued items’ priority back to 0.
1509+
//
1510+
// 2. The resourceTemplate is usually modified by users, while changes in pp/cpp
1511+
// may come from administrators.
1512+
// Since admin changes should not block user changes, otherwise users would
1513+
// perceive controller blocking, they are processed later by assigning them
1514+
// a lower priority.
1515+
// In other words, when all resourceTemplates already need to be queued for processing,
1516+
// this allows user-triggered modifications to resourceTemplates
1517+
// to be prioritized for handling.
1518+
d.Processor.AddWithOpts(util.AddOpts{Priority: util.LowPriority}, enqueueKey)
15001519
}

pkg/detector/detector_test.go

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -366,6 +366,7 @@ func TestOnAdd(t *testing.T) {
366366
name string
367367
obj interface{}
368368
expectedEnqueue bool
369+
isInitialList bool
369370
}{
370371
{
371372
name: "valid unstructured object",
@@ -381,6 +382,21 @@ func TestOnAdd(t *testing.T) {
381382
},
382383
expectedEnqueue: true,
383384
},
385+
{
386+
name: "valid unstructured object, with low priority",
387+
obj: &unstructured.Unstructured{
388+
Object: map[string]interface{}{
389+
"apiVersion": "apps/v1",
390+
"kind": "Deployment",
391+
"metadata": map[string]interface{}{
392+
"name": "test-deployment",
393+
"namespace": "default",
394+
},
395+
},
396+
},
397+
expectedEnqueue: true,
398+
isInitialList: true,
399+
},
384400
{
385401
name: "invalid unstructured object",
386402
obj: &unstructured.Unstructured{
@@ -410,7 +426,7 @@ func TestOnAdd(t *testing.T) {
410426
d := &ResourceDetector{
411427
Processor: mockProcessor,
412428
}
413-
d.OnAdd(tt.obj)
429+
d.OnAdd(tt.obj, tt.isInitialList)
414430
if tt.expectedEnqueue {
415431
assert.Equal(t, 1, mockProcessor.enqueueCount, "Object should be enqueued")
416432
assert.IsType(t, ResourceItem{}, mockProcessor.lastEnqueued, "Enqueued item should be of type ResourceItem")
@@ -1056,6 +1072,10 @@ func (m *mockAsyncWorker) Add(_ interface{}) {
10561072
m.enqueueCount++
10571073
}
10581074
func (m *mockAsyncWorker) AddAfter(_ interface{}, _ time.Duration) {}
1075+
func (m *mockAsyncWorker) AddWithOpts(_ util.AddOpts, _ ...any) {}
1076+
func (m *mockAsyncWorker) EnqueueWithOpts(_ util.AddOpts, item any) {
1077+
m.Enqueue(item)
1078+
}
10591079

10601080
func (m *mockAsyncWorker) Run(_ context.Context, _ int) {}
10611081

pkg/sharedcli/ratelimiterflag/ratelimiterflag.go

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -47,21 +47,27 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) {
4747
fs.IntVar(&o.RateLimiterBucketSize, "rate-limiter-bucket-size", 100, "The bucket size for rate limier.")
4848
}
4949

50-
// DefaultControllerRateLimiter provide a default rate limiter for controller, and users can tune it by corresponding flags.
51-
func DefaultControllerRateLimiter[T comparable](opts Options) workqueue.TypedRateLimiter[T] {
50+
// SetDefaults sets the default values for Options.
51+
func (o *Options) SetDefaults() *Options {
5252
// set defaults
53-
if opts.RateLimiterBaseDelay <= 0 {
54-
opts.RateLimiterBaseDelay = 5 * time.Millisecond
53+
if o.RateLimiterBaseDelay <= 0 {
54+
o.RateLimiterBaseDelay = 5 * time.Millisecond
5555
}
56-
if opts.RateLimiterMaxDelay <= 0 {
57-
opts.RateLimiterMaxDelay = 1000 * time.Second
56+
if o.RateLimiterMaxDelay <= 0 {
57+
o.RateLimiterMaxDelay = 1000 * time.Second
5858
}
59-
if opts.RateLimiterQPS <= 0 {
60-
opts.RateLimiterQPS = 10
59+
if o.RateLimiterQPS <= 0 {
60+
o.RateLimiterQPS = 10
6161
}
62-
if opts.RateLimiterBucketSize <= 0 {
63-
opts.RateLimiterBucketSize = 100
62+
if o.RateLimiterBucketSize <= 0 {
63+
o.RateLimiterBucketSize = 100
6464
}
65+
return o
66+
}
67+
68+
// DefaultControllerRateLimiter provide a default rate limiter for controller, and users can tune it by corresponding flags.
69+
func DefaultControllerRateLimiter[T comparable](opts Options) workqueue.TypedRateLimiter[T] {
70+
opts.SetDefaults()
6571

6672
return workqueue.NewTypedMaxOfRateLimiter[T](
6773
workqueue.NewTypedItemExponentialFailureRateLimiter[T](opts.RateLimiterBaseDelay, opts.RateLimiterMaxDelay),
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
Copyright 2025 The Karmada Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package ratelimiterflag
18+
19+
import (
20+
"reflect"
21+
"testing"
22+
"time"
23+
)
24+
25+
func TestOptions_SetDefaults(t *testing.T) {
26+
type fields struct {
27+
RateLimiterBaseDelay time.Duration
28+
RateLimiterMaxDelay time.Duration
29+
RateLimiterQPS int
30+
RateLimiterBucketSize int
31+
}
32+
tests := []struct {
33+
name string
34+
fields fields
35+
want *Options
36+
}{
37+
{name: "all zero -> defaults", fields: fields{}, want: &Options{RateLimiterBaseDelay: 5 * time.Millisecond, RateLimiterMaxDelay: 1000 * time.Second, RateLimiterQPS: 10, RateLimiterBucketSize: 100}},
38+
{name: "all negative -> defaults", fields: fields{RateLimiterBaseDelay: -1, RateLimiterMaxDelay: -1, RateLimiterQPS: -1, RateLimiterBucketSize: -1}, want: &Options{RateLimiterBaseDelay: 5 * time.Millisecond, RateLimiterMaxDelay: 1000 * time.Second, RateLimiterQPS: 10, RateLimiterBucketSize: 100}},
39+
{name: "only base delay invalid", fields: fields{RateLimiterBaseDelay: 0, RateLimiterMaxDelay: 2 * time.Second, RateLimiterQPS: 20, RateLimiterBucketSize: 200}, want: &Options{RateLimiterBaseDelay: 5 * time.Millisecond, RateLimiterMaxDelay: 2 * time.Second, RateLimiterQPS: 20, RateLimiterBucketSize: 200}},
40+
{name: "custom values unchanged", fields: fields{RateLimiterBaseDelay: 1 * time.Second, RateLimiterMaxDelay: 2 * time.Second, RateLimiterQPS: 50, RateLimiterBucketSize: 500}, want: &Options{RateLimiterBaseDelay: 1 * time.Second, RateLimiterMaxDelay: 2 * time.Second, RateLimiterQPS: 50, RateLimiterBucketSize: 500}},
41+
}
42+
for _, tt := range tests {
43+
t.Run(tt.name, func(t *testing.T) {
44+
o := &Options{
45+
RateLimiterBaseDelay: tt.fields.RateLimiterBaseDelay,
46+
RateLimiterMaxDelay: tt.fields.RateLimiterMaxDelay,
47+
RateLimiterQPS: tt.fields.RateLimiterQPS,
48+
RateLimiterBucketSize: tt.fields.RateLimiterBucketSize,
49+
}
50+
if got := o.SetDefaults(); !reflect.DeepEqual(got, tt.want) {
51+
t.Errorf("SetDefaults() = %v, want %v", got, tt.want)
52+
}
53+
})
54+
}
55+
56+
t.Run("receiver is changed", func(t *testing.T) {
57+
o := &Options{}
58+
o.SetDefaults()
59+
want := &Options{RateLimiterBaseDelay: 5 * time.Millisecond, RateLimiterMaxDelay: 1000 * time.Second, RateLimiterQPS: 10, RateLimiterBucketSize: 100}
60+
if !reflect.DeepEqual(o, want) {
61+
t.Errorf("SetDefaults() changed self to %v, want %v", o, want)
62+
}
63+
})
64+
}

pkg/util/fedinformer/handlers.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,13 +63,13 @@ func NewHandlerOnEvents(addFunc func(obj interface{}), updateFunc func(oldObj, n
6363
// coming in, ensuring the appropriate nested handler method is invoked.
6464
//
6565
// Note: An object that starts passing the filter after an update is considered an add, and
66-
// an object that stops passing the filter after an update is considered a delete.
66+
// an object that stops passing the filter after an update is considered a deletion.
6767
// Like the handlers, the filter MUST NOT modify the objects it is given.
68-
func NewFilteringHandlerOnAllEvents(filterFunc func(obj interface{}) bool, addFunc func(obj interface{}),
68+
func NewFilteringHandlerOnAllEvents(filterFunc func(obj interface{}) bool, addFunc func(obj interface{}, isInitialList bool),
6969
updateFunc func(oldObj, newObj interface{}), deleteFunc func(obj interface{})) cache.ResourceEventHandler {
7070
return &cache.FilteringResourceEventHandler{
7171
FilterFunc: filterFunc,
72-
Handler: cache.ResourceEventHandlerFuncs{
72+
Handler: cache.ResourceEventHandlerDetailedFuncs{
7373
AddFunc: addFunc,
7474
UpdateFunc: updateFunc,
7575
DeleteFunc: deleteFunc,

pkg/util/fedinformer/handlers_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ func TestNewFilteringHandlerOnAllEvents(t *testing.T) {
214214
for _, tc := range testCases {
215215
t.Run(tc.name, func(t *testing.T) {
216216
var addCalled, updateCalled, deleteCalled bool
217-
addFunc := func(_ interface{}) { addCalled = true }
217+
addFunc := func(_ interface{}, _ bool) { addCalled = true }
218218
updateFunc := func(_, _ interface{}) { updateCalled = true }
219219
deleteFunc := func(_ interface{}) { deleteCalled = true }
220220

pkg/util/worker.go

Lines changed: 79 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"k8s.io/client-go/tools/cache"
2525
"k8s.io/client-go/util/workqueue"
2626
"k8s.io/klog/v2"
27+
"sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue"
2728

2829
"github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag"
2930
)
@@ -46,6 +47,35 @@ type AsyncWorker interface {
4647
Run(ctx context.Context, workerNumber int)
4748
}
4849

50+
// LowPriority is the priority value for low priority items.
51+
const LowPriority = -100
52+
53+
// AsyncPriorityWorker is an extension of AsyncWorker with priority queue support.
54+
type AsyncPriorityWorker interface {
55+
// EnqueueWithOpts generates the key of 'obj' according to a 'KeyFunc' then adds the key as an item to priority queue by 'AddWithOpts'.
56+
EnqueueWithOpts(opts AddOpts, obj any)
57+
58+
// AddWithOpts adds items to the priority queue with options.
59+
AddWithOpts(opts AddOpts, item ...any)
60+
61+
AsyncWorker
62+
}
63+
64+
// AddOpts defines the options for adding items to priority queue.
65+
type AddOpts struct {
66+
After time.Duration
67+
RateLimited bool
68+
Priority int
69+
}
70+
71+
func (o *AddOpts) toPriorityQueueAddOpts() priorityqueue.AddOpts {
72+
return priorityqueue.AddOpts{
73+
After: o.After,
74+
RateLimited: o.RateLimited,
75+
Priority: o.Priority,
76+
}
77+
}
78+
4979
// QueueKey is the item key that stores in queue.
5080
// The key could be arbitrary types.
5181
//
@@ -61,6 +91,7 @@ type KeyFunc func(obj interface{}) (QueueKey, error)
6191
type ReconcileFunc func(key QueueKey) error
6292

6393
type asyncWorker struct {
94+
name string
6495
// keyFunc is the function that make keys for API objects.
6596
keyFunc KeyFunc
6697
// reconcileFunc is the function that process keys from the queue.
@@ -77,16 +108,28 @@ type Options struct {
77108
KeyFunc KeyFunc
78109
ReconcileFunc ReconcileFunc
79110
RateLimiterOptions ratelimiterflag.Options
111+
UsePriorityQueue bool
80112
}
81113

82114
// NewAsyncWorker returns a asyncWorker which can process resource periodic.
83-
func NewAsyncWorker(opt Options) AsyncWorker {
115+
func NewAsyncWorker(opt Options) AsyncPriorityWorker {
116+
var queue workqueue.TypedRateLimitingInterface[any]
117+
if opt.UsePriorityQueue {
118+
rateLimiterOpts := opt.RateLimiterOptions.SetDefaults()
119+
queue = priorityqueue.New[any](opt.Name, func(o *priorityqueue.Opts[any]) {
120+
// change to controller-runtime priorityqueue default rateLimiter
121+
o.RateLimiter = workqueue.NewTypedItemExponentialFailureRateLimiter[any](rateLimiterOpts.RateLimiterBaseDelay, rateLimiterOpts.RateLimiterMaxDelay)
122+
})
123+
} else {
124+
queue = workqueue.NewTypedRateLimitingQueueWithConfig(ratelimiterflag.DefaultControllerRateLimiter[any](opt.RateLimiterOptions), workqueue.TypedRateLimitingQueueConfig[any]{
125+
Name: opt.Name,
126+
})
127+
}
84128
return &asyncWorker{
129+
name: opt.Name,
85130
keyFunc: opt.KeyFunc,
86131
reconcileFunc: opt.ReconcileFunc,
87-
queue: workqueue.NewTypedRateLimitingQueueWithConfig(ratelimiterflag.DefaultControllerRateLimiter[any](opt.RateLimiterOptions), workqueue.TypedRateLimitingQueueConfig[any]{
88-
Name: opt.Name,
89-
}),
132+
queue: queue,
90133
}
91134
}
92135

@@ -104,6 +147,20 @@ func (w *asyncWorker) Enqueue(obj interface{}) {
104147
w.Add(key)
105148
}
106149

150+
func (w *asyncWorker) EnqueueWithOpts(opts AddOpts, obj any) {
151+
key, err := w.keyFunc(obj)
152+
if err != nil {
153+
klog.Errorf("Failed to generate key for obj: %+v, err: %v", obj, err)
154+
return
155+
}
156+
157+
if key == nil {
158+
return
159+
}
160+
161+
w.AddWithOpts(opts, key)
162+
}
163+
107164
func (w *asyncWorker) Add(item interface{}) {
108165
if item == nil {
109166
klog.Warningf("Ignore nil item from queue")
@@ -122,6 +179,24 @@ func (w *asyncWorker) AddAfter(item interface{}, duration time.Duration) {
122179
w.queue.AddAfter(item, duration)
123180
}
124181

182+
func (w *asyncWorker) AddWithOpts(opts AddOpts, item ...any) {
183+
if item == nil {
184+
klog.Warningf("Ignore nil item from queue")
185+
return
186+
}
187+
188+
pq, ok := w.queue.(priorityqueue.PriorityQueue[any])
189+
if !ok {
190+
klog.Warningf("queue is not priority queue, fallback to normal queue, queueName: %s", w.name)
191+
for _, it := range item {
192+
w.queue.Add(it)
193+
}
194+
return
195+
}
196+
197+
pq.AddWithOpts(opts.toPriorityQueueAddOpts(), item...)
198+
}
199+
125200
func (w *asyncWorker) worker() {
126201
key, quit := w.queue.Get()
127202
if quit {

0 commit comments

Comments
 (0)