Skip to content

Commit 62f6d2d

Browse files
author
Per Goncalves da Silva
committed
Refactor subscription reconciler away from kubestate
Signed-off-by: Per Goncalves da Silva <[email protected]>
1 parent d68cd00 commit 62f6d2d

File tree

15 files changed

+854
-3216
lines changed

15 files changed

+854
-3216
lines changed

pkg/controller/operators/catalog/operator.go

Lines changed: 45 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,27 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
332332
return nil, err
333333
}
334334

335+
// Namespace sync for resolving subscriptions
336+
namespaceInformer := informers.NewSharedInformerFactory(op.opClient.KubernetesInterface(), resyncPeriod()).Core().V1().Namespaces()
337+
op.lister.CoreV1().RegisterNamespaceLister(namespaceInformer.Lister())
338+
op.nsResolveQueue = workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](),
339+
workqueue.TypedRateLimitingQueueConfig[any]{
340+
Name: "resolve",
341+
})
342+
namespaceQueueInformer, err := queueinformer.NewQueueInformer(
343+
ctx,
344+
queueinformer.WithLogger(op.logger),
345+
queueinformer.WithQueue(op.nsResolveQueue),
346+
queueinformer.WithInformer(namespaceInformer.Informer()),
347+
queueinformer.WithSyncer(queueinformer.LegacySyncHandler(op.syncResolvingNamespace).ToSyncer()),
348+
)
349+
if err != nil {
350+
return nil, err
351+
}
352+
if err := op.RegisterQueueInformer(namespaceQueueInformer); err != nil {
353+
return nil, err
354+
}
355+
335356
// Wire Subscriptions
336357
subInformer := crInformerFactory.Operators().V1alpha1().Subscriptions()
337358
op.lister.OperatorsV1alpha1().RegisterSubscriptionLister(metav1.NamespaceAll, subInformer.Lister())
@@ -355,7 +376,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
355376
subscription.WithCatalogInformer(catsrcInformer.Informer()),
356377
subscription.WithInstallPlanInformer(ipInformer.Informer()),
357378
subscription.WithSubscriptionQueue(subQueue),
358-
subscription.WithAppendedReconcilers(subscription.ReconcilerFromLegacySyncHandler(op.syncSubscriptions, nil)),
379+
subscription.WithNamespaceResolveQueue(op.nsResolveQueue),
359380
subscription.WithRegistryReconcilerFactory(op.reconciler),
360381
subscription.WithGlobalCatalogNamespace(op.namespace),
361382
subscription.WithSourceProvider(op.resolverSourceProvider),
@@ -742,27 +763,6 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
742763
return nil, err
743764
}
744765

745-
// Namespace sync for resolving subscriptions
746-
namespaceInformer := informers.NewSharedInformerFactory(op.opClient.KubernetesInterface(), resyncPeriod()).Core().V1().Namespaces()
747-
op.lister.CoreV1().RegisterNamespaceLister(namespaceInformer.Lister())
748-
op.nsResolveQueue = workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](),
749-
workqueue.TypedRateLimitingQueueConfig[any]{
750-
Name: "resolve",
751-
})
752-
namespaceQueueInformer, err := queueinformer.NewQueueInformer(
753-
ctx,
754-
queueinformer.WithLogger(op.logger),
755-
queueinformer.WithQueue(op.nsResolveQueue),
756-
queueinformer.WithInformer(namespaceInformer.Informer()),
757-
queueinformer.WithSyncer(queueinformer.LegacySyncHandler(op.syncResolvingNamespace).ToSyncer()),
758-
)
759-
if err != nil {
760-
return nil, err
761-
}
762-
if err := op.RegisterQueueInformer(namespaceQueueInformer); err != nil {
763-
return nil, err
764-
}
765-
766766
op.sources.Start(context.Background())
767767

768768
return op, nil
@@ -1342,6 +1342,9 @@ func (o *Operator) syncResolvingNamespace(obj interface{}) error {
13421342
return err
13431343
}
13441344

