diff --git a/pkg/controller/operators/catalog/operator.go b/pkg/controller/operators/catalog/operator.go index 10d3187096..4db034d1b2 100644 --- a/pkg/controller/operators/catalog/operator.go +++ b/pkg/controller/operators/catalog/operator.go @@ -47,7 +47,6 @@ import ( "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/pager" "k8s.io/client-go/tools/record" - "k8s.io/client-go/util/retry" "k8s.io/client-go/util/workqueue" utilclock "k8s.io/utils/clock" @@ -238,6 +237,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo Namespace: csv.Namespace, Labels: csv.Labels, Annotations: csv.Annotations, + UID: csv.UID, }, Spec: v1alpha1.ClusterServiceVersionSpec{ CustomResourceDefinitions: csv.Spec.CustomResourceDefinitions, @@ -745,16 +745,21 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo // Namespace sync for resolving subscriptions namespaceInformer := informers.NewSharedInformerFactory(op.opClient.KubernetesInterface(), resyncPeriod()).Core().V1().Namespaces() op.lister.CoreV1().RegisterNamespaceLister(namespaceInformer.Lister()) - op.nsResolveQueue = workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](), + op.nsResolveQueue = workqueue.NewTypedRateLimitingQueueWithConfig[any]( + workqueue.NewTypedItemExponentialFailureRateLimiter[any](1*time.Second, 30*time.Second), workqueue.TypedRateLimitingQueueConfig[any]{ Name: "resolve", }) + //op.nsResolveQueue = workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](), + // workqueue.TypedRateLimitingQueueConfig[any]{ + // Name: "resolve", + // }) namespaceQueueInformer, err := queueinformer.NewQueueInformer( ctx, queueinformer.WithLogger(op.logger), queueinformer.WithQueue(op.nsResolveQueue), queueinformer.WithInformer(namespaceInformer.Informer()), - queueinformer.WithSyncer(queueinformer.LegacySyncHandler(op.syncResolvingNamespace).ToSyncer()), + queueinformer.WithSyncer(queueinformer.LegacySyncHandler(op.syncResolvingNamespace).ToSyncerWithDelete(op.handleDeletion)), ) if err != nil { return nil, err @@ -1313,6 +1318,9 @@ func (o *Operator) syncResolvingNamespace(obj interface{}) error { // from users/admins. Resyncing the namespace again is unlikely to resolve // not-satisfiable error if _, ok := err.(solver.NotSatisfiable); ok { + if err := o.ResyncInformers(); err != nil { + logger.WithError(err).Infof("error resyncing informers") + } logger.WithError(err).Debug("resolution failed") _, updateErr := o.updateSubscriptionStatuses( o.setSubsCond(subs, v1alpha1.SubscriptionCondition{ @@ -1325,7 +1333,7 @@ func (o *Operator) syncResolvingNamespace(obj interface{}) error { logger.WithError(updateErr).Debug("failed to update subs conditions") return updateErr } - return nil + return err } _, updateErr := o.updateSubscriptionStatuses( @@ -1736,7 +1744,8 @@ func (o *Operator) setSubsCond(subs []*v1alpha1.Subscription, cond v1alpha1.Subs for _, sub := range subs { subCond := sub.Status.GetCondition(cond.Type) - if subCond.Equals(cond) { + + if subCond.Type == cond.Type && subCond.Status == cond.Status && subCond.Reason == cond.Reason { continue } sub.Status.LastUpdated = lastUpdated @@ -1762,10 +1771,10 @@ func (o *Operator) removeSubsCond(subs []*v1alpha1.Subscription, condType v1alph func (o *Operator) updateSubscriptionStatuses(subs []*v1alpha1.Subscription) ([]*v1alpha1.Subscription, error) { var ( - errs []error - mu sync.Mutex - wg sync.WaitGroup - getOpts = metav1.GetOptions{} + errs []error + mu sync.Mutex + wg sync.WaitGroup + // getOpts = metav1.GetOptions{} updateOpts = metav1.UpdateOptions{} ) @@ -1773,23 +1782,27 @@ func (o *Operator) updateSubscriptionStatuses(subs []*v1alpha1.Subscription) ([] wg.Add(1) go func(sub *v1alpha1.Subscription) { defer wg.Done() - - update := func() error { - // Update the status of the latest revision - latest, err := o.client.OperatorsV1alpha1().Subscriptions(sub.GetNamespace()).Get(context.TODO(), sub.GetName(), getOpts) - if err != nil { - return err - } - latest.Status = sub.Status - *sub = *latest - _, err = o.client.OperatorsV1alpha1().Subscriptions(sub.Namespace).UpdateStatus(context.TODO(), latest, updateOpts) - return err - } - if err := retry.RetryOnConflict(retry.DefaultRetry, update); err != nil { + if _, err := o.client.OperatorsV1alpha1().Subscriptions(sub.Namespace).UpdateStatus(context.TODO(), sub, updateOpts); err != nil { mu.Lock() defer mu.Unlock() errs = append(errs, err) } + //update := func() error { + // // Update the status of the latest revision + // latest, err := o.client.OperatorsV1alpha1().Subscriptions(sub.GetNamespace()).Get(context.TODO(), sub.GetName(), getOpts) + // if err != nil { + // return err + // } + // latest.Status = sub.Status + // *sub = *latest + // _, err = o.client.OperatorsV1alpha1().Subscriptions(sub.Namespace).UpdateStatus(context.TODO(), latest, updateOpts) + // return err + //} + //if err := retry.RetryOnConflict(retry.DefaultRetry, update); err != nil { + // mu.Lock() + // defer mu.Unlock() + // errs = append(errs, err) + //} }(sub) } wg.Wait() @@ -2850,31 +2863,33 @@ func (o *Operator) getUpdatedOwnerReferences(refs []metav1.OwnerReference, names } func (o *Operator) listSubscriptions(namespace string) (subs []*v1alpha1.Subscription, err error) { - list, err := o.client.OperatorsV1alpha1().Subscriptions(namespace).List(context.TODO(), metav1.ListOptions{}) - if err != nil { - return - } - - subs = make([]*v1alpha1.Subscription, 0) - for i := range list.Items { - subs = append(subs, &list.Items[i]) - } - - return + return o.lister.OperatorsV1alpha1().SubscriptionLister().Subscriptions(namespace).List(labels.Everything()) + //list, err := o.client.OperatorsV1alpha1().Subscriptions(namespace).List(context.TODO(), metav1.ListOptions{}) + //if err != nil { + // return + //} + // + //subs = make([]*v1alpha1.Subscription, 0) + //for i := range list.Items { + // subs = append(subs, &list.Items[i]) + //} + // + //return } func (o *Operator) listInstallPlans(namespace string) (ips []*v1alpha1.InstallPlan, err error) { - list, err := o.client.OperatorsV1alpha1().InstallPlans(namespace).List(context.TODO(), metav1.ListOptions{}) - if err != nil { - return - } - - ips = make([]*v1alpha1.InstallPlan, 0) - for i := range list.Items { - ips = append(ips, &list.Items[i]) - } - - return + return o.lister.OperatorsV1alpha1().InstallPlanLister().InstallPlans(namespace).List(labels.Everything()) + //list, err := o.client.OperatorsV1alpha1().InstallPlans(namespace).List(context.TODO(), metav1.ListOptions{}) + //if err != nil { + // return + //} + // + //ips = make([]*v1alpha1.InstallPlan, 0) + //for i := range list.Items { + // ips = append(ips, &list.Items[i]) + //} + // + //return } // competingCRDOwnersExist returns true if there exists a CSV that owns at least one of the given CSVs owned CRDs (that's not the given CSV) diff --git a/pkg/controller/registry/resolver/source_csvs.go b/pkg/controller/registry/resolver/source_csvs.go index bc7fe506cf..9d5ae92b0b 100644 --- a/pkg/controller/registry/resolver/source_csvs.go +++ b/pkg/controller/registry/resolver/source_csvs.go @@ -40,6 +40,9 @@ func (csp *csvSourceProvider) Sources(namespaces ...string) map[cache.SourceKey] listSubscriptions: func(ctx context.Context) (*v1alpha1.SubscriptionList, error) { return csp.client.OperatorsV1alpha1().Subscriptions(namespace).List(ctx, metav1.ListOptions{}) }, + //getCSV: func(ctx context.Context, namespace string, name string) (*v1alpha1.ClusterServiceVersion, error) { + // return csp.client.OperatorsV1alpha1().ClusterServiceVersions(namespace).Get(ctx, name, metav1.GetOptions{}) + //}, } break // first ns is assumed to be the target ns, todo: make explicit } @@ -54,6 +57,7 @@ type csvSource struct { logger logrus.StdLogger listSubscriptions func(context.Context) (*v1alpha1.SubscriptionList, error) + // getCSV func(ctx context.Context, namespace string, name string) (*v1alpha1.ClusterServiceVersion, error) } func (s *csvSource) Snapshot(ctx context.Context) (*cache.Snapshot, error) { @@ -93,19 +97,26 @@ func (s *csvSource) Snapshot(ctx context.Context) (*cache.Snapshot, error) { continue } - if cachedSubscription, ok := csvSubscriptions[csv]; !ok || cachedSubscription == nil { - // we might be in an incoherent state, so let's check with live clients to make sure - realSubscriptions, err := s.listSubscriptions(ctx) - if err != nil { - return nil, fmt.Errorf("failed to list subscriptions: %w", err) - } - for _, realSubscription := range realSubscriptions.Items { - if realSubscription.Status.InstalledCSV == csv.Name { - // oops, live cluster state is coherent - return nil, fmt.Errorf("lister caches incoherent for CSV %s/%s - found owning Subscription %s/%s", csv.Namespace, csv.Name, realSubscription.Namespace, realSubscription.Name) - } - } - } + //if cachedSubscription, ok := csvSubscriptions[csv]; !ok || cachedSubscription == nil { + // // we might be in an incoherent state, so let's check with live clients to make sure + // realSubscriptions, err := s.listSubscriptions(ctx) + // if err != nil { + // return nil, fmt.Errorf("failed to list subscriptions: %w", err) + // } + // for _, realSubscription := range realSubscriptions.Items { + // if realSubscription.Status.InstalledCSV == csv.Name { + // // oops, live cluster state is coherent + // return nil, fmt.Errorf("lister caches incoherent for CSV %s/%s - found owning Subscription %s/%s", csv.Namespace, csv.Name, realSubscription.Namespace, realSubscription.Name) + // } + // } + // realCsv, err := s.getCSV(ctx, csv.Namespace, csv.Name) + // if err != nil { + // return nil, fmt.Errorf("lister caches might be incoherent for CSV %s/%s: %w", csv.Namespace, csv.Name, err) + // } + // if realCsv.GetUID() != csv.GetUID() { + // return nil, fmt.Errorf("lister caches incoherent for CSV %s/%s: differing UIDs (%s != %s)", csv.Namespace, csv.Name, csv.UID) + // } + //} if failForwardEnabled { replacementChainEndsInFailure, err := isReplacementChainThatEndsInFailure(csv, ReplacementMapping(csvs)) diff --git a/pkg/controller/registry/resolver/step_resolver.go b/pkg/controller/registry/resolver/step_resolver.go index 5d2807bceb..f4988a7de6 100644 --- a/pkg/controller/registry/resolver/step_resolver.go +++ b/pkg/controller/registry/resolver/step_resolver.go @@ -91,7 +91,7 @@ func NewOperatorStepResolver(lister operatorlister.OperatorLister, client versio } func (r *OperatorStepResolver) ResolveSteps(namespace string) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) { - subs, err := r.listSubscriptions(namespace) + subs, err := r.subLister.Subscriptions(namespace).List(labels.Everything()) if err != nil { return nil, nil, nil, err } diff --git a/pkg/lib/queueinformer/queueinformer_operator.go b/pkg/lib/queueinformer/queueinformer_operator.go index ecdb4eb896..2bdb523352 100644 --- a/pkg/lib/queueinformer/queueinformer_operator.go +++ b/pkg/lib/queueinformer/queueinformer_operator.go @@ -54,6 +54,8 @@ type Operator interface { // RunInformers starts the Operator's underlying Informers. RunInformers(ctx context.Context) + ResyncInformers() error + // Run starts the Operator and its underlying Informers. Run(ctx context.Context) } @@ -197,6 +199,17 @@ func (o *operator) Run(ctx context.Context) { }) } +func (o *operator) ResyncInformers() error { + o.mu.Lock() + defer o.mu.Unlock() + for _, informer := range o.informers { + if err := informer.GetStore().Resync(); err != nil { + return err + } + } + return nil +} + func (o *operator) start(ctx context.Context) error { defer close(o.ready) diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index 7deb176552..6e662ae291 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -80,7 +80,7 @@ var ( func TestEndToEnd(t *testing.T) { RegisterFailHandler(Fail) - SetDefaultEventuallyTimeout(1 * time.Minute) + SetDefaultEventuallyTimeout(5 * time.Minute) SetDefaultEventuallyPollingInterval(1 * time.Second) SetDefaultConsistentlyDuration(30 * time.Second) SetDefaultConsistentlyPollingInterval(1 * time.Second)