@@ -2,6 +2,7 @@ package catalog
22
33import (
44 "context"
5+ "crypto/sha256"
56 "encoding/json"
67 "errors"
78 "fmt"
@@ -47,7 +48,6 @@ import (
4748 "k8s.io/client-go/tools/clientcmd"
4849 "k8s.io/client-go/tools/pager"
4950 "k8s.io/client-go/tools/record"
50- "k8s.io/client-go/util/retry"
5151 "k8s.io/client-go/util/workqueue"
5252 utilclock "k8s.io/utils/clock"
5353
@@ -332,6 +332,27 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
332332 return nil , err
333333 }
334334
335+ // Namespace sync for resolving subscriptions
336+ namespaceInformer := informers .NewSharedInformerFactory (op .opClient .KubernetesInterface (), resyncPeriod ()).Core ().V1 ().Namespaces ()
337+ op .lister .CoreV1 ().RegisterNamespaceLister (namespaceInformer .Lister ())
338+ op .nsResolveQueue = workqueue .NewTypedRateLimitingQueueWithConfig [any ](workqueue .DefaultTypedControllerRateLimiter [any ](),
339+ workqueue.TypedRateLimitingQueueConfig [any ]{
340+ Name : "resolve" ,
341+ })
342+ namespaceQueueInformer , err := queueinformer .NewQueueInformer (
343+ ctx ,
344+ queueinformer .WithLogger (op .logger ),
345+ queueinformer .WithQueue (op .nsResolveQueue ),
346+ queueinformer .WithInformer (namespaceInformer .Informer ()),
347+ queueinformer .WithSyncer (queueinformer .LegacySyncHandler (op .syncResolvingNamespace ).ToSyncer ()),
348+ )
349+ if err != nil {
350+ return nil , err
351+ }
352+ if err := op .RegisterQueueInformer (namespaceQueueInformer ); err != nil {
353+ return nil , err
354+ }
355+
335356 // Wire Subscriptions
336357 subInformer := crInformerFactory .Operators ().V1alpha1 ().Subscriptions ()
337358 op .lister .OperatorsV1alpha1 ().RegisterSubscriptionLister (metav1 .NamespaceAll , subInformer .Lister ())
@@ -355,7 +376,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
355376 subscription .WithCatalogInformer (catsrcInformer .Informer ()),
356377 subscription .WithInstallPlanInformer (ipInformer .Informer ()),
357378 subscription .WithSubscriptionQueue (subQueue ),
358- subscription .WithAppendedReconcilers ( subscription . ReconcilerFromLegacySyncHandler ( op .syncSubscriptions , nil ) ),
379+ subscription .WithNamespaceResolveQueue ( op .nsResolveQueue ),
359380 subscription .WithRegistryReconcilerFactory (op .reconciler ),
360381 subscription .WithGlobalCatalogNamespace (op .namespace ),
361382 subscription .WithSourceProvider (op .resolverSourceProvider ),
@@ -742,27 +763,6 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
742763 return nil , err
743764 }
744765
745- // Namespace sync for resolving subscriptions
746- namespaceInformer := informers .NewSharedInformerFactory (op .opClient .KubernetesInterface (), resyncPeriod ()).Core ().V1 ().Namespaces ()
747- op .lister .CoreV1 ().RegisterNamespaceLister (namespaceInformer .Lister ())
748- op .nsResolveQueue = workqueue .NewTypedRateLimitingQueueWithConfig [any ](workqueue .DefaultTypedControllerRateLimiter [any ](),
749- workqueue.TypedRateLimitingQueueConfig [any ]{
750- Name : "resolve" ,
751- })
752- namespaceQueueInformer , err := queueinformer .NewQueueInformer (
753- ctx ,
754- queueinformer .WithLogger (op .logger ),
755- queueinformer .WithQueue (op .nsResolveQueue ),
756- queueinformer .WithInformer (namespaceInformer .Informer ()),
757- queueinformer .WithSyncer (queueinformer .LegacySyncHandler (op .syncResolvingNamespace ).ToSyncer ()),
758- )
759- if err != nil {
760- return nil , err
761- }
762- if err := op .RegisterQueueInformer (namespaceQueueInformer ); err != nil {
763- return nil , err
764- }
765-
766766 op .sources .Start (context .Background ())
767767
768768 return op , nil
@@ -1342,6 +1342,9 @@ func (o *Operator) syncResolvingNamespace(obj interface{}) error {
13421342 return err
13431343 }
13441344
1345+ // Remove resolutionfailed condition from subscriptions
1346+ o .removeSubsCond (subs , v1alpha1 .SubscriptionResolutionFailed )
1347+
13451348 // Attempt to unpack bundles before installing
13461349 // Note: This should probably use the attenuated client to prevent users from resolving resources they otherwise don't have access to.
13471350 if len (bundleLookups ) > 0 {
@@ -1469,9 +1472,6 @@ func (o *Operator) syncResolvingNamespace(obj interface{}) error {
14691472 // Remove BundleUnpackFailed condition from subscriptions
14701473 o .removeSubsCond (subs , v1alpha1 .SubscriptionBundleUnpackFailed )
14711474
1472- // Remove resolutionfailed condition from subscriptions
1473- o .removeSubsCond (subs , v1alpha1 .SubscriptionResolutionFailed )
1474-
14751475 newSub := true
14761476 for _ , updatedSub := range updatedSubs {
14771477 updatedSub .Status .RemoveConditions (v1alpha1 .SubscriptionResolutionFailed )
@@ -1499,18 +1499,6 @@ func (o *Operator) syncResolvingNamespace(obj interface{}) error {
14991499 return nil
15001500}
15011501
1502- func (o * Operator ) syncSubscriptions (obj interface {}) error {
1503- sub , ok := obj .(* v1alpha1.Subscription )
1504- if ! ok {
1505- o .logger .Infof ("wrong type: %#v" , obj )
1506- return fmt .Errorf ("casting Subscription failed" )
1507- }
1508-
1509- o .nsResolveQueue .Add (sub .GetNamespace ())
1510-
1511- return nil
1512- }
1513-
15141502// syncOperatorGroups requeues the namespace resolution queue on changes to an operatorgroup
15151503// This is because the operatorgroup is now an input to resolution via the global catalog exclusion annotation
15161504func (o * Operator ) syncOperatorGroups (obj interface {}) error {
@@ -1672,7 +1660,20 @@ func (o *Operator) createInstallPlan(namespace string, gen int, subs []*v1alpha1
16721660 return nil , nil
16731661 }
16741662
1675- csvNames := []string {}
1663+ sha , err := func () (string , error ) {
1664+ jsonBytes , err := json .Marshal (steps )
1665+ if err != nil {
1666+ return "" , err
1667+ }
1668+ hash := sha256 .Sum256 (jsonBytes )
1669+ return fmt .Sprintf ("%x" , hash ), nil
1670+ }()
1671+
1672+ if err != nil {
1673+ return nil , err
1674+ }
1675+
1676+ var csvNames []string
16761677 catalogSourceMap := map [string ]struct {}{}
16771678 for _ , s := range steps {
16781679 if s .Resource .Kind == "ClusterServiceVersion" {
@@ -1681,7 +1682,7 @@ func (o *Operator) createInstallPlan(namespace string, gen int, subs []*v1alpha1
16811682 catalogSourceMap [s .Resource .CatalogSource ] = struct {}{}
16821683 }
16831684
1684- catalogSources := []string {}
1685+ var catalogSources []string
16851686 for s := range catalogSourceMap {
16861687 catalogSources = append (catalogSources , s )
16871688 }
@@ -1692,8 +1693,8 @@ func (o *Operator) createInstallPlan(namespace string, gen int, subs []*v1alpha1
16921693 }
16931694 ip := & v1alpha1.InstallPlan {
16941695 ObjectMeta : metav1.ObjectMeta {
1695- GenerateName : "install-" ,
1696- Namespace : namespace ,
1696+ Name : fmt . Sprintf ( "install-%s" , sha ) ,
1697+ Namespace : namespace ,
16971698 },
16981699 Spec : v1alpha1.InstallPlanSpec {
16991700 ClusterServiceVersionNames : csvNames ,
@@ -1707,7 +1708,8 @@ func (o *Operator) createInstallPlan(namespace string, gen int, subs []*v1alpha1
17071708 }
17081709
17091710 res , err := o .client .OperatorsV1alpha1 ().InstallPlans (namespace ).Create (context .TODO (), ip , metav1.CreateOptions {})
1710- if err != nil {
1711+ if err != nil && ! apierrors .IsAlreadyExists (err ) {
1712+
17111713 return nil , err
17121714 }
17131715
@@ -1717,11 +1719,10 @@ func (o *Operator) createInstallPlan(namespace string, gen int, subs []*v1alpha1
17171719 CatalogSources : catalogSources ,
17181720 BundleLookups : bundleLookups ,
17191721 }
1720- res , err = o . client . OperatorsV1alpha1 (). InstallPlans ( namespace ). UpdateStatus ( context . TODO (), res , metav1. UpdateOptions {})
1721- if err != nil {
1722+
1723+ if _ , err = o . client . OperatorsV1alpha1 (). InstallPlans ( namespace ). UpdateStatus ( context . TODO (), res , metav1. UpdateOptions {}); err != nil {
17221724 return nil , err
17231725 }
1724-
17251726 return reference .GetReference (res )
17261727}
17271728
@@ -1765,7 +1766,6 @@ func (o *Operator) updateSubscriptionStatuses(subs []*v1alpha1.Subscription) ([]
17651766 errs []error
17661767 mu sync.Mutex
17671768 wg sync.WaitGroup
1768- getOpts = metav1.GetOptions {}
17691769 updateOpts = metav1.UpdateOptions {}
17701770 )
17711771
@@ -1774,22 +1774,29 @@ func (o *Operator) updateSubscriptionStatuses(subs []*v1alpha1.Subscription) ([]
17741774 go func (sub * v1alpha1.Subscription ) {
17751775 defer wg .Done ()
17761776
1777- update := func () error {
1778- // Update the status of the latest revision
1779- latest , err := o .client .OperatorsV1alpha1 ().Subscriptions (sub .GetNamespace ()).Get (context .TODO (), sub .GetName (), getOpts )
1780- if err != nil {
1781- return err
1782- }
1783- latest .Status = sub .Status
1784- * sub = * latest
1785- _ , err = o .client .OperatorsV1alpha1 ().Subscriptions (sub .Namespace ).UpdateStatus (context .TODO (), latest , updateOpts )
1786- return err
1787- }
1788- if err := retry .RetryOnConflict (retry .DefaultRetry , update ); err != nil {
1777+ _ , err := o .client .OperatorsV1alpha1 ().Subscriptions (sub .Namespace ).UpdateStatus (context .TODO (), sub , updateOpts )
1778+ if err != nil {
17891779 mu .Lock ()
17901780 defer mu .Unlock ()
17911781 errs = append (errs , err )
17921782 }
1783+
1784+ //update := func() error {
1785+ // // Update the status of the latest revision
1786+ // latest, err := o.client.OperatorsV1alpha1().Subscriptions(sub.GetNamespace()).Get(context.TODO(), sub.GetName(), getOpts)
1787+ // if err != nil {
1788+ // return err
1789+ // }
1790+ // latest.Status = sub.Status
1791+ // *sub = *latest
1792+ // _, err = o.client.OperatorsV1alpha1().Subscriptions(sub.Namespace).UpdateStatus(context.TODO(), latest, updateOpts)
1793+ // return err
1794+ //}
1795+ //if err := retry.RetryOnConflict(retry.DefaultRetry, update); err != nil {
1796+ // mu.Lock()
1797+ // defer mu.Unlock()
1798+ // errs = append(errs, err)
1799+ //}
17931800 }(sub )
17941801 }
17951802 wg .Wait ()
@@ -2850,31 +2857,11 @@ func (o *Operator) getUpdatedOwnerReferences(refs []metav1.OwnerReference, names
28502857}
28512858
28522859func (o * Operator ) listSubscriptions (namespace string ) (subs []* v1alpha1.Subscription , err error ) {
2853- list , err := o .client .OperatorsV1alpha1 ().Subscriptions (namespace ).List (context .TODO (), metav1.ListOptions {})
2854- if err != nil {
2855- return
2856- }
2857-
2858- subs = make ([]* v1alpha1.Subscription , 0 )
2859- for i := range list .Items {
2860- subs = append (subs , & list .Items [i ])
2861- }
2862-
2863- return
2860+ return o .lister .OperatorsV1alpha1 ().SubscriptionLister ().Subscriptions (namespace ).List (labels .Everything ())
28642861}
28652862
28662863func (o * Operator ) listInstallPlans (namespace string ) (ips []* v1alpha1.InstallPlan , err error ) {
2867- list , err := o .client .OperatorsV1alpha1 ().InstallPlans (namespace ).List (context .TODO (), metav1.ListOptions {})
2868- if err != nil {
2869- return
2870- }
2871-
2872- ips = make ([]* v1alpha1.InstallPlan , 0 )
2873- for i := range list .Items {
2874- ips = append (ips , & list .Items [i ])
2875- }
2876-
2877- return
2864+ return o .lister .OperatorsV1alpha1 ().InstallPlanLister ().InstallPlans (namespace ).List (labels .Everything ())
28782865}
28792866
28802867// competingCRDOwnersExist returns true if there exists a CSV that owns at least one of the given CSVs owned CRDs (that's not the given CSV)
0 commit comments