From cc75a1b65d90cd5b749a61d98800e619ac1df7e4 Mon Sep 17 00:00:00 2001 From: changzhen Date: Thu, 9 Oct 2025 14:31:15 +0800 Subject: [PATCH] reduce unnecessary watch events in the work-status-controller Signed-off-by: changzhen --- cmd/agent/app/agent.go | 1 + .../app/controllermanager.go | 2 +- .../status/work_status_controller.go | 15 +- pkg/util/helper/predicate.go | 64 ++- pkg/util/helper/predicate_test.go | 427 ++++++++++++++++++ 5 files changed, 494 insertions(+), 15 deletions(-) diff --git a/cmd/agent/app/agent.go b/cmd/agent/app/agent.go index 57ebc349f1eb..7e5071e4e8b0 100644 --- a/cmd/agent/app/agent.go +++ b/cmd/agent/app/agent.go @@ -366,6 +366,7 @@ func startWorkStatusController(ctx controllerscontext.Context) (bool, error) { InformerManager: genericmanager.GetInstance(), Context: ctx.Context, ObjectWatcher: ctx.ObjectWatcher, + WorkPredicateFunc: helper.NewWorkStatusPredicateOnAgent(ctx.Opts.ClusterName), ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent, ClusterCacheSyncTimeout: ctx.Opts.ClusterCacheSyncTimeout, ConcurrentWorkStatusSyncs: ctx.Opts.ConcurrentWorkSyncs, diff --git a/cmd/controller-manager/app/controllermanager.go b/cmd/controller-manager/app/controllermanager.go index 223c8ef76c22..518e53c4fcfd 100644 --- a/cmd/controller-manager/app/controllermanager.go +++ b/cmd/controller-manager/app/controllermanager.go @@ -475,7 +475,7 @@ func startWorkStatusController(ctx controllerscontext.Context) (enabled bool, er InformerManager: genericmanager.GetInstance(), Context: ctx.Context, ObjectWatcher: ctx.ObjectWatcher, - WorkPredicateFunc: helper.WorkWithinPushClusterPredicate(ctx.Mgr), + WorkPredicateFunc: helper.NewWorkStatusPredicate(ctx.Mgr), ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSet, ClusterClientOption: ctx.ClusterClientOption, ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout, diff --git a/pkg/controllers/status/work_status_controller.go b/pkg/controllers/status/work_status_controller.go index f7174cbc5506..36e3b70d1460 100644 --- a/pkg/controllers/status/work_status_controller.go +++ b/pkg/controllers/status/work_status_controller.go @@ -543,18 +543,13 @@ func (c *WorkStatusController) getSingleClusterManager(cluster *clusterv1alpha1. // SetupWithManager creates a controller and register to controller manager. func (c *WorkStatusController) SetupWithManager(mgr controllerruntime.Manager) error { - ctrlBuilder := controllerruntime.NewControllerManagedBy(mgr).Named(WorkStatusControllerName). + return controllerruntime.NewControllerManagedBy(mgr). + Named(WorkStatusControllerName). + For(&workv1alpha1.Work{}, builder.WithPredicates(c.WorkPredicateFunc)). WithOptions(controller.Options{ RateLimiter: ratelimiterflag.DefaultControllerRateLimiter[controllerruntime.Request](c.RateLimiterOptions), - }) - - if c.WorkPredicateFunc != nil { - ctrlBuilder.For(&workv1alpha1.Work{}, builder.WithPredicates(c.WorkPredicateFunc)) - } else { - ctrlBuilder.For(&workv1alpha1.Work{}) - } - - return ctrlBuilder.Complete(c) + }). + Complete(c) } func (c *WorkStatusController) eventf(object *unstructured.Unstructured, eventType, reason, messageFmt string, args ...interface{}) { diff --git a/pkg/util/helper/predicate.go b/pkg/util/helper/predicate.go index 50c7f688caa7..e07cf1599311 100644 --- a/pkg/util/helper/predicate.go +++ b/pkg/util/helper/predicate.go @@ -29,10 +29,7 @@ import ( "github.com/karmada-io/karmada/pkg/util/names" ) -// WorkWithinPushClusterPredicate generates the event filter function to skip events that the controllers are uninterested. -// Used by controllers: -// - execution controller working in karmada-controller-manager -// - work status controller working in karmada-controller-manager +// WorkWithinPushClusterPredicate generates the event filter function for execution-controller in karmada-controller-manager func WorkWithinPushClusterPredicate(mgr controllerruntime.Manager) predicate.Funcs { predFunc := func(object client.Object) bool { obj := object.(*workv1alpha1.Work) @@ -68,6 +65,65 @@ func WorkWithinPushClusterPredicate(mgr controllerruntime.Manager) predicate.Fun } } +func newWorkStatusPredicate(predFunc func(object client.Object) bool) predicate.Funcs { + return predicate.Funcs{ + CreateFunc: func(createEvent event.CreateEvent) bool { + return predFunc(createEvent.Object) + }, + UpdateFunc: func(updateEvent event.UpdateEvent) bool { + workOld := updateEvent.ObjectOld.(*workv1alpha1.Work) + workNew := updateEvent.ObjectNew.(*workv1alpha1.Work) + workOldApplied := IsResourceApplied(&workOld.Status) + workNewApplied := IsResourceApplied(&workNew.Status) + if !workOldApplied && workNewApplied { + return predFunc(updateEvent.ObjectNew) + } + return false + }, + DeleteFunc: func(event.DeleteEvent) bool { + return false + }, + GenericFunc: func(event.GenericEvent) bool { + return false + }, + } +} + +// NewWorkStatusPredicate generates the event filter function for work-status-controller in karmada-controller-manager +func NewWorkStatusPredicate(mgr controllerruntime.Manager) predicate.Funcs { + predFunc := func(object client.Object) bool { + obj := object.(*workv1alpha1.Work) + + clusterName, err := names.GetClusterName(obj.Namespace) + if err != nil { + klog.Errorf("Failed to get member cluster name for work %s/%s", obj.Namespace, obj.Name) + return false + } + + clusterObj, err := util.GetCluster(mgr.GetClient(), clusterName) + if err != nil { + klog.Errorf("Failed to get the given member cluster %s", clusterName) + return false + } + + return clusterObj.Spec.SyncMode == clusterv1alpha1.Push + } + return newWorkStatusPredicate(predFunc) +} + +// NewWorkStatusPredicateOnAgent generates the event filter function for work-status-controller in karmada-agent +func NewWorkStatusPredicateOnAgent(curClusterName string) predicate.Funcs { + predFunc := func(object client.Object) bool { + clusterName, err := names.GetClusterName(object.GetNamespace()) + if err != nil { + klog.Errorf("Failed to get member cluster name for work %s/%s", object.GetNamespace(), object.GetName()) + return false + } + return clusterName == curClusterName + } + return newWorkStatusPredicate(predFunc) +} + // NewPredicateForServiceExportController generates an event filter function for ServiceExport controller running by karmada-controller-manager. func NewPredicateForServiceExportController(mgr controllerruntime.Manager) predicate.Funcs { predFunc := func(eventType string, object client.Object) bool { diff --git a/pkg/util/helper/predicate_test.go b/pkg/util/helper/predicate_test.go index a20b49f5dc80..f970991f9827 100644 --- a/pkg/util/helper/predicate_test.go +++ b/pkg/util/helper/predicate_test.go @@ -633,3 +633,430 @@ type fakeManager struct { func (f *fakeManager) GetClient() client.Client { return f.client } + +func TestNewWorkStatusPredicate(t *testing.T) { + type args struct { + mgr controllerruntime.Manager + obj client.Object + } + type want struct { + create, update, delete, generic bool + } + tests := []struct { + name string + args args + want want + }{ + { + name: "get cluster name error", + args: args{ + mgr: &fakeManager{client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects( + &clusterv1alpha1.Cluster{ + ObjectMeta: metav1.ObjectMeta{Name: "cluster"}, + Spec: clusterv1alpha1.ClusterSpec{SyncMode: clusterv1alpha1.Push}, + }, + ).Build()}, + obj: &workv1alpha1.Work{ + ObjectMeta: metav1.ObjectMeta{Name: "work", Namespace: "cluster"}, + }, + }, + want: want{ + create: false, + update: false, + delete: false, + generic: false, + }, + }, + { + name: "cluster not found", + args: args{ + mgr: &fakeManager{client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects().Build()}, + obj: &workv1alpha1.Work{ + ObjectMeta: metav1.ObjectMeta{Name: "work", Namespace: names.ExecutionSpacePrefix + "cluster"}, + }, + }, + want: want{ + create: false, + update: false, + delete: false, + generic: false, + }, + }, + { + name: "cluster is pull mode", + args: args{ + mgr: &fakeManager{client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects( + &clusterv1alpha1.Cluster{ + ObjectMeta: metav1.ObjectMeta{Name: "cluster"}, + Spec: clusterv1alpha1.ClusterSpec{SyncMode: clusterv1alpha1.Pull}, + }, + ).Build()}, + obj: &workv1alpha1.Work{ + ObjectMeta: metav1.ObjectMeta{Name: "work", Namespace: names.ExecutionSpacePrefix + "cluster"}, + }, + }, + want: want{ + create: false, + update: false, + delete: false, + generic: false, + }, + }, + { + name: "matched push cluster", + args: args{ + mgr: &fakeManager{client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects( + &clusterv1alpha1.Cluster{ + ObjectMeta: metav1.ObjectMeta{Name: "cluster"}, + Spec: clusterv1alpha1.ClusterSpec{SyncMode: clusterv1alpha1.Push}, + }, + ).Build()}, + obj: &workv1alpha1.Work{ + ObjectMeta: metav1.ObjectMeta{Name: "work", Namespace: names.ExecutionSpacePrefix + "cluster"}, + }, + }, + want: want{ + create: true, + update: false, // Update only returns true when resource becomes applied + delete: false, + generic: false, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + pred := NewWorkStatusPredicate(tt.args.mgr) + if got := pred.Create(event.CreateEvent{Object: tt.args.obj}); got != tt.want.create { + t.Errorf("Create() got = %v, want %v", got, tt.want.create) + return + } + if got := pred.Update(event.UpdateEvent{ObjectOld: tt.args.obj, ObjectNew: tt.args.obj}); got != tt.want.update { + t.Errorf("Update() got = %v, want %v", got, tt.want.update) + return + } + if got := pred.Delete(event.DeleteEvent{Object: tt.args.obj}); got != tt.want.delete { + t.Errorf("Delete() got = %v, want %v", got, tt.want.delete) + return + } + if got := pred.Generic(event.GenericEvent{Object: tt.args.obj}); got != tt.want.generic { + t.Errorf("Generic() got = %v, want %v", got, tt.want.generic) + return + } + }) + } +} + +func TestNewWorkStatusPredicate_Update(t *testing.T) { + mgr := &fakeManager{client: fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects( + &clusterv1alpha1.Cluster{ + ObjectMeta: metav1.ObjectMeta{Name: "cluster"}, + Spec: clusterv1alpha1.ClusterSpec{SyncMode: clusterv1alpha1.Push}, + }, + ).Build()} + + // Create work objects with different status conditions + workNotApplied := &workv1alpha1.Work{ + ObjectMeta: metav1.ObjectMeta{ + Name: "work", Namespace: names.ExecutionSpacePrefix + "cluster", + }, + Status: workv1alpha1.WorkStatus{ + Conditions: []metav1.Condition{ + { + Type: workv1alpha1.WorkProgressing, + Status: metav1.ConditionTrue, + }, + }, + }, + } + + workApplied := &workv1alpha1.Work{ + ObjectMeta: metav1.ObjectMeta{ + Name: "work", Namespace: names.ExecutionSpacePrefix + "cluster", + }, + Status: workv1alpha1.WorkStatus{ + Conditions: []metav1.Condition{ + { + Type: workv1alpha1.WorkApplied, + Status: metav1.ConditionTrue, + }, + }, + }, + } + + // Work in pull cluster (should not match) + workInPullCluster := &workv1alpha1.Work{ + ObjectMeta: metav1.ObjectMeta{ + Name: "work", Namespace: names.ExecutionSpacePrefix + "pull-cluster", + }, + Status: workv1alpha1.WorkStatus{ + Conditions: []metav1.Condition{ + { + Type: workv1alpha1.WorkApplied, + Status: metav1.ConditionTrue, + }, + }, + }, + } + + type args struct { + event event.UpdateEvent + } + + tests := []struct { + name string + args args + want bool + }{ + { + name: "both old and new are not applied", + args: args{ + event: event.UpdateEvent{ObjectOld: workNotApplied, ObjectNew: workNotApplied}, + }, + want: false, + }, + { + name: "old is not applied, new is applied - should trigger", + args: args{ + event: event.UpdateEvent{ObjectOld: workNotApplied, ObjectNew: workApplied}, + }, + want: true, + }, + { + name: "old is applied, new is not applied - should not trigger", + args: args{ + event: event.UpdateEvent{ObjectOld: workApplied, ObjectNew: workNotApplied}, + }, + want: false, + }, + { + name: "both old and new are applied - should not trigger", + args: args{ + event: event.UpdateEvent{ObjectOld: workApplied, ObjectNew: workApplied}, + }, + want: false, + }, + { + name: "old is not applied, new is applied but in pull cluster - should not trigger", + args: args{ + event: event.UpdateEvent{ObjectOld: workNotApplied, ObjectNew: workInPullCluster}, + }, + want: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + pred := NewWorkStatusPredicate(mgr) + if got := pred.Update(tt.args.event); got != tt.want { + t.Errorf("Update() got = %v, want %v", got, tt.want) + return + } + }) + } +} + +func TestNewWorkStatusPredicateOnAgent(t *testing.T) { + type args struct { + curClusterName string + obj client.Object + } + type want struct { + create, update, delete, generic bool + } + tests := []struct { + name string + args args + want want + }{ + { + name: "get cluster name error", + args: args{ + curClusterName: "cluster", + obj: &workv1alpha1.Work{ + ObjectMeta: metav1.ObjectMeta{Name: "work", Namespace: "cluster"}, + }, + }, + want: want{ + create: false, + update: false, + delete: false, + generic: false, + }, + }, + { + name: "cluster name unmatched", + args: args{ + curClusterName: "cluster", + obj: &workv1alpha1.Work{ + ObjectMeta: metav1.ObjectMeta{Name: "work", Namespace: names.ExecutionSpacePrefix + "unmatched"}, + }, + }, + want: want{ + create: false, + update: false, + delete: false, + generic: false, + }, + }, + { + name: "matched cluster", + args: args{ + curClusterName: "cluster", + obj: &workv1alpha1.Work{ + ObjectMeta: metav1.ObjectMeta{Name: "work", Namespace: names.ExecutionSpacePrefix + "cluster"}, + }, + }, + want: want{ + create: true, + update: false, // Update only returns true when resource becomes applied + delete: false, + generic: false, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + pred := NewWorkStatusPredicateOnAgent(tt.args.curClusterName) + if got := pred.Create(event.CreateEvent{Object: tt.args.obj}); got != tt.want.create { + t.Errorf("Create() got = %v, want %v", got, tt.want.create) + return + } + if got := pred.Update(event.UpdateEvent{ObjectOld: tt.args.obj, ObjectNew: tt.args.obj}); got != tt.want.update { + t.Errorf("Update() got = %v, want %v", got, tt.want.update) + return + } + if got := pred.Delete(event.DeleteEvent{Object: tt.args.obj}); got != tt.want.delete { + t.Errorf("Delete() got = %v, want %v", got, tt.want.delete) + return + } + if got := pred.Generic(event.GenericEvent{Object: tt.args.obj}); got != tt.want.generic { + t.Errorf("Generic() got = %v, want %v", got, tt.want.generic) + return + } + }) + } +} + +func TestNewWorkStatusPredicateOnAgent_Update(t *testing.T) { + curClusterName := "cluster" + + // Create work objects with different status conditions + workNotApplied := &workv1alpha1.Work{ + ObjectMeta: metav1.ObjectMeta{ + Name: "work", Namespace: names.ExecutionSpacePrefix + curClusterName, + }, + Status: workv1alpha1.WorkStatus{ + Conditions: []metav1.Condition{ + { + Type: workv1alpha1.WorkProgressing, + Status: metav1.ConditionTrue, + }, + }, + }, + } + + workApplied := &workv1alpha1.Work{ + ObjectMeta: metav1.ObjectMeta{ + Name: "work", Namespace: names.ExecutionSpacePrefix + curClusterName, + }, + Status: workv1alpha1.WorkStatus{ + Conditions: []metav1.Condition{ + { + Type: workv1alpha1.WorkApplied, + Status: metav1.ConditionTrue, + }, + }, + }, + } + + // Work in different cluster (should not match) + workInDifferentCluster := &workv1alpha1.Work{ + ObjectMeta: metav1.ObjectMeta{ + Name: "work", Namespace: names.ExecutionSpacePrefix + "different-cluster", + }, + Status: workv1alpha1.WorkStatus{ + Conditions: []metav1.Condition{ + { + Type: workv1alpha1.WorkApplied, + Status: metav1.ConditionTrue, + }, + }, + }, + } + + type args struct { + event event.UpdateEvent + } + + tests := []struct { + name string + args args + want bool + }{ + { + name: "both old and new are not applied", + args: args{ + event: event.UpdateEvent{ObjectOld: workNotApplied, ObjectNew: workNotApplied}, + }, + want: false, + }, + { + name: "old is not applied, new is applied - should trigger", + args: args{ + event: event.UpdateEvent{ObjectOld: workNotApplied, ObjectNew: workApplied}, + }, + want: true, + }, + { + name: "old is applied, new is not applied - should not trigger", + args: args{ + event: event.UpdateEvent{ObjectOld: workApplied, ObjectNew: workNotApplied}, + }, + want: false, + }, + { + name: "both old and new are applied - should not trigger", + args: args{ + event: event.UpdateEvent{ObjectOld: workApplied, ObjectNew: workApplied}, + }, + want: false, + }, + { + name: "old is not applied, new is applied but in different cluster - should not trigger", + args: args{ + event: event.UpdateEvent{ObjectOld: workNotApplied, ObjectNew: workInDifferentCluster}, + }, + want: false, + }, + { + name: "old is not applied, new is applied but cluster name parse error - should not trigger", + args: args{ + event: event.UpdateEvent{ + ObjectOld: workNotApplied, + ObjectNew: &workv1alpha1.Work{ + ObjectMeta: metav1.ObjectMeta{ + Name: "work", Namespace: "invalid-namespace", + }, + Status: workv1alpha1.WorkStatus{ + Conditions: []metav1.Condition{ + { + Type: workv1alpha1.WorkApplied, + Status: metav1.ConditionTrue, + }, + }, + }, + }, + }, + }, + want: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + pred := NewWorkStatusPredicateOnAgent(curClusterName) + if got := pred.Update(tt.args.event); got != tt.want { + t.Errorf("Update() got = %v, want %v", got, tt.want) + return + } + }) + } +}