@@ -59,6 +59,8 @@ const (
5959)
6060
6161var _ Runnable = & controllerManager {}
62+ var _ cluster.Aware = & controllerManager {}
63+ var _ Manager = & controllerManager {}
6264
6365type controllerManager struct {
6466 sync.Mutex
@@ -68,8 +70,14 @@ type controllerManager struct {
6870 errChan chan error
6971 runnables * runnables
7072
71- // cluster holds a variety of methods to interact with a cluster. Required.
72- cluster cluster.Cluster
73+ // defaultCluster holds a variety of methods to interact with a defaultCluster. Required.
74+ defaultCluster cluster.Cluster
75+ defaultClusterOptions cluster.Option
76+
77+ // engagedCluster is a map of engaged clusters. The can come and go as the manager is running.
78+ engagedClustersLock sync.RWMutex
79+ engagedClusters map [string ]cluster.Cluster
80+ clusterAwareRunnables []cluster.Aware
7381
7482 // recorderProvider is used to generate event recorders that will be injected into Controllers
7583 // (and EventHandlers, Sources and Predicates).
@@ -161,6 +169,9 @@ type controllerManager struct {
161169 // internalProceduresStop channel is used internally to the manager when coordinating
162170 // the proper shutdown of servers. This channel is also used for dependency injection.
163171 internalProceduresStop chan struct {}
172+
173+ // clusterProvider is used to get clusters by name, beyond the default cluster.
174+ clusterProvider cluster.Provider
164175}
165176
166177type hasCache interface {
@@ -176,7 +187,40 @@ func (cm *controllerManager) Add(r Runnable) error {
176187}
177188
178189func (cm * controllerManager ) add (r Runnable ) error {
179- return cm .runnables .Add (r )
190+ var engaged []cluster.Aware
191+ var errs []error
192+ disengage := func () {
193+ for _ , aware := range engaged {
194+ if err := aware .Disengage (cm .internalCtx , cm .defaultCluster ); err != nil {
195+ errs = append (errs , err )
196+ }
197+ }
198+ }
199+
200+ // engage with existing clusters (this is reversible)
201+ if aware , ok := r .(cluster.Aware ); ok {
202+ cm .engagedClustersLock .RLock ()
203+ defer cm .engagedClustersLock .RUnlock ()
204+ for _ , cl := range cm .engagedClusters {
205+ if err := aware .Engage (cm .internalCtx , cl ); err != nil {
206+ errs = append (errs , err )
207+ break
208+ }
209+ engaged = append (engaged , aware )
210+ }
211+ if len (errs ) > 0 {
212+ disengage ()
213+ return kerrors .NewAggregate (errs )
214+ }
215+ cm .clusterAwareRunnables = append (cm .clusterAwareRunnables , aware )
216+ } else {
217+ if err := cm .runnables .Add (r ); err != nil {
218+ disengage ()
219+ return err
220+ }
221+ }
222+
223+ return nil
180224}
181225
182226// AddMetricsServerExtraHandler adds extra handler served on path to the http server that serves metrics.
@@ -231,40 +275,58 @@ func (cm *controllerManager) AddReadyzCheck(name string, check healthz.Checker)
231275 return nil
232276}
233277
278+ func (cm * controllerManager ) Name () string {
279+ return cm .defaultCluster .Name ()
280+ }
281+
282+ func (cm * controllerManager ) GetCluster (ctx context.Context , clusterName string ) (cluster.Cluster , error ) {
283+ if clusterName == "" || clusterName == cm .defaultCluster .Name () {
284+ return cm .defaultCluster , nil
285+ }
286+
287+ if cm .clusterProvider == nil {
288+ return nil , fmt .Errorf ("cluster %q not found, cluster provider is not set" , clusterName )
289+ }
290+
291+ // intentionally not returning from engaged clusters. This can be used
292+ // without engaging clusters.
293+ return cm .clusterProvider .Get (ctx , clusterName )
294+ }
295+
234296func (cm * controllerManager ) GetHTTPClient () * http.Client {
235- return cm .cluster .GetHTTPClient ()
297+ return cm .defaultCluster .GetHTTPClient ()
236298}
237299
238300func (cm * controllerManager ) GetConfig () * rest.Config {
239- return cm .cluster .GetConfig ()
301+ return cm .defaultCluster .GetConfig ()
240302}
241303
242304func (cm * controllerManager ) GetClient () client.Client {
243- return cm .cluster .GetClient ()
305+ return cm .defaultCluster .GetClient ()
244306}
245307
246308func (cm * controllerManager ) GetScheme () * runtime.Scheme {
247- return cm .cluster .GetScheme ()
309+ return cm .defaultCluster .GetScheme ()
248310}
249311
250312func (cm * controllerManager ) GetFieldIndexer () client.FieldIndexer {
251- return cm .cluster .GetFieldIndexer ()
313+ return cm .defaultCluster .GetFieldIndexer ()
252314}
253315
254316func (cm * controllerManager ) GetCache () cache.Cache {
255- return cm .cluster .GetCache ()
317+ return cm .defaultCluster .GetCache ()
256318}
257319
258320func (cm * controllerManager ) GetEventRecorderFor (name string ) record.EventRecorder {
259- return cm .cluster .GetEventRecorderFor (name )
321+ return cm .defaultCluster .GetEventRecorderFor (name )
260322}
261323
262324func (cm * controllerManager ) GetRESTMapper () meta.RESTMapper {
263- return cm .cluster .GetRESTMapper ()
325+ return cm .defaultCluster .GetRESTMapper ()
264326}
265327
266328func (cm * controllerManager ) GetAPIReader () client.Reader {
267- return cm .cluster .GetAPIReader ()
329+ return cm .defaultCluster .GetAPIReader ()
268330}
269331
270332func (cm * controllerManager ) GetWebhookServer () webhook.Server {
@@ -381,7 +443,7 @@ func (cm *controllerManager) Start(ctx context.Context) (err error) {
381443 }()
382444
383445 // Add the cluster runnable.
384- if err := cm .add (cm .cluster ); err != nil {
446+ if err := cm .add (cm .defaultCluster ); err != nil {
385447 return fmt .Errorf ("failed to add cluster to runnables: %w" , err )
386448 }
387449
@@ -614,6 +676,70 @@ func (cm *controllerManager) initLeaderElector() (*leaderelection.LeaderElector,
614676 return leaderElector , nil
615677}
616678
679+ func (cm * controllerManager ) Engage (ctx context.Context , cl cluster.Cluster ) error {
680+ cm .Lock ()
681+ defer cm .Unlock ()
682+
683+ // be reentrant via noop
684+ cm .engagedClustersLock .RLock ()
685+ if _ , ok := cm .engagedClusters [cl .Name ()]; ok {
686+ cm .engagedClustersLock .RUnlock ()
687+ return nil
688+ }
689+ cm .engagedClustersLock .RUnlock ()
690+
691+ // add early because any engaged runnable could access it
692+ cm .engagedClustersLock .Lock ()
693+ cm .engagedClusters [cl .Name ()] = cl
694+ cm .engagedClustersLock .Unlock ()
695+
696+ // engage known runnables
697+ var errs []error
698+ engaged := []cluster.Aware {}
699+ for _ , r := range cm .clusterAwareRunnables {
700+ if err := r .Engage (ctx , cl ); err != nil {
701+ errs = append (errs , err )
702+ break
703+ }
704+ engaged = append (engaged , r )
705+ }
706+
707+ // clean-up
708+ if len (errs ) > 0 {
709+ for _ , aware := range engaged {
710+ if err := aware .Disengage (ctx , cl ); err != nil {
711+ errs = append (errs , err )
712+ }
713+ }
714+
715+ cm .engagedClustersLock .Lock ()
716+ delete (cm .engagedClusters , cl .Name ())
717+ cm .engagedClustersLock .Unlock ()
718+
719+ return kerrors .NewAggregate (errs )
720+ }
721+
722+ return nil
723+ }
724+
725+ func (cm * controllerManager ) Disengage (ctx context.Context , cl cluster.Cluster ) error {
726+ cm .Lock ()
727+ defer cm .Unlock ()
728+
729+ var errs []error
730+ for _ , r := range cm .clusterAwareRunnables {
731+ if err := r .Disengage (ctx , cl ); err != nil {
732+ errs = append (errs , err )
733+ }
734+ }
735+
736+ cm .engagedClustersLock .Lock ()
737+ delete (cm .engagedClusters , cl .Name ())
738+ cm .engagedClustersLock .Unlock ()
739+
740+ return kerrors .NewAggregate (errs )
741+ }
742+
617743func (cm * controllerManager ) startLeaderElectionRunnables () error {
618744 return cm .runnables .LeaderElection .Start (cm .internalCtx )
619745}
0 commit comments