Skip to content

Commit 2e028b2

Browse files
committed
fix(olm): race when a subscription updates many versions
when a subscription creates multiple csvs to update through, depending on which ones hit our cache first, it was possible to upgrade from an intermediate before we had upgraded to it e.g. A -> B -> C we would see the B -> C replacement and do it first. This would leave A and C with no replacement chain, and cause C to go into an OwnerConflict state. This is fixed by only upgrading after an operator has hit Succeeded.
1 parent 4b339b8 commit 2e028b2

File tree

5 files changed

+368
-14
lines changed

5 files changed

+368
-14
lines changed

pkg/controller/operators/olm/operator.go

Lines changed: 93 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -773,12 +773,6 @@ func (a *Operator) transitionCSVState(in v1alpha1.ClusterServiceVersion) (out *v
773773
*out = *updated
774774
}
775775

776-
// Check if the current CSV is being replaced, return with replacing status if so
777-
if err := a.checkReplacementsAndUpdateStatus(out); err != nil {
778-
logger.WithError(err).Info("replacement check")
779-
return
780-
}
781-
782776
// Verify CSV operatorgroup (and update annotations if needed)
783777
operatorGroup, err := a.operatorGroupForCSV(out, logger)
784778
if operatorGroup == nil {
@@ -889,6 +883,15 @@ func (a *Operator) transitionCSVState(in v1alpha1.ClusterServiceVersion) (out *v
889883
}
890884
out.SetRequirementStatus(statuses)
891885

886+
// Check if we need to requeue the previous
887+
if prev := a.isReplacing(out); prev != nil {
888+
if prev.Status.Phase == v1alpha1.CSVPhaseSucceeded {
889+
if err := a.csvQueueSet.Requeue(prev.GetName(), prev.GetNamespace()); err != nil {
890+
a.Log.WithError(err).Warn("error requeueing previous")
891+
}
892+
}
893+
}
894+
892895
if !met {
893896
logger.Info("requirements were not met")
894897
out.SetPhaseWithEventIfChanged(v1alpha1.CSVPhasePending, v1alpha1.CSVReasonRequirementsNotMet, "one or more requirements couldn't be found", now, a.recorder)
@@ -912,6 +915,13 @@ func (a *Operator) transitionCSVState(in v1alpha1.ClusterServiceVersion) (out *v
912915
return
913916
}
914917

918+
// Check if we're not ready to install part of the replacement chain yet
919+
if prev := a.isReplacing(out); prev != nil {
920+
if prev.Status.Phase != v1alpha1.CSVPhaseReplacing {
921+
return
922+
}
923+
}
924+
915925
logger.Info("scheduling ClusterServiceVersion for install")
916926
out.SetPhaseWithEvent(v1alpha1.CSVPhaseInstallReady, v1alpha1.CSVReasonRequirementsMet, "all requirements found, attempting install", now, a.recorder)
917927
case v1alpha1.CSVPhaseInstallReady:
@@ -955,6 +965,12 @@ func (a *Operator) transitionCSVState(in v1alpha1.ClusterServiceVersion) (out *v
955965
}
956966

957967
case v1alpha1.CSVPhaseSucceeded:
968+
// Check if the current CSV is being replaced, return with replacing status if so
969+
if err := a.checkReplacementsAndUpdateStatus(out); err != nil {
970+
logger.WithError(err).Info("replacement check")
971+
return
972+
}
973+
958974
installer, strategy, _ := a.parseStrategiesAndUpdateStatus(out)
959975
if strategy == nil {
960976
return
@@ -1073,6 +1089,14 @@ func (a *Operator) transitionCSVState(in v1alpha1.ClusterServiceVersion) (out *v
10731089
return
10741090
}
10751091

1092+
// If we are a leaf, we should requeue the replacement for processing
1093+
if next := a.isBeingReplaced(out, a.csvSet(out.GetNamespace(), v1alpha1.CSVPhaseAny)); next != nil {
1094+
err := a.csvQueueSet.Requeue(next.GetName(), next.GetNamespace())
1095+
if err != nil {
1096+
a.Log.WithError(err).Warn("error requeuing replacement")
1097+
}
1098+
}
1099+
10761100
// If we can find a newer version that's successfully installed, we're safe to mark all intermediates
10771101
for _, csv := range a.findIntermediatesForDeletion(out) {
10781102
// we only mark them in this step, in case some get deleted but others fail and break the replacement chain
@@ -1093,6 +1117,10 @@ func (a *Operator) transitionCSVState(in v1alpha1.ClusterServiceVersion) (out *v
10931117
}
10941118
case v1alpha1.CSVPhaseDeleting:
10951119
var immediate int64 = 0
1120+
1121+
if err := a.csvQueueSet.Remove(out.GetNamespace(), out.GetName()); err != nil {
1122+
logger.WithError(err).Debug("error removing from queue")
1123+
}
10961124
syncError = a.client.OperatorsV1alpha1().ClusterServiceVersions(out.GetNamespace()).Delete(out.GetName(), &metav1.DeleteOptions{GracePeriodSeconds: &immediate})
10971125
if syncError != nil {
10981126
logger.Debugf("unable to get delete csv marked for deletion: %s", syncError.Error())
@@ -1158,7 +1186,7 @@ func (a *Operator) checkReplacementsAndUpdateStatus(csv *v1alpha1.ClusterService
11581186
}
11591187
if replacement := a.isBeingReplaced(csv, a.csvSet(csv.GetNamespace(), v1alpha1.CSVPhaseAny)); replacement != nil {
11601188
a.Log.Infof("newer ClusterServiceVersion replacing %s, no-op", csv.SelfLink)
1161-
msg := fmt.Sprintf("being replaced by csv: %s", replacement.SelfLink)
1189+
msg := fmt.Sprintf("being replaced by csv: %s", replacement.GetName())
11621190
csv.SetPhaseWithEvent(v1alpha1.CSVPhaseReplacing, v1alpha1.CSVReasonBeingReplaced, msg, timeNow(), a.recorder)
11631191
metrics.CSVUpgradeCount.Inc()
11641192

@@ -1237,10 +1265,15 @@ func (a *Operator) parseStrategiesAndUpdateStatus(csv *v1alpha1.ClusterServiceVe
12371265
return installer, strategy, previousStrategy
12381266
}
12391267

1240-
func (a *Operator) crdOwnerConflicts(in *v1alpha1.ClusterServiceVersion, csvs map[string]*v1alpha1.ClusterServiceVersion) error {
1241-
for _, crd := range in.Spec.CustomResourceDefinitions.Owned {
1242-
for name, csv := range csvs {
1243-
if name != in.GetName() && in.Spec.Replaces != name && csv.OwnsCRD(crd.Name) {
1268+
func (a *Operator) crdOwnerConflicts(in *v1alpha1.ClusterServiceVersion, csvsInNamespace map[string]*v1alpha1.ClusterServiceVersion) error {
1269+
csvsInChain := a.getReplacementChain(in, csvsInNamespace)
1270+
// find csvs in the namespace that are not part of the replacement chain
1271+
for name, csv := range csvsInNamespace {
1272+
if _, ok := csvsInChain[name]; ok {
1273+
continue
1274+
}
1275+
for _, crd := range in.Spec.CustomResourceDefinitions.Owned {
1276+
if name != in.GetName() && csv.OwnsCRD(crd.Name) {
12441277
return ErrCRDOwnerConflict
12451278
}
12461279
}
@@ -1249,6 +1282,53 @@ func (a *Operator) crdOwnerConflicts(in *v1alpha1.ClusterServiceVersion, csvs ma
12491282
return nil
12501283
}
12511284

1285+
func (a *Operator) getReplacementChain(in *v1alpha1.ClusterServiceVersion, csvsInNamespace map[string]*v1alpha1.ClusterServiceVersion) map[string]struct{} {
1286+
current := in.GetName()
1287+
csvsInChain := map[string]struct{}{
1288+
current: {},
1289+
}
1290+
1291+
replacement := func(csvName string) *string {
1292+
for _, csv := range csvsInNamespace {
1293+
if csv.Spec.Replaces == csvName {
1294+
name := csv.GetName()
1295+
return &name
1296+
}
1297+
}
1298+
return nil
1299+
}
1300+
1301+
replaces := func(replaces string) *string {
1302+
for _, csv := range csvsInNamespace {
1303+
name := csv.GetName()
1304+
if name == replaces {
1305+
rep := csv.Spec.Replaces
1306+
return &rep
1307+
}
1308+
}
1309+
return nil
1310+
}
1311+
1312+
next := replacement(current)
1313+
for next != nil {
1314+
csvsInChain[*next] = struct{}{}
1315+
current = *next
1316+
next = replacement(current)
1317+
}
1318+
1319+
current = in.Spec.Replaces
1320+
prev := replaces(current)
1321+
if prev != nil {
1322+
csvsInChain[current] = struct{}{}
1323+
}
1324+
for prev != nil && *prev != "" {
1325+
current = *prev
1326+
csvsInChain[current] = struct{}{}
1327+
prev = replaces(current)
1328+
}
1329+
return csvsInChain
1330+
}
1331+
12521332
func (a *Operator) apiServiceOwnerConflicts(csv *v1alpha1.ClusterServiceVersion) error {
12531333
// Get replacing CSV if exists
12541334
replacing, err := a.lister.OperatorsV1alpha1().ClusterServiceVersionLister().ClusterServiceVersions(csv.GetNamespace()).Get(csv.Spec.Replaces)
@@ -1297,6 +1377,8 @@ func (a *Operator) isReplacing(in *v1alpha1.ClusterServiceVersion) *v1alpha1.Clu
12971377
if in.Spec.Replaces == "" {
12981378
return nil
12991379
}
1380+
1381+
// using the client instead of a lister; missing an object because of a cache sync can cause upgrades to fail
13001382
previous, err := a.lister.OperatorsV1alpha1().ClusterServiceVersionLister().ClusterServiceVersions(in.GetNamespace()).Get(in.Spec.Replaces)
13011383
if err != nil {
13021384
a.Log.WithField("replacing", in.Spec.Replaces).WithError(err).Debugf("unable to get previous csv")

0 commit comments

Comments
 (0)