Skip to content

Commit 650df41

Browse files
committed
Extend interface with runner
1 parent a41032c commit 650df41

File tree

14 files changed

+135
-39
lines changed

14 files changed

+135
-39
lines changed

examples/cluster-api/main.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,11 @@ func main() {
9292
os.Exit(1)
9393
}
9494

95+
if err := provider.SetupWithManager(mcMgr); err != nil {
96+
entryLog.Error(err, "unable to set up provider")
97+
os.Exit(1)
98+
}
99+
95100
// Create a configmap controller in the multi-cluster manager.
96101
if err := mcbuilder.ControllerManagedBy(mcMgr).
97102
Named("multicluster-configmaps").
@@ -129,7 +134,7 @@ func main() {
129134
return ignoreCanceled(localMgr.Start(ctx))
130135
})
131136
g.Go(func() error {
132-
return ignoreCanceled(provider.Run(ctx, mcMgr))
137+
return ignoreCanceled(provider.Run(ctx))
133138
})
134139
g.Go(func() error {
135140
return ignoreCanceled(mcMgr.Start(ctx))

examples/kind/main.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,11 @@ func main() {
5151
os.Exit(1)
5252
}
5353

54+
if err := provider.SetupWithManager(mgr); err != nil {
55+
entryLog.Error(err, "unable to set up provider")
56+
os.Exit(1)
57+
}
58+
5459
err = mcbuilder.ControllerManagedBy(mgr).
5560
Named("multicluster-configmaps").
5661
For(&corev1.ConfigMap{}).
@@ -92,7 +97,7 @@ func main() {
9297
// Starting everything.
9398
g, ctx := errgroup.WithContext(ctx)
9499
g.Go(func() error {
95-
return ignoreCanceled(provider.Run(ctx, mgr))
100+
return ignoreCanceled(provider.Run(ctx))
96101
})
97102
g.Go(func() error {
98103
return ignoreCanceled(mgr.Start(ctx))

examples/namespace/main.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,10 @@ func run(ctx context.Context, log logr.Logger, kubeconfig string) error {
122122
return fmt.Errorf("unable to set up overall controller manager: %w", err)
123123
}
124124

125+
if err := provider.SetupWithManager(mgr); err != nil {
126+
return fmt.Errorf("unable to set up provider: %w", err)
127+
}
128+
125129
if err := mcbuilder.ControllerManagedBy(mgr).
126130
Named("multicluster-configmaps").
127131
For(&corev1.ConfigMap{}).
@@ -154,7 +158,7 @@ func run(ctx context.Context, log logr.Logger, kubeconfig string) error {
154158
// Starting everything.
155159
g, ctx := errgroup.WithContext(ctx)
156160
g.Go(func() error {
157-
return ignoreCanceled(provider.Run(ctx, mgr))
161+
return ignoreCanceled(provider.Run(ctx))
158162
})
159163
g.Go(func() error {
160164
return ignoreCanceled(cl.Start(ctx))

pkg/multicluster/multicluster.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,4 +60,9 @@ type Provider interface {
6060
// IndexField indexes the given object by the given field on all engaged
6161
// clusters, current and future.
6262
IndexField(ctx context.Context, obj client.Object, field string, extractValue client.IndexerFunc) error
63+
64+
// Run runs the provider. Implenetation of this method should block.
65+
// If you need to pass in manager, it is recommended to implement SetupWithManager(mgr mcmanager.Manager) error method on individual providers.
66+
// It is not part of the provider interface because it is not required for all providers.
67+
Run(ctx context.Context) error
6368
}

providers/cluster-api/provider.go

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -126,13 +126,21 @@ func (p *Provider) Get(_ context.Context, clusterName string) (cluster.Cluster,
126126
return nil, multicluster.ErrClusterNotFound
127127
}
128128

129-
// Run starts the provider and blocks.
130-
func (p *Provider) Run(ctx context.Context, mgr mcmanager.Manager) error {
131-
p.log.Info("Starting Cluster-API cluster provider")
132-
129+
func (p *Provider) SetupWithManager(mgr mcmanager.Manager) error {
133130
p.lock.Lock()
131+
defer p.lock.Unlock()
132+
134133
p.mcMgr = mgr
135-
p.lock.Unlock()
134+
return nil
135+
}
136+
137+
// Run starts the provider and blocks.
138+
func (p *Provider) Run(ctx context.Context) error {
139+
if p.mcMgr == nil {
140+
return fmt.Errorf("manager is not set")
141+
}
142+
143+
p.log.Info("Starting Cluster-API cluster provider")
136144

137145
<-ctx.Done()
138146

providers/cluster-inventory-api/provider.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -291,3 +291,14 @@ func (p *Provider) IndexField(ctx context.Context, obj client.Object, field stri
291291

292292
return nil
293293
}
294+
295+
// Run runs the provider and blocks.
296+
func (p *Provider) Run(ctx context.Context) error {
297+
if p.mcMgr == nil {
298+
return fmt.Errorf("manager is not set")
299+
}
300+
301+
<-ctx.Done()
302+
303+
return ctx.Err()
304+
}

providers/file/provider.go

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -138,13 +138,22 @@ type Provider struct {
138138
log logr.Logger
139139

140140
clustersLock sync.RWMutex
141+
mgr mcmanager.Manager
141142
clusters map[string]cluster.Cluster
142143
clusterCancel map[string]func()
143144
}
144145

146+
func (p *Provider) SetupWithManager(mgr mcmanager.Manager) error {
147+
p.clustersLock.Lock()
148+
defer p.clustersLock.Unlock()
149+
150+
p.mgr = mgr
151+
return nil
152+
}
153+
145154
// Run starts the provider and updates the clusters and is blocking.
146-
func (p *Provider) Run(ctx context.Context, mgr mcmanager.Manager) error {
147-
if err := p.run(ctx, mgr); err != nil {
155+
func (p *Provider) Run(ctx context.Context) error {
156+
if err := p.run(ctx); err != nil {
148157
return fmt.Errorf("initial update failed: %w", err)
149158
}
150159

@@ -181,7 +190,7 @@ func (p *Provider) Run(ctx context.Context, mgr mcmanager.Manager) error {
181190
// would also require to track which cluster belongs to
182191
// which file.
183192
// Instead clusters are just updated from all files.
184-
if err := p.run(ctx, mgr); err != nil {
193+
if err := p.run(ctx); err != nil {
185194
p.log.Error(err, "failed to update clusters after file change")
186195
}
187196
case err, ok := <-watcher.Errors:
@@ -194,11 +203,11 @@ func (p *Provider) Run(ctx context.Context, mgr mcmanager.Manager) error {
194203
}
195204

196205
// RunOnce performs a single update of the clusters.
197-
func (p *Provider) RunOnce(ctx context.Context, mgr mcmanager.Manager) error {
198-
return p.run(ctx, mgr)
206+
func (p *Provider) RunOnce(ctx context.Context) error {
207+
return p.run(ctx)
199208
}
200209

201-
func (p *Provider) addCluster(ctx context.Context, mgr mcmanager.Manager, name string, cl cluster.Cluster) {
210+
func (p *Provider) addCluster(ctx context.Context, name string, cl cluster.Cluster) {
202211
ctx, cancel := context.WithCancel(ctx)
203212

204213
p.clustersLock.Lock()
@@ -213,8 +222,8 @@ func (p *Provider) addCluster(ctx context.Context, mgr mcmanager.Manager, name s
213222
p.removeCluster(name)
214223
}()
215224

216-
if mgr != nil {
217-
if err := mgr.Engage(ctx, name, cl); err != nil {
225+
if p.mgr != nil {
226+
if err := p.mgr.Engage(ctx, name, cl); err != nil {
218227
cancel()
219228
p.log.Error(err, "failed to engage cluster", "name", name)
220229
}
@@ -232,7 +241,11 @@ func (p *Provider) removeCluster(name string) {
232241
}
233242
}
234243

235-
func (p *Provider) run(ctx context.Context, mgr mcmanager.Manager) error {
244+
func (p *Provider) run(ctx context.Context) error {
245+
if p.mgr == nil {
246+
return fmt.Errorf("manager is not set")
247+
}
248+
236249
loadedClusters, err := p.loadClusters()
237250
if err != nil {
238251
return fmt.Errorf("failed to load clusters: %w", err)
@@ -247,12 +260,12 @@ func (p *Provider) run(ctx context.Context, mgr mcmanager.Manager) error {
247260
if !cmp.Equal(existingCluster.GetConfig(), cl.GetConfig()) {
248261
p.log.Info("updating cluster", "name", name)
249262
p.removeCluster(name)
250-
p.addCluster(ctx, mgr, name, cl)
263+
p.addCluster(ctx, name, cl)
251264
}
252265
continue
253266
}
254267
p.log.Info("adding cluster", "name", name)
255-
p.addCluster(ctx, mgr, name, cl)
268+
p.addCluster(ctx, name, cl)
256269
}
257270

258271
// delete clusters that are no longer present

providers/file/provider_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,11 +54,13 @@ var _ = Describe("Provider File", Ordered, func() {
5454
KubeconfigDirs: []string{discoverDir},
5555
})
5656
Expect(err).NotTo(HaveOccurred())
57+
err = provider.SetupWithManager(nil)
58+
Expect(err).NotTo(HaveOccurred())
5759
})
5860

5961
By("Starting the provider", func() {
6062
g.Go(func() error {
61-
return ignoreCanceled(provider.Run(ctx, nil))
63+
return ignoreCanceled(provider.Run(ctx))
6264
})
6365
})
6466

providers/kind/provider.go

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ type index struct {
5757

5858
// Provider is a cluster Provider that works with a local Kind instance.
5959
type Provider struct {
60+
mcMgr mcmanager.Manager
61+
6062
opts []cluster.Option
6163
log logr.Logger
6264
lock sync.RWMutex
@@ -76,8 +78,20 @@ func (p *Provider) Get(ctx context.Context, clusterName string) (cluster.Cluster
7678
return nil, multicluster.ErrClusterNotFound
7779
}
7880

81+
func (p *Provider) SetupWithManager(mgr mcmanager.Manager) error {
82+
p.lock.Lock()
83+
defer p.lock.Unlock()
84+
85+
p.mcMgr = mgr
86+
return nil
87+
}
88+
7989
// Run starts the provider and blocks.
80-
func (p *Provider) Run(ctx context.Context, mgr mcmanager.Manager) error {
90+
func (p *Provider) Run(ctx context.Context) error {
91+
if p.mcMgr == nil {
92+
return fmt.Errorf("manager is not set")
93+
}
94+
8195
p.log.Info("Starting kind cluster provider")
8296

8397
provider := kind.NewProvider()
@@ -152,15 +166,13 @@ func (p *Provider) Run(ctx context.Context, mgr mcmanager.Manager) error {
152166
p.log.Info("Added new cluster", "cluster", clusterName)
153167

154168
// engage manager
155-
if mgr != nil {
156-
if err := mgr.Engage(clusterCtx, clusterName, cl); err != nil {
157-
log.Error(err, "failed to engage manager")
158-
p.lock.Lock()
159-
delete(p.clusters, clusterName)
160-
delete(p.cancelFns, clusterName)
161-
p.lock.Unlock()
162-
return false, nil
163-
}
169+
if err := p.mcMgr.Engage(clusterCtx, clusterName, cl); err != nil {
170+
log.Error(err, "failed to engage manager")
171+
p.lock.Lock()
172+
delete(p.clusters, clusterName)
173+
delete(p.cancelFns, clusterName)
174+
p.lock.Unlock()
175+
return false, nil
164176
}
165177
}
166178

providers/kubeconfig/provider.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,13 @@ func (p *Provider) getCluster(clusterName string) (activeCluster, bool) {
117117
return ac, exists
118118
}
119119

120+
// Run runs the provider and blocks.
121+
func (p *Provider) Run(ctx context.Context) error {
122+
<-ctx.Done()
123+
124+
return nil
125+
}
126+
120127
// setCluster adds a cluster with write lock
121128
func (p *Provider) setCluster(clusterName string, ac activeCluster) {
122129
p.lock.Lock()

0 commit comments

Comments
 (0)