Skip to content

Commit 0a9f077

Browse files
committed
Extend interface with runner
1 parent 00d670c commit 0a9f077

File tree

17 files changed

+108
-90
lines changed

17 files changed

+108
-90
lines changed

examples/cluster-api/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ func main() {
129129
return ignoreCanceled(localMgr.Start(ctx))
130130
})
131131
g.Go(func() error {
132-
return ignoreCanceled(provider.Run(ctx, mcMgr))
132+
return ignoreCanceled(provider.Start(ctx, mcMgr))
133133
})
134134
g.Go(func() error {
135135
return ignoreCanceled(mcMgr.Start(ctx))

examples/file/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ func main() {
114114
return ignoreCanceled(mgr.Start(ctx))
115115
})
116116
g.Go(func() error {
117-
return ignoreCanceled(provider.Run(ctx, mgr))
117+
return ignoreCanceled(provider.Start(ctx, mgr))
118118
})
119119
if err := g.Wait(); err != nil {
120120
entryLog.Info("error in errgroup: %w", err)

examples/kind/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ func main() {
9292
// Starting everything.
9393
g, ctx := errgroup.WithContext(ctx)
9494
g.Go(func() error {
95-
return ignoreCanceled(provider.Run(ctx, mgr))
95+
return ignoreCanceled(provider.Start(ctx, mgr))
9696
})
9797
g.Go(func() error {
9898
return ignoreCanceled(mgr.Start(ctx))

examples/namespace/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ func run(ctx context.Context, log logr.Logger, kubeconfig string) error {
154154
// Starting everything.
155155
g, ctx := errgroup.WithContext(ctx)
156156
g.Go(func() error {
157-
return ignoreCanceled(provider.Run(ctx, mgr))
157+
return ignoreCanceled(provider.Start(ctx, mgr))
158158
})
159159
g.Go(func() error {
160160
return ignoreCanceled(cl.Start(ctx))

pkg/multicluster/multicluster.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,4 +60,9 @@ type Provider interface {
6060
// IndexField indexes the given object by the given field on all engaged
6161
// clusters, current and future.
6262
IndexField(ctx context.Context, obj client.Object, field string, extractValue client.IndexerFunc) error
63+
64+
// Start runs the provider. Implementation of this method should block.
65+
// If you need to pass in manager, it is recommended to implement SetupWithManager(mgr mcmanager.Manager) error method on individual providers.
66+
// Even if a provider gets a manager through e.g. `SetupWithManager` the `Aware` passed to this method must be used to engage clusters.
67+
Start(context.Context, Aware) error
6368
}

providers/cluster-api/provider.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ import (
3939
"sigs.k8s.io/controller-runtime/pkg/manager"
4040
"sigs.k8s.io/controller-runtime/pkg/reconcile"
4141

42-
mcmanager "sigs.k8s.io/multicluster-runtime/pkg/manager"
4342
"sigs.k8s.io/multicluster-runtime/pkg/multicluster"
4443
)
4544

@@ -109,7 +108,7 @@ type Provider struct {
109108
client client.Client
110109

111110
lock sync.Mutex
112-
mcMgr mcmanager.Manager
111+
mcAware multicluster.Aware
113112
clusters map[string]cluster.Cluster
114113
cancelFns map[string]context.CancelFunc
115114
indexers []index
@@ -126,13 +125,14 @@ func (p *Provider) Get(_ context.Context, clusterName string) (cluster.Cluster,
126125
return nil, multicluster.ErrClusterNotFound
127126
}
128127

129-
// Run starts the provider and blocks.
130-
func (p *Provider) Run(ctx context.Context, mgr mcmanager.Manager) error {
131-
p.log.Info("Starting Cluster-API cluster provider")
132-
128+
// Start starts the provider and blocks.
129+
func (p *Provider) Start(ctx context.Context, mcAware multicluster.Aware) error {
133130
p.lock.Lock()
134-
p.mcMgr = mgr
135-
p.lock.Unlock()
131+
defer p.lock.Unlock()
132+
133+
p.mcAware = mcAware
134+
135+
p.log.Info("Starting Cluster-API cluster provider")
136136

137137
<-ctx.Done()
138138

@@ -171,7 +171,7 @@ func (p *Provider) Reconcile(ctx context.Context, req reconcile.Request) (reconc
171171
// TODO(sttts): do tighter logging.
172172

173173
// provider already started?
174-
if p.mcMgr == nil {
174+
if p.mcAware == nil {
175175
return reconcile.Result{RequeueAfter: time.Second * 2}, nil
176176
}
177177

@@ -222,7 +222,7 @@ func (p *Provider) Reconcile(ctx context.Context, req reconcile.Request) (reconc
222222
p.log.Info("Added new cluster")
223223

224224
// engage manager.
225-
if err := p.mcMgr.Engage(clusterCtx, key, cl); err != nil {
225+
if err := p.mcAware.Engage(clusterCtx, key, cl); err != nil {
226226
log.Error(err, "failed to engage manager")
227227
delete(p.clusters, key)
228228
delete(p.cancelFns, key)

providers/cluster-inventory-api/provider.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ type Provider struct {
7878
strategy kubeconfigstrategy.Interface
7979

8080
lock sync.RWMutex
81-
mcMgr mcmanager.Manager
81+
manager mcmanager.Manager
8282
clusters map[string]cluster.Cluster
8383
cancelFns map[string]context.CancelFunc
8484
kubeconfig map[string]*rest.Config
@@ -125,7 +125,7 @@ func (p *Provider) SetupWithManager(mgr mcmanager.Manager) error {
125125
if mgr == nil {
126126
return fmt.Errorf("manager is nil")
127127
}
128-
p.mcMgr = mgr
128+
p.manager = mgr
129129

130130
// Get the local manager from the multi-cluster manager.
131131
localMgr := mgr.GetLocalManager()
@@ -199,7 +199,7 @@ func (p *Provider) Reconcile(ctx context.Context, req reconcile.Request) (reconc
199199
defer p.lock.Unlock()
200200

201201
// provider already started?
202-
if p.mcMgr == nil {
202+
if p.manager == nil {
203203
log.V(3).Info("Provider not started yet, requeuing")
204204
return reconcile.Result{RequeueAfter: time.Second * 2}, nil
205205
}
@@ -267,7 +267,7 @@ func (p *Provider) Reconcile(ctx context.Context, req reconcile.Request) (reconc
267267
log.Info("Added new cluster for ClusterProfile")
268268

269269
// engage manager.
270-
if err := p.mcMgr.Engage(clusterCtx, key, cl); err != nil {
270+
if err := p.manager.Engage(clusterCtx, key, cl); err != nil {
271271
log.Error(err, "failed to engage manager for ClusterProfile")
272272
delete(p.clusters, key)
273273
delete(p.cancelFns, key)
@@ -301,3 +301,9 @@ func (p *Provider) IndexField(ctx context.Context, obj client.Object, field stri
301301

302302
return nil
303303
}
304+
305+
// Start runs the provider and blocks.
306+
func (p *Provider) Start(ctx context.Context, _ multicluster.Aware) error {
307+
<-ctx.Done()
308+
return ctx.Err()
309+
}

providers/file/provider.go

Lines changed: 28 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ import (
3333
"sigs.k8s.io/controller-runtime/pkg/cluster"
3434
"sigs.k8s.io/controller-runtime/pkg/log"
3535

36-
mcmanager "sigs.k8s.io/multicluster-runtime/pkg/manager"
3736
"sigs.k8s.io/multicluster-runtime/pkg/multicluster"
3837
)
3938

@@ -137,14 +136,19 @@ type Provider struct {
137136

138137
log logr.Logger
139138

140-
clustersLock sync.RWMutex
139+
lock sync.RWMutex
140+
mcAware multicluster.Aware
141141
clusters map[string]cluster.Cluster
142142
clusterCancel map[string]func()
143143
}
144144

145-
// Run starts the provider and updates the clusters and is blocking.
146-
func (p *Provider) Run(ctx context.Context, mgr mcmanager.Manager) error {
147-
if err := p.run(ctx, mgr); err != nil {
145+
// Start starts the provider and updates the clusters and is blocking.
146+
func (p *Provider) Start(ctx context.Context, mcAware multicluster.Aware) error {
147+
p.lock.Lock()
148+
p.mcAware = mcAware
149+
p.lock.Unlock()
150+
151+
if err := p.run(ctx); err != nil {
148152
return fmt.Errorf("initial update failed: %w", err)
149153
}
150154

@@ -181,7 +185,7 @@ func (p *Provider) Run(ctx context.Context, mgr mcmanager.Manager) error {
181185
// would also require to track which cluster belongs to
182186
// which file.
183187
// Instead clusters are just updated from all files.
184-
if err := p.run(ctx, mgr); err != nil {
188+
if err := p.run(ctx); err != nil {
185189
p.log.Error(err, "failed to update clusters after file change")
186190
}
187191
case err, ok := <-watcher.Errors:
@@ -194,17 +198,17 @@ func (p *Provider) Run(ctx context.Context, mgr mcmanager.Manager) error {
194198
}
195199

196200
// RunOnce performs a single update of the clusters.
197-
func (p *Provider) RunOnce(ctx context.Context, mgr mcmanager.Manager) error {
198-
return p.run(ctx, mgr)
201+
func (p *Provider) RunOnce(ctx context.Context) error {
202+
return p.run(ctx)
199203
}
200204

201-
func (p *Provider) addCluster(ctx context.Context, mgr mcmanager.Manager, name string, cl cluster.Cluster) {
205+
func (p *Provider) addCluster(ctx context.Context, name string, cl cluster.Cluster) {
202206
ctx, cancel := context.WithCancel(ctx)
203207

204-
p.clustersLock.Lock()
208+
p.lock.Lock()
205209
p.clusters[name] = cl
206210
p.clusterCancel[name] = cancel
207-
p.clustersLock.Unlock()
211+
p.lock.Unlock()
208212

209213
go func() {
210214
if err := cl.Start(ctx); err != nil {
@@ -213,17 +217,17 @@ func (p *Provider) addCluster(ctx context.Context, mgr mcmanager.Manager, name s
213217
p.removeCluster(name)
214218
}()
215219

216-
if mgr != nil {
217-
if err := mgr.Engage(ctx, name, cl); err != nil {
220+
if p.mcAware != nil {
221+
if err := p.mcAware.Engage(ctx, name, cl); err != nil {
218222
cancel()
219223
p.log.Error(err, "failed to engage cluster", "name", name)
220224
}
221225
}
222226
}
223227

224228
func (p *Provider) removeCluster(name string) {
225-
p.clustersLock.Lock()
226-
defer p.clustersLock.Unlock()
229+
p.lock.Lock()
230+
defer p.lock.Unlock()
227231

228232
if cancel, ok := p.clusterCancel[name]; ok {
229233
cancel()
@@ -232,7 +236,7 @@ func (p *Provider) removeCluster(name string) {
232236
}
233237
}
234238

235-
func (p *Provider) run(ctx context.Context, mgr mcmanager.Manager) error {
239+
func (p *Provider) run(ctx context.Context) error {
236240
loadedClusters, err := p.loadClusters()
237241
if err != nil {
238242
return fmt.Errorf("failed to load clusters: %w", err)
@@ -247,12 +251,12 @@ func (p *Provider) run(ctx context.Context, mgr mcmanager.Manager) error {
247251
if !cmp.Equal(existingCluster.GetConfig(), cl.GetConfig()) {
248252
p.log.Info("updating cluster", "name", name)
249253
p.removeCluster(name)
250-
p.addCluster(ctx, mgr, name, cl)
254+
p.addCluster(ctx, name, cl)
251255
}
252256
continue
253257
}
254258
p.log.Info("adding cluster", "name", name)
255-
p.addCluster(ctx, mgr, name, cl)
259+
p.addCluster(ctx, name, cl)
256260
}
257261

258262
// delete clusters that are no longer present
@@ -271,8 +275,8 @@ func (p *Provider) run(ctx context.Context, mgr mcmanager.Manager) error {
271275
// If the cluster name is empty (""), it returns the first cluster
272276
// found.
273277
func (p *Provider) Get(_ context.Context, clusterName string) (cluster.Cluster, error) {
274-
p.clustersLock.RLock()
275-
defer p.clustersLock.RUnlock()
278+
p.lock.RLock()
279+
defer p.lock.RUnlock()
276280

277281
if clusterName == "" {
278282
for _, cl := range p.clusters {
@@ -289,8 +293,8 @@ func (p *Provider) Get(_ context.Context, clusterName string) (cluster.Cluster,
289293

290294
// IndexField indexes a field on all clusters.
291295
func (p *Provider) IndexField(ctx context.Context, obj client.Object, field string, extractValue client.IndexerFunc) error {
292-
p.clustersLock.RLock()
293-
defer p.clustersLock.RUnlock()
296+
p.lock.RLock()
297+
defer p.lock.RUnlock()
294298

295299
for name, cl := range p.clusters {
296300
if err := cl.GetCache().IndexField(ctx, obj, field, extractValue); err != nil {
@@ -302,7 +306,7 @@ func (p *Provider) IndexField(ctx context.Context, obj client.Object, field stri
302306

303307
// ClusterNames returns the names of all clusters known to the provider.
304308
func (p *Provider) ClusterNames() []string {
305-
p.clustersLock.RLock()
306-
defer p.clustersLock.RUnlock()
309+
p.lock.RLock()
310+
defer p.lock.RUnlock()
307311
return slices.Sorted(maps.Keys(p.clusters))
308312
}

providers/file/provider_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,11 +54,12 @@ var _ = Describe("Provider File", Ordered, func() {
5454
KubeconfigDirs: []string{discoverDir},
5555
})
5656
Expect(err).NotTo(HaveOccurred())
57+
Expect(err).NotTo(HaveOccurred())
5758
})
5859

5960
By("Starting the provider", func() {
6061
g.Go(func() error {
61-
return ignoreCanceled(provider.Run(ctx, nil))
62+
return ignoreCanceled(provider.Start(ctx, nil)) // Pass nil for aware since we are not using a manager in this test
6263
})
6364
})
6465

providers/kind/go.mod

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,9 @@ require (
1616
require (
1717
github.com/BurntSushi/toml v1.4.0 // indirect
1818
github.com/alessio/shellescape v1.4.2 // indirect
19-
github.com/beorn7/perks v1.0.1 // indirect
20-
github.com/cespare/xxhash/v2 v2.3.0 // indirect
2119
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
2220
github.com/emicklei/go-restful/v3 v3.11.0 // indirect
2321
github.com/evanphx/json-patch/v5 v5.9.11 // indirect
24-
github.com/fsnotify/fsnotify v1.7.0 // indirect
2522
github.com/fxamacker/cbor/v2 v2.7.0 // indirect
2623
github.com/go-openapi/jsonpointer v0.21.0 // indirect
2724
github.com/go-openapi/jsonreference v0.20.2 // indirect
@@ -41,10 +38,6 @@ require (
4138
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
4239
github.com/pelletier/go-toml v1.9.5 // indirect
4340
github.com/pkg/errors v0.9.1 // indirect
44-
github.com/prometheus/client_golang v1.22.0 // indirect
45-
github.com/prometheus/client_model v0.6.1 // indirect
46-
github.com/prometheus/common v0.62.0 // indirect
47-
github.com/prometheus/procfs v0.15.1 // indirect
4841
github.com/spf13/cobra v1.8.1 // indirect
4942
github.com/spf13/pflag v1.0.5 // indirect
5043
github.com/x448/float16 v0.8.4 // indirect
@@ -54,7 +47,6 @@ require (
5447
golang.org/x/term v0.30.0 // indirect
5548
golang.org/x/text v0.23.0 // indirect
5649
golang.org/x/time v0.9.0 // indirect
57-
gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect
5850
google.golang.org/protobuf v1.36.5 // indirect
5951
gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect
6052
gopkg.in/inf.v0 v0.9.1 // indirect

0 commit comments

Comments
 (0)