Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions pkg/builder/forked_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ type untypedWatchesInput interface {
setObjectProjection(objectProjection)
setEngageWithLocalCluster(engage bool)
setEngageWithProviderClusters(engage bool)
setClusterFilter(ClusterFilterFunc)
}

// WatchesInput represents the information set by Watches method.
Expand Down Expand Up @@ -353,9 +354,10 @@ func (blder *TypedBuilder[request]) doWatch() error {
allPredicates = append(allPredicates, blder.forInput.predicates...)

src := mcsource.TypedKind[client.Object, request](blder.forInput.object, hdler, allPredicates...).
WithProjection(blder.project(blder.forInput.objectProjection))
WithProjection(blder.project(blder.forInput.objectProjection)).
WithClusterFilter(blder.forInput.getClusterFilter())
if ptr.Deref(blder.forInput.engageWithLocalCluster, blder.mgr.GetProvider() == nil) {
src, err := src.ForCluster("", blder.mgr.GetLocalManager())
src, _, err := src.ForCluster("", blder.mgr.GetLocalManager())
if err != nil {
return err
}
Expand Down Expand Up @@ -391,10 +393,12 @@ func (blder *TypedBuilder[request]) doWatch() error {
}
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
allPredicates = append(allPredicates, own.predicates...)

src := mcsource.TypedKind[client.Object, request](own.object, hdler, allPredicates...).
WithProjection(blder.project(own.objectProjection))
WithProjection(blder.project(own.objectProjection)).
WithClusterFilter(own.getClusterFilter())
if ptr.Deref(own.engageWithLocalCluster, blder.mgr.GetProvider() == nil) {
src, err := src.ForCluster("", blder.mgr.GetLocalManager())
src, _, err := src.ForCluster("", blder.mgr.GetLocalManager())
if err != nil {
return err
}
Expand All @@ -416,9 +420,12 @@ func (blder *TypedBuilder[request]) doWatch() error {
for _, w := range blder.watchesInput {
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
allPredicates = append(allPredicates, w.predicates...)
src := mcsource.TypedKind[client.Object, request](w.obj, w.handler, allPredicates...).WithProjection(blder.project(w.objectProjection))

src := mcsource.TypedKind[client.Object, request](w.obj, w.handler, allPredicates...).
WithProjection(blder.project(w.objectProjection)).
WithClusterFilter(w.getClusterFilter())
if ptr.Deref(w.engageWithLocalCluster, blder.mgr.GetProvider() == nil) {
src, err := src.ForCluster("", blder.mgr.GetLocalManager())
src, _, err := src.ForCluster("", blder.mgr.GetLocalManager())
if err != nil {
return err
}
Expand Down
158 changes: 154 additions & 4 deletions pkg/builder/forked_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (

"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/cluster"
"sigs.k8s.io/controller-runtime/pkg/config"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
Expand All @@ -49,6 +50,7 @@ import (
mcmanager "sigs.k8s.io/multicluster-runtime/pkg/manager"
mcreconcile "sigs.k8s.io/multicluster-runtime/pkg/reconcile"
mcsource "sigs.k8s.io/multicluster-runtime/pkg/source"
"sigs.k8s.io/multicluster-runtime/providers/clusters"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
Expand Down Expand Up @@ -604,8 +606,157 @@ var _ = Describe("application", func() {
}).Should(BeTrue())
})
})

Describe("filter engaged clusters when a filter is set", func() {
var mgr mcmanager.Manager
var provider *clusters.Provider
filterFunctionCalled := &atomic.Bool{}
reconciled := &atomic.Bool{}
BeforeEach(func() {
filterFunctionCalled.Store(false)
reconciled.Store(false)
provider = clusters.New()
var err error
mgr, err = mcmanager.New(cfg, provider, mcmanager.Options{})
Expect(err).NotTo(HaveOccurred())
})

It("should call the filter when local cluster is engaged but not prevent reconciling", func(ctx SpecContext) {
By("adding a controller to the manager", func() {
err := ControllerManagedBy(mgr).
For(&appsv1.Deployment{},
WithClusterFilter(func(clusterName string, cluster cluster.Cluster) bool {
// this filter should be applied
filterFunctionCalled.Store(true)
return false
}),
WithEngageWithLocalCluster(true),
).
Named("cluster-filter-local").
Complete(mcreconcile.Func(func(ctx context.Context, req mcreconcile.Request) (reconcile.Result, error) {
reconciled.Store(true)
return reconcile.Result{}, nil
}))
Expect(err).NotTo(HaveOccurred())
})
By("starting the manager", func() {
go func() {
defer GinkgoRecover()
Expect(mgr.Start(ctx)).NotTo(HaveOccurred())
}()
})
By("Creating a Deployment", func() {
dep := &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "deploy-name-local",
},
Spec: appsv1.DeploymentSpec{
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{"foo": "bar"},
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"foo": "bar"}},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "nginx",
Image: "nginx",
},
},
},
},
},
}
err := mgr.GetLocalManager().GetClient().Create(ctx, dep)
Expect(err).NotTo(HaveOccurred())
})

