Skip to content

Commit fcd2b53

Browse files
committed
Add multi provider
1 parent 5a71e2a commit fcd2b53

File tree

2 files changed

+176
-0
lines changed

2 files changed

+176
-0
lines changed

providers/multi/provider.go

Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package multi
18+
19+
import (
20+
"context"
21+
"errors"
22+
"fmt"
23+
"strings"
24+
"sync"
25+
26+
"github.com/go-logr/logr"
27+
28+
"sigs.k8s.io/controller-runtime/pkg/client"
29+
"sigs.k8s.io/controller-runtime/pkg/cluster"
30+
"sigs.k8s.io/controller-runtime/pkg/log"
31+
32+
mctrl "sigs.k8s.io/multicluster-runtime"
33+
"sigs.k8s.io/multicluster-runtime/pkg/multicluster"
34+
)
35+
36+
var _ multicluster.Provider = &Provider{}
37+
38+
// Options defines the options for the provider.
39+
type Options struct {
40+
Separator string
41+
}
42+
43+
// Provider is a multicluster.Provider that manages multiple providers.
44+
type Provider struct {
45+
opts Options
46+
47+
log logr.Logger
48+
mgr mctrl.Manager
49+
50+
providerLock sync.RWMutex
51+
providers map[string]multicluster.Provider
52+
providerCancel map[string]context.CancelFunc
53+
}
54+
55+
// New returns a new instance of the provider with the given options.
56+
func New(mgr mctrl.Manager, opts Options) *Provider {
57+
p := new(Provider)
58+
59+
p.opts = opts
60+
if p.opts.Separator == "" {
61+
p.opts.Separator = "#"
62+
}
63+
64+
p.log = log.Log.WithName("namespaced-cluster-provider")
65+
p.mgr = mgr
66+
67+
p.providers = make(map[string]multicluster.Provider)
68+
p.providerCancel = make(map[string]context.CancelFunc)
69+
70+
return p
71+
}
72+
73+
// Run starts the provider and blocks until the context is done. This is a noop.
74+
func (p *Provider) Run(ctx context.Context, mgr mctrl.Manager) error {
75+
<-ctx.Done()
76+
return nil
77+
}
78+
79+
func (p *Provider) splitClusterName(clusterName string) (string, string) {
80+
parts := strings.SplitN(clusterName, p.opts.Separator, 2)
81+
if len(parts) < 2 {
82+
return "", clusterName
83+
}
84+
return parts[0], parts[1]
85+
}
86+
87+
// AddProvider adds a new provider to the manager. The startFunc is
88+
// called to start the provider.
89+
// Starting the provider outside of this function is an error and will
90+
// result in undefined behaviour.
91+
// startFunc should not block.
92+
func (p *Provider) AddProvider(ctx context.Context, prefix string, provider multicluster.Provider, startFunc func(context.Context, mctrl.Manager) error) error {
93+
ctx, cancel := context.WithCancel(ctx)
94+
95+
wrappedMgr := &wrappedManager{
96+
Manager: p.mgr,
97+
prefix: prefix,
98+
sep: p.opts.Separator,
99+
}
100+
101+
if err := startFunc(ctx, wrappedMgr); err != nil {
102+
cancel()
103+
return fmt.Errorf("failed to start provider %q: %w", prefix, err)
104+
}
105+
106+
p.providerLock.Lock()
107+
p.providers[prefix] = provider
108+
p.providerCancel[prefix] = cancel
109+
p.providerLock.Unlock()
110+
111+
return nil
112+
}
113+
114+
// RemoveProvider removes a provider from the manager and cancels its
115+
// context.
116+
func (p *Provider) RemoveProvider(prefix string) {
117+
p.providerLock.Lock()
118+
defer p.providerLock.Unlock()
119+
if cancel, ok := p.providerCancel[prefix]; ok {
120+
cancel()
121+
delete(p.providers, prefix)
122+
delete(p.providerCancel, prefix)
123+
}
124+
}
125+
126+
// Get returns a cluster for the given identifying cluster name.
127+
func (p *Provider) Get(ctx context.Context, clusterName string) (cluster.Cluster, error) {
128+
prefix, clusterName := p.splitClusterName(clusterName)
129+
130+
p.providerLock.RLock()
131+
provider, ok := p.providers[prefix]
132+
p.providerLock.RUnlock()
133+
134+
if !ok {
135+
return nil, fmt.Errorf("provider not found %q: %w", prefix, multicluster.ErrClusterNotFound)
136+
}
137+
138+
return provider.Get(ctx, clusterName)
139+
}
140+
141+
// IndexField indexes the given object by the given field on all
142+
// providers and returns the aggregated errors.
143+
func (p *Provider) IndexField(ctx context.Context, obj client.Object, field string, extractValue client.IndexerFunc) error {
144+
p.providerLock.RLock()
145+
defer p.providerLock.RUnlock()
146+
var errs error
147+
for prefix, provider := range p.providers {
148+
if err := provider.IndexField(ctx, obj, field, extractValue); err != nil {
149+
errs = errors.Join(
150+
errs,
151+
fmt.Errorf("failed to index field %q on cluster %q: %w", field, prefix, err),
152+
)
153+
}
154+
}
155+
return errs
156+
}

providers/multi/wrappedManager.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package multi
2+
3+
import (
4+
"context"
5+
6+
"sigs.k8s.io/controller-runtime/pkg/cluster"
7+
8+
mctrl "sigs.k8s.io/multicluster-runtime"
9+
)
10+
11+
var _ mctrl.Manager = &wrappedManager{}
12+
13+
type wrappedManager struct {
14+
mctrl.Manager
15+
prefix, sep string
16+
}
17+
18+
func (w *wrappedManager) Engage(ctx context.Context, name string, cl cluster.Cluster) error {
19+
return w.Manager.Engage(ctx, w.prefix+w.sep+name, cl)
20+
}

0 commit comments

Comments
 (0)