Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 28 additions & 9 deletions pkg/detector/detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ type ResourceDetector struct {
InformerManager genericmanager.SingleClusterInformerManager
ControllerRuntimeCache ctrlcache.Cache
EventHandler cache.ResourceEventHandler
Processor util.AsyncWorker
Processor util.AsyncPriorityWorker
SkippedResourceConfig *util.SkippedResourceConfig
SkippedPropagatingNamespaces []*regexp.Regexp
// ResourceInterpreter knows the details of resource structure.
Expand Down Expand Up @@ -136,6 +136,7 @@ func (d *ResourceDetector) Start(ctx context.Context) error {
KeyFunc: ResourceItemKeyFunc,
ReconcileFunc: d.Reconcile,
RateLimiterOptions: d.RateLimiterOptions,
UsePriorityQueue: features.FeatureGate.Enabled(features.ControllerPriorityQueue),
}
d.Processor = util.NewAsyncWorker(detectorWorkerOptions)
d.Processor.Run(ctx, d.ConcurrentResourceTemplateSyncs)
Expand Down Expand Up @@ -310,12 +311,17 @@ func (d *ResourceDetector) EventFilter(obj interface{}) bool {
}

// OnAdd handles object add event and push the object to queue.
func (d *ResourceDetector) OnAdd(obj interface{}) {
func (d *ResourceDetector) OnAdd(obj interface{}, isInitialList bool) {
runtimeObj, ok := obj.(runtime.Object)
if !ok {
return
}
d.Processor.Enqueue(ResourceItem{Obj: runtimeObj})

var priority *int
if isInitialList {
priority = ptr.To(util.LowPriority)
}
d.Processor.EnqueueWithOpts(util.AddOpts{Priority: priority}, ResourceItem{Obj: runtimeObj})
}

// OnUpdate handles object update event and push the object to queue.
Expand Down Expand Up @@ -375,7 +381,7 @@ func (d *ResourceDetector) OnDelete(obj interface{}) {
return
}
}
d.OnAdd(obj)
d.OnAdd(obj, false)
}

