Skip to content

Commit 5aeccb6

Browse files
author
Per Goncalves da Silva
committed
Refactor subscription reconciler away from kubestate
Signed-off-by: Per Goncalves da Silva <[email protected]>
1 parent d68cd00 commit 5aeccb6

File tree

15 files changed

+863
-3220
lines changed

15 files changed

+863
-3220
lines changed

pkg/controller/operators/catalog/operator.go

Lines changed: 48 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@ import (
4747
"k8s.io/client-go/tools/clientcmd"
4848
"k8s.io/client-go/tools/pager"
4949
"k8s.io/client-go/tools/record"
50-
"k8s.io/client-go/util/retry"
5150
"k8s.io/client-go/util/workqueue"
5251
utilclock "k8s.io/utils/clock"
5352

@@ -332,6 +331,27 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
332331
return nil, err
333332
}
334333

334+
// Namespace sync for resolving subscriptions
335+
namespaceInformer := informers.NewSharedInformerFactory(op.opClient.KubernetesInterface(), resyncPeriod()).Core().V1().Namespaces()
336+
op.lister.CoreV1().RegisterNamespaceLister(namespaceInformer.Lister())
337+
op.nsResolveQueue = workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](),
338+
workqueue.TypedRateLimitingQueueConfig[any]{
339+
Name: "resolve",
340+
})
341+
namespaceQueueInformer, err := queueinformer.NewQueueInformer(
342+
ctx,
343+
queueinformer.WithLogger(op.logger),
344+
queueinformer.WithQueue(op.nsResolveQueue),
345+
queueinformer.WithInformer(namespaceInformer.Informer()),
346+
queueinformer.WithSyncer(queueinformer.LegacySyncHandler(op.syncResolvingNamespace).ToSyncer()),
347+
)
348+
if err != nil {
349+
return nil, err
350+
}
351+
if err := op.RegisterQueueInformer(namespaceQueueInformer); err != nil {
352+
return nil, err
353+
}
354+
335355
// Wire Subscriptions
336356
subInformer := crInformerFactory.Operators().V1alpha1().Subscriptions()
337357
op.lister.OperatorsV1alpha1().RegisterSubscriptionLister(metav1.NamespaceAll, subInformer.Lister())
@@ -355,7 +375,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
355375
subscription.WithCatalogInformer(catsrcInformer.Informer()),
356376
subscription.WithInstallPlanInformer(ipInformer.Informer()),
357377
subscription.WithSubscriptionQueue(subQueue),
358-
subscription.WithAppendedReconcilers(subscription.ReconcilerFromLegacySyncHandler(op.syncSubscriptions, nil)),
378+
subscription.WithNamespaceResolveQueue(op.nsResolveQueue),
359379
subscription.WithRegistryReconcilerFactory(op.reconciler),
360380
subscription.WithGlobalCatalogNamespace(op.namespace),
361381
subscription.WithSourceProvider(op.resolverSourceProvider),
@@ -742,27 +762,6 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
742762
return nil, err
743763
}
744764

745-
// Namespace sync for resolving subscriptions
746-
namespaceInformer := informers.NewSharedInformerFactory(op.opClient.KubernetesInterface(), resyncPeriod()).Core().V1().Namespaces()
747-
op.lister.CoreV1().RegisterNamespaceLister(namespaceInformer.Lister())
748-
op.nsResolveQueue = workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](),
749-
workqueue.TypedRateLimitingQueueConfig[any]{
750-
Name: "resolve",
751-
})
752-
namespaceQueueInformer, err := queueinformer.NewQueueInformer(
753-
ctx,
754-
queueinformer.WithLogger(op.logger),
755-
queueinformer.WithQueue(op.nsResolveQueue),
756-
queueinformer.WithInformer(namespaceInformer.Informer()),
757-
queueinformer.WithSyncer(queueinformer.LegacySyncHandler(op.syncResolvingNamespace).ToSyncer()),
758-
)
759-
if err != nil {
760-
return nil, err
761-
}
762-
if err := op.RegisterQueueInformer(namespaceQueueInformer); err != nil {
763-
return nil, err
764-
}
765-
766765
op.sources.Start(context.Background())
767766

768767
return op, nil
@@ -1342,6 +1341,9 @@ func (o *Operator) syncResolvingNamespace(obj interface{}) error {
13421341
return err
13431342
}
13441343

