Skip to content

Commit f99a591

Browse files
committed
Add multi provider
1 parent 79d0407 commit f99a591

File tree

4 files changed

+585
-0
lines changed

4 files changed

+585
-0
lines changed

providers/multi/multi_suite_test.go

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package multi
18+
19+
import (
20+
"testing"
21+
22+
"k8s.io/client-go/rest"
23+
24+
"sigs.k8s.io/controller-runtime/pkg/envtest"
25+
logf "sigs.k8s.io/controller-runtime/pkg/log"
26+
"sigs.k8s.io/controller-runtime/pkg/log/zap"
27+
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
28+
29+
. "github.com/onsi/ginkgo/v2"
30+
. "github.com/onsi/gomega"
31+
)
32+
33+
func TestBuilder(t *testing.T) {
34+
RegisterFailHandler(Fail)
35+
RunSpecs(t, "Namespace Provider Suite")
36+
}
37+
38+
// The operator runs in a local cluster and embeds two other providers
39+
// for cloud providers. The cloud providers are simulated by using the
40+
// namespace provider with two other clusters.
41+
42+
var localEnv *envtest.Environment
43+
var localCfg *rest.Config
44+
45+
var cloud1 *envtest.Environment
46+
var cloud1cfg *rest.Config
47+
48+
var cloud2 *envtest.Environment
49+
var cloud2cfg *rest.Config
50+
51+
var _ = BeforeSuite(func() {
52+
logf.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true)))
53+
54+
var err error
55+
56+
localEnv = &envtest.Environment{}
57+
localCfg, err = localEnv.Start()
58+
Expect(err).NotTo(HaveOccurred())
59+
60+
cloud1 = &envtest.Environment{}
61+
cloud1cfg, err = cloud1.Start()
62+
Expect(err).NotTo(HaveOccurred())
63+
64+
cloud2 = &envtest.Environment{}
65+
cloud2cfg, err = cloud2.Start()
66+
Expect(err).NotTo(HaveOccurred())
67+
68+
// Prevent the metrics listener being created
69+
metricsserver.DefaultBindAddress = "0"
70+
})
71+
72+
var _ = AfterSuite(func() {
73+
if localEnv != nil {
74+
Expect(localEnv.Stop()).To(Succeed())
75+
}
76+
77+
if cloud1 != nil {
78+
Expect(cloud1.Stop()).To(Succeed())
79+
}
80+
81+
if cloud2 != nil {
82+
Expect(cloud2.Stop()).To(Succeed())
83+
}
84+
85+
// Put the DefaultBindAddress back
86+
metricsserver.DefaultBindAddress = ":8080"
87+
})

