Skip to content

Commit 3682f0c

Browse files
committed
fix(catalog): move write of installplan state from installplan execution
into namespace resolution - this prevents races where installplans are created twice. Also gates namespace resolution on operatorgroups (don't attempt if there isn't an operatorgroup in the namespace)
1 parent 82f9151 commit 3682f0c

File tree

1 file changed

+98
-41
lines changed

1 file changed

+98
-41
lines changed

pkg/controller/operators/catalog/operator.go

Lines changed: 98 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,7 @@ func NewOperator(kubeconfigPath string, logger *logrus.Logger, wakeupInterval ti
211211
Lister: op.lister,
212212
}
213213

214+
// Namespace sync for resolving subscriptions
214215
namespaceInformer := informers.NewSharedInformerFactory(op.OpClient.KubernetesInterface(), wakeupInterval).Core().V1().Namespaces()
215216
resolvingNamespaceQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resolver")
216217
namespaceQueueInformer := queueinformer.NewInformer(
@@ -227,9 +228,25 @@ func NewOperator(kubeconfigPath string, logger *logrus.Logger, wakeupInterval ti
227228
op.lister.CoreV1().RegisterNamespaceLister(namespaceInformer.Lister())
228229
op.namespaceResolveQueue = resolvingNamespaceQueue
229230

231+
// Create an informer/lister for operatorgroups
232+
for _, namespace := range watchedNamespaces {
233+
sharedInformerFactory := externalversions.NewSharedInformerFactoryWithOptions(crClient, wakeupInterval, externalversions.WithNamespace(namespace))
234+
operatorGroupInformer := sharedInformerFactory.Operators().V1alpha2().OperatorGroups()
235+
op.lister.OperatorsV1alpha2().RegisterOperatorGroupLister(namespace, operatorGroupInformer.Lister())
236+
237+
// Register queue and QueueInformer
238+
queueName := fmt.Sprintf("%s/operatorgroups", namespace)
239+
operatorGroupQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), queueName)
240+
operatorGroupQueueInformer := queueinformer.NewInformer(operatorGroupQueue, operatorGroupInformer.Informer(), NoopHandler, nil, queueName, metrics.NewMetricsNil(), logger)
241+
op.RegisterQueueInformer(operatorGroupQueueInformer)
242+
}
230243
return op, nil
231244
}
232245

246+
func NoopHandler(obj interface{}) error {
247+
return nil
248+
}
249+
233250
func (o *Operator) syncObject(obj interface{}) (syncError error) {
234251
// Assert as runtime.Object
235252
runtimeObj, ok := obj.(runtime.Object)
@@ -475,7 +492,7 @@ func (o *Operator) syncCatalogSources(obj interface{}) (syncError error) {
475492
}
476493

477494
// Trigger a resolve, will pick up any subscriptions that depend on the catalog
478-
o.namespaceResolveQueue.AddRateLimited(out.GetNamespace())
495+
o.resolveNamespace(out.GetNamespace())
479496

480497
return nil
481498
}
@@ -516,6 +533,12 @@ func (o *Operator) syncResolvingNamespace(obj interface{}) error {
516533
"namespace": namespace,
517534
})
518535

