diff --git a/pkg/builder/forked_controller.go b/pkg/builder/forked_controller.go index b3ff227..53cd5a3 100644 --- a/pkg/builder/forked_controller.go +++ b/pkg/builder/forked_controller.go @@ -149,6 +149,7 @@ type untypedWatchesInput interface { setObjectProjection(objectProjection) setEngageWithLocalCluster(engage bool) setEngageWithProviderClusters(engage bool) + setClusterFilter(ClusterFilterFunc) } // WatchesInput represents the information set by Watches method. @@ -353,9 +354,10 @@ func (blder *TypedBuilder[request]) doWatch() error { allPredicates = append(allPredicates, blder.forInput.predicates...) src := mcsource.TypedKind[client.Object, request](blder.forInput.object, hdler, allPredicates...). - WithProjection(blder.project(blder.forInput.objectProjection)) + WithProjection(blder.project(blder.forInput.objectProjection)). + WithClusterFilter(blder.forInput.getClusterFilter()) if ptr.Deref(blder.forInput.engageWithLocalCluster, blder.mgr.GetProvider() == nil) { - src, err := src.ForCluster("", blder.mgr.GetLocalManager()) + src, _, err := src.ForCluster("", blder.mgr.GetLocalManager()) if err != nil { return err } @@ -391,10 +393,12 @@ func (blder *TypedBuilder[request]) doWatch() error { } allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...) allPredicates = append(allPredicates, own.predicates...) + src := mcsource.TypedKind[client.Object, request](own.object, hdler, allPredicates...). - WithProjection(blder.project(own.objectProjection)) + WithProjection(blder.project(own.objectProjection)). + WithClusterFilter(own.getClusterFilter()) if ptr.Deref(own.engageWithLocalCluster, blder.mgr.GetProvider() == nil) { - src, err := src.ForCluster("", blder.mgr.GetLocalManager()) + src, _, err := src.ForCluster("", blder.mgr.GetLocalManager()) if err != nil { return err } @@ -416,9 +420,12 @@ func (blder *TypedBuilder[request]) doWatch() error { for _, w := range blder.watchesInput { allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...) allPredicates = append(allPredicates, w.predicates...) - src := mcsource.TypedKind[client.Object, request](w.obj, w.handler, allPredicates...).WithProjection(blder.project(w.objectProjection)) + + src := mcsource.TypedKind[client.Object, request](w.obj, w.handler, allPredicates...). + WithProjection(blder.project(w.objectProjection)). + WithClusterFilter(w.getClusterFilter()) if ptr.Deref(w.engageWithLocalCluster, blder.mgr.GetProvider() == nil) { - src, err := src.ForCluster("", blder.mgr.GetLocalManager()) + src, _, err := src.ForCluster("", blder.mgr.GetLocalManager()) if err != nil { return err } diff --git a/pkg/builder/forked_controller_test.go b/pkg/builder/forked_controller_test.go index 823e140..7ffce9e 100644 --- a/pkg/builder/forked_controller_test.go +++ b/pkg/builder/forked_controller_test.go @@ -36,6 +36,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/cluster" "sigs.k8s.io/controller-runtime/pkg/config" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" @@ -49,6 +50,7 @@ import ( mcmanager "sigs.k8s.io/multicluster-runtime/pkg/manager" mcreconcile "sigs.k8s.io/multicluster-runtime/pkg/reconcile" mcsource "sigs.k8s.io/multicluster-runtime/pkg/source" + "sigs.k8s.io/multicluster-runtime/providers/clusters" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" @@ -604,8 +606,157 @@ var _ = Describe("application", func() { }).Should(BeTrue()) }) }) + + Describe("filter engaged clusters when a filter is set", func() { + var mgr mcmanager.Manager + var provider *clusters.Provider + filterFunctionCalled := &atomic.Bool{} + reconciled := &atomic.Bool{} + BeforeEach(func() { + filterFunctionCalled.Store(false) + reconciled.Store(false) + provider = clusters.New() + var err error + mgr, err = mcmanager.New(cfg, provider, mcmanager.Options{}) + Expect(err).NotTo(HaveOccurred()) + }) + + It("should call the filter when local cluster is engaged but not prevent reconciling", func(ctx SpecContext) { + By("adding a controller to the manager", func() { + err := ControllerManagedBy(mgr). + For(&appsv1.Deployment{}, + WithClusterFilter(func(clusterName string, cluster cluster.Cluster) bool { + // this filter should be applied + filterFunctionCalled.Store(true) + return false + }), + WithEngageWithLocalCluster(true), + ). + Named("cluster-filter-local"). + Complete(mcreconcile.Func(func(ctx context.Context, req mcreconcile.Request) (reconcile.Result, error) { + reconciled.Store(true) + return reconcile.Result{}, nil + })) + Expect(err).NotTo(HaveOccurred()) + }) + By("starting the manager", func() { + go func() { + defer GinkgoRecover() + Expect(mgr.Start(ctx)).NotTo(HaveOccurred()) + }() + }) + By("Creating a Deployment", func() { + dep := &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "deploy-name-local", + }, + Spec: appsv1.DeploymentSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{"foo": "bar"}, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"foo": "bar"}}, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "nginx", + Image: "nginx", + }, + }, + }, + }, + }, + } + err := mgr.GetLocalManager().GetClient().Create(ctx, dep) + Expect(err).NotTo(HaveOccurred()) + }) + + Eventually(func() bool { + return reconciled.Load() + }, "10s").Should(BeTrue()) + + Expect(filterFunctionCalled.Load()).To(BeTrue()) + }) + + It("should respect the filter when provider clusters are engaged", func(ctx SpecContext) { + By("adding the local manager as cluster to the provider", func() { + noopCluster := &noopStartCluster{ + Cluster: mgr.GetLocalManager(), + } + + Expect(provider.Add(ctx, "local", noopCluster)).NotTo(HaveOccurred()) + }) + By("adding a controller to the manager", func() { + err := ControllerManagedBy(mgr). + For(&appsv1.Deployment{}, + WithClusterFilter(func(clusterName string, cluster cluster.Cluster) bool { + // this filter should be applied + filterFunctionCalled.Store(true) + return true + }), + ). + Named("cluster-filter-non-local"). + Complete(mcreconcile.Func(func(ctx context.Context, req mcreconcile.Request) (reconcile.Result, error) { + reconciled.Store(true) + return reconcile.Result{}, nil + })) + Expect(err).NotTo(HaveOccurred()) + }) + By("starting the manager", func() { + go func() { + defer GinkgoRecover() + Expect(mgr.Start(ctx)).NotTo(HaveOccurred()) + }() + }) + By("Creating a Deployment", func() { + dep := &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "deploy-name-non-local", + }, + Spec: appsv1.DeploymentSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{"foo": "bar"}, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"foo": "bar"}}, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "nginx", + Image: "nginx", + }, + }, + }, + }, + }, + } + err := mgr.GetLocalManager().GetClient().Create(ctx, dep) + Expect(err).NotTo(HaveOccurred()) + }) + + Eventually(func() bool { + return reconciled.Load() + }, "10s").Should(BeTrue()) + + Expect(filterFunctionCalled.Load()).To(BeTrue()) + }) + }) }) +// noopStartCluster is a cluster.Cluster that does nothing on Start. +// Used to wrap the local manager when adding to a provider to avoid +// starting it twice. +type noopStartCluster struct { + cluster.Cluster +} + +func (n *noopStartCluster) Start(ctx context.Context) error { + <-ctx.Done() + return nil +} + // newNonTypedOnlyCache returns a new cache that wraps the normal cache, // returning an error if normal, typed objects have informers requested. func newNonTypedOnlyCache(config *rest.Config, opts cache.Options) (cache.Cache, error) { @@ -744,9 +895,8 @@ type fakeType struct { func (*fakeType) GetObjectKind() schema.ObjectKind { return nil } func (*fakeType) DeepCopyObject() runtime.Object { return nil } -func must[T any](x T, err error) T { - if err != nil { - Expect(err).NotTo(HaveOccurred()) - } +func must[T any](x T, b bool, err error) T { + Expect(err).NotTo(HaveOccurred()) + Expect(b).To(BeTrue()) return x } diff --git a/pkg/builder/multicluster_options.go b/pkg/builder/multicluster_options.go index cc30083..1de2c80 100644 --- a/pkg/builder/multicluster_options.go +++ b/pkg/builder/multicluster_options.go @@ -16,11 +16,31 @@ limitations under the License. package builder +import ( + "context" + + "sigs.k8s.io/controller-runtime/pkg/cluster" + + "sigs.k8s.io/multicluster-runtime/pkg/multicluster" + mcsource "sigs.k8s.io/multicluster-runtime/pkg/source" +) + +// ClusterFilterFunc is a function that filters clusters. +type ClusterFilterFunc = mcsource.ClusterFilterFunc + // EngageOptions configures how the controller should engage with clusters // when a provider is configured. type EngageOptions struct { engageWithLocalCluster *bool engageWithProviderClusters *bool + clusterFilter *ClusterFilterFunc +} + +func (w EngageOptions) getClusterFilter() ClusterFilterFunc { + if w.clusterFilter != nil { + return *w.clusterFilter + } + return nil } // WithEngageWithLocalCluster configures whether the controller should engage @@ -41,6 +61,32 @@ func WithEngageWithProviderClusters(engage bool) EngageOptions { } } +// WithClusterFilter configures a filter function that determines +// which clusters the controller should engage with. +// The option applies only if WithEngageWithProviderClusters is true. +func WithClusterFilter(filter ClusterFilterFunc) EngageOptions { + return EngageOptions{ + clusterFilter: &filter, + } +} + +// WithClustersFromProvider configures the controller to only engage with +// the clusters provided by the given provider. +// If is a helper function that wraps WithClusterFilter and has the +// same constraints and mutually exclusive. +func WithClustersFromProvider(ctx context.Context, provider multicluster.Provider) EngageOptions { + var fn ClusterFilterFunc = func(clusterName string, cluster cluster.Cluster) bool { + cl, err := provider.Get(ctx, clusterName) + if err != nil { + return false + } + return cl == cluster + } + return EngageOptions{ + clusterFilter: &fn, + } +} + // ApplyToFor applies this configuration to the given ForInput options. func (w EngageOptions) ApplyToFor(opts *ForInput) { if w.engageWithLocalCluster != nil { @@ -51,6 +97,10 @@ func (w EngageOptions) ApplyToFor(opts *ForInput) { val := *w.engageWithProviderClusters opts.engageWithProviderClusters = &val } + if w.clusterFilter != nil { + val := *w.clusterFilter + opts.clusterFilter = &val + } } // ApplyToOwns applies this configuration to the given OwnsInput options. @@ -63,6 +113,10 @@ func (w EngageOptions) ApplyToOwns(opts *OwnsInput) { val := *w.engageWithProviderClusters opts.engageWithProviderClusters = &val } + if w.clusterFilter != nil { + val := *w.clusterFilter + opts.clusterFilter = &val + } } // ApplyToWatches applies this configuration to the given WatchesInput options. @@ -73,6 +127,9 @@ func (w EngageOptions) ApplyToWatches(opts untypedWatchesInput) { if w.engageWithProviderClusters != nil { opts.setEngageWithProviderClusters(*w.engageWithProviderClusters) } + if w.clusterFilter != nil { + opts.setClusterFilter(*w.clusterFilter) + } } func (w *WatchesInput[request]) setEngageWithLocalCluster(engage bool) { @@ -82,3 +139,7 @@ func (w *WatchesInput[request]) setEngageWithLocalCluster(engage bool) { func (w *WatchesInput[request]) setEngageWithProviderClusters(engage bool) { w.engageWithProviderClusters = &engage } + +func (w *WatchesInput[request]) setClusterFilter(clusterFilter ClusterFilterFunc) { + w.clusterFilter = &clusterFilter +} diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index b24f768..0ed535c 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -141,11 +141,14 @@ func (c *mcController[request]) Engage(ctx context.Context, name string, cl clus // engage cluster aware instances for _, aware := range c.sources { - src, err := aware.ForCluster(name, cl) + src, shouldEngage, err := aware.ForCluster(name, cl) if err != nil { cancel() return fmt.Errorf("failed to engage for cluster %q: %w", name, err) } + if !shouldEngage { + continue + } if err := c.TypedController.Watch(startWithinContext[request](engCtx, src)); err != nil { cancel() return fmt.Errorf("failed to watch for cluster %q: %w", name, err) @@ -177,10 +180,13 @@ func (c *mcController[request]) MultiClusterWatch(src mcsource.TypedSource[clien defer c.lock.Unlock() for name, eng := range c.clusters { - src, err := src.ForCluster(name, eng.cluster) + src, shouldEngage, err := src.ForCluster(name, eng.cluster) if err != nil { return fmt.Errorf("failed to engage for cluster %q: %w", name, err) } + if !shouldEngage { + continue + } if err := c.TypedController.Watch(startWithinContext[request](eng.ctx, src)); err != nil { return fmt.Errorf("failed to watch for cluster %q: %w", name, err) } diff --git a/pkg/source/kind.go b/pkg/source/kind.go index 6a5d55e..5bf8d9e 100644 --- a/pkg/source/kind.go +++ b/pkg/source/kind.go @@ -38,6 +38,9 @@ import ( mcreconcile "sigs.k8s.io/multicluster-runtime/pkg/reconcile" ) +// ClusterFilterFunc is a function that filters clusters. +type ClusterFilterFunc func(clusterName string, cluster cluster.Cluster) bool + // Kind creates a KindSource with the given cache provider. func Kind[object client.Object]( obj object, @@ -63,11 +66,12 @@ func TypedKind[object client.Object, request mcreconcile.ClusterAware[request]]( } type kind[object client.Object, request mcreconcile.ClusterAware[request]] struct { - obj object - handler mchandler.TypedEventHandlerFunc[object, request] - predicates []predicate.TypedPredicate[object] - project func(cluster.Cluster, object) (object, error) - resync time.Duration + obj object + handler mchandler.TypedEventHandlerFunc[object, request] + predicates []predicate.TypedPredicate[object] + project func(cluster.Cluster, object) (object, error) + resync time.Duration + clusterFilter ClusterFilterFunc } type clusterKind[object client.Object, request mcreconcile.ClusterAware[request]] struct { @@ -89,10 +93,22 @@ func (k *kind[object, request]) WithProjection(project func(cluster.Cluster, obj return k } -func (k *kind[object, request]) ForCluster(name string, cl cluster.Cluster) (crsource.TypedSource[request], error) { +func (k *kind[object, request]) WithClusterFilter(filter ClusterFilterFunc) TypedSyncingSource[object, request] { + k.clusterFilter = filter + return k +} + +func (k *kind[object, request]) ForCluster(name string, cl cluster.Cluster) (crsource.TypedSource[request], bool, error) { obj, err := k.project(cl, k.obj) if err != nil { - return nil, err + return nil, false, err + } + // A valid TypedSource must always be returned, even if it shouldn't + // engage based on the filter to allow engaging with the local + // cluster. + shouldEngage := true + if k.clusterFilter != nil { + shouldEngage = k.clusterFilter(name, cl) } return &clusterKind[object, request]{ clusterName: name, @@ -101,15 +117,15 @@ func (k *kind[object, request]) ForCluster(name string, cl cluster.Cluster) (crs h: k.handler(name, cl), preds: k.predicates, resync: k.resync, - }, nil + }, shouldEngage, nil } -func (k *kind[object, request]) SyncingForCluster(name string, cl cluster.Cluster) (crsource.TypedSyncingSource[request], error) { - src, err := k.ForCluster(name, cl) +func (k *kind[object, request]) SyncingForCluster(name string, cl cluster.Cluster) (crsource.TypedSyncingSource[request], bool, error) { + src, shouldEngage, err := k.ForCluster(name, cl) if err != nil { - return nil, err + return nil, shouldEngage, err } - return src.(crsource.TypedSyncingSource[request]), nil + return src.(crsource.TypedSyncingSource[request]), shouldEngage, nil } // WaitForSync satisfies TypedSyncingSource. diff --git a/pkg/source/source.go b/pkg/source/source.go index 0a801e2..83ae8c7 100644 --- a/pkg/source/source.go +++ b/pkg/source/source.go @@ -43,7 +43,7 @@ type Source = TypedSource[client.Object, mcreconcile.Request] // // Users may build their own Source implementations. type TypedSource[object client.Object, request mcreconcile.ClusterAware[request]] interface { - ForCluster(string, cluster.Cluster) (source.TypedSource[request], error) + ForCluster(string, cluster.Cluster) (source.TypedSource[request], bool, error) } // SyncingSource is a source that needs syncing prior to being usable. The controller @@ -54,6 +54,7 @@ type SyncingSource[object client.Object] TypedSyncingSource[object, mcreconcile. // will call its WaitForSync prior to starting workers. type TypedSyncingSource[object client.Object, request mcreconcile.ClusterAware[request]] interface { TypedSource[object, request] - SyncingForCluster(string, cluster.Cluster) (source.TypedSyncingSource[request], error) + SyncingForCluster(string, cluster.Cluster) (source.TypedSyncingSource[request], bool, error) WithProjection(func(cluster.Cluster, object) (object, error)) TypedSyncingSource[object, request] + WithClusterFilter(ClusterFilterFunc) TypedSyncingSource[object, request] }