1344+
// Remove resolutionfailed condition from subscriptions
1345+
o.removeSubsCond(subs, v1alpha1.SubscriptionResolutionFailed)
1346+
13451347
// Attempt to unpack bundles before installing
13461348
// Note: This should probably use the attenuated client to prevent users from resolving resources they otherwise don't have access to.
13471349
if len(bundleLookups) > 0 {
@@ -1469,9 +1471,6 @@ func (o *Operator) syncResolvingNamespace(obj interface{}) error {
14691471
// Remove BundleUnpackFailed condition from subscriptions
14701472
o.removeSubsCond(subs, v1alpha1.SubscriptionBundleUnpackFailed)
14711473

1472-
// Remove resolutionfailed condition from subscriptions
1473-
o.removeSubsCond(subs, v1alpha1.SubscriptionResolutionFailed)
1474-
14751474
newSub := true
14761475
for _, updatedSub := range updatedSubs {
14771476
updatedSub.Status.RemoveConditions(v1alpha1.SubscriptionResolutionFailed)
@@ -1499,18 +1498,6 @@ func (o *Operator) syncResolvingNamespace(obj interface{}) error {
14991498
return nil
15001499
}
15011500

1502-
func (o *Operator) syncSubscriptions(obj interface{}) error {
1503-
sub, ok := obj.(*v1alpha1.Subscription)
1504-
if !ok {
1505-
o.logger.Infof("wrong type: %#v", obj)
1506-
return fmt.Errorf("casting Subscription failed")
1507-
}
1508-
1509-
o.nsResolveQueue.Add(sub.GetNamespace())
1510-
1511-
return nil
1512-
}
1513-
15141501
// syncOperatorGroups requeues the namespace resolution queue on changes to an operatorgroup
15151502
// This is because the operatorgroup is now an input to resolution via the global catalog exclusion annotation
15161503
func (o *Operator) syncOperatorGroups(obj interface{}) error {
@@ -1672,7 +1659,7 @@ func (o *Operator) createInstallPlan(namespace string, gen int, subs []*v1alpha1
16721659
return nil, nil
16731660
}
16741661

1675-
csvNames := []string{}
1662+
var csvNames []string
16761663
catalogSourceMap := map[string]struct{}{}
16771664
for _, s := range steps {
16781665
if s.Resource.Kind == "ClusterServiceVersion" {
@@ -1681,7 +1668,7 @@ func (o *Operator) createInstallPlan(namespace string, gen int, subs []*v1alpha1
16811668
catalogSourceMap[s.Resource.CatalogSource] = struct{}{}
16821669
}
16831670

1684-
catalogSources := []string{}
1671+
var catalogSources []string
16851672
for s := range catalogSourceMap {
16861673
catalogSources = append(catalogSources, s)
16871674
}
@@ -1765,7 +1752,6 @@ func (o *Operator) updateSubscriptionStatuses(subs []*v1alpha1.Subscription) ([]
17651752
errs []error
17661753
mu sync.Mutex
17671754
wg sync.WaitGroup
1768-
getOpts = metav1.GetOptions{}
17691755
updateOpts = metav1.UpdateOptions{}
17701756
)
17711757

@@ -1774,22 +1760,29 @@ func (o *Operator) updateSubscriptionStatuses(subs []*v1alpha1.Subscription) ([]
17741760
go func(sub *v1alpha1.Subscription) {
17751761
defer wg.Done()
17761762

1777-
update := func() error {
1778-
// Update the status of the latest revision
1779-
latest, err := o.client.OperatorsV1alpha1().Subscriptions(sub.GetNamespace()).Get(context.TODO(), sub.GetName(), getOpts)
1780-
if err != nil {
1781-
return err
1782-
}
1783-
latest.Status = sub.Status
1784-
*sub = *latest
1785-
_, err = o.client.OperatorsV1alpha1().Subscriptions(sub.Namespace).UpdateStatus(context.TODO(), latest, updateOpts)
1786-
return err
1787-
}
1788-
if err := retry.RetryOnConflict(retry.DefaultRetry, update); err != nil {
1763+
_, err := o.client.OperatorsV1alpha1().Subscriptions(sub.Namespace).UpdateStatus(context.TODO(), sub, updateOpts)
1764+
if err != nil {
17891765
mu.Lock()
17901766
defer mu.Unlock()
17911767
errs = append(errs, err)
17921768
}
1769+
1770+
//update := func() error {
1771+
// // Update the status of the latest revision
1772+
// latest, err := o.client.OperatorsV1alpha1().Subscriptions(sub.GetNamespace()).Get(context.TODO(), sub.GetName(), getOpts)
1773+
// if err != nil {
1774+
// return err
1775+
// }
1776+
// latest.Status = sub.Status
1777+
// *sub = *latest
1778+
// _, err = o.client.OperatorsV1alpha1().Subscriptions(sub.Namespace).UpdateStatus(context.TODO(), latest, updateOpts)
1779+
// return err
1780+
//}
1781+
//if err := retry.RetryOnConflict(retry.DefaultRetry, update); err != nil {
1782+
// mu.Lock()
1783+
// defer mu.Unlock()
1784+
// errs = append(errs, err)
1785+
//}
17931786
}(sub)
17941787
}
17951788
wg.Wait()
@@ -2850,31 +2843,11 @@ func (o *Operator) getUpdatedOwnerReferences(refs []metav1.OwnerReference, names
28502843
}
28512844

28522845
func (o *Operator) listSubscriptions(namespace string) (subs []*v1alpha1.Subscription, err error) {
2853-
list, err := o.client.OperatorsV1alpha1().Subscriptions(namespace).List(context.TODO(), metav1.ListOptions{})
2854-
if err != nil {
2855-
return
2856-
}
2857-
2858-
subs = make([]*v1alpha1.Subscription, 0)
2859-
for i := range list.Items {
2860-
subs = append(subs, &list.Items[i])
2861-
}
2862-
2863-
return
2846+
return o.lister.OperatorsV1alpha1().SubscriptionLister().Subscriptions(namespace).List(labels.Everything())
28642847
}
28652848

28662849
func (o *Operator) listInstallPlans(namespace string) (ips []*v1alpha1.InstallPlan, err error) {
2867-
list, err := o.client.OperatorsV1alpha1().InstallPlans(namespace).List(context.TODO(), metav1.ListOptions{})
2868-
if err != nil {
2869-
return
2870-
}
2871-
2872-
ips = make([]*v1alpha1.InstallPlan, 0)
2873-
for i := range list.Items {
2874-
ips = append(ips, &list.Items[i])
2875-
}
2876-
2877-
return
2850+
return o.lister.OperatorsV1alpha1().InstallPlanLister().InstallPlans(namespace).List(labels.Everything())
28782851
}
28792852

28802853
// competingCRDOwnersExist returns true if there exists a CSV that owns at least one of the given CSVs owned CRDs (that's not the given CSV)

0 commit comments

Comments
 (0)