Skip to content

Commit 37a46ec

Browse files
committed
Add multi provider
Signed-off-by: Nelo-T. Wallus <[email protected]>
1 parent 00d670c commit 37a46ec

File tree

5 files changed

+612
-0
lines changed

5 files changed

+612
-0
lines changed

providers/multi/doc.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
// Package multi provides a multicluster.Provider that allows to utilize
18+
// multiple providers in a single multicluster.Manager without
19+
// conflicting cluster names.
20+
//
21+
// Each provider must be added with a unique prefix, which is used to
22+
// identify clusters generated by that provider.
23+
package multi

providers/multi/multi_suite_test.go

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package multi
18+
19+
import (
20+
"testing"
21+
22+
"k8s.io/client-go/rest"
23+
24+
"sigs.k8s.io/controller-runtime/pkg/envtest"
25+
logf "sigs.k8s.io/controller-runtime/pkg/log"
26+
"sigs.k8s.io/controller-runtime/pkg/log/zap"
27+
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
28+
29+
. "github.com/onsi/ginkgo/v2"
30+
. "github.com/onsi/gomega"
31+
)
32+
33+
func TestBuilder(t *testing.T) {
34+
RegisterFailHandler(Fail)
35+
RunSpecs(t, "Namespace Provider Suite")
36+
}
37+
38+
// The operator runs in a local cluster and embeds two other providers
39+
// for cloud providers. The cloud providers are simulated by using the
40+
// namespace provider with two other clusters.
41+
42+
var localEnv *envtest.Environment
43+
var localCfg *rest.Config
44+
45+
var cloud1 *envtest.Environment
46+
var cloud1cfg *rest.Config
47+
48+
var cloud2 *envtest.Environment
49+
var cloud2cfg *rest.Config
50+
51+
var _ = BeforeSuite(func() {
52+
logf.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true)))
53+
54+
var err error
55+
56+
localEnv = &envtest.Environment{}
57+
localCfg, err = localEnv.Start()
58+
Expect(err).NotTo(HaveOccurred())
59+
60+
cloud1 = &envtest.Environment{}
61+
cloud1cfg, err = cloud1.Start()
62+
Expect(err).NotTo(HaveOccurred())
63+
64+
cloud2 = &envtest.Environment{}
65+
cloud2cfg, err = cloud2.Start()
66+
Expect(err).NotTo(HaveOccurred())
67+
68+
// Prevent the metrics listener being created
69+
metricsserver.DefaultBindAddress = "0"
70+
})
71+
72+
var _ = AfterSuite(func() {
73+
if localEnv != nil {
74+
Expect(localEnv.Stop()).To(Succeed())
75+
}
76+
77+
if cloud1 != nil {
78+
Expect(cloud1.Stop()).To(Succeed())
79+
}
80+
81+
if cloud2 != nil {
82+
Expect(cloud2.Stop()).To(Succeed())
83+
}
84+
85+
// Put the DefaultBindAddress back
86+
metricsserver.DefaultBindAddress = ":8080"
87+
})