providers/multi/provider.go

Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package multi
18+
19+
import (
20+
"context"
21+
"errors"
22+
"fmt"
23+
"strings"
24+
"sync"
25+
26+
"github.com/go-logr/logr"
27+
28+
"sigs.k8s.io/controller-runtime/pkg/client"
29+
"sigs.k8s.io/controller-runtime/pkg/cluster"
30+
"sigs.k8s.io/controller-runtime/pkg/log"
31+
32+
mctrl "sigs.k8s.io/multicluster-runtime"
33+
"sigs.k8s.io/multicluster-runtime/pkg/multicluster"
34+
)
35+
36+
var _ multicluster.Provider = &Provider{}
37+
38+
// Options defines the options for the provider.
39+
type Options struct {
40+
Separator string
41+
}
42+
43+
// Provider is a multicluster.Provider that manages multiple providers.
44+
type Provider struct {
45+
opts Options
46+
47+
log logr.Logger
48+
mgr mctrl.Manager
49+
50+
providerLock sync.RWMutex
51+
providers map[string]multicluster.Provider
52+
providerCancel map[string]context.CancelFunc
53+
}
54+
55+
// New returns a new instance of the provider with the given options.
56+
func New(opts Options) *Provider {
57+
p := new(Provider)
58+
59+
p.opts = opts
60+
if p.opts.Separator == "" {
61+
p.opts.Separator = "#"
62+
}
63+
64+
p.log = log.Log.WithName("multi-provider")
65+
66+
p.providers = make(map[string]multicluster.Provider)
67+
p.providerCancel = make(map[string]context.CancelFunc)
68+
69+
return p
70+
}
71+
72+
// SetManager sets the manager for the provider.
73+
func (p *Provider) SetManager(mgr mctrl.Manager) {
74+
if p.mgr != nil {
75+
p.log.Error(nil, "manager already set, overwriting")
76+
}
77+
p.mgr = mgr
78+
}
79+
80+
func (p *Provider) splitClusterName(clusterName string) (string, string) {
81+
parts := strings.SplitN(clusterName, p.opts.Separator, 2)
82+
if len(parts) < 2 {
83+
return "", clusterName
84+
}
85+
return parts[0], parts[1]
86+
}
87+
88+
// AddProvider adds a new provider with the given prefix.
89+
//
90+
// The startFunc is called to start the provider - starting the provider
91+
// outside of startFunc is an error and will result in undefined
92+
// behaviour.
93+
// startFunc should block for as long as the provider is running,
94+
// If startFunc returns an error the provider is removed and the error
95+
// is returned.
96+
func (p *Provider) AddProvider(ctx context.Context, prefix string, provider multicluster.Provider, startFunc func(context.Context, mctrl.Manager) error) error {
97+
ctx, cancel := context.WithCancel(ctx)
98+
99+
p.providerLock.Lock()
100+
_, ok := p.providers[prefix]
101+
p.providerLock.Unlock()
102+
if ok {
103+
cancel()
104+
return fmt.Errorf("provider already exists for prefix %q", prefix)
105+
}
106+
107+
var wrappedMgr mctrl.Manager
108+
if p.mgr == nil {
109+
p.log.Info("manager is nil, wrapped manager passed to start will be nil as well", "prefix", prefix)
110+
} else {
111+
wrappedMgr = &wrappedManager{
112+
Manager: p.mgr,
113+
prefix: prefix,
114+
sep: p.opts.Separator,
115+
}
116+
}
117+
118+
p.providerLock.Lock()
119+
p.providers[prefix] = provider
120+
p.providerCancel[prefix] = cancel
121+
p.providerLock.Unlock()
122+
123+
go func() {
124+
defer p.RemoveProvider(prefix)
125+
if err := startFunc(ctx, wrappedMgr); err != nil {
126+
cancel()
127+
p.log.Error(err, "error in provider", "prefix", prefix)
128+
}
129+
}()
130+
131+
return nil
132+
}
133+
134+
// RemoveProvider removes a provider from the manager and cancels its
135+
// context.
136+
func (p *Provider) RemoveProvider(prefix string) {
137+
p.providerLock.Lock()
138+
defer p.providerLock.Unlock()
139+
if cancel, ok := p.providerCancel[prefix]; ok {
140+
cancel()
141+
delete(p.providers, prefix)
142+
delete(p.providerCancel, prefix)
143+
}
144+
}
145+
146+
// Get returns a cluster by name.
147+
func (p *Provider) Get(ctx context.Context, clusterName string) (cluster.Cluster, error) {
148+
prefix, clusterName := p.splitClusterName(clusterName)
149+
150+
p.providerLock.RLock()
151+
provider, ok := p.providers[prefix]
152+
p.providerLock.RUnlock()
153+
154+
if !ok {
155+
return nil, fmt.Errorf("provider not found %q: %w", prefix, multicluster.ErrClusterNotFound)
156+
}
157+
158+
return provider.Get(ctx, clusterName)
159+
}
160+
161+
// IndexField indexes a field on all providers and clusters and returns
162+
// the aggregated errors.
163+
func (p *Provider) IndexField(ctx context.Context, obj client.Object, field string, extractValue client.IndexerFunc) error {
164+
p.providerLock.RLock()
165+
defer p.providerLock.RUnlock()
166+
var errs error
167+
for prefix, provider := range p.providers {
168+
if err := provider.IndexField(ctx, obj, field, extractValue); err != nil {
169+
errs = errors.Join(
170+
errs,
171+
fmt.Errorf("failed to index field %q on cluster %q: %w", field, prefix, err),
172+
)
173+
}
174+
}
175+
return errs
176+
}

0 commit comments

Comments
 (0)