@@ -119,8 +119,14 @@ func NewOperator(kubeconfigPath string, logger *logrus.Logger, wakeupInterval ti
119
119
for _ , namespace := range watchedNamespaces {
120
120
nsInformerFactory := externalversions .NewSharedInformerFactoryWithOptions (crClient , wakeupInterval , externalversions .WithNamespace (namespace ))
121
121
catsrcInformer := nsInformerFactory .Operators ().V1alpha1 ().CatalogSources ()
122
+
122
123
// Register queue and QueueInformer
123
- queueName := fmt .Sprintf ("%s/catsrc" , namespace )
124
+ var queueName string
125
+ if namespace == corev1 .NamespaceAll {
126
+ queueName = "catsrc"
127
+ } else {
128
+ queueName = fmt .Sprintf ("%s/catsrc" , namespace )
129
+ }
124
130
catsrcQueue := workqueue .NewNamedRateLimitingQueue (workqueue .DefaultControllerRateLimiter (), queueName )
125
131
op .RegisterQueueInformer (queueinformer .NewInformer (catsrcQueue , catsrcInformer .Informer (), op .syncCatalogSources , deleteCatSrc , queueName , metrics .NewMetricsCatalogSource (op .client ), logger ))
126
132
op .catSrcQueueSet [namespace ] = catsrcQueue
@@ -239,7 +245,7 @@ func (o *Operator) syncObject(obj interface{}) (syncError error) {
239
245
defer o .sourcesLock .RUnlock ()
240
246
if _ , ok := o .sources [sourceKey ]; ok {
241
247
logger .Debug ("requeueing owner CatalogSource" )
242
- if err := o .catSrcQueueSet .Requeue (owner .Name , metaObj .GetNamespace ()); err != nil {
248
+ if err := o .catSrcQueueSet .Requeue (owner .Name , metaObj .GetNamespace ()); err != nil {
243
249
logger .Warn (err .Error ())
244
250
}
245
251
}
@@ -266,22 +272,36 @@ func (o *Operator) handleDeletion(obj interface{}) {
266
272
}
267
273
268
274
if owner := ownerutil .GetOwnerByKind (ownee , v1alpha1 .CatalogSourceKind ); owner != nil {
269
- if err := o .catSrcQueueSet .Requeue (owner .Name , ownee .GetNamespace ()); err != nil {
270
- o .Log .Warn (err .Error ())
271
- }
275
+ if err := o .catSrcQueueSet .Requeue (owner .Name , ownee .GetNamespace ()); err != nil {
276
+ o .Log .Warn (err .Error ())
277
+ }
272
278
}
273
279
}
274
280
275
281
func (o * Operator ) handleCatSrcDeletion (obj interface {}) {
276
- if catsrc , ok := obj .(* v1alpha1.CatalogSource ); ok {
277
- sourceKey := resolver.CatalogKey {Name : catsrc .GetName (), Namespace : catsrc .GetNamespace ()}
278
- func () {
279
- o .sourcesLock .Lock ()
280
- defer o .sourcesLock .Unlock ()
281
- delete (o .sources , sourceKey )
282
- }()
283
- o .Log .WithField ("source" , sourceKey ).Info ("removed client for deleted catalogsource" )
282
+ catsrc , ok := obj .(metav1.Object )
283
+ if ! ok {
284
+ if ! ok {
285
+ tombstone , ok := obj .(cache.DeletedFinalStateUnknown )
286
+ if ! ok {
287
+ utilruntime .HandleError (fmt .Errorf ("Couldn't get object from tombstone %#v" , obj ))
288
+ return
289
+ }
290
+
291
+ catsrc , ok = tombstone .Obj .(metav1.Object )
292
+ if ! ok {
293
+ utilruntime .HandleError (fmt .Errorf ("Tombstone contained object that is not a Namespace %#v" , obj ))
294
+ return
295
+ }
296
+ }
284
297
}
298
+ sourceKey := resolver.CatalogKey {Name : catsrc .GetName (), Namespace : catsrc .GetNamespace ()}
299
+ func () {
300
+ o .sourcesLock .Lock ()
301
+ defer o .sourcesLock .Unlock ()
302
+ delete (o .sources , sourceKey )
303
+ }()
304
+ o .Log .WithField ("source" , sourceKey ).Info ("removed client for deleted catalogsource" )
285
305
}
286
306
287
307
func (o * Operator ) syncCatalogSources (obj interface {}) (syncError error ) {
0 commit comments