diff --git a/providers/multi/doc.go b/providers/multi/doc.go new file mode 100644 index 0000000..2b9eabe --- /dev/null +++ b/providers/multi/doc.go @@ -0,0 +1,23 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package multi provides a multicluster.Provider that allows to utilize +// multiple providers in a single multicluster.Manager without +// conflicting cluster names. +// +// Each provider must be added with a unique prefix, which is used to +// identify clusters generated by that provider. +package multi diff --git a/providers/multi/multi_suite_test.go b/providers/multi/multi_suite_test.go new file mode 100644 index 0000000..b77c723 --- /dev/null +++ b/providers/multi/multi_suite_test.go @@ -0,0 +1,87 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package multi + +import ( + "testing" + + "k8s.io/client-go/rest" + + "sigs.k8s.io/controller-runtime/pkg/envtest" + logf "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/log/zap" + metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestBuilder(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Namespace Provider Suite") +} + +// The operator runs in a local cluster and embeds two other providers +// for cloud providers. The cloud providers are simulated by using the +// namespace provider with two other clusters. + +var localEnv *envtest.Environment +var localCfg *rest.Config + +var cloud1 *envtest.Environment +var cloud1cfg *rest.Config + +var cloud2 *envtest.Environment +var cloud2cfg *rest.Config + +var _ = BeforeSuite(func() { + logf.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true))) + + var err error + + localEnv = &envtest.Environment{} + localCfg, err = localEnv.Start() + Expect(err).NotTo(HaveOccurred()) + + cloud1 = &envtest.Environment{} + cloud1cfg, err = cloud1.Start() + Expect(err).NotTo(HaveOccurred()) + + cloud2 = &envtest.Environment{} + cloud2cfg, err = cloud2.Start() + Expect(err).NotTo(HaveOccurred()) + + // Prevent the metrics listener being created + metricsserver.DefaultBindAddress = "0" +}) + +var _ = AfterSuite(func() { + if localEnv != nil { + Expect(localEnv.Stop()).To(Succeed()) + } + + if cloud1 != nil { + Expect(cloud1.Stop()).To(Succeed()) + } + + if cloud2 != nil { + Expect(cloud2.Stop()).To(Succeed()) + } + + // Put the DefaultBindAddress back + metricsserver.DefaultBindAddress = ":8080" +}) diff --git a/providers/multi/provider.go b/providers/multi/provider.go new file mode 100644 index 0000000..9d9c155 --- /dev/null +++ b/providers/multi/provider.go @@ -0,0 +1,180 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package multi + +import ( + "context" + "errors" + "fmt" + "strings" + "sync" + + "github.com/go-logr/logr" + + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/cluster" + "sigs.k8s.io/controller-runtime/pkg/log" + + mctrl "sigs.k8s.io/multicluster-runtime" + "sigs.k8s.io/multicluster-runtime/pkg/multicluster" +) + +var _ multicluster.Provider = &Provider{} + +// Options defines the options for the provider. +type Options struct { + Separator string +} + +// Provider is a multicluster.Provider that manages multiple providers. +type Provider struct { + opts Options + + log logr.Logger + mgr mctrl.Manager + + providerLock sync.RWMutex + providers map[string]multicluster.Provider + providerCancel map[string]context.CancelFunc +} + +// New returns a new instance of the provider with the given options. +func New(opts Options) *Provider { + p := new(Provider) + + p.opts = opts + if p.opts.Separator == "" { + p.opts.Separator = "#" + } + + p.log = log.Log.WithName("multi-provider") + + p.providers = make(map[string]multicluster.Provider) + p.providerCancel = make(map[string]context.CancelFunc) + + return p +} + +// SetManager sets the manager for the provider. +func (p *Provider) SetManager(mgr mctrl.Manager) { + if p.mgr != nil { + p.log.Error(nil, "manager already set, overwriting") + } + p.mgr = mgr +} + +func (p *Provider) splitClusterName(clusterName string) (string, string) { + parts := strings.SplitN(clusterName, p.opts.Separator, 2) + if len(parts) < 2 { + return "", clusterName + } + return parts[0], parts[1] +} + +// AddProvider adds a new provider with the given prefix. +// +// The startFunc is called to start the provider - starting the provider +// outside of startFunc is an error and will result in undefined +// behaviour. +// startFunc should block for as long as the provider is running, +// If startFunc returns an error the provider is removed and the error +// is returned. +func (p *Provider) AddProvider(ctx context.Context, prefix string, provider multicluster.Provider, startFunc func(context.Context, mctrl.Manager) error) error { + ctx, cancel := context.WithCancel(ctx) + + p.providerLock.Lock() + _, ok := p.providers[prefix] + p.providerLock.Unlock() + if ok { + cancel() + return fmt.Errorf("provider already exists for prefix %q", prefix) + } + + var wrappedMgr mctrl.Manager + if p.mgr == nil { + p.log.Info("manager is nil, wrapped manager passed to start will be nil as well", "prefix", prefix) + } else { + wrappedMgr = &wrappedManager{ + Manager: p.mgr, + prefix: prefix, + sep: p.opts.Separator, + } + } + + p.providerLock.Lock() + p.providers[prefix] = provider + p.providerCancel[prefix] = cancel + p.providerLock.Unlock() + + go func() { + defer p.RemoveProvider(prefix) + if err := startFunc(ctx, wrappedMgr); err != nil { + cancel() + p.log.Error(err, "error in provider", "prefix", prefix) + } + }() + + return nil +} + +// RemoveProvider removes a provider from the manager and cancels its +// context. +// +// Warning: This can lead to dangling clusters if the provider is not +// using the context it is started with to engage the clusters it +// manages. +func (p *Provider) RemoveProvider(prefix string) { + p.providerLock.Lock() + defer p.providerLock.Unlock() + if cancel, ok := p.providerCancel[prefix]; ok { + cancel() + delete(p.providers, prefix) + delete(p.providerCancel, prefix) + } +} + +// Get returns a cluster by name. +func (p *Provider) Get(ctx context.Context, clusterName string) (cluster.Cluster, error) { + prefix, clusterName := p.splitClusterName(clusterName) + + p.providerLock.RLock() + provider, ok := p.providers[prefix] + p.providerLock.RUnlock() + + if !ok { + return nil, fmt.Errorf("provider not found %q: %w", prefix, multicluster.ErrClusterNotFound) + } + + return provider.Get(ctx, clusterName) +} + +// IndexField indexes a field on all providers and clusters and returns +// the aggregated errors. +func (p *Provider) IndexField(ctx context.Context, obj client.Object, field string, extractValue client.IndexerFunc) error { + p.providerLock.RLock() + defer p.providerLock.RUnlock() + var errs error + for prefix, provider := range p.providers { + if err := provider.IndexField(ctx, obj, field, extractValue); err != nil { + errs = errors.Join( + errs, + fmt.Errorf("failed to index field %q on cluster %q: %w", field, prefix, err), + ) + } + } + return errs +} diff --git a/providers/multi/provider_test.go b/providers/multi/provider_test.go new file mode 100644 index 0000000..c72fb39 --- /dev/null +++ b/providers/multi/provider_test.go @@ -0,0 +1,286 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package multi + +import ( + "context" + "errors" + "fmt" + "strconv" + + "golang.org/x/sync/errgroup" + + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/client-go/util/retry" + + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/cluster" + "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + mcbuilder "sigs.k8s.io/multicluster-runtime/pkg/builder" + mcmanager "sigs.k8s.io/multicluster-runtime/pkg/manager" + mcreconcile "sigs.k8s.io/multicluster-runtime/pkg/reconcile" + nsprovider "sigs.k8s.io/multicluster-runtime/providers/namespace" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Describe("Provider Multi", Ordered, func() { + ctx, cancel := context.WithCancel(context.Background()) + g, ctx := errgroup.WithContext(ctx) + + var provider *Provider + var mgr mcmanager.Manager + var cloud1client, cloud2client client.Client + var cloud1cluster, cloud2cluster cluster.Cluster + var cloud1provider, cloud2provider *nsprovider.Provider + + BeforeAll(func() { + By("Setting up the first namespace provider", func() { + var err error + cloud1client, err = client.New(cloud1cfg, client.Options{}) + Expect(err).NotTo(HaveOccurred()) + + cloud1cluster, err = cluster.New(cloud1cfg) + Expect(err).NotTo(HaveOccurred()) + g.Go(func() error { + return ignoreCanceled(cloud1cluster.Start(ctx)) + }) + + cloud1provider = nsprovider.New(cloud1cluster) + }) + + By("Setting up the second namespace provider", func() { + var err error + cloud2client, err = client.New(cloud2cfg, client.Options{}) + Expect(err).NotTo(HaveOccurred()) + + cloud2cluster, err = cluster.New(cloud2cfg) + Expect(err).NotTo(HaveOccurred()) + g.Go(func() error { + return ignoreCanceled(cloud2cluster.Start(ctx)) + }) + + cloud2provider = nsprovider.New(cloud2cluster) + }) + + By("Setting up the provider and manager", func() { + provider = New(Options{}) + + var err error + mgr, err = mcmanager.New(localCfg, provider, manager.Options{}) + Expect(err).NotTo(HaveOccurred()) + + provider.SetManager(mgr) + }) + + By("Adding the namespace providers to the multi provider", func() { + // Without waiting for the cache sync adding the provider + // will fail because the cache informer is not ready yet. + cloud1cluster.GetCache().WaitForCacheSync(ctx) + err := provider.AddProvider(ctx, "cloud1", cloud1provider, cloud1provider.Run) + Expect(err).NotTo(HaveOccurred()) + + cloud2cluster.GetCache().WaitForCacheSync(ctx) + err = provider.AddProvider(ctx, "cloud2", cloud2provider, cloud2provider.Run) + Expect(err).NotTo(HaveOccurred()) + }) + + By("Setting up the controller feeding the animals", func() { + err := mcbuilder.ControllerManagedBy(mgr). + Named("fleet-ns-configmap-controller"). + For(&corev1.ConfigMap{}). + Complete(mcreconcile.Func( + func(ctx context.Context, req mcreconcile.Request) (ctrl.Result, error) { + log := log.FromContext(ctx).WithValues("request", req.String()) + log.Info("Reconciling ConfigMap") + + cl, err := mgr.GetCluster(ctx, req.ClusterName) + if err != nil { + return reconcile.Result{}, fmt.Errorf("failed to get cluster: %w", err) + } + + // Feed the animal. + cm := &corev1.ConfigMap{} + if err := cl.GetClient().Get(ctx, req.NamespacedName, cm); err != nil { + if apierrors.IsNotFound(err) { + return reconcile.Result{}, nil + } + return reconcile.Result{}, fmt.Errorf("failed to get configmap: %w", err) + } + if cm.GetLabels()["type"] != "animal" { + return reconcile.Result{}, nil + } + + cm.Data = map[string]string{"stomach": "food"} + if err := cl.GetClient().Update(ctx, cm); err != nil { + return reconcile.Result{}, fmt.Errorf("failed to update configmap: %w", err) + } + + return ctrl.Result{}, nil + }, + )) + Expect(err).NotTo(HaveOccurred()) + + By("Adding an index to the provider clusters", func() { + err := mgr.GetFieldIndexer().IndexField(ctx, &corev1.ConfigMap{}, "type", func(obj client.Object) []string { + return []string{obj.GetLabels()["type"]} + }) + Expect(err).NotTo(HaveOccurred()) + }) + }) + + By("Starting the manager", func() { + g.Go(func() error { + return ignoreCanceled(mgr.Start(ctx)) + }) + }) + }) + + BeforeAll(func() { + // cluster zoo exists in cloud1 + runtime.Must(client.IgnoreAlreadyExists(cloud1client.Create(ctx, &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "zoo"}}))) + runtime.Must(client.IgnoreAlreadyExists(cloud1client.Create(ctx, &corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Namespace: "zoo", Name: "elephant", Labels: map[string]string{"type": "animal"}}}))) + runtime.Must(client.IgnoreAlreadyExists(cloud1client.Create(ctx, &corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Namespace: "zoo", Name: "lion", Labels: map[string]string{"type": "animal"}}}))) + runtime.Must(client.IgnoreAlreadyExists(cloud1client.Create(ctx, &corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Namespace: "zoo", Name: "keeper", Labels: map[string]string{"type": "human"}}}))) + + // cluster jungle exists in cloud2 + runtime.Must(client.IgnoreAlreadyExists(cloud2client.Create(ctx, &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "jungle"}}))) + runtime.Must(client.IgnoreAlreadyExists(cloud2client.Create(ctx, &corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Namespace: "jungle", Name: "monkey", Labels: map[string]string{"type": "animal"}}}))) + runtime.Must(client.IgnoreAlreadyExists(cloud2client.Create(ctx, &corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Namespace: "jungle", Name: "tree", Labels: map[string]string{"type": "thing"}}}))) + runtime.Must(client.IgnoreAlreadyExists(cloud2client.Create(ctx, &corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Namespace: "jungle", Name: "tarzan", Labels: map[string]string{"type": "human"}}}))) + + // cluster island exists in both clouds + runtime.Must(client.IgnoreAlreadyExists(cloud1client.Create(ctx, &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "island"}}))) + runtime.Must(client.IgnoreAlreadyExists(cloud1client.Create(ctx, &corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Namespace: "island", Name: "bird", Labels: map[string]string{"type": "animal"}}}))) + runtime.Must(client.IgnoreAlreadyExists(cloud1client.Create(ctx, &corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Namespace: "island", Name: "stone", Labels: map[string]string{"type": "thing"}}}))) + runtime.Must(client.IgnoreAlreadyExists(cloud1client.Create(ctx, &corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Namespace: "island", Name: "crusoe", Labels: map[string]string{"type": "human"}}}))) + runtime.Must(client.IgnoreAlreadyExists(cloud2client.Create(ctx, &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "island"}}))) + runtime.Must(client.IgnoreAlreadyExists(cloud2client.Create(ctx, &corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Namespace: "island", Name: "bird", Labels: map[string]string{"type": "animal"}}}))) + runtime.Must(client.IgnoreAlreadyExists(cloud2client.Create(ctx, &corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Namespace: "island", Name: "stone", Labels: map[string]string{"type": "thing"}}}))) + runtime.Must(client.IgnoreAlreadyExists(cloud2client.Create(ctx, &corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Namespace: "island", Name: "selkirk", Labels: map[string]string{"type": "human"}}}))) + }) + + It("runs the reconciler for existing objects", func(ctx context.Context) { + Eventually(func() string { + cl, err := mgr.GetCluster(ctx, "cloud1#zoo") + Expect(err).NotTo(HaveOccurred()) + lion := &corev1.ConfigMap{} + err = cl.GetClient().Get(ctx, client.ObjectKey{Namespace: "default", Name: "lion"}, lion) + Expect(err).NotTo(HaveOccurred()) + return lion.Data["stomach"] + }, "10s").Should(Equal("food")) + }) + + It("runs the reconciler for new objects", func(ctx context.Context) { + By("Creating a new configmap", func() { + err := cloud1client.Create(ctx, &corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Namespace: "zoo", Name: "tiger", Labels: map[string]string{"type": "animal"}}}) + Expect(err).NotTo(HaveOccurred()) + }) + + Eventually(func() string { + cl, err := mgr.GetCluster(ctx, "cloud1#zoo") + Expect(err).NotTo(HaveOccurred()) + tiger := &corev1.ConfigMap{} + err = cl.GetClient().Get(ctx, client.ObjectKey{Namespace: "default", Name: "tiger"}, tiger) + Expect(err).NotTo(HaveOccurred()) + return tiger.Data["stomach"] + }, "10s").Should(Equal("food")) + }) + + It("runs the reconciler for updated objects", func(ctx context.Context) { + updated := &corev1.ConfigMap{} + By("Emptying the elephant's stomach", func() { + err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + if err := cloud1client.Get(ctx, client.ObjectKey{Namespace: "zoo", Name: "elephant"}, updated); err != nil { + return err + } + updated.Data = map[string]string{} + return cloud1client.Update(ctx, updated) + }) + Expect(err).NotTo(HaveOccurred()) + }) + rv, err := strconv.ParseInt(updated.ResourceVersion, 10, 64) + Expect(err).NotTo(HaveOccurred()) + + Eventually(func() int64 { + cl, err := mgr.GetCluster(ctx, "cloud1#zoo") + Expect(err).NotTo(HaveOccurred()) + elephant := &corev1.ConfigMap{} + err = cl.GetClient().Get(ctx, client.ObjectKey{Namespace: "default", Name: "elephant"}, elephant) + Expect(err).NotTo(HaveOccurred()) + rv, err := strconv.ParseInt(elephant.ResourceVersion, 10, 64) + Expect(err).NotTo(HaveOccurred()) + return rv + }, "10s").Should(BeNumerically(">=", rv)) + + Eventually(func() string { + cl, err := mgr.GetCluster(ctx, "cloud1#zoo") + Expect(err).NotTo(HaveOccurred()) + elephant := &corev1.ConfigMap{} + err = cl.GetClient().Get(ctx, client.ObjectKey{Namespace: "default", Name: "elephant"}, elephant) + Expect(err).NotTo(HaveOccurred()) + return elephant.Data["stomach"] + }, "10s").Should(Equal("food")) + }) + + It("queries island on cloud1 via a multi-cluster index", func() { + island, err := mgr.GetCluster(ctx, "cloud1#island") + Expect(err).NotTo(HaveOccurred()) + cms := &corev1.ConfigMapList{} + err = island.GetCache().List(ctx, cms, client.MatchingFields{"type": "human"}) + Expect(err).NotTo(HaveOccurred()) + Expect(cms.Items).To(HaveLen(1)) + Expect(cms.Items[0].Name).To(Equal("crusoe")) + Expect(cms.Items[0].Namespace).To(Equal("default")) + }) + + It("queries island on cloud2 via a multi-cluster index", func() { + island, err := mgr.GetCluster(ctx, "cloud2#island") + Expect(err).NotTo(HaveOccurred()) + cms := &corev1.ConfigMapList{} + err = island.GetCache().List(ctx, cms, client.MatchingFields{"type": "human"}) + Expect(err).NotTo(HaveOccurred()) + Expect(cms.Items).To(HaveLen(1)) + Expect(cms.Items[0].Name).To(Equal("selkirk")) + Expect(cms.Items[0].Namespace).To(Equal("default")) + }) + + AfterAll(func() { + By("Stopping the provider, cluster, manager, and controller", func() { + cancel() + }) + By("Waiting for the error group to finish", func() { + err := g.Wait() + Expect(err).NotTo(HaveOccurred()) + }) + }) +}) + +func ignoreCanceled(err error) error { + if errors.Is(err, context.Canceled) { + return nil + } + return err +} diff --git a/providers/multi/wrapper_manager.go b/providers/multi/wrapper_manager.go new file mode 100644 index 0000000..34dc8f8 --- /dev/null +++ b/providers/multi/wrapper_manager.go @@ -0,0 +1,36 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package multi + +import ( + "context" + + "sigs.k8s.io/controller-runtime/pkg/cluster" + + mctrl "sigs.k8s.io/multicluster-runtime" +) + +var _ mctrl.Manager = &wrappedManager{} + +type wrappedManager struct { + mctrl.Manager + prefix, sep string +} + +func (w *wrappedManager) Engage(ctx context.Context, name string, cl cluster.Cluster) error { + return w.Manager.Engage(ctx, w.prefix+w.sep+name, cl) +}