1345+
// Remove resolutionfailed condition from subscriptions
1346+
o.removeSubsCond(subs, v1alpha1.SubscriptionResolutionFailed)
1347+
13451348
// Attempt to unpack bundles before installing
13461349
// Note: This should probably use the attenuated client to prevent users from resolving resources they otherwise don't have access to.
13471350
if len(bundleLookups) > 0 {
@@ -1469,9 +1472,6 @@ func (o *Operator) syncResolvingNamespace(obj interface{}) error {
14691472
// Remove BundleUnpackFailed condition from subscriptions
14701473
o.removeSubsCond(subs, v1alpha1.SubscriptionBundleUnpackFailed)
14711474

1472-
// Remove resolutionfailed condition from subscriptions
1473-
o.removeSubsCond(subs, v1alpha1.SubscriptionResolutionFailed)
1474-
14751475
newSub := true
14761476
for _, updatedSub := range updatedSubs {
14771477
updatedSub.Status.RemoveConditions(v1alpha1.SubscriptionResolutionFailed)
@@ -1499,18 +1499,6 @@ func (o *Operator) syncResolvingNamespace(obj interface{}) error {
14991499
return nil
15001500
}
15011501

1502-
func (o *Operator) syncSubscriptions(obj interface{}) error {
1503-
sub, ok := obj.(*v1alpha1.Subscription)
1504-
if !ok {
1505-
o.logger.Infof("wrong type: %#v", obj)
1506-
return fmt.Errorf("casting Subscription failed")
1507-
}
1508-
1509-
o.nsResolveQueue.Add(sub.GetNamespace())
1510-
1511-
return nil
1512-
}
1513-
15141502
// syncOperatorGroups requeues the namespace resolution queue on changes to an operatorgroup
15151503
// This is because the operatorgroup is now an input to resolution via the global catalog exclusion annotation
15161504
func (o *Operator) syncOperatorGroups(obj interface{}) error {
@@ -2047,7 +2035,8 @@ func (o *Operator) syncInstallPlans(obj interface{}) (syncError error) {
20472035
defer o.ipQueueSet.RequeueAfter(outInstallPlan.GetNamespace(), outInstallPlan.GetName(), time.Second*5)
20482036
}
20492037

2050-
defer o.requeueSubscriptionForInstallPlan(plan, logger)
2038+
// TODO: do we need this?
2039+
// defer o.requeueSubscriptionForInstallPlan(plan, logger)
20512040

20522041
// Update InstallPlan with status of transition. Log errors if we can't write them to the status.
20532042
if _, err := o.client.OperatorsV1alpha1().InstallPlans(plan.GetNamespace()).UpdateStatus(context.TODO(), outInstallPlan, metav1.UpdateOptions{}); err != nil {
@@ -2075,22 +2064,22 @@ func hasBundleLookupFailureCondition(bundleLookups []v1alpha1.BundleLookup) (boo
20752064
return false, nil
20762065
}
20772066

2078-
func (o *Operator) requeueSubscriptionForInstallPlan(plan *v1alpha1.InstallPlan, logger *logrus.Entry) {
2079-
// Notify subscription loop of installplan changes
2080-
owners := ownerutil.GetOwnersByKind(plan, v1alpha1.SubscriptionKind)
2081-
2082-
if len(owners) == 0 {
2083-
logger.Trace("no installplan owner subscriptions found to requeue")
2084-
return
2085-
}
2086-
2087-
for _, owner := range owners {
2088-
logger.WithField("owner", owner).Debug("requeueing installplan owner")
2089-
if err := o.subQueueSet.Requeue(plan.GetNamespace(), owner.Name); err != nil {
2090-
logger.WithError(err).Warn("error requeuing installplan owner")
2091-
}
2092-
}
2093-
}
2067+
//func (o *Operator) requeueSubscriptionForInstallPlan(plan *v1alpha1.InstallPlan, logger *logrus.Entry) {
2068+
// // Notify subscription loop of installplan changes
2069+
// owners := ownerutil.GetOwnersByKind(plan, v1alpha1.SubscriptionKind)
2070+
//
2071+
// if len(owners) == 0 {
2072+
// logger.Trace("no installplan owner subscriptions found to requeue")
2073+
// return
2074+
// }
2075+
//
2076+
// for _, owner := range owners {
2077+
// logger.WithField("owner", owner).Debug("requeueing installplan owner")
2078+
// if err := o.subQueueSet.Requeue(plan.GetNamespace(), owner.Name); err != nil {
2079+
// logger.WithError(err).Warn("error requeuing installplan owner")
2080+
// }
2081+
// }
2082+
//}
20942083

20952084
func (o *Operator) setInstallPlanInstalledCond(ip *v1alpha1.InstallPlan, reason v1alpha1.InstallPlanConditionReason, message string, logger *logrus.Entry) (*v1alpha1.InstallPlan, error) {
20962085
now := o.now()
@@ -2850,31 +2839,11 @@ func (o *Operator) getUpdatedOwnerReferences(refs []metav1.OwnerReference, names
28502839
}
28512840

28522841
func (o *Operator) listSubscriptions(namespace string) (subs []*v1alpha1.Subscription, err error) {
2853-
list, err := o.client.OperatorsV1alpha1().Subscriptions(namespace).List(context.TODO(), metav1.ListOptions{})
2854-
if err != nil {
2855-
return
2856-
}
2857-
2858-
subs = make([]*v1alpha1.Subscription, 0)
2859-
for i := range list.Items {
2860-
subs = append(subs, &list.Items[i])
2861-
}
2862-
2863-
return
2842+
return o.lister.OperatorsV1alpha1().SubscriptionLister().Subscriptions(namespace).List(labels.Everything())
28642843
}
28652844

28662845
func (o *Operator) listInstallPlans(namespace string) (ips []*v1alpha1.InstallPlan, err error) {
2867-
list, err := o.client.OperatorsV1alpha1().InstallPlans(namespace).List(context.TODO(), metav1.ListOptions{})
2868-
if err != nil {
2869-
return
2870-
}
2871-
2872-
ips = make([]*v1alpha1.InstallPlan, 0)
2873-
for i := range list.Items {
2874-
ips = append(ips, &list.Items[i])
2875-
}
2876-
2877-
return
2846+
return o.lister.OperatorsV1alpha1().InstallPlanLister().InstallPlans(namespace).List(labels.Everything())
28782847
}
28792848

28802849
// competingCRDOwnersExist returns true if there exists a CSV that owns at least one of the given CSVs owned CRDs (that's not the given CSV)

0 commit comments

Comments
 (0)