@@ -56,16 +56,17 @@ var timeNow = func() metav1.Time { return metav1.NewTime(time.Now().UTC()) }
56
56
// resolving dependencies in a catalog.
57
57
type Operator struct {
58
58
* queueinformer.Operator
59
- client versioned.Interface
60
- lister operatorlister.OperatorLister
61
- namespace string
62
- sources map [resolver.CatalogKey ]resolver.SourceRef
63
- sourcesLock sync.RWMutex
64
- sourcesLastUpdate metav1.Time
65
- resolver resolver.Resolver
66
- subQueue workqueue.RateLimitingInterface
67
- catSrcQueueSet queueinformer.ResourceQueueSet
68
- reconciler reconciler.ReconcilerFactory
59
+ client versioned.Interface
60
+ lister operatorlister.OperatorLister
61
+ namespace string
62
+ sources map [resolver.CatalogKey ]resolver.SourceRef
63
+ sourcesLock sync.RWMutex
64
+ sourcesLastUpdate metav1.Time
65
+ resolver resolver.Resolver
66
+ subQueue workqueue.RateLimitingInterface
67
+ catSrcQueueSet queueinformer.ResourceQueueSet
68
+ namespaceResolveQueue workqueue.RateLimitingInterface
69
+ reconciler reconciler.ReconcilerFactory
69
70
}
70
71
71
72
// NewOperator creates a new Catalog Operator.
@@ -209,6 +210,23 @@ func NewOperator(kubeconfigPath string, logger *logrus.Logger, wakeupInterval ti
209
210
OpClient : op .OpClient ,
210
211
Lister : op .lister ,
211
212
}
213
+
214
+ namespaceInformer := informers .NewSharedInformerFactory (op .OpClient .KubernetesInterface (), wakeupInterval ).Core ().V1 ().Namespaces ()
215
+ resolvingNamespaceQueue := workqueue .NewNamedRateLimitingQueue (workqueue .DefaultControllerRateLimiter (), "resolver" )
216
+ namespaceQueueInformer := queueinformer .NewInformer (
217
+ resolvingNamespaceQueue ,
218
+ namespaceInformer .Informer (),
219
+ op .syncResolvingNamespace ,
220
+ nil ,
221
+ "resolver" ,
222
+ metrics .NewMetricsNil (),
223
+ logger ,
224
+ )
225
+
226
+ op .RegisterQueueInformer (namespaceQueueInformer )
227
+ op .lister .CoreV1 ().RegisterNamespaceLister (namespaceInformer .Lister ())
228
+ op .namespaceResolveQueue = resolvingNamespaceQueue
229
+
212
230
return op , nil
213
231
}
214
232
@@ -456,9 +474,8 @@ func (o *Operator) syncCatalogSources(obj interface{}) (syncError error) {
456
474
return err
457
475
}
458
476
459
- // Sync any dependent Subscriptions
460
- // TODO: this should go away, we should resync the namespace instead
461
- o .syncDependentSubscriptions (logger , out .GetName (), out .GetNamespace ())
477
+ // Trigger a resolve, will pick up any subscriptions that depend on the catalog
478
+ o .namespaceResolveQueue .AddRateLimited (out .GetNamespace ())
462
479
463
480
return nil
464
481
}
@@ -487,37 +504,22 @@ func (o *Operator) syncDependentSubscriptions(logger *logrus.Entry, catalogSourc
487
504
}
488
505
}
489
506
490
- func (o * Operator ) syncSubscriptions (obj interface {}) error {
491
- sub , ok := obj .(* v1alpha1. Subscription )
507
+ func (o * Operator ) syncResolvingNamespace (obj interface {}) error {
508
+ ns , ok := obj .(* corev1. Namespace )
492
509
if ! ok {
493
510
o .Log .Debugf ("wrong type: %#v" , obj )
494
- return fmt .Errorf ("casting Subscription failed" )
511
+ return fmt .Errorf ("casting Namespace failed" )
495
512
}
496
- namespace := sub .GetNamespace ()
497
-
513
+ namespace := ns .GetName ()
498
514
logger := o .Log .WithFields (logrus.Fields {
499
- "sub" : sub .GetName (),
500
- "namespace" : sub .GetNamespace (),
501
- "source" : sub .Spec .CatalogSource ,
502
- "pkg" : sub .Spec .Package ,
503
- "channel" : sub .Spec .Channel ,
515
+ "namespace" : namespace ,
504
516
})
505
-
506
- // record the current state of the desired corresponding CSV in the status. no-op if we don't know the csv yet.
507
- sub , err := o .ensureSubscriptionCSVState (logger , sub )
508
- if err != nil {
509
- return err
510
- }
511
-
512
- // return early if the subscription is up to date
513
- if o .nothingToUpdate (logger , sub ) {
514
- return nil
515
- }
516
-
517
517
// get the set of sources that should be used for resolution and best-effort get their connections working
518
- logger .Debugf ("resolving sources for %s" , namespace )
518
+ logger .Debug ("resolving sources" )
519
519
resolverSources := o .ensureResolverSources (logger , namespace )
520
520
521
+ logger .Debug ("resolving subscriptions in namespace" )
522
+
521
523
// resolve a set of steps to apply to a cluster, a set of subscriptions to create/update, and any errors
522
524
steps , subs , err := o .resolver .ResolveSteps (namespace , resolver .NewNamespaceSourceQuerier (resolverSources ))
523
525
if err != nil {
@@ -533,7 +535,6 @@ func (o *Operator) syncSubscriptions(obj interface{}) error {
533
535
break
534
536
}
535
537
}
536
-
537
538
installplanReference , err := o .createInstallPlan (namespace , subs , installPlanApproval , steps )
538
539
if err != nil {
539
540
logger .WithError (err ).Debug ("error creating installplan" )
@@ -547,6 +548,37 @@ func (o *Operator) syncSubscriptions(obj interface{}) error {
547
548
return nil
548
549
}
549
550
551
+ func (o * Operator ) syncSubscriptions (obj interface {}) error {
552
+ sub , ok := obj .(* v1alpha1.Subscription )
553
+ if ! ok {
554
+ o .Log .Debugf ("wrong type: %#v" , obj )
555
+ return fmt .Errorf ("casting Subscription failed" )
556
+ }
557
+
558
+ logger := o .Log .WithFields (logrus.Fields {
559
+ "sub" : sub .GetName (),
560
+ "namespace" : sub .GetNamespace (),
561
+ "source" : sub .Spec .CatalogSource ,
562
+ "pkg" : sub .Spec .Package ,
563
+ "channel" : sub .Spec .Channel ,
564
+ })
565
+
566
+ // record the current state of the desired corresponding CSV in the status. no-op if we don't know the csv yet.
567
+ sub , err := o .ensureSubscriptionCSVState (logger , sub )
568
+ if err != nil {
569
+ return err
570
+ }
571
+
572
+ // return early if the subscription is up to date
573
+ if o .nothingToUpdate (logger , sub ) {
574
+ return nil
575
+ }
576
+
577
+ o .namespaceResolveQueue .AddRateLimited (sub .GetNamespace ())
578
+
579
+ return nil
580
+ }
581
+
550
582
func (o * Operator ) ensureResolverSources (logger * logrus.Entry , namespace string ) map [resolver.CatalogKey ]registryclient.Interface {
551
583
// TODO: record connection status onto an object
552
584
resolverSources := make (map [resolver.CatalogKey ]registryclient.Interface , 0 )
0 commit comments