@@ -32,7 +32,6 @@ import (
32
32
"sigs.k8s.io/controller-runtime/pkg/cluster"
33
33
"sigs.k8s.io/controller-runtime/pkg/handler"
34
34
ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics"
35
- internal "sigs.k8s.io/controller-runtime/pkg/internal/source"
36
35
logf "sigs.k8s.io/controller-runtime/pkg/log"
37
36
"sigs.k8s.io/controller-runtime/pkg/predicate"
38
37
"sigs.k8s.io/controller-runtime/pkg/reconcile"
@@ -82,7 +81,7 @@ type Controller struct {
82
81
startWatches []* watchDescription
83
82
84
83
// clusterAwareWatches maintains a list of cluster aware sources, handlers, and predicates to start when the controller is started.
85
- clusterAwareWatches []* watchDescription
84
+ clusterAwareWatches []* deepcopyableWatchDescription
86
85
87
86
// clustersByName is used to manage the fleet of clusters.
88
87
clustersByName map [string ]* clusterDescription
@@ -124,37 +123,13 @@ type watchDescription struct {
124
123
predicates []predicate.Predicate
125
124
}
126
125
127
- func (w * watchDescription ) IsClusterAware () bool {
128
- if _ , ok := w .src .(cluster.AwareDeepCopy [* internal.Kind ]); ! ok {
129
- if _ , ok := w .src .(cluster.AwareDeepCopy [source.Source ]); ! ok {
130
- return false
131
- }
132
- }
133
- if _ , ok := w .handler .(cluster.AwareDeepCopy [handler.EventHandler ]); ! ok {
134
- return false
135
- }
136
- return true
137
- }
138
-
139
- func (w * watchDescription ) DeepCopyFor (c cluster.Cluster ) * watchDescription {
140
- copy := & watchDescription {
141
- predicates : w .predicates ,
142
- }
143
- if clusterAwareSource , ok := w .src .(cluster.AwareDeepCopy [* internal.Kind ]); ok {
144
- copy .src = clusterAwareSource .DeepCopyFor (c )
145
- } else if clusterAwareSource , ok := w .src .(cluster.AwareDeepCopy [source.Source ]); ok {
146
- copy .src = clusterAwareSource .DeepCopyFor (c )
147
- } else {
148
- return nil
149
- }
150
-
151
- if clusterAwareHandler , ok := w .handler .(cluster.AwareDeepCopy [handler.EventHandler ]); ok {
152
- copy .handler = clusterAwareHandler .DeepCopyFor (c )
153
- } else {
154
- return nil
155
- }
156
-
157
- return copy
126
+ // deepcopyableWatchDescription contains all the information necessary to start
127
+ // a watch. In addition to watchDescription it also contains the DeepCopyFor
128
+ // method to adapt it to a different cluster.
129
+ type deepcopyableWatchDescription struct {
130
+ src source.DeepCopyableSyncingSource
131
+ handler handler.DeepCopyableEventHandler
132
+ predicates []predicate.Predicate
158
133
}
159
134
160
135
// Reconcile implements reconcile.Reconciler.
@@ -182,14 +157,22 @@ func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prc
182
157
c .mu .Lock ()
183
158
defer c .mu .Unlock ()
184
159
185
- watchDesc := & watchDescription {src : src , handler : evthdler , predicates : prct }
186
-
187
160
// If the source is cluster aware, store it in a separate list.
188
- _ , forceDefaultClsuter := src .(ClusterAwareSource )
189
- if c .WatchProviderClusters && ! forceDefaultClsuter {
190
- if ! watchDesc .IsClusterAware () {
191
- return fmt .Errorf ("source %s is not cluster aware, but WatchProviderClusters is true" , src )
161
+ var forceDefaultCluster bool
162
+ if src , ok := src .(ClusterAwareSource ); ok {
163
+ forceDefaultCluster = src .ForceDefaultCluster ()
164
+ }
165
+ if c .WatchProviderClusters && ! forceDefaultCluster {
166
+ src , ok := src .(source.DeepCopyableSyncingSource )
167
+ if ! ok {
168
+ return fmt .Errorf ("source %T is not cluster aware, but WatchProviderClusters is true" , src )
192
169
}
170
+ evthdler , ok := evthdler .(handler.DeepCopyableEventHandler )
171
+ if ! ok {
172
+ return fmt .Errorf ("handler %T is not cluster aware, but WatchProviderClusters is true" , evthdler )
173
+ }
174
+
175
+ watchDesc := & deepcopyableWatchDescription {src : src , handler : evthdler , predicates : prct }
193
176
c .clusterAwareWatches = append (c .clusterAwareWatches , watchDesc )
194
177
195
178
// If the watch is cluster aware, start it for all the clusters
@@ -208,7 +191,7 @@ func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prc
208
191
//
209
192
// These watches are going to be held on the controller struct until the manager or user calls Start(...).
210
193
if ! c .Started {
211
- c .startWatches = append (c .startWatches , watchDesc )
194
+ c .startWatches = append (c .startWatches , & watchDescription { src : src , handler : evthdler , predicates : prct } )
212
195
return nil
213
196
}
214
197
@@ -260,8 +243,8 @@ func (c *Controller) Disengage(ctx context.Context, cluster cluster.Cluster) err
260
243
return nil
261
244
}
262
245
263
- func (c * Controller ) startClusterAwareWatchLocked (cldesc * clusterDescription , watchDesc * watchDescription ) error {
264
- watch := watchDesc .DeepCopyFor (cldesc )
246
+ func (c * Controller ) startClusterAwareWatchLocked (cldesc * clusterDescription , watchDesc * deepcopyableWatchDescription ) error {
247
+ watch := & deepcopyableWatchDescription { src : watchDesc .src . DeepCopyFor (cldesc . Cluster ), handler : watchDesc . handler . DeepCopyFor ( cldesc . Cluster ), predicates : watchDesc . predicates }
265
248
if watch == nil {
266
249
return nil
267
250
}
0 commit comments