Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 59 additions & 44 deletions pkg/controller/operators/catalog/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ import (
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/pager"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/retry"
"k8s.io/client-go/util/workqueue"
utilclock "k8s.io/utils/clock"

Expand Down Expand Up @@ -238,6 +237,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
Namespace: csv.Namespace,
Labels: csv.Labels,
Annotations: csv.Annotations,
UID: csv.UID,
},
Spec: v1alpha1.ClusterServiceVersionSpec{
CustomResourceDefinitions: csv.Spec.CustomResourceDefinitions,
Expand Down Expand Up @@ -745,16 +745,21 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
// Namespace sync for resolving subscriptions
namespaceInformer := informers.NewSharedInformerFactory(op.opClient.KubernetesInterface(), resyncPeriod()).Core().V1().Namespaces()
op.lister.CoreV1().RegisterNamespaceLister(namespaceInformer.Lister())
op.nsResolveQueue = workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](),
op.nsResolveQueue = workqueue.NewTypedRateLimitingQueueWithConfig[any](
workqueue.NewTypedItemExponentialFailureRateLimiter[any](1*time.Second, 30*time.Second),
workqueue.TypedRateLimitingQueueConfig[any]{
Name: "resolve",
})
//op.nsResolveQueue = workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](),
// workqueue.TypedRateLimitingQueueConfig[any]{
// Name: "resolve",
// })
namespaceQueueInformer, err := queueinformer.NewQueueInformer(
ctx,
queueinformer.WithLogger(op.logger),
queueinformer.WithQueue(op.nsResolveQueue),
queueinformer.WithInformer(namespaceInformer.Informer()),
queueinformer.WithSyncer(queueinformer.LegacySyncHandler(op.syncResolvingNamespace).ToSyncer()),
queueinformer.WithSyncer(queueinformer.LegacySyncHandler(op.syncResolvingNamespace).ToSyncerWithDelete(op.handleDeletion)),
)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1313,6 +1318,9 @@ func (o *Operator) syncResolvingNamespace(obj interface{}) error {
// from users/admins. Resyncing the namespace again is unlikely to resolve
// not-satisfiable error
if _, ok := err.(solver.NotSatisfiable); ok {
if err := o.ResyncInformers(); err != nil {
logger.WithError(err).Infof("error resyncing informers")
}
logger.WithError(err).Debug("resolution failed")
_, updateErr := o.updateSubscriptionStatuses(
o.setSubsCond(subs, v1alpha1.SubscriptionCondition{
Expand All @@ -1325,7 +1333,7 @@ func (o *Operator) syncResolvingNamespace(obj interface{}) error {
logger.WithError(updateErr).Debug("failed to update subs conditions")
return updateErr
}
return nil
return err
}

_, updateErr := o.updateSubscriptionStatuses(
Expand Down Expand Up @@ -1736,7 +1744,8 @@ func (o *Operator) setSubsCond(subs []*v1alpha1.Subscription, cond v1alpha1.Subs

for _, sub := range subs {
subCond := sub.Status.GetCondition(cond.Type)
if subCond.Equals(cond) {

if subCond.Type == cond.Type && subCond.Status == cond.Status && subCond.Reason == cond.Reason {
continue
}
sub.Status.LastUpdated = lastUpdated
Expand All @@ -1762,34 +1771,38 @@ func (o *Operator) removeSubsCond(subs []*v1alpha1.Subscription, condType v1alph

func (o *Operator) updateSubscriptionStatuses(subs []*v1alpha1.Subscription) ([]*v1alpha1.Subscription, error) {
var (
errs []error
mu sync.Mutex
wg sync.WaitGroup
getOpts = metav1.GetOptions{}
errs []error
mu sync.Mutex
wg sync.WaitGroup
// getOpts = metav1.GetOptions{}
updateOpts = metav1.UpdateOptions{}
)

for _, sub := range subs {
wg.Add(1)
go func(sub *v1alpha1.Subscription) {
defer wg.Done()

update := func() error {
// Update the status of the latest revision
latest, err := o.client.OperatorsV1alpha1().Subscriptions(sub.GetNamespace()).Get(context.TODO(), sub.GetName(), getOpts)
if err != nil {
return err
}
latest.Status = sub.Status
*sub = *latest
_, err = o.client.OperatorsV1alpha1().Subscriptions(sub.Namespace).UpdateStatus(context.TODO(), latest, updateOpts)
return err
}
if err := retry.RetryOnConflict(retry.DefaultRetry, update); err != nil {
if _, err := o.client.OperatorsV1alpha1().Subscriptions(sub.Namespace).UpdateStatus(context.TODO(), sub, updateOpts); err != nil {
mu.Lock()
defer mu.Unlock()
errs = append(errs, err)
}
//update := func() error {
// // Update the status of the latest revision
// latest, err := o.client.OperatorsV1alpha1().Subscriptions(sub.GetNamespace()).Get(context.TODO(), sub.GetName(), getOpts)
// if err != nil {
// return err
// }
// latest.Status = sub.Status
// *sub = *latest
// _, err = o.client.OperatorsV1alpha1().Subscriptions(sub.Namespace).UpdateStatus(context.TODO(), latest, updateOpts)
// return err
//}
//if err := retry.RetryOnConflict(retry.DefaultRetry, update); err != nil {
// mu.Lock()
// defer mu.Unlock()
// errs = append(errs, err)
//}
}(sub)
}
wg.Wait()
Expand Down Expand Up @@ -2850,31 +2863,33 @@ func (o *Operator) getUpdatedOwnerReferences(refs []metav1.OwnerReference, names
}

func (o *Operator) listSubscriptions(namespace string) (subs []*v1alpha1.Subscription, err error) {
list, err := o.client.OperatorsV1alpha1().Subscriptions(namespace).List(context.TODO(), metav1.ListOptions{})
if err != nil {
return
}

subs = make([]*v1alpha1.Subscription, 0)
for i := range list.Items {
subs = append(subs, &list.Items[i])
}

return
return o.lister.OperatorsV1alpha1().SubscriptionLister().Subscriptions(namespace).List(labels.Everything())
//list, err := o.client.OperatorsV1alpha1().Subscriptions(namespace).List(context.TODO(), metav1.ListOptions{})
//if err != nil {
// return
//}
//
//subs = make([]*v1alpha1.Subscription, 0)
//for i := range list.Items {
// subs = append(subs, &list.Items[i])
//}
//
//return
}

func (o *Operator) listInstallPlans(namespace string) (ips []*v1alpha1.InstallPlan, err error) {
list, err := o.client.OperatorsV1alpha1().InstallPlans(namespace).List(context.TODO(), metav1.ListOptions{})
if err != nil {
return
}

ips = make([]*v1alpha1.InstallPlan, 0)
for i := range list.Items {
ips = append(ips, &list.Items[i])
}

return
return o.lister.OperatorsV1alpha1().InstallPlanLister().InstallPlans(namespace).List(labels.Everything())
//list, err := o.client.OperatorsV1alpha1().InstallPlans(namespace).List(context.TODO(), metav1.ListOptions{})
//if err != nil {
// return
//}
//
//ips = make([]*v1alpha1.InstallPlan, 0)
//for i := range list.Items {
// ips = append(ips, &list.Items[i])
//}
//
//return
}

// competingCRDOwnersExist returns true if there exists a CSV that owns at least one of the given CSVs owned CRDs (that's not the given CSV)
Expand Down
37 changes: 24 additions & 13 deletions pkg/controller/registry/resolver/source_csvs.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ func (csp *csvSourceProvider) Sources(namespaces ...string) map[cache.SourceKey]
listSubscriptions: func(ctx context.Context) (*v1alpha1.SubscriptionList, error) {
return csp.client.OperatorsV1alpha1().Subscriptions(namespace).List(ctx, metav1.ListOptions{})
},
//getCSV: func(ctx context.Context, namespace string, name string) (*v1alpha1.ClusterServiceVersion, error) {
// return csp.client.OperatorsV1alpha1().ClusterServiceVersions(namespace).Get(ctx, name, metav1.GetOptions{})
//},
}
break // first ns is assumed to be the target ns, todo: make explicit
}
Expand All @@ -54,6 +57,7 @@ type csvSource struct {
logger logrus.StdLogger

listSubscriptions func(context.Context) (*v1alpha1.SubscriptionList, error)
// getCSV func(ctx context.Context, namespace string, name string) (*v1alpha1.ClusterServiceVersion, error)
}

func (s *csvSource) Snapshot(ctx context.Context) (*cache.Snapshot, error) {
Expand Down Expand Up @@ -93,19 +97,26 @@ func (s *csvSource) Snapshot(ctx context.Context) (*cache.Snapshot, error) {
continue
}

if cachedSubscription, ok := csvSubscriptions[csv]; !ok || cachedSubscription == nil {
// we might be in an incoherent state, so let's check with live clients to make sure
realSubscriptions, err := s.listSubscriptions(ctx)
if err != nil {
return nil, fmt.Errorf("failed to list subscriptions: %w", err)
}
for _, realSubscription := range realSubscriptions.Items {
if realSubscription.Status.InstalledCSV == csv.Name {
// oops, live cluster state is coherent
return nil, fmt.Errorf("lister caches incoherent for CSV %s/%s - found owning Subscription %s/%s", csv.Namespace, csv.Name, realSubscription.Namespace, realSubscription.Name)
}
}
}
//if cachedSubscription, ok := csvSubscriptions[csv]; !ok || cachedSubscription == nil {
// // we might be in an incoherent state, so let's check with live clients to make sure
// realSubscriptions, err := s.listSubscriptions(ctx)
// if err != nil {
// return nil, fmt.Errorf("failed to list subscriptions: %w", err)
// }
// for _, realSubscription := range realSubscriptions.Items {
// if realSubscription.Status.InstalledCSV == csv.Name {
// // oops, live cluster state is coherent
// return nil, fmt.Errorf("lister caches incoherent for CSV %s/%s - found owning Subscription %s/%s", csv.Namespace, csv.Name, realSubscription.Namespace, realSubscription.Name)
// }
// }
// realCsv, err := s.getCSV(ctx, csv.Namespace, csv.Name)
// if err != nil {
// return nil, fmt.Errorf("lister caches might be incoherent for CSV %s/%s: %w", csv.Namespace, csv.Name, err)
// }
// if realCsv.GetUID() != csv.GetUID() {
// return nil, fmt.Errorf("lister caches incoherent for CSV %s/%s: differing UIDs (%s != %s)", csv.Namespace, csv.Name, csv.UID)
// }
//}

if failForwardEnabled {
replacementChainEndsInFailure, err := isReplacementChainThatEndsInFailure(csv, ReplacementMapping(csvs))
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/registry/resolver/step_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func NewOperatorStepResolver(lister operatorlister.OperatorLister, client versio
}

func (r *OperatorStepResolver) ResolveSteps(namespace string) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) {
subs, err := r.listSubscriptions(namespace)
subs, err := r.subLister.Subscriptions(namespace).List(labels.Everything())
if err != nil {
return nil, nil, nil, err
}
Expand Down
13 changes: 13 additions & 0 deletions pkg/lib/queueinformer/queueinformer_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ type Operator interface {
// RunInformers starts the Operator's underlying Informers.
RunInformers(ctx context.Context)

ResyncInformers() error

// Run starts the Operator and its underlying Informers.
Run(ctx context.Context)
}
Expand Down Expand Up @@ -197,6 +199,17 @@ func (o *operator) Run(ctx context.Context) {
})
}

func (o *operator) ResyncInformers() error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No client using LIST + WATCH should ever have to do this, FWIW.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, watch events are at least once right?
This mishmash of cached and live requests is biting us in the butt....

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case you're curious: the issue is we have a situation in which the namespaces gets nuked and immediately recreated with the subscription, and we're ending up with unsat...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you using UID checks to detect deletion + recreation? Do you care that it deleted & recreated, or no?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in the end the RC was: the typed queues were of any type and we were enqueuing two different types of objects (string and ResourceEvent). Two semantically equal events come in and get sync'ed concurrently because we break dedupe =P

o.mu.Lock()
defer o.mu.Unlock()
for _, informer := range o.informers {
if err := informer.GetStore().Resync(); err != nil {
return err
}
}
return nil
}

func (o *operator) start(ctx context.Context) error {
defer close(o.ready)

Expand Down
2 changes: 1 addition & 1 deletion test/e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ var (

func TestEndToEnd(t *testing.T) {
RegisterFailHandler(Fail)
SetDefaultEventuallyTimeout(1 * time.Minute)
SetDefaultEventuallyTimeout(5 * time.Minute)
SetDefaultEventuallyPollingInterval(1 * time.Second)
SetDefaultConsistentlyDuration(30 * time.Second)
SetDefaultConsistentlyPollingInterval(1 * time.Second)
Expand Down
Loading