Skip to content

✨ Add multi provider #56

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions providers/multi/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
Copyright 2025 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// Package multi provides a multicluster.Provider that allows to utilize
// multiple providers in a single multicluster.Manager without
// conflicting cluster names.
//
// Each provider must be added with a unique prefix, which is used to
// identify clusters generated by that provider.
package multi
87 changes: 87 additions & 0 deletions providers/multi/multi_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package multi

import (
"testing"

"k8s.io/client-go/rest"

"sigs.k8s.io/controller-runtime/pkg/envtest"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

func TestBuilder(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Namespace Provider Suite")
}

// The operator runs in a local cluster and embeds two other providers
// for cloud providers. The cloud providers are simulated by using the
// namespace provider with two other clusters.

var localEnv *envtest.Environment
var localCfg *rest.Config

var cloud1 *envtest.Environment
var cloud1cfg *rest.Config

var cloud2 *envtest.Environment
var cloud2cfg *rest.Config

var _ = BeforeSuite(func() {
logf.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true)))

var err error

localEnv = &envtest.Environment{}
localCfg, err = localEnv.Start()
Expect(err).NotTo(HaveOccurred())

cloud1 = &envtest.Environment{}
cloud1cfg, err = cloud1.Start()
Expect(err).NotTo(HaveOccurred())

cloud2 = &envtest.Environment{}
cloud2cfg, err = cloud2.Start()
Expect(err).NotTo(HaveOccurred())

// Prevent the metrics listener being created
metricsserver.DefaultBindAddress = "0"
})

var _ = AfterSuite(func() {
if localEnv != nil {
Expect(localEnv.Stop()).To(Succeed())
}

if cloud1 != nil {
Expect(cloud1.Stop()).To(Succeed())
}

if cloud2 != nil {
Expect(cloud2.Stop()).To(Succeed())
}

// Put the DefaultBindAddress back
metricsserver.DefaultBindAddress = ":8080"
})
180 changes: 180 additions & 0 deletions providers/multi/provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
/*
Copyright 2025 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package multi

import (
"context"
"errors"
"fmt"
"strings"
"sync"

"github.com/go-logr/logr"

"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/cluster"
"sigs.k8s.io/controller-runtime/pkg/log"

mctrl "sigs.k8s.io/multicluster-runtime"
"sigs.k8s.io/multicluster-runtime/pkg/multicluster"
)

var _ multicluster.Provider = &Provider{}

// Options defines the options for the provider.
type Options struct {
Separator string
}

// Provider is a multicluster.Provider that manages multiple providers.
type Provider struct {
opts Options

log logr.Logger
mgr mctrl.Manager

providerLock sync.RWMutex
providers map[string]multicluster.Provider
providerCancel map[string]context.CancelFunc
}

// New returns a new instance of the provider with the given options.
func New(opts Options) *Provider {
p := new(Provider)

p.opts = opts
if p.opts.Separator == "" {
p.opts.Separator = "#"
}

p.log = log.Log.WithName("multi-provider")

p.providers = make(map[string]multicluster.Provider)
p.providerCancel = make(map[string]context.CancelFunc)

return p
}

// SetManager sets the manager for the provider.
func (p *Provider) SetManager(mgr mctrl.Manager) {
if p.mgr != nil {
p.log.Error(nil, "manager already set, overwriting")
}
p.mgr = mgr
}

func (p *Provider) splitClusterName(clusterName string) (string, string) {
parts := strings.SplitN(clusterName, p.opts.Separator, 2)
if len(parts) < 2 {
return "", clusterName
}
return parts[0], parts[1]
}

// AddProvider adds a new provider with the given prefix.
//
// The startFunc is called to start the provider - starting the provider
// outside of startFunc is an error and will result in undefined
// behaviour.
// startFunc should block for as long as the provider is running,
// If startFunc returns an error the provider is removed and the error
// is returned.
func (p *Provider) AddProvider(ctx context.Context, prefix string, provider multicluster.Provider, startFunc func(context.Context, mctrl.Manager) error) error {
ctx, cancel := context.WithCancel(ctx)

p.providerLock.Lock()
_, ok := p.providers[prefix]
p.providerLock.Unlock()
if ok {
cancel()
return fmt.Errorf("provider already exists for prefix %q", prefix)
}

var wrappedMgr mctrl.Manager
if p.mgr == nil {
p.log.Info("manager is nil, wrapped manager passed to start will be nil as well", "prefix", prefix)
} else {
wrappedMgr = &wrappedManager{
Manager: p.mgr,
prefix: prefix,
sep: p.opts.Separator,
}
}

p.providerLock.Lock()
p.providers[prefix] = provider
p.providerCancel[prefix] = cancel
p.providerLock.Unlock()

go func() {
defer p.RemoveProvider(prefix)
if err := startFunc(ctx, wrappedMgr); err != nil {
cancel()
p.log.Error(err, "error in provider", "prefix", prefix)
}
}()

return nil
}

// RemoveProvider removes a provider from the manager and cancels its
// context.
//
// Warning: This can lead to dangling clusters if the provider is not
// using the context it is started with to engage the clusters it
// manages.
func (p *Provider) RemoveProvider(prefix string) {
p.providerLock.Lock()
defer p.providerLock.Unlock()
if cancel, ok := p.providerCancel[prefix]; ok {
cancel()
delete(p.providers, prefix)
delete(p.providerCancel, prefix)
}
}

// Get returns a cluster by name.
func (p *Provider) Get(ctx context.Context, clusterName string) (cluster.Cluster, error) {
prefix, clusterName := p.splitClusterName(clusterName)

p.providerLock.RLock()
provider, ok := p.providers[prefix]
p.providerLock.RUnlock()

if !ok {
return nil, fmt.Errorf("provider not found %q: %w", prefix, multicluster.ErrClusterNotFound)
}

return provider.Get(ctx, clusterName)
}

// IndexField indexes a field on all providers and clusters and returns
// the aggregated errors.
func (p *Provider) IndexField(ctx context.Context, obj client.Object, field string, extractValue client.IndexerFunc) error {
p.providerLock.RLock()
defer p.providerLock.RUnlock()
var errs error
for prefix, provider := range p.providers {
if err := provider.IndexField(ctx, obj, field, extractValue); err != nil {
errs = errors.Join(
errs,
fmt.Errorf("failed to index field %q on cluster %q: %w", field, prefix, err),
)
}
}
return errs
}
Loading