Eventually(func() bool {
return reconciled.Load()
}, "10s").Should(BeTrue())

Expect(filterFunctionCalled.Load()).To(BeTrue())
})

It("should respect the filter when provider clusters are engaged", func(ctx SpecContext) {
By("adding the local manager as cluster to the provider", func() {
noopCluster := &noopStartCluster{
Cluster: mgr.GetLocalManager(),
}

Expect(provider.Add(ctx, "local", noopCluster)).NotTo(HaveOccurred())
})
By("adding a controller to the manager", func() {
err := ControllerManagedBy(mgr).
For(&appsv1.Deployment{},
WithClusterFilter(func(clusterName string, cluster cluster.Cluster) bool {
// this filter should be applied
filterFunctionCalled.Store(true)
return true
}),
).
Named("cluster-filter-non-local").
Complete(mcreconcile.Func(func(ctx context.Context, req mcreconcile.Request) (reconcile.Result, error) {
reconciled.Store(true)
return reconcile.Result{}, nil
}))
Expect(err).NotTo(HaveOccurred())
})
By("starting the manager", func() {
go func() {
defer GinkgoRecover()
Expect(mgr.Start(ctx)).NotTo(HaveOccurred())
}()
})
By("Creating a Deployment", func() {
dep := &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "deploy-name-non-local",
},
Spec: appsv1.DeploymentSpec{
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{"foo": "bar"},
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"foo": "bar"}},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "nginx",
Image: "nginx",
},
},
},
},
},
}
err := mgr.GetLocalManager().GetClient().Create(ctx, dep)
Expect(err).NotTo(HaveOccurred())
})

Eventually(func() bool {
return reconciled.Load()
}, "10s").Should(BeTrue())

Expect(filterFunctionCalled.Load()).To(BeTrue())
})
})
})

// noopStartCluster is a cluster.Cluster that does nothing on Start.
// Used to wrap the local manager when adding to a provider to avoid
// starting it twice.
type noopStartCluster struct {
cluster.Cluster
}

func (n *noopStartCluster) Start(ctx context.Context) error {
<-ctx.Done()
return nil
}