providers/multi/provider.go

Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package multi
18+
19+
import (
20+
"context"
21+
"errors"
22+
"fmt"
23+
"strings"
24+
"sync"
25+
26+
"github.com/go-logr/logr"
27+
28+
"sigs.k8s.io/controller-runtime/pkg/client"
29+
"sigs.k8s.io/controller-runtime/pkg/cluster"
30+
"sigs.k8s.io/controller-runtime/pkg/log"
31+
32+
mctrl "sigs.k8s.io/multicluster-runtime"
33+
"sigs.k8s.io/multicluster-runtime/pkg/multicluster"
34+
)
35+
36+
var _ multicluster.Provider = &Provider{}
37+
38+
// Options defines the options for the provider.
39+
type Options struct {
40+
Separator string
41+
}
42+
43+
// Provider is a multicluster.Provider that manages multiple providers.
44+
type Provider struct {
45+
opts Options
46+
47+
log logr.Logger
48+
mgr mctrl.Manager
49+
50+
providerLock sync.RWMutex
51+
providers map[string]multicluster.Provider
52+
providerCancel map[string]context.CancelFunc
53+
}
54+
55+
// New returns a new instance of the provider with the given options.
56+
func New(opts Options) *Provider {
57+
p := new(Provider)
58+
59+
p.opts = opts
60+
if p.opts.Separator == "" {
61+
p.opts.Separator = "#"
62+
}
63+
64+
p.log = log.Log.WithName("multi-provider")
65+
66+
p.providers = make(map[string]multicluster.Provider)
67+
p.providerCancel = make(map[string]context.CancelFunc)
68+
69+
return p
70+
}
71+
72+
// SetManager sets the manager for the provider.
73+
func (p *Provider) SetManager(mgr mctrl.Manager) {
74+
if p.mgr != nil {
75+
p.log.Error(nil, "manager already set, overwriting")
76+
}
77+
p.mgr = mgr
78+
}
79+
80+
func (p *Provider) splitClusterName(clusterName string) (string, string) {
81+
parts := strings.SplitN(clusterName, p.opts.Separator, 2)
82+
if len(parts) < 2 {
83+
return "", clusterName
84+
}
85+
return parts[0], parts[1]
86+
}
87+
88+
// AddProvider adds a new provider with the given prefix.
89+
//
90+
// The startFunc is called to start the provider - starting the provider
91+
// outside of startFunc is an error and will result in undefined
92+
// behaviour.
93+
// startFunc should block for as long as the provider is running,
94+
// If startFunc returns an error the provider is removed and the error
95+
// is returned.
96+
func (p *Provider) AddProvider(ctx context.Context, prefix string, provider multicluster.Provider, startFunc func(context.Context, mctrl.Manager) error) error {
97+
ctx, cancel := context.WithCancel(ctx)
98+
99+
p.providerLock.Lock()
100+
_, ok := p.providers[prefix]
101+
p.providerLock.Unlock()
102+
if ok {
103+
cancel()
104+
return fmt.Errorf("provider already exists for prefix %q", prefix)
105+
}
106+
107+
var wrappedMgr mctrl.Manager
108+
if p.mgr == nil {
109+
p.log.Info("manager is nil, wrapped manager passed to start will be nil as well", "prefix", prefix)
110+
} else {
111+
wrappedMgr = &wrappedManager{
112+
Manager: p.mgr,
113+
prefix: prefix,
114+
sep: p.opts.Separator,
115+
}
116+
}
117+
118+
p.providerLock.Lock()
119+
p.providers[prefix] = provider
120+
p.providerCancel[prefix] = cancel
121+
p.providerLock.Unlock()
122+
123+
go func() {
124+
defer p.RemoveProvider(prefix)
125+
if err := startFunc(ctx, wrappedMgr); err != nil {
126+
cancel()
127+
p.log.Error(err, "error in provider", "prefix", prefix)
128+
}
129+
}()
130+
131+
return nil
132+
}
133+
134+
// RemoveProvider removes a provider from the manager and cancels its
135+
// context.
136+
//
137+
// Warning: This can lead to dangling clusters if the provider is not
138+
// using the context it is started with to engage the clusters it
139+
// manages.
140+
func (p *Provider) RemoveProvider(prefix string) {
141+
p.providerLock.Lock()
142+
defer p.providerLock.Unlock()
143+
if cancel, ok := p.providerCancel[prefix]; ok {
144+
cancel()
145+
delete(p.providers, prefix)
146+
delete(p.providerCancel, prefix)
147+
}
148+
}
149+
150+
// Get returns a cluster by name.
151+
func (p *Provider) Get(ctx context.Context, clusterName string) (cluster.Cluster, error) {
152+
prefix, clusterName := p.splitClusterName(clusterName)
153+
154+
p.providerLock.RLock()
155+
provider, ok := p.providers[prefix]
156+
p.providerLock.RUnlock()
157+
158+
if !ok {
159+
return nil, fmt.Errorf("provider not found %q: %w", prefix, multicluster.ErrClusterNotFound)
160+
}
161+
162+
return provider.Get(ctx, clusterName)
163+
}
164+
165+
// IndexField indexes a field on all providers and clusters and returns
166+
// the aggregated errors.
167+
func (p *Provider) IndexField(ctx context.Context, obj client.Object, field string, extractValue client.IndexerFunc) error {
168+
p.providerLock.RLock()
169+
defer p.providerLock.RUnlock()
170+
var errs error
171+
for prefix, provider := range p.providers {
172+
if err := provider.IndexField(ctx, obj, field, extractValue); err != nil {
173+
errs = errors.Join(
174+
errs,
175+
fmt.Errorf("failed to index field %q on cluster %q: %w", field, prefix, err),
176+
)
177+
}
178+
}
179+
return errs
180+
}

0 commit comments

Comments
 (0)