diff --git a/pkg/clusterdiscovery/clusterapi/clusterapi.go b/pkg/clusterdiscovery/clusterapi/clusterapi.go index 218409db84b8..76976f469758 100644 --- a/pkg/clusterdiscovery/clusterapi/clusterapi.go +++ b/pkg/clusterdiscovery/clusterapi/clusterapi.go @@ -34,6 +34,7 @@ import ( secretutil "sigs.k8s.io/cluster-api/util/secret" "sigs.k8s.io/controller-runtime/pkg/client" + "github.com/karmada-io/karmada/pkg/features" "github.com/karmada-io/karmada/pkg/karmadactl/join" "github.com/karmada-io/karmada/pkg/karmadactl/options" "github.com/karmada-io/karmada/pkg/karmadactl/unjoin" @@ -62,7 +63,7 @@ type ClusterDetector struct { ClusterAPIClient client.Client InformerManager genericmanager.SingleClusterInformerManager EventHandler cache.ResourceEventHandler - Processor util.AsyncWorker + Processor util.AsyncPriorityWorker ConcurrentReconciles int } @@ -72,9 +73,10 @@ func (d *ClusterDetector) Start(ctx context.Context) error { d.EventHandler = fedinformer.NewHandlerOnEvents(d.OnAdd, d.OnUpdate, d.OnDelete) workerOptions := util.Options{ - Name: "cluster-api cluster detector", - KeyFunc: ClusterWideKeyFunc, - ReconcileFunc: d.Reconcile, + Name: "cluster-api cluster detector", + KeyFunc: ClusterWideKeyFunc, + ReconcileFunc: d.Reconcile, + UsePriorityQueue: features.FeatureGate.Enabled(features.ControllerPriorityQueue), } d.Processor = util.NewAsyncWorker(workerOptions) d.Processor.Run(ctx, d.ConcurrentReconciles) @@ -97,17 +99,18 @@ func (d *ClusterDetector) discoveryCluster() { } // OnAdd handles object add event and push the object to queue. -func (d *ClusterDetector) OnAdd(obj interface{}) { +func (d *ClusterDetector) OnAdd(obj interface{}, isInInitialList bool) { runtimeObj, ok := obj.(runtime.Object) if !ok { return } - d.Processor.Enqueue(runtimeObj) + priority := util.ItemPriorityIfInInitialList(isInInitialList) + d.Processor.EnqueueWithOpts(util.AddOpts{Priority: priority}, runtimeObj) } // OnUpdate handles object update event and push the object to queue. func (d *ClusterDetector) OnUpdate(_, newObj interface{}) { - d.OnAdd(newObj) + d.OnAdd(newObj, false) } // OnDelete handles object delete event and push the object to queue. @@ -119,7 +122,7 @@ func (d *ClusterDetector) OnDelete(obj interface{}) { return } } - d.OnAdd(obj) + d.OnAdd(obj, false) } // Reconcile performs a full reconciliation for the object referred to by the key. diff --git a/pkg/controllers/hpascaletargetmarker/hpa_scale_target_marker_controller.go b/pkg/controllers/hpascaletargetmarker/hpa_scale_target_marker_controller.go index 35d5157ff164..e29731f475a6 100644 --- a/pkg/controllers/hpascaletargetmarker/hpa_scale_target_marker_controller.go +++ b/pkg/controllers/hpascaletargetmarker/hpa_scale_target_marker_controller.go @@ -26,6 +26,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/controller" + "github.com/karmada-io/karmada/pkg/features" "github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag" "github.com/karmada-io/karmada/pkg/util" ) @@ -42,15 +43,16 @@ type HpaScaleTargetMarker struct { DynamicClient dynamic.Interface RESTMapper meta.RESTMapper - scaleTargetWorker util.AsyncWorker + scaleTargetWorker util.AsyncPriorityWorker RateLimiterOptions ratelimiterflag.Options } // SetupWithManager creates a controller and register to controller manager. func (r *HpaScaleTargetMarker) SetupWithManager(mgr controllerruntime.Manager) error { scaleTargetWorkerOptions := util.Options{ - Name: "scale target worker", - ReconcileFunc: r.reconcileScaleRef, + Name: "scale target worker", + ReconcileFunc: r.reconcileScaleRef, + UsePriorityQueue: features.FeatureGate.Enabled(features.ControllerPriorityQueue), } r.scaleTargetWorker = util.NewAsyncWorker(scaleTargetWorkerOptions) r.scaleTargetWorker.Run(context.Background(), scaleTargetWorkerNum) diff --git a/pkg/controllers/hpascaletargetmarker/hpa_scale_target_marker_predicate.go b/pkg/controllers/hpascaletargetmarker/hpa_scale_target_marker_predicate.go index 124d504474bc..46c6e6505d2a 100644 --- a/pkg/controllers/hpascaletargetmarker/hpa_scale_target_marker_predicate.go +++ b/pkg/controllers/hpascaletargetmarker/hpa_scale_target_marker_predicate.go @@ -25,6 +25,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/predicate" policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1" + "github.com/karmada-io/karmada/pkg/util" ) var _ predicate.Predicate = &HpaScaleTargetMarker{} @@ -40,7 +41,8 @@ func (r *HpaScaleTargetMarker) Create(e event.CreateEvent) bool { // if hpa exist and has been propagated, add label to its scale ref resource if hasBeenPropagated(hpa) { - r.scaleTargetWorker.Add(labelEvent{addLabelEvent, hpa}) + priority := util.ItemPriorityIfInInitialList(e.IsInInitialList) + r.scaleTargetWorker.AddWithOpts(util.AddOpts{Priority: priority}, labelEvent{addLabelEvent, hpa}) } return false diff --git a/pkg/controllers/mcs/service_export_controller.go b/pkg/controllers/mcs/service_export_controller.go index fea9c636700b..364bdfe5caf2 100644 --- a/pkg/controllers/mcs/service_export_controller.go +++ b/pkg/controllers/mcs/service_export_controller.go @@ -50,6 +50,7 @@ import ( clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1" "github.com/karmada-io/karmada/pkg/controllers/ctrlutil" + "github.com/karmada-io/karmada/pkg/features" "github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag" "github.com/karmada-io/karmada/pkg/util" "github.com/karmada-io/karmada/pkg/util/fedinformer" @@ -84,7 +85,7 @@ type ServiceExportController struct { // "member1": instance of ResourceEventHandler eventHandlers sync.Map // worker process resources periodic from rateLimitingQueue. - worker util.AsyncWorker + worker util.AsyncPriorityWorker RateLimiterOptions ratelimiterflag.Options // epsWorkListFunc is used to mock the work list provider to return a specific work list, @@ -161,9 +162,10 @@ func (c *ServiceExportController) SetupWithManager(mgr controllerruntime.Manager // RunWorkQueue initializes worker and run it, worker will process resource asynchronously. func (c *ServiceExportController) RunWorkQueue() { workerOptions := util.Options{ - Name: "service-export", - KeyFunc: nil, - ReconcileFunc: c.syncServiceExportOrEndpointSlice, + Name: "service-export", + KeyFunc: nil, + ReconcileFunc: c.syncServiceExportOrEndpointSlice, + UsePriorityQueue: features.FeatureGate.Enabled(features.ControllerPriorityQueue), } c.worker = util.NewAsyncWorker(workerOptions) c.worker.Run(c.Context, c.WorkerNumber) @@ -318,15 +320,16 @@ func (c *ServiceExportController) getEventHandler(clusterName string) cache.Reso return eventHandler } -func (c *ServiceExportController) genHandlerAddFunc(clusterName string) func(obj interface{}) { - return func(obj interface{}) { +func (c *ServiceExportController) genHandlerAddFunc(clusterName string) func(obj interface{}, isInInitialList bool) { + return func(obj interface{}, isInInitialList bool) { curObj := obj.(runtime.Object) key, err := keys.FederatedKeyFunc(clusterName, curObj) if err != nil { klog.ErrorS(err, "Failed to generate key for obj", "gvk", curObj.GetObjectKind().GroupVersionKind()) return } - c.worker.Add(key) + priority := util.ItemPriorityIfInInitialList(isInInitialList) + c.worker.AddWithOpts(util.AddOpts{Priority: priority}, key) } } diff --git a/pkg/controllers/multiclusterservice/endpointslice_collect_controller.go b/pkg/controllers/multiclusterservice/endpointslice_collect_controller.go index 307cd17bab17..eb735f57b504 100644 --- a/pkg/controllers/multiclusterservice/endpointslice_collect_controller.go +++ b/pkg/controllers/multiclusterservice/endpointslice_collect_controller.go @@ -46,6 +46,7 @@ import ( networkingv1alpha1 "github.com/karmada-io/karmada/pkg/apis/networking/v1alpha1" workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1" "github.com/karmada-io/karmada/pkg/controllers/ctrlutil" + "github.com/karmada-io/karmada/pkg/features" "github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag" "github.com/karmada-io/karmada/pkg/util" "github.com/karmada-io/karmada/pkg/util/fedinformer" @@ -69,7 +70,7 @@ type EndpointSliceCollectController struct { // Each handler takes the cluster name as key and takes the handler function as the value, e.g. // "member1": instance of ResourceEventHandler eventHandlers sync.Map - worker util.AsyncWorker // worker process resources periodic from rateLimitingQueue. + worker util.AsyncPriorityWorker // worker process resources periodic from rateLimitingQueue. ClusterCacheSyncTimeout metav1.Duration RateLimiterOptions ratelimiterflag.Options @@ -133,9 +134,10 @@ func (c *EndpointSliceCollectController) SetupWithManager(mgr controllerruntime. // RunWorkQueue initializes worker and run it, worker will process resource asynchronously. func (c *EndpointSliceCollectController) RunWorkQueue() { workerOptions := util.Options{ - Name: "endpointslice-collect", - KeyFunc: nil, - ReconcileFunc: c.collectEndpointSlice, + Name: "endpointslice-collect", + KeyFunc: nil, + ReconcileFunc: c.collectEndpointSlice, + UsePriorityQueue: features.FeatureGate.Enabled(features.ControllerPriorityQueue), } c.worker = util.NewAsyncWorker(workerOptions) c.worker.Run(c.Context, c.WorkerNumber) @@ -242,15 +244,16 @@ func (c *EndpointSliceCollectController) getEventHandler(clusterName string) cac return eventHandler } -func (c *EndpointSliceCollectController) genHandlerAddFunc(clusterName string) func(obj interface{}) { - return func(obj interface{}) { +func (c *EndpointSliceCollectController) genHandlerAddFunc(clusterName string) func(obj interface{}, isInInitialList bool) { + return func(obj interface{}, isInInitialList bool) { curObj := obj.(runtime.Object) key, err := keys.FederatedKeyFunc(clusterName, curObj) if err != nil { klog.ErrorS(err, "Failed to generate key for obj", "gvk", curObj.GetObjectKind().GroupVersionKind()) return } - c.worker.Add(key) + priority := util.ItemPriorityIfInInitialList(isInInitialList) + c.worker.AddWithOpts(util.AddOpts{Priority: priority}, key) } } diff --git a/pkg/controllers/multiclusterservice/endpointslice_collect_controller_test.go b/pkg/controllers/multiclusterservice/endpointslice_collect_controller_test.go index b8a6a3a1f258..6eaa3e7144f6 100644 --- a/pkg/controllers/multiclusterservice/endpointslice_collect_controller_test.go +++ b/pkg/controllers/multiclusterservice/endpointslice_collect_controller_test.go @@ -74,7 +74,7 @@ func TestGetEventHandler(t *testing.T) { assert.True(t, exists, "Handler should be stored in eventHandlers") assert.Equal(t, handler, storedHandler, "Stored handler should match returned handler") if !tc.existingHandler { - assert.IsType(t, &cache.ResourceEventHandlerFuncs{}, handler, "New handler should be of type *cache.ResourceEventHandlerFuncs") + assert.IsType(t, &cache.ResourceEventHandlerDetailedFuncs{}, handler, "New handler should be of type *cache.ResourceEventHandlerDetailedFuncs") } else { assert.IsType(t, &mockResourceEventHandler{}, handler, "Existing handler should be of type *mockResourceEventHandler") } @@ -92,7 +92,7 @@ func TestGenHandlerFuncs(t *testing.T) { worker: mockWorker, } addFunc := controller.genHandlerAddFunc(clusterName) - addFunc(testObj) + addFunc(testObj, false) assert.Equal(t, 1, mockWorker.addCount, "Add function should be called once") }) @@ -448,6 +448,15 @@ func (m *mockAsyncWorker) AddAfter(_ interface{}, _ time.Duration) {} func (m *mockAsyncWorker) Enqueue(_ interface{}) {} +func (m *mockAsyncWorker) AddWithOpts(_ util.AddOpts, items ...any) { + for _, item := range items { + m.Add(item) + } +} +func (m *mockAsyncWorker) EnqueueWithOpts(_ util.AddOpts, item any) { + m.Enqueue(item) +} + func (m *mockAsyncWorker) Run(_ context.Context, _ int) {} type mockResourceEventHandler struct{} diff --git a/pkg/controllers/status/work_status_controller.go b/pkg/controllers/status/work_status_controller.go index f7174cbc5506..c9640a64f17f 100644 --- a/pkg/controllers/status/work_status_controller.go +++ b/pkg/controllers/status/work_status_controller.go @@ -26,6 +26,7 @@ import ( "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" @@ -42,6 +43,7 @@ import ( workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1" workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" "github.com/karmada-io/karmada/pkg/events" + "github.com/karmada-io/karmada/pkg/features" "github.com/karmada-io/karmada/pkg/metrics" "github.com/karmada-io/karmada/pkg/resourceinterpreter" "github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag" @@ -66,7 +68,7 @@ type WorkStatusController struct { InformerManager genericmanager.MultiClusterInformerManager eventHandler cache.ResourceEventHandler // eventHandler knows how to handle events from the member cluster. Context context.Context - worker util.AsyncWorker // worker process resources periodic from rateLimitingQueue. + worker util.AsyncPriorityWorker // worker process resources periodic from rateLimitingQueue. // ConcurrentWorkStatusSyncs is the number of Work status that are allowed to sync concurrently. ConcurrentWorkStatusSyncs int ObjectWatcher objectwatcher.ObjectWatcher @@ -137,7 +139,7 @@ func (c *WorkStatusController) buildResourceInformers(cluster *clusterv1alpha1.C // getEventHandler return callback function that knows how to handle events from the member cluster. func (c *WorkStatusController) getEventHandler() cache.ResourceEventHandler { if c.eventHandler == nil { - c.eventHandler = fedinformer.NewHandlerOnAllEvents(c.worker.Enqueue) + c.eventHandler = fedinformer.NewHandlerOnEvents(c.onAdd, c.onUpdate, c.onDelete) } return c.eventHandler } @@ -145,9 +147,10 @@ func (c *WorkStatusController) getEventHandler() cache.ResourceEventHandler { // RunWorkQueue initializes worker and run it, worker will process resource asynchronously. func (c *WorkStatusController) RunWorkQueue() { workerOptions := util.Options{ - Name: "work-status", - KeyFunc: generateKey, - ReconcileFunc: c.syncWorkStatus, + Name: "work-status", + KeyFunc: generateKey, + ReconcileFunc: c.syncWorkStatus, + UsePriorityQueue: features.FeatureGate.Enabled(features.ControllerPriorityQueue), } c.worker = util.NewAsyncWorker(workerOptions) c.worker.Run(c.Context, c.ConcurrentWorkStatusSyncs) @@ -565,3 +568,28 @@ func (c *WorkStatusController) eventf(object *unstructured.Unstructured, eventTy } c.EventRecorder.Eventf(ref, eventType, reason, messageFmt, args...) } + +func (c *WorkStatusController) onAdd(obj any, isInInitialList bool) { + curObj := obj.(runtime.Object) + priority := util.ItemPriorityIfInInitialList(isInInitialList) + c.worker.EnqueueWithOpts(util.AddOpts{Priority: priority}, curObj) +} + +func (c *WorkStatusController) onUpdate(old, cur any) { + curObj := cur.(runtime.Object) + if !reflect.DeepEqual(old, cur) { + c.worker.Enqueue(curObj) + } +} + +func (c *WorkStatusController) onDelete(old any) { + if deleted, ok := old.(cache.DeletedFinalStateUnknown); ok { + // This object might be stale but ok for our current usage. + old = deleted.Obj + if old == nil { + return + } + } + oldObj := old.(runtime.Object) + c.worker.Enqueue(oldObj) +} diff --git a/pkg/controllers/status/work_status_controller_test.go b/pkg/controllers/status/work_status_controller_test.go index 25f354fc2fcd..cbd5cf4209f5 100644 --- a/pkg/controllers/status/work_status_controller_test.go +++ b/pkg/controllers/status/work_status_controller_test.go @@ -36,6 +36,7 @@ import ( "k8s.io/apimachinery/pkg/util/uuid" dynamicfake "k8s.io/client-go/dynamic/fake" "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/utils/ptr" controllerruntime "sigs.k8s.io/controller-runtime" @@ -1096,3 +1097,113 @@ func TestWorkStatusController_interpretHealth(t *testing.T) { }) } } + +type TestObject struct { + metav1.TypeMeta + metav1.ObjectMeta + Spec string +} + +func (in *TestObject) DeepCopyObject() runtime.Object { + return &TestObject{ + TypeMeta: in.TypeMeta, + ObjectMeta: in.ObjectMeta, + Spec: in.Spec, + } +} + +// mockAsyncWorker implements util.AsyncPriorityWorker for testing onAdd/onUpdate/onDelete behavior. +type mockAsyncWorker struct { + receivedObjs []any // objects passed via EnqueueWithOpts (onAdd) + receivedPriorities []*int // priorities captured from EnqueueWithOpts + enqueuedObjs []any // objects passed via Enqueue (onUpdate/onDelete) +} + +func (m *mockAsyncWorker) EnqueueWithOpts(opts util.AddOpts, obj any) { // capture inputs for onAdd + m.receivedObjs = append(m.receivedObjs, obj) + m.receivedPriorities = append(m.receivedPriorities, opts.Priority) +} +func (m *mockAsyncWorker) AddWithOpts(_ util.AddOpts, _ ...any) {} +func (m *mockAsyncWorker) Add(_ interface{}) {} +func (m *mockAsyncWorker) AddAfter(_ interface{}, _ time.Duration) {} +func (m *mockAsyncWorker) Enqueue(obj interface{}) { // capture inputs for onUpdate/onDelete + m.enqueuedObjs = append(m.enqueuedObjs, obj) +} +func (m *mockAsyncWorker) Run(_ context.Context, _ int) {} + +func TestWorkStatusController_onAdd(t *testing.T) { + cluster := newCluster("cluster", clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue) + c := newWorkStatusController(cluster) + mockWorker := &mockAsyncWorker{} + c.worker = mockWorker + + obj := &TestObject{ObjectMeta: metav1.ObjectMeta{Name: "test-obj"}} + + t.Run("in initial list -> low priority", func(t *testing.T) { + c.onAdd(obj, true) + assert.Equal(t, 1, len(mockWorker.receivedObjs)) + assert.Equal(t, obj, mockWorker.receivedObjs[0]) + if assert.NotNil(t, mockWorker.receivedPriorities[0]) { + assert.Equal(t, util.LowPriority, *mockWorker.receivedPriorities[0]) + } + }) + + t.Run("not in initial list -> nil priority", func(t *testing.T) { + c.onAdd(obj, false) + assert.Equal(t, 2, len(mockWorker.receivedObjs)) + assert.Equal(t, obj, mockWorker.receivedObjs[1]) + assert.Nil(t, mockWorker.receivedPriorities[1]) + }) +} + +func TestWorkStatusController_onUpdate(t *testing.T) { + cluster := newCluster("cluster", clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue) + c := newWorkStatusController(cluster) + mockWorker := &mockAsyncWorker{} + c.worker = mockWorker + + oldObj := &TestObject{ObjectMeta: metav1.ObjectMeta{Name: "test-obj"}, Spec: "same"} + // Deep copy same content + curSame := oldObj.DeepCopyObject().(*TestObject) + // Different spec + curDiff := &TestObject{ObjectMeta: metav1.ObjectMeta{Name: "test-obj"}, Spec: "different"} + + t.Run("objects equal -> no enqueue", func(t *testing.T) { + c.onUpdate(oldObj, curSame) + assert.Equal(t, 0, len(mockWorker.enqueuedObjs)) + }) + + t.Run("objects differ -> enqueue", func(t *testing.T) { + c.onUpdate(oldObj, curDiff) + assert.Equal(t, 1, len(mockWorker.enqueuedObjs)) + assert.Equal(t, curDiff, mockWorker.enqueuedObjs[0]) + }) +} + +func TestWorkStatusController_onDelete(t *testing.T) { + cluster := newCluster("cluster", clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue) + c := newWorkStatusController(cluster) + mockWorker := &mockAsyncWorker{} + c.worker = mockWorker + + obj := &TestObject{ObjectMeta: metav1.ObjectMeta{Name: "to-delete"}} + + t.Run("direct object -> enqueue", func(t *testing.T) { + c.onDelete(obj) + assert.Equal(t, 1, len(mockWorker.enqueuedObjs)) + assert.Equal(t, obj, mockWorker.enqueuedObjs[0]) + }) + + t.Run("DeletedFinalStateUnknown wrapper -> enqueue", func(t *testing.T) { + wrapped := cache.DeletedFinalStateUnknown{Obj: &TestObject{ObjectMeta: metav1.ObjectMeta{Name: "wrapped"}}} + c.onDelete(wrapped) + assert.Equal(t, 2, len(mockWorker.enqueuedObjs)) + assert.Equal(t, "wrapped", mockWorker.enqueuedObjs[1].(*TestObject).Name) + }) + + t.Run("DeletedFinalStateUnknown with nil Obj -> no additional enqueue", func(t *testing.T) { + wrappedNil := cache.DeletedFinalStateUnknown{Obj: nil} + c.onDelete(wrappedNil) + assert.Equal(t, 2, len(mockWorker.enqueuedObjs)) // unchanged + }) +} diff --git a/pkg/dependenciesdistributor/dependencies_distributor.go b/pkg/dependenciesdistributor/dependencies_distributor.go index c4240fe7ed37..4014fe68fb8f 100644 --- a/pkg/dependenciesdistributor/dependencies_distributor.go +++ b/pkg/dependenciesdistributor/dependencies_distributor.go @@ -51,6 +51,7 @@ import ( configv1alpha1 "github.com/karmada-io/karmada/pkg/apis/config/v1alpha1" workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" "github.com/karmada-io/karmada/pkg/events" + "github.com/karmada-io/karmada/pkg/features" "github.com/karmada-io/karmada/pkg/resourceinterpreter" "github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag" "github.com/karmada-io/karmada/pkg/util" @@ -102,7 +103,7 @@ type DependenciesDistributor struct { RateLimiterOptions ratelimiterflag.Options eventHandler cache.ResourceEventHandler - resourceProcessor util.AsyncWorker + resourceProcessor util.AsyncPriorityWorker genericEvent chan event.TypedGenericEvent[*workv1alpha2.ResourceBinding] // ConcurrentDependentResourceSyncs is the number of dependent resource that are allowed to sync concurrently. ConcurrentDependentResourceSyncs int @@ -119,12 +120,13 @@ func (d *DependenciesDistributor) NeedLeaderElection() bool { } // OnAdd handles object add event and push the object to queue. -func (d *DependenciesDistributor) OnAdd(obj interface{}) { +func (d *DependenciesDistributor) OnAdd(obj interface{}, isInInitialList bool) { runtimeObj, ok := obj.(runtime.Object) if !ok { return } - d.resourceProcessor.Enqueue(runtimeObj) + priority := util.ItemPriorityIfInInitialList(isInInitialList) + d.resourceProcessor.EnqueueWithOpts(util.AddOpts{Priority: priority}, runtimeObj) } // OnUpdate handles object update event and push the object to queue. @@ -146,9 +148,9 @@ func (d *DependenciesDistributor) OnUpdate(oldObj, newObj interface{}) { return } if !equality.Semantic.DeepEqual(unstructuredOldObj.GetLabels(), unstructuredNewObj.GetLabels()) { - d.OnAdd(oldObj) + d.OnAdd(oldObj, false) } - d.OnAdd(newObj) + d.OnAdd(newObj, false) } // OnDelete handles object delete event and push the object to queue. @@ -160,7 +162,7 @@ func (d *DependenciesDistributor) OnDelete(obj interface{}) { return } } - d.OnAdd(obj) + d.OnAdd(obj, false) } // reconcileResourceTemplate coordinates resources that may need to be distributed, such as Configmap, Service, etc. @@ -663,6 +665,7 @@ func (d *DependenciesDistributor) Start(ctx context.Context) error { }, ReconcileFunc: d.reconcileResourceTemplate, RateLimiterOptions: d.RateLimiterOptions, + UsePriorityQueue: features.FeatureGate.Enabled(features.ControllerPriorityQueue), } d.eventHandler = fedinformer.NewHandlerOnEvents(d.OnAdd, d.OnUpdate, d.OnDelete) d.resourceProcessor = util.NewAsyncWorker(resourceWorkerOptions) diff --git a/pkg/dependenciesdistributor/dependencies_distributor_test.go b/pkg/dependenciesdistributor/dependencies_distributor_test.go index 6cfde4af8b32..0af9d210d87f 100644 --- a/pkg/dependenciesdistributor/dependencies_distributor_test.go +++ b/pkg/dependenciesdistributor/dependencies_distributor_test.go @@ -73,6 +73,18 @@ func (m *MockAsyncWorker) Enqueue(obj interface{}) { m.queue = append(m.queue, obj) } +// Note: This is a dummy implementation of AddWithOpts for testing purposes. +func (m *MockAsyncWorker) AddWithOpts(_ util.AddOpts, items ...any) { + for _, item := range items { + m.Add(item) + } +} + +// Note: This is a dummy implementation of EnqueueWithOpts for testing purposes. +func (m *MockAsyncWorker) EnqueueWithOpts(_ util.AddOpts, item any) { + m.Enqueue(item) +} + // Note: This is a dummy implementation of Run for testing purposes. func (m *MockAsyncWorker) Run(ctx context.Context, workerNumber int) { // No actual work is done in the mock; we just simulate running diff --git a/pkg/detector/detector.go b/pkg/detector/detector.go index fe6db79eba87..22a8ea9f8718 100644 --- a/pkg/detector/detector.go +++ b/pkg/detector/detector.go @@ -83,11 +83,11 @@ type ResourceDetector struct { EventRecorder record.EventRecorder // policyReconcileWorker maintains a rate limited queue which used to store PropagationPolicy's key and // a reconcile function to consume the items in queue. - policyReconcileWorker util.AsyncWorker + policyReconcileWorker util.AsyncPriorityWorker // clusterPolicyReconcileWorker maintains a rate limited queue which used to store ClusterPropagationPolicy's key and // a reconcile function to consume the items in queue. - clusterPolicyReconcileWorker util.AsyncWorker + clusterPolicyReconcileWorker util.AsyncPriorityWorker RESTMapper meta.RESTMapper @@ -119,6 +119,7 @@ func (d *ResourceDetector) Start(ctx context.Context) error { KeyFunc: NamespacedKeyFunc, ReconcileFunc: d.ReconcilePropagationPolicy, RateLimiterOptions: d.RateLimiterOptions, + UsePriorityQueue: features.FeatureGate.Enabled(features.ControllerPriorityQueue), } d.policyReconcileWorker = util.NewAsyncWorker(policyWorkerOptions) d.policyReconcileWorker.Run(ctx, d.ConcurrentPropagationPolicySyncs) @@ -127,6 +128,7 @@ func (d *ResourceDetector) Start(ctx context.Context) error { KeyFunc: NamespacedKeyFunc, ReconcileFunc: d.ReconcileClusterPropagationPolicy, RateLimiterOptions: d.RateLimiterOptions, + UsePriorityQueue: features.FeatureGate.Enabled(features.ControllerPriorityQueue), } d.clusterPolicyReconcileWorker = util.NewAsyncWorker(clusterPolicyWorkerOptions) d.clusterPolicyReconcileWorker.Run(ctx, d.ConcurrentClusterPropagationPolicySyncs) @@ -311,16 +313,12 @@ func (d *ResourceDetector) EventFilter(obj interface{}) bool { } // OnAdd handles object add event and push the object to queue. -func (d *ResourceDetector) OnAdd(obj interface{}, isInitialList bool) { +func (d *ResourceDetector) OnAdd(obj interface{}, isInInitialList bool) { runtimeObj, ok := obj.(runtime.Object) if !ok { return } - - var priority *int - if isInitialList { - priority = ptr.To(util.LowPriority) - } + priority := util.ItemPriorityIfInInitialList(isInInitialList) d.Processor.EnqueueWithOpts(util.AddOpts{Priority: priority}, ResourceItem{Obj: runtimeObj}) } @@ -958,8 +956,9 @@ func (d *ResourceDetector) GetMatching(resourceSelectors []policyv1alpha1.Resour } // OnPropagationPolicyAdd handles object add event and push the object to queue. -func (d *ResourceDetector) OnPropagationPolicyAdd(obj interface{}) { - d.policyReconcileWorker.Enqueue(obj) +func (d *ResourceDetector) OnPropagationPolicyAdd(obj interface{}, isInInitialList bool) { + priority := util.ItemPriorityIfInInitialList(isInInitialList) + d.policyReconcileWorker.EnqueueWithOpts(util.AddOpts{Priority: priority}, obj) } // OnPropagationPolicyUpdate handles object update event and push the object to queue. @@ -1029,8 +1028,9 @@ func (d *ResourceDetector) ReconcilePropagationPolicy(key util.QueueKey) error { } // OnClusterPropagationPolicyAdd handles object add event and push the object to queue. -func (d *ResourceDetector) OnClusterPropagationPolicyAdd(obj interface{}) { - d.clusterPolicyReconcileWorker.Enqueue(obj) +func (d *ResourceDetector) OnClusterPropagationPolicyAdd(obj interface{}, isInInitialList bool) { + priority := util.ItemPriorityIfInInitialList(isInInitialList) + d.clusterPolicyReconcileWorker.EnqueueWithOpts(util.AddOpts{Priority: priority}, obj) } // OnClusterPropagationPolicyUpdate handles object update event and push the object to queue. diff --git a/pkg/detector/detector_test.go b/pkg/detector/detector_test.go index f82d50613842..bb540b62b076 100644 --- a/pkg/detector/detector_test.go +++ b/pkg/detector/detector_test.go @@ -366,7 +366,7 @@ func TestOnAdd(t *testing.T) { name string obj interface{} expectedEnqueue bool - isInitialList bool + isInInitialList bool }{ { name: "valid unstructured object", @@ -395,7 +395,7 @@ func TestOnAdd(t *testing.T) { }, }, expectedEnqueue: true, - isInitialList: true, + isInInitialList: true, }, { name: "invalid unstructured object", @@ -426,7 +426,7 @@ func TestOnAdd(t *testing.T) { d := &ResourceDetector{ Processor: mockProcessor, } - d.OnAdd(tt.obj, tt.isInitialList) + d.OnAdd(tt.obj, tt.isInInitialList) if tt.expectedEnqueue { assert.Equal(t, 1, mockProcessor.enqueueCount, "Object should be enqueued") assert.IsType(t, ResourceItem{}, mockProcessor.lastEnqueued, "Enqueued item should be of type ResourceItem") @@ -1072,7 +1072,11 @@ func (m *mockAsyncWorker) Add(_ interface{}) { m.enqueueCount++ } func (m *mockAsyncWorker) AddAfter(_ interface{}, _ time.Duration) {} -func (m *mockAsyncWorker) AddWithOpts(_ util.AddOpts, _ ...any) {} +func (m *mockAsyncWorker) AddWithOpts(_ util.AddOpts, items ...any) { + for _, item := range items { + m.Add(item) + } +} func (m *mockAsyncWorker) EnqueueWithOpts(_ util.AddOpts, item any) { m.Enqueue(item) } diff --git a/pkg/detector/preemption_test.go b/pkg/detector/preemption_test.go index 4830dbc97de5..8628538a5e3b 100644 --- a/pkg/detector/preemption_test.go +++ b/pkg/detector/preemption_test.go @@ -34,6 +34,7 @@ import ( policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1" "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" + "github.com/karmada-io/karmada/pkg/util" "github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager" ) @@ -60,6 +61,18 @@ func (m *MockAsyncWorker) Enqueue(obj interface{}) { m.queue = append(m.queue, obj) } +// Note: This is a dummy implementation of AddWithOpts for testing purposes. +func (m *MockAsyncWorker) AddWithOpts(_ util.AddOpts, items ...any) { + for _, item := range items { + m.Add(item) + } +} + +// Note: This is a dummy implementation of EnqueueWithOpts for testing purposes. +func (m *MockAsyncWorker) EnqueueWithOpts(_ util.AddOpts, item any) { + m.Enqueue(item) +} + // Note: This is a dummy implementation of Run for testing purposes. func (m *MockAsyncWorker) Run(ctx context.Context, workerNumber int) { // No actual work is done in the mock; we just simulate running diff --git a/pkg/resourceinterpreter/customized/declarative/configmanager/manager.go b/pkg/resourceinterpreter/customized/declarative/configmanager/manager.go index bbaca0f9e3f9..642c8c798cf8 100644 --- a/pkg/resourceinterpreter/customized/declarative/configmanager/manager.go +++ b/pkg/resourceinterpreter/customized/declarative/configmanager/manager.go @@ -83,7 +83,7 @@ func NewInterpreterConfigManager(informer genericmanager.SingleClusterInformerMa manager.informer = informer manager.lister = informer.Lister(util.ResourceInterpreterCustomizationsGVR) configHandlers := fedinformer.NewHandlerOnEvents( - func(_ interface{}) { _ = manager.updateConfiguration() }, + func(_ interface{}, _ bool) { _ = manager.updateConfiguration() }, func(_, _ interface{}) { _ = manager.updateConfiguration() }, func(_ interface{}) { _ = manager.updateConfiguration() }) informer.ForResource(util.ResourceInterpreterCustomizationsGVR, configHandlers) diff --git a/pkg/resourceinterpreter/customized/webhook/configmanager/manager.go b/pkg/resourceinterpreter/customized/webhook/configmanager/manager.go index 0c67e99ee7a1..a9caf51799ce 100644 --- a/pkg/resourceinterpreter/customized/webhook/configmanager/manager.go +++ b/pkg/resourceinterpreter/customized/webhook/configmanager/manager.go @@ -82,7 +82,7 @@ func NewExploreConfigManager(inform genericmanager.SingleClusterInformerManager) manager.informer = inform configHandlers := fedinformer.NewHandlerOnEvents( - func(_ interface{}) { _ = manager.updateConfiguration() }, + func(_ interface{}, _ bool) { _ = manager.updateConfiguration() }, func(_, _ interface{}) { _ = manager.updateConfiguration() }, func(_ interface{}) { _ = manager.updateConfiguration() }) inform.ForResource(util.ResourceInterpreterWebhookConfigurationsGVR, configHandlers) diff --git a/pkg/util/fedinformer/handlers.go b/pkg/util/fedinformer/handlers.go index b1d915b6de55..7d30908a4678 100644 --- a/pkg/util/fedinformer/handlers.go +++ b/pkg/util/fedinformer/handlers.go @@ -17,42 +17,12 @@ limitations under the License. package fedinformer import ( - "reflect" - - "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/tools/cache" ) -// NewHandlerOnAllEvents builds a ResourceEventHandler that the function 'fn' will be called on all events(add/update/delete). -func NewHandlerOnAllEvents(fn func(interface{})) cache.ResourceEventHandler { - return &cache.ResourceEventHandlerFuncs{ - AddFunc: func(cur interface{}) { - curObj := cur.(runtime.Object) - fn(curObj) - }, - UpdateFunc: func(old, cur interface{}) { - curObj := cur.(runtime.Object) - if !reflect.DeepEqual(old, cur) { - fn(curObj) - } - }, - DeleteFunc: func(old interface{}) { - if deleted, ok := old.(cache.DeletedFinalStateUnknown); ok { - // This object might be stale but ok for our current usage. - old = deleted.Obj - if old == nil { - return - } - } - oldObj := old.(runtime.Object) - fn(oldObj) - }, - } -} - // NewHandlerOnEvents builds a ResourceEventHandler. -func NewHandlerOnEvents(addFunc func(obj interface{}), updateFunc func(oldObj, newObj interface{}), deleteFunc func(obj interface{})) cache.ResourceEventHandler { - return &cache.ResourceEventHandlerFuncs{ +func NewHandlerOnEvents(addFunc func(obj interface{}, isInInitialList bool), updateFunc func(oldObj, newObj interface{}), deleteFunc func(obj interface{})) cache.ResourceEventHandler { + return &cache.ResourceEventHandlerDetailedFuncs{ AddFunc: addFunc, UpdateFunc: updateFunc, DeleteFunc: deleteFunc, @@ -65,7 +35,7 @@ func NewHandlerOnEvents(addFunc func(obj interface{}), updateFunc func(oldObj, n // Note: An object that starts passing the filter after an update is considered an add, and // an object that stops passing the filter after an update is considered a deletion. // Like the handlers, the filter MUST NOT modify the objects it is given. -func NewFilteringHandlerOnAllEvents(filterFunc func(obj interface{}) bool, addFunc func(obj interface{}, isInitialList bool), +func NewFilteringHandlerOnAllEvents(filterFunc func(obj interface{}) bool, addFunc func(obj interface{}, isInInitialList bool), updateFunc func(oldObj, newObj interface{}), deleteFunc func(obj interface{})) cache.ResourceEventHandler { return &cache.FilteringResourceEventHandler{ FilterFunc: filterFunc, diff --git a/pkg/util/fedinformer/handlers_test.go b/pkg/util/fedinformer/handlers_test.go index 267705765eb4..a2dc2879bc07 100644 --- a/pkg/util/fedinformer/handlers_test.go +++ b/pkg/util/fedinformer/handlers_test.go @@ -17,7 +17,6 @@ limitations under the License. package fedinformer import ( - "reflect" "testing" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -59,65 +58,6 @@ func (c *CustomResourceEventHandler) OnDelete(obj interface{}) { c.handler.OnDelete(obj) } -func TestNewHandlerOnAllEvents(t *testing.T) { - testCases := []struct { - name string - event string - input interface{} - expected runtime.Object - }{ - { - name: "Add event", - event: "add", - input: &TestObject{ObjectMeta: metav1.ObjectMeta{Name: "test-obj-add"}, Spec: "add"}, - expected: &TestObject{ObjectMeta: metav1.ObjectMeta{Name: "test-obj-add"}, Spec: "add"}, - }, - { - name: "Update event", - event: "update", - input: &TestObject{ObjectMeta: metav1.ObjectMeta{Name: "test-obj-update"}, Spec: "update"}, - expected: &TestObject{ObjectMeta: metav1.ObjectMeta{Name: "test-obj-update"}, Spec: "update"}, - }, - { - name: "Delete event", - event: "delete", - input: &TestObject{ObjectMeta: metav1.ObjectMeta{Name: "test-obj-delete"}, Spec: "delete"}, - expected: &TestObject{ObjectMeta: metav1.ObjectMeta{Name: "test-obj-delete"}, Spec: "delete"}, - }, - { - name: "Delete event with DeletedFinalStateUnknown", - event: "delete", - input: cache.DeletedFinalStateUnknown{Obj: &TestObject{ObjectMeta: metav1.ObjectMeta{Name: "test-obj-delete-unknown"}, Spec: "delete-unknown"}}, - expected: &TestObject{ObjectMeta: metav1.ObjectMeta{Name: "test-obj-delete-unknown"}, Spec: "delete-unknown"}, - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - var calledWith runtime.Object - fn := func(obj interface{}) { - calledWith = obj.(runtime.Object) - } - - handler := &CustomResourceEventHandler{NewHandlerOnAllEvents(fn)} - - switch tc.event { - case "add": - handler.OnAdd(tc.input, false) - case "update": - oldObj := &TestObject{ObjectMeta: metav1.ObjectMeta{Name: "old-obj"}, Spec: "old"} - handler.OnUpdate(oldObj, tc.input) - case "delete": - handler.OnDelete(tc.input) - } - - if !reflect.DeepEqual(calledWith, tc.expected) { - t.Errorf("expected %v, got %v", tc.expected, calledWith) - } - }) - } -} - func TestNewHandlerOnEvents(t *testing.T) { testCases := []struct { name string @@ -131,7 +71,7 @@ func TestNewHandlerOnEvents(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { var addCalled, updateCalled, deleteCalled bool - addFunc := func(_ interface{}) { addCalled = true } + addFunc := func(_ interface{}, _ bool) { addCalled = true } updateFunc := func(_, _ interface{}) { updateCalled = true } deleteFunc := func(_ interface{}) { deleteCalled = true } diff --git a/pkg/util/worker.go b/pkg/util/worker.go index d7a2e0da0a0b..b7520cafa75b 100644 --- a/pkg/util/worker.go +++ b/pkg/util/worker.go @@ -24,6 +24,7 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" + "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue" "github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag" @@ -192,10 +193,10 @@ func (w *asyncWorker) AddWithOpts(opts AddOpts, item ...any) { klog.Warningf("queue is not priority queue, fallback to normal queue, queueName: %s", w.name) for _, it := range item { switch { - case opts.After > 0: - w.queue.AddAfter(it, opts.After) case opts.RateLimited: w.queue.AddRateLimited(it) + case opts.After > 0: + w.queue.AddAfter(it, opts.After) default: w.queue.Add(it) } @@ -242,3 +243,11 @@ func MetaNamespaceKeyFunc(obj interface{}) (QueueKey, error) { } return key, nil } + +// ItemPriorityIfInInitialList returns the item's priority if it belongs to the initial list; otherwise, it returns nil. +func ItemPriorityIfInInitialList(isInInitialList bool) *int { + if !isInInitialList { + return nil + } + return ptr.To(LowPriority) +}