Skip to content

Commit c0b639d

Browse files
author
Per Goncalves da Silva
committed
just subscription sync changes
Signed-off-by: Per Goncalves da Silva <[email protected]>
1 parent 816b551 commit c0b639d

File tree

3 files changed

+81
-67
lines changed

3 files changed

+81
-67
lines changed

pkg/controller/operators/catalog/operator.go

Lines changed: 59 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package catalog
22

33
import (
44
"context"
5-
"crypto/sha256"
65
"encoding/json"
76
"errors"
87
"fmt"
@@ -48,6 +47,7 @@ import (
4847
"k8s.io/client-go/tools/clientcmd"
4948
"k8s.io/client-go/tools/pager"
5049
"k8s.io/client-go/tools/record"
50+
"k8s.io/client-go/util/retry"
5151
"k8s.io/client-go/util/workqueue"
5252
utilclock "k8s.io/utils/clock"
5353

@@ -1342,9 +1342,6 @@ func (o *Operator) syncResolvingNamespace(obj interface{}) error {
13421342
return err
13431343
}
13441344

1345-
// Remove resolutionfailed condition from subscriptions
1346-
o.removeSubsCond(subs, v1alpha1.SubscriptionResolutionFailed)
1347-
13481345
// Attempt to unpack bundles before installing
13491346
// Note: This should probably use the attenuated client to prevent users from resolving resources they otherwise don't have access to.
13501347
if len(bundleLookups) > 0 {
@@ -1472,6 +1469,9 @@ func (o *Operator) syncResolvingNamespace(obj interface{}) error {
14721469
// Remove BundleUnpackFailed condition from subscriptions
14731470
o.removeSubsCond(subs, v1alpha1.SubscriptionBundleUnpackFailed)
14741471

1472+
// Remove resolutionfailed condition from subscriptions
1473+
o.removeSubsCond(subs, v1alpha1.SubscriptionResolutionFailed)
1474+
14751475
newSub := true
14761476
for _, updatedSub := range updatedSubs {
14771477
updatedSub.Status.RemoveConditions(v1alpha1.SubscriptionResolutionFailed)
@@ -1499,6 +1499,18 @@ func (o *Operator) syncResolvingNamespace(obj interface{}) error {
14991499
return nil
15001500
}
15011501

1502+
func (o *Operator) syncSubscriptions(obj interface{}) error {
1503+
sub, ok := obj.(*v1alpha1.Subscription)
1504+
if !ok {
1505+
o.logger.Infof("wrong type: %#v", obj)
1506+
return fmt.Errorf("casting Subscription failed")
1507+
}
1508+
1509+
o.nsResolveQueue.Add(sub.GetNamespace())
1510+
1511+
return nil
1512+
}
1513+
15021514
// syncOperatorGroups requeues the namespace resolution queue on changes to an operatorgroup
15031515
// This is because the operatorgroup is now an input to resolution via the global catalog exclusion annotation
15041516
func (o *Operator) syncOperatorGroups(obj interface{}) error {
@@ -1660,20 +1672,7 @@ func (o *Operator) createInstallPlan(namespace string, gen int, subs []*v1alpha1
16601672
return nil, nil
16611673
}
16621674

1663-
sha, err := func() (string, error) {
1664-
jsonBytes, err := json.Marshal(steps)
1665-
if err != nil {
1666-
return "", err
1667-
}
1668-
hash := sha256.Sum256(jsonBytes)
1669-
return fmt.Sprintf("%x", hash), nil
1670-
}()
1671-
1672-
if err != nil {
1673-
return nil, err
1674-
}
1675-
1676-
var csvNames []string
1675+
csvNames := []string{}
16771676
catalogSourceMap := map[string]struct{}{}
16781677
for _, s := range steps {
16791678
if s.Resource.Kind == "ClusterServiceVersion" {
@@ -1682,7 +1681,7 @@ func (o *Operator) createInstallPlan(namespace string, gen int, subs []*v1alpha1
16821681
catalogSourceMap[s.Resource.CatalogSource] = struct{}{}
16831682
}
16841683

1685-
var catalogSources []string
1684+
catalogSources := []string{}
16861685
for s := range catalogSourceMap {
16871686
catalogSources = append(catalogSources, s)
16881687
}
@@ -1693,8 +1692,8 @@ func (o *Operator) createInstallPlan(namespace string, gen int, subs []*v1alpha1
16931692
}
16941693
ip := &v1alpha1.InstallPlan{
16951694
ObjectMeta: metav1.ObjectMeta{
1696-
Name: fmt.Sprintf("install-%s", sha),
1697-
Namespace: namespace,
1695+
GenerateName: "install-",
1696+
Namespace: namespace,
16981697
},
16991698
Spec: v1alpha1.InstallPlanSpec{
17001699
ClusterServiceVersionNames: csvNames,
@@ -1708,8 +1707,7 @@ func (o *Operator) createInstallPlan(namespace string, gen int, subs []*v1alpha1
17081707
}
17091708

17101709
res, err := o.client.OperatorsV1alpha1().InstallPlans(namespace).Create(context.TODO(), ip, metav1.CreateOptions{})
1711-
if err != nil && !apierrors.IsAlreadyExists(err) {
1712-
1710+
if err != nil {
17131711
return nil, err
17141712
}
17151713

@@ -1719,10 +1717,11 @@ func (o *Operator) createInstallPlan(namespace string, gen int, subs []*v1alpha1
17191717
CatalogSources: catalogSources,
17201718
BundleLookups: bundleLookups,
17211719
}
1722-
1723-
if _, err = o.client.OperatorsV1alpha1().InstallPlans(namespace).UpdateStatus(context.TODO(), res, metav1.UpdateOptions{}); err != nil {
1720+
res, err = o.client.OperatorsV1alpha1().InstallPlans(namespace).UpdateStatus(context.TODO(), res, metav1.UpdateOptions{})
1721+
if err != nil {
17241722
return nil, err
17251723
}
1724+
17261725
return reference.GetReference(res)
17271726
}
17281727

@@ -1766,6 +1765,7 @@ func (o *Operator) updateSubscriptionStatuses(subs []*v1alpha1.Subscription) ([]
17661765
errs []error
17671766
mu sync.Mutex
17681767
wg sync.WaitGroup
1768+
getOpts = metav1.GetOptions{}
17691769
updateOpts = metav1.UpdateOptions{}
17701770
)
17711771

@@ -1774,29 +1774,22 @@ func (o *Operator) updateSubscriptionStatuses(subs []*v1alpha1.Subscription) ([]
17741774
go func(sub *v1alpha1.Subscription) {
17751775
defer wg.Done()
17761776

1777-
_, err := o.client.OperatorsV1alpha1().Subscriptions(sub.Namespace).UpdateStatus(context.TODO(), sub, updateOpts)
1778-
if err != nil {
1777+
update := func() error {
1778+
// Update the status of the latest revision
1779+
latest, err := o.client.OperatorsV1alpha1().Subscriptions(sub.GetNamespace()).Get(context.TODO(), sub.GetName(), getOpts)
1780+
if err != nil {
1781+
return err
1782+
}
1783+
latest.Status = sub.Status
1784+
*sub = *latest
1785+
_, err = o.client.OperatorsV1alpha1().Subscriptions(sub.Namespace).UpdateStatus(context.TODO(), latest, updateOpts)
1786+
return err
1787+
}
1788+
if err := retry.RetryOnConflict(retry.DefaultRetry, update); err != nil {
17791789
mu.Lock()
17801790
defer mu.Unlock()
17811791
errs = append(errs, err)
17821792
}
1783-
1784-
//update := func() error {
1785-
// // Update the status of the latest revision
1786-
// latest, err := o.client.OperatorsV1alpha1().Subscriptions(sub.GetNamespace()).Get(context.TODO(), sub.GetName(), getOpts)
1787-
// if err != nil {
1788-
// return err
1789-
// }
1790-
// latest.Status = sub.Status
1791-
// *sub = *latest
1792-
// _, err = o.client.OperatorsV1alpha1().Subscriptions(sub.Namespace).UpdateStatus(context.TODO(), latest, updateOpts)
1793-
// return err
1794-
//}
1795-
//if err := retry.RetryOnConflict(retry.DefaultRetry, update); err != nil {
1796-
// mu.Lock()
1797-
// defer mu.Unlock()
1798-
// errs = append(errs, err)
1799-
//}
18001793
}(sub)
18011794
}
18021795
wg.Wait()
@@ -2857,11 +2850,31 @@ func (o *Operator) getUpdatedOwnerReferences(refs []metav1.OwnerReference, names
28572850
}
28582851

28592852
func (o *Operator) listSubscriptions(namespace string) (subs []*v1alpha1.Subscription, err error) {
2860-
return o.lister.OperatorsV1alpha1().SubscriptionLister().Subscriptions(namespace).List(labels.Everything())
2853+
list, err := o.client.OperatorsV1alpha1().Subscriptions(namespace).List(context.TODO(), metav1.ListOptions{})
2854+
if err != nil {
2855+
return
2856+
}
2857+
2858+
subs = make([]*v1alpha1.Subscription, 0)
2859+
for i := range list.Items {
2860+
subs = append(subs, &list.Items[i])
2861+
}
2862+
2863+
return
28612864
}
28622865

28632866
func (o *Operator) listInstallPlans(namespace string) (ips []*v1alpha1.InstallPlan, err error) {
2864-
return o.lister.OperatorsV1alpha1().InstallPlanLister().InstallPlans(namespace).List(labels.Everything())
2867+
list, err := o.client.OperatorsV1alpha1().InstallPlans(namespace).List(context.TODO(), metav1.ListOptions{})
2868+
if err != nil {
2869+
return
2870+
}
2871+
2872+
ips = make([]*v1alpha1.InstallPlan, 0)
2873+
for i := range list.Items {
2874+
ips = append(ips, &list.Items[i])
2875+
}
2876+
2877+
return
28652878
}
28662879

28672880
// competingCRDOwnersExist returns true if there exists a CSV that owns at least one of the given CSVs owned CRDs (that's not the given CSV)

pkg/controller/registry/resolver/step_resolver.go

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
package resolver
33

44
import (
5+
"context"
56
"fmt"
67

78
"github.com/operator-framework/api/pkg/operators/v1alpha1"
@@ -15,6 +16,7 @@ import (
1516
"github.com/sirupsen/logrus"
1617
corev1 "k8s.io/api/core/v1"
1718
"k8s.io/apimachinery/pkg/api/errors"
19+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1820
"k8s.io/apimachinery/pkg/labels"
1921
)
2022

@@ -89,7 +91,7 @@ func NewOperatorStepResolver(lister operatorlister.OperatorLister, client versio
8991
}
9092

9193
func (r *OperatorStepResolver) ResolveSteps(namespace string) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) {
92-
subs, err := r.subLister.Subscriptions(namespace).List(labels.Everything())
94+
subs, err := r.listSubscriptions(namespace)
9395
if err != nil {
9496
return nil, nil, nil, err
9597
}
@@ -117,9 +119,9 @@ func (r *OperatorStepResolver) ResolveSteps(namespace string) ([]*v1alpha1.Step,
117119

118120
// if there's no error, we were able to satisfy all constraints in the subscription set, so we calculate what
119121
// changes to persist to the cluster and write them out as `steps`
120-
var steps []*v1alpha1.Step
121-
var updatedSubs []*v1alpha1.Subscription
122-
var bundleLookups []v1alpha1.BundleLookup
122+
steps := []*v1alpha1.Step{}
123+
updatedSubs := []*v1alpha1.Subscription{}
124+
bundleLookups := []v1alpha1.BundleLookup{}
123125
for _, op := range operators {
124126
// Find any existing subscriptions that resolve to this operator.
125127
existingSubscriptions := make(map[*v1alpha1.Subscription]bool)
@@ -217,9 +219,8 @@ func (r *OperatorStepResolver) ResolveSteps(namespace string) ([]*v1alpha1.Step,
217219
continue
218220
}
219221
// update existing subscription status
220-
updatedSub := sub.DeepCopy()
221-
updatedSub.Status.CurrentCSV = op.Name
222-
updatedSubs = append(updatedSubs, updatedSub)
222+
sub.Status.CurrentCSV = op.Name
223+
updatedSubs = append(updatedSubs, sub)
223224
}
224225
}
225226

@@ -242,19 +243,19 @@ func (r *OperatorStepResolver) hasExistingCurrentCSV(sub *v1alpha1.Subscription)
242243
return false, err // Can't answer this question right now.
243244
}
244245

245-
//func (r *OperatorStepResolver) listSubscriptions(namespace string) ([]*v1alpha1.Subscription, error) {
246-
// list, err := r.client.OperatorsV1alpha1().Subscriptions(namespace).List(context.TODO(), metav1.ListOptions{})
247-
// if err != nil {
248-
// return nil, err
249-
// }
250-
//
251-
// var subs []*v1alpha1.Subscription
252-
// for i := range list.Items {
253-
// subs = append(subs, &list.Items[i])
254-
// }
255-
//
256-
// return subs, nil
257-
//}
246+
func (r *OperatorStepResolver) listSubscriptions(namespace string) ([]*v1alpha1.Subscription, error) {
247+
list, err := r.client.OperatorsV1alpha1().Subscriptions(namespace).List(context.TODO(), metav1.ListOptions{})
248+
if err != nil {
249+
return nil, err
250+
}
251+
252+
var subs []*v1alpha1.Subscription
253+
for i := range list.Items {
254+
subs = append(subs, &list.Items[i])
255+
}
256+
257+
return subs, nil
258+
}
258259

259260
type mergedSourceProvider struct {
260261
sps []cache.SourceProvider

pkg/lib/queueinformer/config.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,6 @@ func (c *operatorConfig) validate() (err error) {
233233
func defaultOperatorConfig() *operatorConfig {
234234
return &operatorConfig{
235235
logger: logrus.New(),
236-
numWorkers: 1,
236+
numWorkers: 2,
237237
}
238238
}

0 commit comments

Comments
 (0)