@@ -50,10 +50,10 @@ const (
50
50
51
51
type Operator struct {
52
52
* queueinformer.Operator
53
- csvQueue workqueue.RateLimitingInterface
54
- client versioned.Interface
55
- resolver install.StrategyResolverInterface
56
- lister operatorlister.OperatorLister
53
+ csvQueues map [ string ] workqueue.RateLimitingInterface
54
+ client versioned.Interface
55
+ resolver install.StrategyResolverInterface
56
+ lister operatorlister.OperatorLister
57
57
recorder record.EventRecorder
58
58
}
59
59
@@ -75,60 +75,75 @@ func NewOperator(crClient versioned.Interface, opClient operatorclient.ClientInt
75
75
}
76
76
77
77
op := & Operator {
78
- Operator : queueOperator ,
79
- client : crClient ,
80
- lister : operatorlister .NewLister (),
81
- resolver : resolver ,
82
- recorder : eventRecorder ,
78
+ Operator : queueOperator ,
79
+ csvQueues : make (map [string ]workqueue.RateLimitingInterface ),
80
+ client : crClient ,
81
+ lister : operatorlister .NewLister (),
82
+ resolver : resolver ,
83
+ recorder : eventRecorder ,
83
84
}
84
85
85
86
// Set up RBAC informers
86
- informerFactory := informers .NewSharedInformerFactory (opClient .KubernetesInterface (), wakeupInterval )
87
- roleInformer := informerFactory .Rbac ().V1 ().Roles ()
88
- roleBindingInformer := informerFactory .Rbac ().V1 ().RoleBindings ()
89
- clusterRoleInformer := informerFactory .Rbac ().V1 ().ClusterRoles ()
90
- clusterRoleBindingInformer := informerFactory .Rbac ().V1 ().ClusterRoleBindings ()
91
- namespaceInformer := informerFactory .Core ().V1 ().Namespaces ()
92
-
93
- // register namespace queueinformer
94
- queueInformer := queueinformer .NewInformer (
95
- workqueue .NewNamedRateLimitingQueue (workqueue .DefaultControllerRateLimiter (), "namespaces" ),
96
- namespaceInformer .Informer (),
87
+ roleInformer := informers .NewSharedInformerFactory (opClient .KubernetesInterface (), wakeupInterval ).Rbac ().V1 ().Roles ()
88
+ roleQueueInformer := queueinformer .NewInformer (
89
+ workqueue .NewNamedRateLimitingQueue (workqueue .DefaultControllerRateLimiter (), "roles" ),
90
+ roleInformer .Informer (),
97
91
op .syncObject ,
98
92
nil ,
99
- "namespace " ,
93
+ "roles " ,
100
94
metrics .NewMetricsNil (),
101
95
)
102
- op .RegisterQueueInformer (queueInformer )
103
- op .lister .CoreV1 ().RegisterNamespaceLister ( namespaceInformer .Lister ())
96
+ op .RegisterQueueInformer (roleQueueInformer )
97
+ op .lister .RbacV1 ().RegisterRoleLister ( metav1 . NamespaceAll , roleInformer .Lister ())
104
98
105
- // Register RBAC QueueInformers
106
- rbacInformers := []cache. SharedIndexInformer {
107
- roleInformer . Informer ( ),
99
+ roleBindingInformer := informers . NewSharedInformerFactory ( opClient . KubernetesInterface (), wakeupInterval ). Rbac (). V1 (). RoleBindings ()
100
+ roleBindingQueueInformer := queueinformer . NewInformer (
101
+ workqueue . NewNamedRateLimitingQueue ( workqueue . DefaultControllerRateLimiter (), "rolebindings" ),
108
102
roleBindingInformer .Informer (),
109
- clusterRoleInformer .Informer (),
110
- clusterRoleBindingInformer .Informer (),
111
- }
112
-
113
- rbacQueueInformers := queueinformer .New (
114
- workqueue .NewNamedRateLimitingQueue (workqueue .DefaultControllerRateLimiter (), "rbac" ),
115
- rbacInformers ,
116
103
op .syncObject ,
117
- & cache.ResourceEventHandlerFuncs {
118
- DeleteFunc : op .handleDeletion ,
119
- },
120
- "namespace" ,
104
+ nil ,
105
+ "rolebindings" ,
121
106
metrics .NewMetricsNil (),
122
107
)
123
- for _ , informer := range rbacQueueInformers {
124
- op .RegisterQueueInformer (informer )
125
- }
126
-
127
- // Set listers (for RBAC CSV requirement checking)
128
- op .lister .RbacV1 ().RegisterRoleLister (metav1 .NamespaceAll , roleInformer .Lister ())
108
+ op .RegisterQueueInformer (roleBindingQueueInformer )
129
109
op .lister .RbacV1 ().RegisterRoleBindingLister (metav1 .NamespaceAll , roleBindingInformer .Lister ())
110
+
111
+ clusterRoleInformer := informers .NewSharedInformerFactory (opClient .KubernetesInterface (), wakeupInterval ).Rbac ().V1 ().ClusterRoles ()
112
+ clusterRoleQueueInformer := queueinformer .NewInformer (
113
+ workqueue .NewNamedRateLimitingQueue (workqueue .DefaultControllerRateLimiter (), "clusterroles" ),
114
+ clusterRoleInformer .Informer (),
115
+ op .syncObject ,
116
+ nil ,
117
+ "clusterroles" ,
118
+ metrics .NewMetricsNil (),
119
+ )
120
+ op .RegisterQueueInformer (clusterRoleQueueInformer )
130
121
op .lister .RbacV1 ().RegisterClusterRoleLister (clusterRoleInformer .Lister ())
122
+
123
+ clusterRoleBindingInformer := informers .NewSharedInformerFactory (opClient .KubernetesInterface (), wakeupInterval ).Rbac ().V1 ().ClusterRoleBindings ()
124
+ clusterRoleBindingQueueInformer := queueinformer .NewInformer (
125
+ workqueue .NewNamedRateLimitingQueue (workqueue .DefaultControllerRateLimiter (), "clusterrolebindings" ),
126
+ clusterRoleBindingInformer .Informer (),
127
+ op .syncObject ,
128
+ nil ,
129
+ "clusterrolebindings" ,
130
+ metrics .NewMetricsNil (),
131
+ )
131
132
op .lister .RbacV1 ().RegisterClusterRoleBindingLister (clusterRoleBindingInformer .Lister ())
133
+ op .RegisterQueueInformer (clusterRoleBindingQueueInformer )
134
+
135
+ // register namespace queueinformer
136
+ namespaceInformer := informers .NewSharedInformerFactory (opClient .KubernetesInterface (), wakeupInterval ).Core ().V1 ().Namespaces ()
137
+ namespaceQueueInformer := queueinformer .NewInformer (
138
+ workqueue .NewNamedRateLimitingQueue (workqueue .DefaultControllerRateLimiter (), "namespaces" ),
139
+ namespaceInformer .Informer (),
140
+ op .syncObject ,
141
+ nil ,
142
+ "namespaces" ,
143
+ metrics .NewMetricsNil (),
144
+ )
145
+ op .RegisterQueueInformer (namespaceQueueInformer )
146
+ op .lister .CoreV1 ().RegisterNamespaceLister (namespaceInformer .Lister ())
132
147
133
148
// Register APIService QueueInformers
134
149
apiServiceInformer := kagg .NewSharedInformerFactory (opClient .ApiregistrationV1Interface (), wakeupInterval ).Apiregistration ().V1 ().APIServices ()
@@ -181,95 +196,74 @@ func NewOperator(crClient versioned.Interface, opClient operatorclient.ClientInt
181
196
& cache.ResourceEventHandlerFuncs {
182
197
DeleteFunc : op .handleDeletion ,
183
198
},
184
- "services " ,
199
+ "serviceaccounts " ,
185
200
metrics .NewMetricsNil (),
186
201
))
187
202
op .lister .CoreV1 ().RegisterServiceAccountLister (metav1 .NamespaceAll , serviceAccountInformer .Lister ())
188
203
189
- // Set up watch on CSVs
190
- csvInformers := []cache.SharedIndexInformer {}
204
+ // csvInformers for each namespace all use the same backing queue keys are namespaced
205
+ csvHandlers := & cache.ResourceEventHandlerFuncs {
206
+ DeleteFunc : op .deleteClusterServiceVersion ,
207
+ }
191
208
for _ , namespace := range namespaces {
192
- log .Debugf ( "watching for CSVs in namespace %s " , namespace )
209
+ log .WithField ( " namespace" , namespace ). Infof ( "watching CSVs" )
193
210
sharedInformerFactory := externalversions .NewSharedInformerFactoryWithOptions (crClient , wakeupInterval , externalversions .WithNamespace (namespace ))
194
- informer := sharedInformerFactory .Operators ().V1alpha1 ().ClusterServiceVersions ()
195
- csvInformers = append (csvInformers , informer .Informer ())
196
- op .lister .OperatorsV1alpha1 ().RegisterClusterServiceVersionLister (namespace , informer .Lister ())
197
- }
211
+ csvInformer := sharedInformerFactory .Operators ().V1alpha1 ().ClusterServiceVersions ()
212
+ op .lister .OperatorsV1alpha1 ().RegisterClusterServiceVersionLister (namespace , csvInformer .Lister ())
198
213
199
- // csvInformers for each namespace all use the same backing queue
200
- // queue keys are namespaced
201
- csvQueue := workqueue .NewNamedRateLimitingQueue (workqueue .DefaultControllerRateLimiter (), "clusterserviceversions" )
202
- csvHandlers := cache.ResourceEventHandlerFuncs {
203
- DeleteFunc : op .deleteClusterServiceVersion ,
214
+ // Register queue and QueueInformer
215
+ queueName := fmt .Sprintf ("%s/clusterserviceversions" , namespace )
216
+ csvQueue := workqueue .NewNamedRateLimitingQueue (workqueue .DefaultControllerRateLimiter (), queueName )
217
+ csvQueueInformer := queueinformer .NewInformer (csvQueue , csvInformer .Informer (), op .syncClusterServiceVersion , csvHandlers , queueName , metrics .NewMetricsCSV (op .client ))
218
+ op .RegisterQueueInformer (csvQueueInformer )
219
+ op .csvQueues [namespace ] = csvQueue
204
220
}
205
- queueInformers := queueinformer .New (
206
- csvQueue ,
207
- csvInformers ,
208
- op .syncClusterServiceVersion ,
209
- & csvHandlers ,
210
- "csv" ,
211
- metrics .NewMetricsCSV (op .client ),
212
- )
213
- for _ , informer := range queueInformers {
214
- op .RegisterQueueInformer (informer )
215
- }
216
- op .csvQueue = csvQueue
217
221
218
- // set up watch on deployments
219
- depInformers := []cache.SharedIndexInformer {}
220
- for _ , namespace := range namespaces {
221
- log .Debugf ("watching deployments in namespace %s" , namespace )
222
- informer := informers .NewSharedInformerFactoryWithOptions (opClient .KubernetesInterface (), wakeupInterval , informers .WithNamespace (namespace )).Apps ().V1 ().Deployments ()
223
- depInformers = append (depInformers , informer .Informer ())
224
- op .lister .AppsV1 ().RegisterDeploymentLister (namespace , informer .Lister ())
222
+ // Set up watch on deployments
223
+ depHandlers := & cache.ResourceEventHandlerFuncs {
224
+ DeleteFunc : op .handleDeletion ,
225
225
}
226
+ for _ , namespace := range namespaces {
227
+ log .WithField ("namespace" , namespace ).Infof ("watching deployments" )
228
+ depInformer := informers .NewSharedInformerFactoryWithOptions (opClient .KubernetesInterface (), wakeupInterval , informers .WithNamespace (namespace )).Apps ().V1 ().Deployments ()
229
+ op .lister .AppsV1 ().RegisterDeploymentLister (namespace , depInformer .Lister ())
226
230
227
- depQueue := workqueue .NewNamedRateLimitingQueue (workqueue .DefaultControllerRateLimiter (), "csv-deployments" )
228
- depQueueInformers := queueinformer .New (
229
- depQueue ,
230
- depInformers ,
231
- op .syncDeployment ,
232
- & cache.ResourceEventHandlerFuncs {
233
- DeleteFunc : op .handleDeletion ,
234
- },
235
- "deployment" ,
236
- metrics .NewMetricsNil (),
237
- )
238
- for _ , informer := range depQueueInformers {
239
- op .RegisterQueueInformer (informer )
231
+ // Register queue and QueueInformer
232
+ queueName := fmt .Sprintf ("%s/csv-deployments" , namespace )
233
+ depQueue := workqueue .NewNamedRateLimitingQueue (workqueue .DefaultControllerRateLimiter (), queueName )
234
+ depQueueInformer := queueinformer .NewInformer (depQueue , depInformer .Informer (), op .syncDeployment , depHandlers , queueName , metrics .NewMetricsNil ())
235
+ op .RegisterQueueInformer (depQueueInformer )
240
236
}
241
237
242
238
// Create an informer for the operator group
243
- operatorGroupInformers := []cache.SharedIndexInformer {}
244
239
for _ , namespace := range namespaces {
245
- informerFactory := externalversions .NewSharedInformerFactoryWithOptions (crClient , wakeupInterval , externalversions .WithNamespace (namespace ))
246
- informer := informerFactory .Operators ().V1alpha2 ().OperatorGroups ()
247
- operatorGroupInformers = append (operatorGroupInformers , informer .Informer ())
248
- op .lister .OperatorsV1alpha2 ().RegisterOperatorGroupLister (namespace , informer .Lister ())
249
- }
250
-
251
- // Register OperatorGroup informers.
252
- operatorGroupQueue := workqueue .NewNamedRateLimitingQueue (workqueue .DefaultControllerRateLimiter (), "operatorgroups" )
253
- operatorGroupQueueInformer := queueinformer .New (
254
- operatorGroupQueue ,
255
- operatorGroupInformers ,
256
- op .syncOperatorGroups ,
257
- nil ,
258
- "operatorgroups" ,
259
- metrics .NewMetricsNil (),
260
- )
261
- for _ , informer := range operatorGroupQueueInformer {
262
- op .RegisterQueueInformer (informer )
240
+ log .WithField ("namespace" , namespace ).Infof ("watching OperatorGroups" )
241
+ sharedInformerFactory := externalversions .NewSharedInformerFactoryWithOptions (crClient , wakeupInterval , externalversions .WithNamespace (namespace ))
242
+ operatorGroupInformer := sharedInformerFactory .Operators ().V1alpha2 ().OperatorGroups ()
243
+ op .lister .OperatorsV1alpha2 ().RegisterOperatorGroupLister (namespace , operatorGroupInformer .Lister ())
244
+
245
+ // Register queue and QueueInformer
246
+ queueName := fmt .Sprintf ("%s/operatorgroups" , namespace )
247
+ operatorGroupQueue := workqueue .NewNamedRateLimitingQueue (workqueue .DefaultControllerRateLimiter (), queueName )
248
+ operatorGroupQueueInformer := queueinformer .NewInformer (operatorGroupQueue , operatorGroupInformer .Informer (), op .syncOperatorGroups , nil , queueName , metrics .NewMetricsNil ())
249
+ op .RegisterQueueInformer (operatorGroupQueueInformer )
263
250
}
264
251
265
252
return op , nil
266
253
}
267
254
268
255
func (a * Operator ) requeueCSV (name , namespace string ) {
269
- // we can build the key directly, will need to change if queue uses different key scheme
256
+ // We can build the key directly, will need to change if queue uses different key scheme
270
257
key := fmt .Sprintf ("%s/%s" , namespace , name )
271
- log .Debugf ("requeueing CSV %s" , key )
272
- a .csvQueue .AddRateLimited (key )
258
+ logger := log .WithField ("key" , key )
259
+ logger .Debugf ("requeueing CSV" )
260
+
261
+ if queue , ok := a .csvQueues [namespace ]; ok {
262
+ queue .AddRateLimited (key )
263
+ return
264
+ }
265
+
266
+ logger .Debugf ("couldn't find queue for CSV" )
273
267
}
274
268
275
269
func (a * Operator ) syncDeployment (obj interface {}) (syncError error ) {
@@ -398,7 +392,9 @@ func (a *Operator) syncClusterServiceVersion(obj interface{}) (syncError error)
398
392
"phase" : clusterServiceVersion .Status .Phase ,
399
393
})
400
394
401
- if clusterServiceVersion .Status .Reason == v1alpha1 .CSVReasonCopied {
395
+ operatorNamespace , ok := clusterServiceVersion .GetAnnotations ()["olm.operatorNamespace" ]
396
+ if clusterServiceVersion .Status .Reason == v1alpha1 .CSVReasonCopied ||
397
+ ok && clusterServiceVersion .GetNamespace () != operatorNamespace {
402
398
logger .Info ("skip sync of dummy CSV" )
403
399
return a .removeDanglingChildCSVs (clusterServiceVersion )
404
400
}
0 commit comments