// newNonTypedOnlyCache returns a new cache that wraps the normal cache,
// returning an error if normal, typed objects have informers requested.
func newNonTypedOnlyCache(config *rest.Config, opts cache.Options) (cache.Cache, error) {
Expand Down Expand Up @@ -744,9 +895,8 @@ type fakeType struct {
func (*fakeType) GetObjectKind() schema.ObjectKind { return nil }
func (*fakeType) DeepCopyObject() runtime.Object { return nil }

func must[T any](x T, err error) T {
if err != nil {
Expect(err).NotTo(HaveOccurred())
}
func must[T any](x T, b bool, err error) T {
Expect(err).NotTo(HaveOccurred())
Expect(b).To(BeTrue())
return x
}
61 changes: 61 additions & 0 deletions pkg/builder/multicluster_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,31 @@ limitations under the License.

package builder

import (
"context"

"sigs.k8s.io/controller-runtime/pkg/cluster"

"sigs.k8s.io/multicluster-runtime/pkg/multicluster"
mcsource "sigs.k8s.io/multicluster-runtime/pkg/source"
)

// ClusterFilterFunc is a function that filters clusters.
type ClusterFilterFunc = mcsource.ClusterFilterFunc

// EngageOptions configures how the controller should engage with clusters
// when a provider is configured.
type EngageOptions struct {
engageWithLocalCluster *bool
engageWithProviderClusters *bool
clusterFilter *ClusterFilterFunc
}

func (w EngageOptions) getClusterFilter() ClusterFilterFunc {
if w.clusterFilter != nil {
return *w.clusterFilter
}
return nil
}

// WithEngageWithLocalCluster configures whether the controller should engage
Expand All @@ -41,6 +61,32 @@ func WithEngageWithProviderClusters(engage bool) EngageOptions {
}
}

// WithClusterFilter configures a filter function that determines
// which clusters the controller should engage with.
// The option applies only if WithEngageWithProviderClusters is true.
func WithClusterFilter(filter ClusterFilterFunc) EngageOptions {
return EngageOptions{
clusterFilter: &filter,
}
}

// WithClustersFromProvider configures the controller to only engage with
// the clusters provided by the given provider.
// If is a helper function that wraps WithClusterFilter and has the
// same constraints and mutually exclusive.
func WithClustersFromProvider(ctx context.Context, provider multicluster.Provider) EngageOptions {
var fn ClusterFilterFunc = func(clusterName string, cluster cluster.Cluster) bool {
cl, err := provider.Get(ctx, clusterName)
if err != nil {
return false
}
return cl == cluster
}
return EngageOptions{
clusterFilter: &fn,
}
}

// ApplyToFor applies this configuration to the given ForInput options.
func (w EngageOptions) ApplyToFor(opts *ForInput) {
if w.engageWithLocalCluster != nil {
Expand All @@ -51,6 +97,10 @@ func (w EngageOptions) ApplyToFor(opts *ForInput) {
val := *w.engageWithProviderClusters
opts.engageWithProviderClusters = &val
}
if w.clusterFilter != nil {
val := *w.clusterFilter
opts.clusterFilter = &val
}
}

// ApplyToOwns applies this configuration to the given OwnsInput options.
Expand All @@ -63,6 +113,10 @@ func (w EngageOptions) ApplyToOwns(opts *OwnsInput) {
val := *w.engageWithProviderClusters
opts.engageWithProviderClusters = &val
}
if w.clusterFilter != nil {
val := *w.clusterFilter
opts.clusterFilter = &val
}
}

// ApplyToWatches applies this configuration to the given WatchesInput options.
Expand All @@ -73,6 +127,9 @@ func (w EngageOptions) ApplyToWatches(opts untypedWatchesInput) {
if w.engageWithProviderClusters != nil {
opts.setEngageWithProviderClusters(*w.engageWithProviderClusters)
}
if w.clusterFilter != nil {
opts.setClusterFilter(*w.clusterFilter)
}
}

func (w *WatchesInput[request]) setEngageWithLocalCluster(engage bool) {
Expand All @@ -82,3 +139,7 @@ func (w *WatchesInput[request]) setEngageWithLocalCluster(engage bool) {
func (w *WatchesInput[request]) setEngageWithProviderClusters(engage bool) {
w.engageWithProviderClusters = &engage
}

func (w *WatchesInput[request]) setClusterFilter(clusterFilter ClusterFilterFunc) {
w.clusterFilter = &clusterFilter
}
10 changes: 8 additions & 2 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,14 @@ func (c *mcController[request]) Engage(ctx context.Context, name string, cl clus

// engage cluster aware instances
for _, aware := range c.sources {
src, err := aware.ForCluster(name, cl)
src, shouldEngage, err := aware.ForCluster(name, cl)
if err != nil {
cancel()
return fmt.Errorf("failed to engage for cluster %q: %w", name, err)
}
if !shouldEngage {
continue
}
if err := c.TypedController.Watch(startWithinContext[request](engCtx, src)); err != nil {
cancel()
return fmt.Errorf("failed to watch for cluster %q: %w", name, err)
Expand Down Expand Up @@ -177,10 +180,13 @@ func (c *mcController[request]) MultiClusterWatch(src mcsource.TypedSource[clien
defer c.lock.Unlock()

for name, eng := range c.clusters {
src, err := src.ForCluster(name, eng.cluster)
src, shouldEngage, err := src.ForCluster(name, eng.cluster)
if err != nil {
return fmt.Errorf("failed to engage for cluster %q: %w", name, err)
}
if !shouldEngage {
continue
}
if err := c.TypedController.Watch(startWithinContext[request](eng.ctx, src)); err != nil {
return fmt.Errorf("failed to watch for cluster %q: %w", name, err)
}
Expand Down
Loading