536+
// ignore error, we just want to fail early if we know there's no operatorgroup
537+
ogs, _ := o.lister.OperatorsV1alpha2().OperatorGroupLister().OperatorGroups(namespace).List(labels.Everything())
538+
if len(ogs) == 0 {
539+
return fmt.Errorf("no operatorgroups in namespace")
540+
}
541+
519542
// get the set of sources that should be used for resolution and best-effort get their connections working
520543
logger.Debug("resolving sources")
521544
resolverSources := o.ensureResolverSources(logger, namespace)
@@ -530,13 +553,29 @@ func (o *Operator) syncResolvingNamespace(obj interface{}) error {
530553

531554
shouldUpdate := false
532555
for _, sub := range subs {
533-
if !o.nothingToUpdate(logger, sub) {
534-
shouldUpdate = true
535-
break
556+
logger := logger.WithFields(logrus.Fields{
557+
"sub": sub.GetName(),
558+
"source": sub.Spec.CatalogSource,
559+
"pkg": sub.Spec.Package,
560+
"channel": sub.Spec.Channel,
561+
})
562+
563+
// ensure the installplan reference is correct
564+
sub, err := o.ensureSubscriptionInstallPlanState(logger, sub)
565+
if err != nil {
566+
return err
536567
}
568+
569+
// record the current state of the desired corresponding CSV in the status. no-op if we don't know the csv yet.
570+
sub, err = o.ensureSubscriptionCSVState(logger, sub)
571+
if err != nil {
572+
return err
573+
}
574+
shouldUpdate = shouldUpdate || !o.nothingToUpdate(logger, sub)
537575
}
538576
if !shouldUpdate {
539577
logger.Debug("all subscriptions up to date")
578+
return nil
540579
}
541580

542581
logger.Debug("resolving subscriptions in namespace")
@@ -562,7 +601,7 @@ func (o *Operator) syncResolvingNamespace(obj interface{}) error {
562601
return err
563602
}
564603

565-
if err := o.ensureSubscriptionInstallPlanState(namespace, subs, installplanReference); err != nil {
604+
if err := o.updateSubscriptionSetInstallPlanState(namespace, subs, installplanReference); err != nil {
566605
logger.WithError(err).Debug("error ensuring subscription installplan state")
567606
return err
568607
}
@@ -576,30 +615,15 @@ func (o *Operator) syncSubscriptions(obj interface{}) error {
576615
return fmt.Errorf("casting Subscription failed")
577616
}
578617

579-
logger := o.Log.WithFields(logrus.Fields{
580-
"sub": sub.GetName(),
581-
"namespace": sub.GetNamespace(),
582-
"source": sub.Spec.CatalogSource,
583-
"pkg": sub.Spec.Package,
584-
"channel": sub.Spec.Channel,
585-
})
586-
587-
// record the current state of the desired corresponding CSV in the status. no-op if we don't know the csv yet.
588-
sub, err := o.ensureSubscriptionCSVState(logger, sub)
589-
if err != nil {
590-
return err
591-
}
592-
593-
// return early if the subscription is up to date
594-
if o.nothingToUpdate(logger, sub) {
595-
return nil
596-
}
597-
598-
o.namespaceResolveQueue.AddRateLimited(sub.GetNamespace())
618+
o.resolveNamespace(sub.GetNamespace())
599619

600620
return nil
601621
}
602622

623+
func (o *Operator) resolveNamespace(namespace string) {
624+
o.namespaceResolveQueue.AddRateLimited(namespace)
625+
}
626+
603627
func (o *Operator) ensureResolverSources(logger *logrus.Entry, namespace string) map[resolver.CatalogKey]registryclient.Interface {
604628
// TODO: record connection status onto an object
605629
resolverSources := make(map[resolver.CatalogKey]registryclient.Interface, 0)
@@ -659,6 +683,44 @@ func (o *Operator) nothingToUpdate(logger *logrus.Entry, sub *v1alpha1.Subscript
659683
return false
660684
}
661685

686+
func (o *Operator) ensureSubscriptionInstallPlanState(logger *logrus.Entry, sub *v1alpha1.Subscription) (*v1alpha1.Subscription, error) {
687+
if sub.Status.Install != nil {
688+
return sub, nil
689+
}
690+
691+
logger.Debug("checking for existing installplan")
692+
693+
// check if there's an installplan that created this subscription (only if it doesn't have a reference yet)
694+
// this indicates it was newly resolved by another operator, and we should reference that installplan in the status
695+
ips, err := o.client.OperatorsV1alpha1().InstallPlans(sub.GetNamespace()).List(metav1.ListOptions{})
696+
if err != nil {
697+
logger.WithError(err).Debug("couldn't get installplans")
698+
// if we can't list, just continue processing
699+
return sub, nil
700+
}
701+
702+
out := sub.DeepCopy()
703+
704+
for _, ip := range ips.Items {
705+
for _, step := range ip.Status.Plan {
706+
// TODO: is this enough? should we check equality of pkg/channel?
707+
if step != nil && step.Resource.Kind == v1alpha1.SubscriptionKind && step.Resource.Name == sub.GetName() {
708+
logger.WithField("installplan", ip.GetName()).Debug("found subscription in steps of existing installplan")
709+
out.Status.Install = o.referenceForInstallPlan(&ip)
710+
out.Status.State = v1alpha1.SubscriptionStateUpgradePending
711+
if updated, err := o.client.OperatorsV1alpha1().Subscriptions(sub.GetNamespace()).UpdateStatus(out); err != nil {
712+
return nil, err
713+
} else {
714+
return updated, nil
715+
}
716+
}
717+
}
718+
}
719+
logger.Debug("did not find subscription in steps of existing installplan")
720+
721+
return sub, nil
722+
}
723+
662724
func (o *Operator) ensureSubscriptionCSVState(logger *logrus.Entry, sub *v1alpha1.Subscription) (*v1alpha1.Subscription, error) {
663725
if sub.Status.CurrentCSV == "" {
664726
return sub, nil
@@ -690,7 +752,7 @@ func (o *Operator) ensureSubscriptionCSVState(logger *logrus.Entry, sub *v1alpha
690752
return sub, nil
691753
}
692754

693-
func (o *Operator) ensureSubscriptionInstallPlanState(namespace string, subs []*v1alpha1.Subscription, installPlanRef *v1alpha1.InstallPlanReference) error {
755+
func (o *Operator) updateSubscriptionSetInstallPlanState(namespace string, subs []*v1alpha1.Subscription, installPlanRef *v1alpha1.InstallPlanReference) error {
694756
// TODO: parallel, sync waitgroup
695757
for _, sub := range subs {
696758
sub.Status.Install = installPlanRef
@@ -752,13 +814,17 @@ func (o *Operator) createInstallPlan(namespace string, subs []*v1alpha1.Subscrip
752814
if err != nil {
753815
return nil, err
754816
}
817+
return o.referenceForInstallPlan(res), nil
818+
819+
}
820+
821+
func (o *Operator) referenceForInstallPlan(ip *v1alpha1.InstallPlan) *v1alpha1.InstallPlanReference {
755822
return &v1alpha1.InstallPlanReference{
756-
UID: res.GetUID(),
757-
Name: res.GetName(),
823+
UID: ip.GetUID(),
824+
Name: ip.GetName(),
758825
APIVersion: v1alpha1.SchemeGroupVersion.String(),
759826
Kind: v1alpha1.InstallPlanKind,
760-
}, nil
761-
827+
}
762828
}
763829

764830
func (o *Operator) requeueSubscription(name, namespace string) {
@@ -802,7 +868,7 @@ func (o *Operator) syncInstallPlans(obj interface{}) (syncError error) {
802868
// notify subscription loop of installplan changes
803869
if ownerutil.IsOwnedByKind(outInstallPlan, v1alpha1.SubscriptionKind) {
804870
oref := ownerutil.GetOwnerByKind(outInstallPlan, v1alpha1.SubscriptionKind)
805-
logger.Info("requeuing installplan owning subscription")
871+
logger.WithField("owner", oref).Debug("requeueing installplan owner")
806872
o.requeueSubscription(oref.Name, outInstallPlan.GetNamespace())
807873
}
808874

@@ -953,7 +1019,7 @@ func (o *Operator) ExecutePlan(plan *v1alpha1.InstallPlan) error {
9531019

9541020
// Attempt to create the Subscription
9551021
sub.SetNamespace(namespace)
956-
created, err := o.client.OperatorsV1alpha1().Subscriptions(sub.GetNamespace()).Create(&sub)
1022+
_, err = o.client.OperatorsV1alpha1().Subscriptions(sub.GetNamespace()).Create(&sub)
9571023
if k8serrors.IsAlreadyExists(err) {
9581024
// If it already existed, mark the step as Present.
9591025
plan.Status.Plan[i].Status = v1alpha1.StepStatusPresent
@@ -962,15 +1028,6 @@ func (o *Operator) ExecutePlan(plan *v1alpha1.InstallPlan) error {
9621028
} else {
9631029
// If no error occurred, mark the step as Created.
9641030
plan.Status.Plan[i].Status = v1alpha1.StepStatusCreated
965-
created.Status.Install = &v1alpha1.InstallPlanReference{
966-
UID: plan.GetUID(),
967-
Name: plan.GetName(),
968-
APIVersion: v1alpha1.SchemeGroupVersion.String(),
969-
Kind: v1alpha1.InstallPlanKind,
970-
}
971-
if _, err := o.client.OperatorsV1alpha1().Subscriptions(sub.GetNamespace()).UpdateStatus(created); err != nil {
972-
o.Log.WithError(err).Warn("couldn't set installplan reference on created subscription")
973-
}
9741031
}
9751032
case secretKind:
9761033
// TODO: this will confuse bundle users that include secrets in their bundles - this only handles pull secrets

0 commit comments

Comments
 (0)