@@ -67,7 +67,7 @@ type Operator struct {
67
67
subQueue workqueue.RateLimitingInterface
68
68
catSrcQueueSet * queueinformer.ResourceQueueSet
69
69
namespaceResolveQueue workqueue.RateLimitingInterface
70
- reconciler reconciler.ReconcilerFactory
70
+ reconciler reconciler.RegistryReconcilerFactory
71
71
}
72
72
73
73
// NewOperator creates a new Catalog Operator.
@@ -136,6 +136,7 @@ func NewOperator(kubeconfigPath string, logger *logrus.Logger, wakeupInterval ti
136
136
}
137
137
catsrcQueue := workqueue .NewNamedRateLimitingQueue (workqueue .DefaultControllerRateLimiter (), queueName )
138
138
op .RegisterQueueInformer (queueinformer .NewInformer (catsrcQueue , catsrcInformer .Informer (), op .syncCatalogSources , deleteCatSrc , queueName , metrics .NewMetricsCatalogSource (op .client ), logger ))
139
+ op .lister .OperatorsV1alpha1 ().RegisterCatalogSourceLister (namespace , catsrcInformer .Lister ())
139
140
op .catSrcQueueSet .Set (namespace , catsrcQueue )
140
141
}
141
142
@@ -209,11 +210,7 @@ func NewOperator(kubeconfigPath string, logger *logrus.Logger, wakeupInterval ti
209
210
op .lister .CoreV1 ().RegisterPodLister (namespace , podInformer .Lister ())
210
211
op .lister .CoreV1 ().RegisterConfigMapLister (namespace , configMapInformer .Lister ())
211
212
}
212
- op .reconciler = & reconciler.RegistryReconcilerFactory {
213
- ConfigMapServerImage : configmapRegistryImage ,
214
- OpClient : op .OpClient ,
215
- Lister : op .lister ,
216
- }
213
+ op .reconciler = reconciler .NewRegistryReconcilerFactory (op .lister , op .OpClient , configmapRegistryImage )
217
214
218
215
// Namespace sync for resolving subscriptions
219
216
namespaceInformer := informers .NewSharedInformerFactory (op .OpClient .KubernetesInterface (), wakeupInterval ).Core ().V1 ().Namespaces ()
@@ -290,21 +287,60 @@ func (o *Operator) handleDeletion(obj interface{}) {
290
287
if ! ok {
291
288
tombstone , ok := obj .(cache.DeletedFinalStateUnknown )
292
289
if ! ok {
293
- utilruntime .HandleError (fmt .Errorf ("Couldn 't get object from tombstone %#v" , obj ))
290
+ utilruntime .HandleError (fmt .Errorf ("couldn 't get object from tombstone %#v" , obj ))
294
291
return
295
292
}
296
293
297
294
ownee , ok = tombstone .Obj .(metav1.Object )
298
295
if ! ok {
299
- utilruntime .HandleError (fmt .Errorf ("Tombstone contained object that is not a metav1 object %#v" , obj ))
296
+ utilruntime .HandleError (fmt .Errorf ("tombstone contained object that is not a metav1 object %#v" , obj ))
300
297
return
301
298
}
302
299
}
303
300
304
- if owner := ownerutil .GetOwnerByKind (ownee , v1alpha1 .CatalogSourceKind ); owner != nil {
305
- if err := o .catSrcQueueSet .Requeue (owner .Name , ownee .GetNamespace ()); err != nil {
306
- o .Log .Warn (err .Error ())
301
+ logger := o .Log .WithFields (logrus.Fields {
302
+ "sync" : "resourcedeletion" ,
303
+ "name" : ownee .GetName (),
304
+ "namespace" : ownee .GetNamespace (),
305
+ })
306
+
307
+ owner := ownerutil .GetOwnerByKind (ownee , v1alpha1 .CatalogSourceKind )
308
+ if owner == nil {
309
+ logger .Debug ("no owner catalogsource found" )
310
+ return
311
+ }
312
+
313
+ logger = logger .WithFields (logrus.Fields {
314
+ "owner" : owner .Name ,
315
+ "ownerkind" : owner .Kind ,
316
+ })
317
+
318
+ // Get the owner CatalogSource
319
+ catsrc , err := o .lister .OperatorsV1alpha1 ().CatalogSourceLister ().CatalogSources (ownee .GetNamespace ()).Get (owner .Name )
320
+ if err != nil {
321
+ logger .WithError (err ).Warn ("could not get owner catalogsource from cache" )
322
+ return
323
+ }
324
+
325
+ // Check the registry server
326
+ checker := o .reconciler .ReconcilerForSource (catsrc )
327
+ healthy , err := checker .CheckRegistryServer (catsrc )
328
+ if err != nil {
329
+ logger .WithError (err ).Warn ("error checking registry health" )
330
+ } else if ! healthy {
331
+ logger .Debug ("registry server unhealthy, updating catalog source" )
332
+ catsrc .Status .RegistryServiceStatus = nil
333
+ _ , err = o .client .OperatorsV1alpha1 ().CatalogSources (catsrc .GetNamespace ()).UpdateStatus (catsrc )
334
+ if err == nil {
335
+ logger .Debug ("successfully updated catalogsource registry status" )
336
+ return
307
337
}
338
+ logger .WithError (err ).Warn ("error updating catalogsource registry status" )
339
+ }
340
+
341
+ // Requeue CatalogSource
342
+ if err := o .catSrcQueueSet .Requeue (catsrc .GetName (), catsrc .GetNamespace ()); err != nil {
343
+ logger .WithError (err ).Warn ("error requeuing owner catalogsource" )
308
344
}
309
345
}
310
346
@@ -398,9 +434,9 @@ func (o *Operator) syncCatalogSources(obj interface{}) (syncError error) {
398
434
return fmt .Errorf ("no reconciler for source type %s" , catsrc .Spec .SourceType )
399
435
}
400
436
401
- // if registry pod hasn't been created or hasn't been updated since the last configmap update, recreate it
437
+ // If registry pod hasn't been created or hasn't been updated since the last configmap update, recreate it
402
438
if catsrc .Status .RegistryServiceStatus == nil || catsrc .Status .RegistryServiceStatus .CreatedAt .Before (& catsrc .Status .LastSync ) {
403
- logger .Debug ("registry server scheduled recheck " )
439
+ logger .Debug ("ensuring registry server" )
404
440
405
441
if err := reconciler .EnsureRegistryServer (out ); err != nil {
406
442
logger .WithError (err ).Warn ("couldn't ensure registry server" )
@@ -409,7 +445,7 @@ func (o *Operator) syncCatalogSources(obj interface{}) (syncError error) {
409
445
logger .Debug ("ensured registry server" )
410
446
411
447
out .Status .RegistryServiceStatus .CreatedAt = timeNow ()
412
- out .Status .LastSync = timeNow ()
448
+ out .Status .LastSync = out . Status . RegistryServiceStatus . CreatedAt
413
449
414
450
logger .Debug ("updating catsrc status" )
415
451
// update status
0 commit comments