// LookForMatchedPolicy tries to find a policy for object referenced by object key.
Expand Down Expand Up @@ -1492,9 +1498,22 @@ func (d *ResourceDetector) applyReplicaInterpretation(object *unstructured.Unstr
// Therefore, only set ResourceChangeByKarmada in lazy activation mode.
// For more details, see: https://github.com/karmada-io/karmada/issues/5996.
func (d *ResourceDetector) enqueueResourceTemplateForPolicyChange(key keys.ClusterWideKey, pref policyv1alpha1.ActivationPreference) {
if util.IsLazyActivationEnabled(pref) {
d.Processor.Add(keys.ClusterWideKeyWithConfig{ClusterWideKey: key, ResourceChangeByKarmada: true})
return
}
d.Processor.Add(keys.ClusterWideKeyWithConfig{ClusterWideKey: key})
enqueueKey := keys.ClusterWideKeyWithConfig{ClusterWideKey: key, ResourceChangeByKarmada: util.IsLazyActivationEnabled(pref)}
// Use of low priority here is based on the following reasons:
//
// 1. The purpose of using low priority is to ensure that user-triggered changes
// are processed first when the controller restarts.
// At startup, items are enqueued both in the Processor and here.
// If low priority is applied only in the Processor, enqueueing here would
// reset most already queued items’ priority back to 0.
//
// 2. The resourceTemplate is usually modified by users, while changes in pp/cpp
// may come from administrators.
// Since admin changes should not block user changes, otherwise users would
// perceive controller blocking, they are processed later by assigning them
// a lower priority.
// In other words, when all resourceTemplates already need to be queued for processing,
// this allows user-triggered modifications to resourceTemplates
// to be prioritized for handling.
d.Processor.AddWithOpts(util.AddOpts{Priority: ptr.To(util.LowPriority)}, enqueueKey)
}
22 changes: 21 additions & 1 deletion pkg/detector/detector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,7 @@ func TestOnAdd(t *testing.T) {
name string
obj interface{}
expectedEnqueue bool
isInitialList bool
}{
{
name: "valid unstructured object",
Expand All @@ -381,6 +382,21 @@ func TestOnAdd(t *testing.T) {
},
expectedEnqueue: true,
},
{
name: "valid unstructured object, with low priority",
obj: &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "apps/v1",
"kind": "Deployment",
"metadata": map[string]interface{}{
"name": "test-deployment",
"namespace": "default",
},
},
},
expectedEnqueue: true,
isInitialList: true,
},
{
name: "invalid unstructured object",
obj: &unstructured.Unstructured{
Expand Down Expand Up @@ -410,7 +426,7 @@ func TestOnAdd(t *testing.T) {
d := &ResourceDetector{
Processor: mockProcessor,
}
d.OnAdd(tt.obj)
d.OnAdd(tt.obj, tt.isInitialList)
if tt.expectedEnqueue {
assert.Equal(t, 1, mockProcessor.enqueueCount, "Object should be enqueued")
assert.IsType(t, ResourceItem{}, mockProcessor.lastEnqueued, "Enqueued item should be of type ResourceItem")
Expand Down Expand Up @@ -1056,6 +1072,10 @@ func (m *mockAsyncWorker) Add(_ interface{}) {
m.enqueueCount++
}
func (m *mockAsyncWorker) AddAfter(_ interface{}, _ time.Duration) {}
func (m *mockAsyncWorker) AddWithOpts(_ util.AddOpts, _ ...any) {}
func (m *mockAsyncWorker) EnqueueWithOpts(_ util.AddOpts, item any) {
m.Enqueue(item)
}

func (m *mockAsyncWorker) Run(_ context.Context, _ int) {}

Expand Down
26 changes: 16 additions & 10 deletions pkg/sharedcli/ratelimiterflag/ratelimiterflag.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,21 +47,27 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) {
fs.IntVar(&o.RateLimiterBucketSize, "rate-limiter-bucket-size", 100, "The bucket size for rate limier.")
}

// DefaultControllerRateLimiter provide a default rate limiter for controller, and users can tune it by corresponding flags.
func DefaultControllerRateLimiter[T comparable](opts Options) workqueue.TypedRateLimiter[T] {
// SetDefaults sets the default values for Options.
func (o *Options) SetDefaults() *Options {
// set defaults
if opts.RateLimiterBaseDelay <= 0 {
opts.RateLimiterBaseDelay = 5 * time.Millisecond
if o.RateLimiterBaseDelay <= 0 {
o.RateLimiterBaseDelay = 5 * time.Millisecond
}
if opts.RateLimiterMaxDelay <= 0 {
opts.RateLimiterMaxDelay = 1000 * time.Second
if o.RateLimiterMaxDelay <= 0 {
o.RateLimiterMaxDelay = 1000 * time.Second
}
if opts.RateLimiterQPS <= 0 {
opts.RateLimiterQPS = 10
if o.RateLimiterQPS <= 0 {
o.RateLimiterQPS = 10
}
if opts.RateLimiterBucketSize <= 0 {
opts.RateLimiterBucketSize = 100
if o.RateLimiterBucketSize <= 0 {
o.RateLimiterBucketSize = 100
}
return o
}

// DefaultControllerRateLimiter provide a default rate limiter for controller, and users can tune it by corresponding flags.
func DefaultControllerRateLimiter[T comparable](opts Options) workqueue.TypedRateLimiter[T] {
opts.SetDefaults()

return workqueue.NewTypedMaxOfRateLimiter[T](
workqueue.NewTypedItemExponentialFailureRateLimiter[T](opts.RateLimiterBaseDelay, opts.RateLimiterMaxDelay),
Expand Down
64 changes: 64 additions & 0 deletions pkg/sharedcli/ratelimiterflag/ratelimiterflag_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
Copyright 2025 The Karmada Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package ratelimiterflag

import (
"reflect"
"testing"
"time"
)

func TestOptions_SetDefaults(t *testing.T) {
type fields struct {
RateLimiterBaseDelay time.Duration
RateLimiterMaxDelay time.Duration
RateLimiterQPS int
RateLimiterBucketSize int
}
tests := []struct {
name string
fields fields
want *Options
}{
{name: "all zero -> defaults", fields: fields{}, want: &Options{RateLimiterBaseDelay: 5 * time.Millisecond, RateLimiterMaxDelay: 1000 * time.Second, RateLimiterQPS: 10, RateLimiterBucketSize: 100}},
{name: "all negative -> defaults", fields: fields{RateLimiterBaseDelay: -1, RateLimiterMaxDelay: -1, RateLimiterQPS: -1, RateLimiterBucketSize: -1}, want: &Options{RateLimiterBaseDelay: 5 * time.Millisecond, RateLimiterMaxDelay: 1000 * time.Second, RateLimiterQPS: 10, RateLimiterBucketSize: 100}},
{name: "only base delay invalid", fields: fields{RateLimiterBaseDelay: 0, RateLimiterMaxDelay: 2 * time.Second, RateLimiterQPS: 20, RateLimiterBucketSize: 200}, want: &Options{RateLimiterBaseDelay: 5 * time.Millisecond, RateLimiterMaxDelay: 2 * time.Second, RateLimiterQPS: 20, RateLimiterBucketSize: 200}},
{name: "custom values unchanged", fields: fields{RateLimiterBaseDelay: 1 * time.Second, RateLimiterMaxDelay: 2 * time.Second, RateLimiterQPS: 50, RateLimiterBucketSize: 500}, want: &Options{RateLimiterBaseDelay: 1 * time.Second, RateLimiterMaxDelay: 2 * time.Second, RateLimiterQPS: 50, RateLimiterBucketSize: 500}},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
o := &Options{
RateLimiterBaseDelay: tt.fields.RateLimiterBaseDelay,
RateLimiterMaxDelay: tt.fields.RateLimiterMaxDelay,
RateLimiterQPS: tt.fields.RateLimiterQPS,
RateLimiterBucketSize: tt.fields.RateLimiterBucketSize,
}
if got := o.SetDefaults(); !reflect.DeepEqual(got, tt.want) {
t.Errorf("SetDefaults() = %v, want %v", got, tt.want)
}
})
}

t.Run("receiver is changed", func(t *testing.T) {
o := &Options{}
o.SetDefaults()
want := &Options{RateLimiterBaseDelay: 5 * time.Millisecond, RateLimiterMaxDelay: 1000 * time.Second, RateLimiterQPS: 10, RateLimiterBucketSize: 100}
if !reflect.DeepEqual(o, want) {
t.Errorf("SetDefaults() changed self to %v, want %v", o, want)
}
})
}
6 changes: 3 additions & 3 deletions pkg/util/fedinformer/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,13 @@ func NewHandlerOnEvents(addFunc func(obj interface{}), updateFunc func(oldObj, n
// coming in, ensuring the appropriate nested handler method is invoked.
//
// Note: An object that starts passing the filter after an update is considered an add, and
// an object that stops passing the filter after an update is considered a delete.
// an object that stops passing the filter after an update is considered a deletion.
// Like the handlers, the filter MUST NOT modify the objects it is given.
func NewFilteringHandlerOnAllEvents(filterFunc func(obj interface{}) bool, addFunc func(obj interface{}),
func NewFilteringHandlerOnAllEvents(filterFunc func(obj interface{}) bool, addFunc func(obj interface{}, isInitialList bool),
updateFunc func(oldObj, newObj interface{}), deleteFunc func(obj interface{})) cache.ResourceEventHandler {
return &cache.FilteringResourceEventHandler{
FilterFunc: filterFunc,
Handler: cache.ResourceEventHandlerFuncs{
Handler: cache.ResourceEventHandlerDetailedFuncs{
AddFunc: addFunc,
UpdateFunc: updateFunc,
DeleteFunc: deleteFunc,
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/fedinformer/handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func TestNewFilteringHandlerOnAllEvents(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
var addCalled, updateCalled, deleteCalled bool
addFunc := func(_ interface{}) { addCalled = true }
addFunc := func(_ interface{}, _ bool) { addCalled = true }
updateFunc := func(_, _ interface{}) { updateCalled = true }
deleteFunc := func(_ interface{}) { deleteCalled = true }

Expand Down
92 changes: 88 additions & 4 deletions pkg/util/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue"

"github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag"
)
Expand All @@ -46,6 +47,35 @@ type AsyncWorker interface {
Run(ctx context.Context, workerNumber int)
}

// LowPriority is the priority value for low priority items.
const LowPriority = -100

// AsyncPriorityWorker is an extension of AsyncWorker with priority queue support.
type AsyncPriorityWorker interface {
// EnqueueWithOpts generates the key of 'obj' according to a 'KeyFunc' then adds the key as an item to priority queue by 'AddWithOpts'.
EnqueueWithOpts(opts AddOpts, obj any)

// AddWithOpts adds items to the priority queue with options.
AddWithOpts(opts AddOpts, item ...any)

AsyncWorker
}

// AddOpts defines the options for adding items to priority queue.
type AddOpts struct {
After time.Duration
RateLimited bool
Priority *int
}

func (o *AddOpts) toPriorityQueueAddOpts() priorityqueue.AddOpts {
return priorityqueue.AddOpts{
After: o.After,
RateLimited: o.RateLimited,
Priority: o.Priority,
}
}

// QueueKey is the item key that stores in queue.
// The key could be arbitrary types.
//
Expand All @@ -61,6 +91,7 @@ type KeyFunc func(obj interface{}) (QueueKey, error)
type ReconcileFunc func(key QueueKey) error

type asyncWorker struct {
name string
// keyFunc is the function that make keys for API objects.
keyFunc KeyFunc
// reconcileFunc is the function that process keys from the queue.
Expand All @@ -77,16 +108,28 @@ type Options struct {
KeyFunc KeyFunc
ReconcileFunc ReconcileFunc
RateLimiterOptions ratelimiterflag.Options
UsePriorityQueue bool
}

// NewAsyncWorker returns a asyncWorker which can process resource periodic.
func NewAsyncWorker(opt Options) AsyncWorker {
func NewAsyncWorker(opt Options) AsyncPriorityWorker {
var queue workqueue.TypedRateLimitingInterface[any]
if opt.UsePriorityQueue {
rateLimiterOpts := opt.RateLimiterOptions.SetDefaults()
queue = priorityqueue.New[any](opt.Name, func(o *priorityqueue.Opts[any]) {
// change to controller-runtime priorityqueue default rateLimiter
o.RateLimiter = workqueue.NewTypedItemExponentialFailureRateLimiter[any](rateLimiterOpts.RateLimiterBaseDelay, rateLimiterOpts.RateLimiterMaxDelay)
})
} else {
queue = workqueue.NewTypedRateLimitingQueueWithConfig(ratelimiterflag.DefaultControllerRateLimiter[any](opt.RateLimiterOptions), workqueue.TypedRateLimitingQueueConfig[any]{
Name: opt.Name,
})
}
return &asyncWorker{
name: opt.Name,
keyFunc: opt.KeyFunc,
reconcileFunc: opt.ReconcileFunc,
queue: workqueue.NewTypedRateLimitingQueueWithConfig(ratelimiterflag.DefaultControllerRateLimiter[any](opt.RateLimiterOptions), workqueue.TypedRateLimitingQueueConfig[any]{
Name: opt.Name,
}),
queue: queue,
}
}

Expand All @@ -104,6 +147,20 @@ func (w *asyncWorker) Enqueue(obj interface{}) {
w.Add(key)
}

func (w *asyncWorker) EnqueueWithOpts(opts AddOpts, obj any) {
key, err := w.keyFunc(obj)
if err != nil {
klog.Errorf("Failed to generate key for obj: %+v, err: %v", obj, err)
return
}

if key == nil {
return
}

w.AddWithOpts(opts, key)
}

func (w *asyncWorker) Add(item interface{}) {
if item == nil {
klog.Warningf("Ignore nil item from queue")
Expand All @@ -122,6 +179,33 @@ func (w *asyncWorker) AddAfter(item interface{}, duration time.Duration) {
w.queue.AddAfter(item, duration)
}

func (w *asyncWorker) AddWithOpts(opts AddOpts, item ...any) {
if item == nil {
klog.Warningf("Ignore nil item from queue")
return
}

pq, ok := w.queue.(interface {
AddWithOpts(o priorityqueue.AddOpts, Items ...any)
})
if !ok {
klog.Warningf("queue is not priority queue, fallback to normal queue, queueName: %s", w.name)
for _, it := range item {
switch {
case opts.After > 0:
w.queue.AddAfter(it, opts.After)
case opts.RateLimited:
w.queue.AddRateLimited(it)
default:
w.queue.Add(it)
}
}
return
}

pq.AddWithOpts(opts.toPriorityQueueAddOpts(), item...)
}

func (w *asyncWorker) worker() {
key, quit := w.queue.Get()
if quit {
Expand Down
Loading