@@ -124,7 +124,9 @@ func NewOperator(kubeconfigPath string, logger *logrus.Logger, wakeupInterval ti
124
124
catsrcQueue ,
125
125
catsrcSharedIndexInformers ,
126
126
op .syncCatalogSources ,
127
- nil ,
127
+ & cache.ResourceEventHandlerFuncs {
128
+ DeleteFunc : op .handleCatSrcDeletion ,
129
+ },
128
130
"catsrc" ,
129
131
metrics .NewMetricsCatalogSource (op .client ),
130
132
logger ,
@@ -242,9 +244,16 @@ func (o *Operator) syncObject(obj interface{}) (syncError error) {
242
244
})
243
245
244
246
if ownerutil .IsOwnedByKind (metaObj , v1alpha1 .CatalogSourceKind ) {
245
- logger .Debug ("requeueing owner CatalogSource" )
246
247
owner := ownerutil .GetOwnerByKind (metaObj , v1alpha1 .CatalogSourceKind )
247
- o .catSrcQueue .AddRateLimited (fmt .Sprintf ("%s/%s" , metaObj .GetNamespace (), owner .Name ))
248
+ sourceKey := resolver.CatalogKey {Name : owner .Name , Namespace : metaObj .GetNamespace ()}
249
+ func () {
250
+ o .sourcesLock .RLock ()
251
+ defer o .sourcesLock .RUnlock ()
252
+ if _ , ok := o .sources [sourceKey ]; ok {
253
+ logger .Debug ("requeueing owner CatalogSource" )
254
+ o .catSrcQueue .AddRateLimited (fmt .Sprintf ("%s/%s" , metaObj .GetNamespace (), owner .Name ))
255
+ }
256
+ }()
248
257
}
249
258
250
259
return nil
@@ -271,6 +280,18 @@ func (o *Operator) handleDeletion(obj interface{}) {
271
280
}
272
281
}
273
282
283
+ func (o * Operator ) handleCatSrcDeletion (obj interface {}) {
284
+ if catsrc , ok := obj .(* v1alpha1.CatalogSource ); ok {
285
+ sourceKey := resolver.CatalogKey {Name : catsrc .GetName (), Namespace : catsrc .GetNamespace ()}
286
+ func () {
287
+ o .sourcesLock .Lock ()
288
+ defer o .sourcesLock .Unlock ()
289
+ delete (o .sources , sourceKey )
290
+ }()
291
+ o .Log .WithField ("source" , sourceKey ).Info ("removed client for deleted catalogsource" )
292
+ }
293
+ }
294
+
274
295
func (o * Operator ) syncCatalogSources (obj interface {}) (syncError error ) {
275
296
catsrc , ok := obj .(* v1alpha1.CatalogSource )
276
297
if ! ok {
@@ -353,7 +374,7 @@ func (o *Operator) syncConfigMapSource(logger *logrus.Entry, catsrc *v1alpha1.Ca
353
374
defer o .sourcesLock .Unlock ()
354
375
address := catsrc .Status .RegistryServiceStatus .Address ()
355
376
currentSource , ok := o .sources [sourceKey ]
356
- if ! ok || currentSource .Address != address || currentSource . LastConnect . Before ( & o . sourcesLastUpdate ) {
377
+ if ! ok || currentSource .Address != address {
357
378
client , err := registryclient .NewClient (address )
358
379
if err != nil {
359
380
logger .WithError (err ).Warn ("couldn't connect to registry" )
@@ -378,26 +399,34 @@ func (o *Operator) syncConfigMapSource(logger *logrus.Entry, catsrc *v1alpha1.Ca
378
399
}
379
400
380
401
// Sync any dependent Subscriptions
402
+ // TODO: this should go away, we should resync the namespace instead
403
+ o .syncDependentSubscriptions (logger , out .GetName (), out .GetNamespace ())
404
+
405
+ return nil
406
+ }
407
+
408
+ func (o * Operator ) syncDependentSubscriptions (logger * logrus.Entry , catalogSource , catalogSourceNamespace string ) {
381
409
subs , err := o .lister .OperatorsV1alpha1 ().SubscriptionLister ().List (labels .Everything ())
382
410
if err != nil {
383
411
logger .Warnf ("could not list Subscriptions" )
384
- return nil
412
+ return
385
413
}
386
414
387
415
for _ , sub := range subs {
388
- logger = logger .WithFields (logrus.Fields {"subscriptionCatalogSource" : sub .Spec .CatalogSource , "subscriptionCatalogNamespace" : sub .Spec .CatalogSourceNamespace })
416
+ logger = logger .WithFields (logrus.Fields {
417
+ "subscriptionCatalogSource" : sub .Spec .CatalogSource ,
418
+ "subscriptionCatalogNamespace" : sub .Spec .CatalogSourceNamespace ,
419
+ "subscription" : sub .GetName (),
420
+ })
389
421
catalogNamespace := sub .Spec .CatalogSourceNamespace
390
422
if catalogNamespace == "" {
391
423
catalogNamespace = o .namespace
392
424
}
393
- logger .Debug ("checking subscription" )
394
- if sub .Spec .CatalogSource == out .GetName () && catalogNamespace == out .GetNamespace () {
425
+ if sub .Spec .CatalogSource == catalogSource && catalogNamespace == catalogSourceNamespace {
395
426
logger .Debug ("requeueing subscription because catalog changed" )
396
427
o .requeueSubscription (sub .GetName (), sub .GetNamespace ())
397
428
}
398
429
}
399
-
400
- return nil
401
430
}
402
431
403
432
func (o * Operator ) syncSubscriptions (obj interface {}) error {
@@ -427,6 +456,38 @@ func (o *Operator) syncSubscriptions(obj interface{}) error {
427
456
return nil
428
457
}
429
458
459
+ // get the set of sources that should be used for resolution and best-effort get their connections working
460
+ logger .Debugf ("resolving sources for %s" , namespace )
461
+ resolverSources := o .ensureResolverSources (logger , namespace )
462
+
463
+ // resolve a set of steps to apply to a cluster, a set of subscriptions to create/update, and any errors
464
+ steps , subs , err := o .resolver .ResolveSteps (namespace , resolver .NewNamespaceSourceQuerier (resolverSources ))
465
+ if err != nil {
466
+ return err
467
+ }
468
+
469
+ // any subscription in the namespace with manual approval will force generated installplans to be manual
470
+ // TODO: this is an odd artifact of the older resolver, and will probably confuse users. approval mode could be on the operatorgroup?
471
+ installPlanApproval := v1alpha1 .ApprovalAutomatic
472
+ for _ , sub := range subs {
473
+ if sub .Spec .InstallPlanApproval == v1alpha1 .ApprovalManual {
474
+ installPlanApproval = v1alpha1 .ApprovalManual
475
+ break
476
+ }
477
+ }
478
+
479
+ installplanReference , err := o .createInstallPlan (namespace , subs , installPlanApproval , steps )
480
+ if err != nil {
481
+ return err
482
+ }
483
+
484
+ if err := o .ensureSubscriptionInstallPlanState (namespace , subs , installplanReference ); err != nil {
485
+ return err
486
+ }
487
+ return nil
488
+ }
489
+
490
+ func (o * Operator ) ensureResolverSources (logger * logrus.Entry , namespace string ) map [resolver.CatalogKey ]registryclient.Interface {
430
491
// TODO: record connection status onto an object
431
492
resolverSources := make (map [resolver.CatalogKey ]registryclient.Interface , 0 )
432
493
func () {
@@ -449,42 +510,18 @@ func (o *Operator) syncSubscriptions(obj interface{}) error {
449
510
logger = logger .WithField ("resolverSource" , k )
450
511
logger .WithField ("clientState" , client .Conn .GetState ()).Debug ("source" )
451
512
if client .Conn .GetState () == connectivity .TransientFailure {
452
- logger .WithField ("clientState" , client .Conn .GetState ()).Debug ("resetting connection" )
453
- client .Conn .ResetConnectBackoff ()
513
+ logger .WithField ("clientState" , client .Conn .GetState ()).Debug ("waiting for connection" )
454
514
ctx , _ := context .WithTimeout (context .TODO (), 5 * time .Second )
455
515
changed := client .Conn .WaitForStateChange (ctx , connectivity .TransientFailure )
456
516
if ! changed {
457
517
logger .WithField ("clientState" , client .Conn .GetState ()).Debug ("source in transient failure and didn't recover" )
518
+ delete (resolverSources , k )
519
+ } else {
520
+ logger .WithField ("clientState" , client .Conn .GetState ()).Debug ("connection re-established" )
458
521
}
459
- logger .WithField ("clientState" , client .Conn .GetState ()).Debug ("connection successfully reset" )
460
522
}
461
523
}
462
-
463
- // resolve a set of steps to apply to a cluster, a set of subscriptions to create/update, and any errors
464
- steps , subs , err := o .resolver .ResolveSteps (sub .GetNamespace (), resolver .NewNamespaceSourceQuerier (resolverSources ))
465
- if err != nil {
466
- return err
467
- }
468
-
469
- // any subscription in the namespace with manual approval will force generated installplans to be manual
470
- // TODO: this is an odd artifact of the older resolver, and will probably confuse users. approval mode could be on the operatorgroup?
471
- installPlanApproval := v1alpha1 .ApprovalAutomatic
472
- for _ , sub := range subs {
473
- if sub .Spec .InstallPlanApproval == v1alpha1 .ApprovalManual {
474
- installPlanApproval = v1alpha1 .ApprovalManual
475
- break
476
- }
477
- }
478
-
479
- installplanReference , err := o .createInstallPlan (namespace , subs , installPlanApproval , steps )
480
- if err != nil {
481
- return err
482
- }
483
-
484
- if err := o .ensureSubscriptionInstallPlanState (namespace , subs , installplanReference ); err != nil {
485
- return err
486
- }
487
- return nil
524
+ return resolverSources
488
525
}
489
526
490
527
func (o * Operator ) nothingToUpdate (logger * logrus.Entry , sub * v1alpha1.Subscription ) bool {
0 commit comments