@@ -607,11 +607,6 @@ func (a *Operator) syncCopyCSV(obj interface{}) (syncError error) {
607
607
return
608
608
}
609
609
610
- if len (operatorGroup .Status .Namespaces ) == 1 && operatorGroup .Status .Namespaces [0 ] == operatorGroup .GetNamespace () {
611
- logger .Debug ("skipping copy for OwnNamespace operatorgroup" )
612
- return
613
- }
614
-
615
610
logger .WithFields (logrus.Fields {
616
611
"targetNamespaces" : strings .Join (operatorGroup .Status .Namespaces , "," ),
617
612
}).Debug ("copying csv to targets" )
@@ -773,12 +768,6 @@ func (a *Operator) transitionCSVState(in v1alpha1.ClusterServiceVersion) (out *v
773
768
* out = * updated
774
769
}
775
770
776
- // Check if the current CSV is being replaced, return with replacing status if so
777
- if err := a .checkReplacementsAndUpdateStatus (out ); err != nil {
778
- logger .WithError (err ).Info ("replacement check" )
779
- return
780
- }
781
-
782
771
// Verify CSV operatorgroup (and update annotations if needed)
783
772
operatorGroup , err := a .operatorGroupForCSV (out , logger )
784
773
if operatorGroup == nil {
@@ -889,6 +878,15 @@ func (a *Operator) transitionCSVState(in v1alpha1.ClusterServiceVersion) (out *v
889
878
}
890
879
out .SetRequirementStatus (statuses )
891
880
881
+ // Check if we need to requeue the previous
882
+ if prev := a .isReplacing (out ); prev != nil {
883
+ if prev .Status .Phase == v1alpha1 .CSVPhaseSucceeded {
884
+ if err := a .csvQueueSet .Requeue (prev .GetName (), prev .GetNamespace ()); err != nil {
885
+ a .Log .WithError (err ).Warn ("error requeueing previous" )
886
+ }
887
+ }
888
+ }
889
+
892
890
if ! met {
893
891
logger .Info ("requirements were not met" )
894
892
out .SetPhaseWithEventIfChanged (v1alpha1 .CSVPhasePending , v1alpha1 .CSVReasonRequirementsNotMet , "one or more requirements couldn't be found" , now , a .recorder )
@@ -912,6 +910,13 @@ func (a *Operator) transitionCSVState(in v1alpha1.ClusterServiceVersion) (out *v
912
910
return
913
911
}
914
912
913
+ // Check if we're not ready to install part of the replacement chain yet
914
+ if prev := a .isReplacing (out ); prev != nil {
915
+ if prev .Status .Phase != v1alpha1 .CSVPhaseReplacing {
916
+ return
917
+ }
918
+ }
919
+
915
920
logger .Info ("scheduling ClusterServiceVersion for install" )
916
921
out .SetPhaseWithEvent (v1alpha1 .CSVPhaseInstallReady , v1alpha1 .CSVReasonRequirementsMet , "all requirements found, attempting install" , now , a .recorder )
917
922
case v1alpha1 .CSVPhaseInstallReady :
@@ -955,6 +960,12 @@ func (a *Operator) transitionCSVState(in v1alpha1.ClusterServiceVersion) (out *v
955
960
}
956
961
957
962
case v1alpha1 .CSVPhaseSucceeded :
963
+ // Check if the current CSV is being replaced, return with replacing status if so
964
+ if err := a .checkReplacementsAndUpdateStatus (out ); err != nil {
965
+ logger .WithError (err ).Info ("replacement check" )
966
+ return
967
+ }
968
+
958
969
installer , strategy , _ := a .parseStrategiesAndUpdateStatus (out )
959
970
if strategy == nil {
960
971
return
@@ -1073,6 +1084,14 @@ func (a *Operator) transitionCSVState(in v1alpha1.ClusterServiceVersion) (out *v
1073
1084
return
1074
1085
}
1075
1086
1087
+ // If we are a leaf, we should requeue the replacement for processing
1088
+ if next := a .isBeingReplaced (out , a .csvSet (out .GetNamespace (), v1alpha1 .CSVPhaseAny )); next != nil {
1089
+ err := a .csvQueueSet .Requeue (next .GetName (), next .GetNamespace ())
1090
+ if err != nil {
1091
+ a .Log .WithError (err ).Warn ("error requeuing replacement" )
1092
+ }
1093
+ }
1094
+
1076
1095
// If we can find a newer version that's successfully installed, we're safe to mark all intermediates
1077
1096
for _ , csv := range a .findIntermediatesForDeletion (out ) {
1078
1097
// we only mark them in this step, in case some get deleted but others fail and break the replacement chain
@@ -1093,6 +1112,10 @@ func (a *Operator) transitionCSVState(in v1alpha1.ClusterServiceVersion) (out *v
1093
1112
}
1094
1113
case v1alpha1 .CSVPhaseDeleting :
1095
1114
var immediate int64 = 0
1115
+
1116
+ if err := a .csvQueueSet .Remove (out .GetNamespace (), out .GetName ()); err != nil {
1117
+ logger .WithError (err ).Debug ("error removing from queue" )
1118
+ }
1096
1119
syncError = a .client .OperatorsV1alpha1 ().ClusterServiceVersions (out .GetNamespace ()).Delete (out .GetName (), & metav1.DeleteOptions {GracePeriodSeconds : & immediate })
1097
1120
if syncError != nil {
1098
1121
logger .Debugf ("unable to get delete csv marked for deletion: %s" , syncError .Error ())
@@ -1158,7 +1181,7 @@ func (a *Operator) checkReplacementsAndUpdateStatus(csv *v1alpha1.ClusterService
1158
1181
}
1159
1182
if replacement := a .isBeingReplaced (csv , a .csvSet (csv .GetNamespace (), v1alpha1 .CSVPhaseAny )); replacement != nil {
1160
1183
a .Log .Infof ("newer ClusterServiceVersion replacing %s, no-op" , csv .SelfLink )
1161
- msg := fmt .Sprintf ("being replaced by csv: %s" , replacement .SelfLink )
1184
+ msg := fmt .Sprintf ("being replaced by csv: %s" , replacement .GetName () )
1162
1185
csv .SetPhaseWithEvent (v1alpha1 .CSVPhaseReplacing , v1alpha1 .CSVReasonBeingReplaced , msg , timeNow (), a .recorder )
1163
1186
metrics .CSVUpgradeCount .Inc ()
1164
1187
@@ -1237,10 +1260,15 @@ func (a *Operator) parseStrategiesAndUpdateStatus(csv *v1alpha1.ClusterServiceVe
1237
1260
return installer , strategy , previousStrategy
1238
1261
}
1239
1262
1240
- func (a * Operator ) crdOwnerConflicts (in * v1alpha1.ClusterServiceVersion , csvs map [string ]* v1alpha1.ClusterServiceVersion ) error {
1241
- for _ , crd := range in .Spec .CustomResourceDefinitions .Owned {
1242
- for name , csv := range csvs {
1243
- if name != in .GetName () && in .Spec .Replaces != name && csv .OwnsCRD (crd .Name ) {
1263
+ func (a * Operator ) crdOwnerConflicts (in * v1alpha1.ClusterServiceVersion , csvsInNamespace map [string ]* v1alpha1.ClusterServiceVersion ) error {
1264
+ csvsInChain := a .getReplacementChain (in , csvsInNamespace )
1265
+ // find csvs in the namespace that are not part of the replacement chain
1266
+ for name , csv := range csvsInNamespace {
1267
+ if _ , ok := csvsInChain [name ]; ok {
1268
+ continue
1269
+ }
1270
+ for _ , crd := range in .Spec .CustomResourceDefinitions .Owned {
1271
+ if name != in .GetName () && csv .OwnsCRD (crd .Name ) {
1244
1272
return ErrCRDOwnerConflict
1245
1273
}
1246
1274
}
@@ -1249,6 +1277,53 @@ func (a *Operator) crdOwnerConflicts(in *v1alpha1.ClusterServiceVersion, csvs ma
1249
1277
return nil
1250
1278
}
1251
1279
1280
+ func (a * Operator ) getReplacementChain (in * v1alpha1.ClusterServiceVersion , csvsInNamespace map [string ]* v1alpha1.ClusterServiceVersion ) map [string ]struct {} {
1281
+ current := in .GetName ()
1282
+ csvsInChain := map [string ]struct {}{
1283
+ current : {},
1284
+ }
1285
+
1286
+ replacement := func (csvName string ) * string {
1287
+ for _ , csv := range csvsInNamespace {
1288
+ if csv .Spec .Replaces == csvName {
1289
+ name := csv .GetName ()
1290
+ return & name
1291
+ }
1292
+ }
1293
+ return nil
1294
+ }
1295
+
1296
+ replaces := func (replaces string ) * string {
1297
+ for _ , csv := range csvsInNamespace {
1298
+ name := csv .GetName ()
1299
+ if name == replaces {
1300
+ rep := csv .Spec .Replaces
1301
+ return & rep
1302
+ }
1303
+ }
1304
+ return nil
1305
+ }
1306
+
1307
+ next := replacement (current )
1308
+ for next != nil {
1309
+ csvsInChain [* next ] = struct {}{}
1310
+ current = * next
1311
+ next = replacement (current )
1312
+ }
1313
+
1314
+ current = in .Spec .Replaces
1315
+ prev := replaces (current )
1316
+ if prev != nil {
1317
+ csvsInChain [current ] = struct {}{}
1318
+ }
1319
+ for prev != nil && * prev != "" {
1320
+ current = * prev
1321
+ csvsInChain [current ] = struct {}{}
1322
+ prev = replaces (current )
1323
+ }
1324
+ return csvsInChain
1325
+ }
1326
+
1252
1327
func (a * Operator ) apiServiceOwnerConflicts (csv * v1alpha1.ClusterServiceVersion ) error {
1253
1328
// Get replacing CSV if exists
1254
1329
replacing , err := a .lister .OperatorsV1alpha1 ().ClusterServiceVersionLister ().ClusterServiceVersions (csv .GetNamespace ()).Get (csv .Spec .Replaces )
@@ -1297,7 +1372,9 @@ func (a *Operator) isReplacing(in *v1alpha1.ClusterServiceVersion) *v1alpha1.Clu
1297
1372
if in .Spec .Replaces == "" {
1298
1373
return nil
1299
1374
}
1300
- previous , err := a .lister .OperatorsV1alpha1 ().ClusterServiceVersionLister ().ClusterServiceVersions (in .GetNamespace ()).Get (in .Spec .Replaces )
1375
+
1376
+ // using the client instead of a lister; missing an object because of a cache sync can cause upgrades to fail
1377
+ previous , err := a .client .OperatorsV1alpha1 ().ClusterServiceVersions (in .GetNamespace ()).Get (in .Spec .Replaces , metav1.GetOptions {})
1301
1378
if err != nil {
1302
1379
a .Log .WithField ("replacing" , in .Spec .Replaces ).WithError (err ).Debugf ("unable to get previous csv" )
1303
1380
return nil
0 commit comments