Skip to content

Commit f62351b

Browse files
authored
OCPBUGS-35228: Fix desired before sync_worker's work is initialized (#1081)
* OCPBUGS-35228: Fix desired before sync_worker's work is initialized If the CVO pod gets restarted after a user sends a cluster upgrade request with `--to-image`, the desired should not ALWAYS be taken from `.Spec.DesiredUpdate`, e.g., when the targeting payload is blocked on some precondition check failure. The user request will be written to `cv.spec.desiredUpdate` where only the `image` field is set, i.e., the `Version` field is empty. If the CVO gets restarted, then 1. The desied [1] from `cv.spec.desiredUpdate` will be used to set `w.status.Actual` [2] and then applied to `cv.status.desired`. 2. When the next `optr.sync()` happens, the precondition check may become non-blocking [3] as the `Version` field is empty now. 3. CVO will then start an upgrade that should have been blocked. [1]. https://github.com/openshift/cluster-version-operator/blob/1995380b6d755c29b926b846a64ca0039002c2cf/pkg/cvo/cvo.go#L680 [2]. https://github.com/openshift/cluster-version-operator/blob/1995380b6d755c29b926b846a64ca0039002c2cf/pkg/cvo/sync_worker.go#L502 [3]. https://github.com/openshift/cluster-version-operator/blob/1995380b6d755c29b926b846a64ca0039002c2cf/pkg/payload/precondition/clusterversion/rollback.go#L57 * Fix unit tests * Add test to cover the case of stillInitializing == true * Better naming of variables and functions * Improve some wording and simplify unit tests
1 parent 830f2ed commit f62351b

File tree

4 files changed

+141
-7
lines changed

4 files changed

+141
-7
lines changed

pkg/cvo/cvo.go

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -677,16 +677,25 @@ func (optr *Operator) sync(ctx context.Context, key string) error {
677677
config := validation.ClearInvalidFields(original, errs)
678678

679679
// identify the desired next version
680-
desired, ok := findUpdateFromConfig(config, optr.getArchitecture())
681-
if ok {
682-
klog.V(2).Infof("Desired version from spec is %#v", desired)
680+
desired, found := findUpdateFromConfig(config, optr.getArchitecture())
681+
initialized := optr.configSync.Initialized()
682+
if found && initialized {
683+
klog.V(2).Infof("Desired version from spec is %#v after initialization", desired)
683684
} else {
685+
pendingDesired := desired
684686
currentVersion := optr.currentVersion()
685687
desired = configv1.Update{
686688
Version: currentVersion.Version,
687689
Image: currentVersion.Image,
688690
}
689-
klog.V(2).Infof("Desired version from operator is %#v", desired)
691+
if !initialized {
692+
klog.V(2).Infof("Desired version from operator is %#v with user's request to go to %#v. "+
693+
"We are currently initializing the worker and will evaluate the request later", desired, pendingDesired)
694+
// enqueue to trigger a reconciliation on ClusterVersion
695+
optr.queue.Add(optr.queueKey())
696+
} else {
697+
klog.V(2).Infof("Desired version from operator is %#v", desired)
698+
}
690699
}
691700

692701
// handle the case of a misconfigured CVO by doing nothing

pkg/cvo/cvo_scenarios_test.go

Lines changed: 109 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1402,6 +1402,7 @@ func TestCVO_UpgradeUnverifiedPayload(t *testing.T) {
14021402
t.Fatal("not the correct error type")
14031403
}
14041404
worker := o.configSync.(*SyncWorker)
1405+
worker.initializedFunc = func() bool { return true }
14051406
retriever := worker.retriever.(*fakeDirectoryRetriever)
14061407
retriever.Set(PayloadInfo{}, payloadErr)
14071408

@@ -1657,7 +1658,7 @@ func TestCVO_ResetPayloadLoadStatus(t *testing.T) {
16571658
t.Fatal("not the correct error type")
16581659
}
16591660
worker := o.configSync.(*SyncWorker)
1660-
1661+
worker.initializedFunc = func() bool { return true }
16611662
// checked by SyncWorker.syncPayload
16621663
worker.payload = &payload.Update{Release: o.release}
16631664

@@ -1908,6 +1909,7 @@ func TestCVO_UpgradeFailedPayloadLoadWithCapsChanges(t *testing.T) {
19081909
t.Fatal("not the correct error type")
19091910
}
19101911
worker := o.configSync.(*SyncWorker)
1912+
worker.initializedFunc = func() bool { return true }
19111913
retriever := worker.retriever.(*fakeDirectoryRetriever)
19121914
retriever.Set(PayloadInfo{}, payloadErr)
19131915

@@ -2026,6 +2028,7 @@ func TestCVO_InitImplicitlyEnabledCaps(t *testing.T) {
20262028

20272029
defer shutdownFn()
20282030
worker := o.configSync.(*SyncWorker)
2031+
worker.initializedFunc = func() bool { return true }
20292032

20302033
go worker.Start(ctx, 1)
20312034

@@ -2192,6 +2195,7 @@ func TestCVO_UpgradeUnverifiedPayloadRetrieveOnce(t *testing.T) {
21922195
t.Fatal("not the correct error type")
21932196
}
21942197
worker := o.configSync.(*SyncWorker)
2198+
worker.initializedFunc = func() bool { return true }
21952199
retriever := worker.retriever.(*fakeDirectoryRetriever)
21962200
retriever.Set(PayloadInfo{}, payloadErr)
21972201

@@ -2480,6 +2484,7 @@ func TestCVO_UpgradePreconditionFailing(t *testing.T) {
24802484
defer shutdownFn()
24812485

24822486
worker := o.configSync.(*SyncWorker)
2487+
worker.initializedFunc = func() bool { return true }
24832488
worker.preconditions = []precondition.Precondition{&testPrecondition{SuccessAfter: 3}}
24842489

24852490
go worker.Start(ctx, 1)
@@ -2754,6 +2759,7 @@ func TestCVO_UpgradePreconditionFailingAcceptedRisks(t *testing.T) {
27542759
defer shutdownFn()
27552760

27562761
worker := o.configSync.(*SyncWorker)
2762+
worker.initializedFunc = func() bool { return true }
27572763
worker.preconditions = []precondition.Precondition{&testPreconditionAlwaysFail{PreConditionName: "PreCondition1"}, &testPreconditionAlwaysFail{PreConditionName: "PreCondition2"}}
27582764

27592765
go worker.Start(ctx, 1)
@@ -2856,6 +2862,107 @@ func TestCVO_UpgradePreconditionFailingAcceptedRisks(t *testing.T) {
28562862
})
28572863
}
28582864

2865+
func TestCVO_UpgradePayloadStillInitializing(t *testing.T) {
2866+
o, cvs, client, _, shutdownFn := setupCVOTest("testdata/payloadtest")
2867+
2868+
// Setup: an upgrade request from user to a new image and the operator at the same image as before
2869+
//
2870+
o.release.Image = "image/image:0"
2871+
o.release.Version = "1.0.0-abc"
2872+
desired := configv1.Release{Version: "1.0.1-abc", Image: "image/image:1"}
2873+
uid, _ := uuid.NewRandom()
2874+
clusterUID := configv1.ClusterID(uid.String())
2875+
cvs["version"] = &configv1.ClusterVersion{
2876+
ObjectMeta: metav1.ObjectMeta{
2877+
Name: "version",
2878+
ResourceVersion: "1",
2879+
Generation: 1,
2880+
},
2881+
Spec: configv1.ClusterVersionSpec{
2882+
ClusterID: clusterUID,
2883+
Channel: "fast",
2884+
DesiredUpdate: &configv1.Update{Version: desired.Version, Image: desired.Image},
2885+
},
2886+
Status: configv1.ClusterVersionStatus{
2887+
// Prefers the image version over the operator's version (although in general they will remain in sync)
2888+
Desired: desired,
2889+
VersionHash: "DL-FFQ2Uem8=",
2890+
History: []configv1.UpdateHistory{
2891+
{State: configv1.CompletedUpdate, Image: "image/image:0", Version: "1.0.0-abc", Verified: true, StartedTime: defaultStartedTime, CompletionTime: &defaultCompletionTime},
2892+
},
2893+
Conditions: []configv1.ClusterOperatorStatusCondition{
2894+
{Type: ImplicitlyEnabledCapabilities, Status: "False", Reason: "AsExpected", Message: "Capabilities match configured spec"},
2895+
{Type: configv1.OperatorAvailable, Status: configv1.ConditionTrue, Message: "Done applying 1.0.0-abc"},
2896+
{Type: ClusterStatusFailing, Status: configv1.ConditionFalse},
2897+
{Type: configv1.OperatorProgressing, Status: configv1.ConditionFalse, Message: "Cluster version is 1.0.0-abc"},
2898+
{Type: configv1.RetrievedUpdates, Status: configv1.ConditionFalse},
2899+
},
2900+
},
2901+
}
2902+
2903+
ctx, cancel := context.WithCancel(context.Background())
2904+
defer cancel()
2905+
2906+
defer shutdownFn()
2907+
2908+
worker := o.configSync.(*SyncWorker)
2909+
retriever := worker.retriever.(*fakeDirectoryRetriever)
2910+
retriever.Set(PayloadInfo{Directory: "testdata/payloadtest", Verified: true}, nil)
2911+
2912+
go worker.Start(ctx, 1)
2913+
2914+
// Step 1: Simulate a payload being retrieved while the sync worker is not initialized
2915+
// and ensure the desired version from the operator is taken from the operator and a reconciliation is enqueued
2916+
client.ClearActions()
2917+
err := o.sync(ctx, o.queueKey())
2918+
if err != nil {
2919+
t.Fatal(err)
2920+
}
2921+
actions := client.Actions()
2922+
if len(actions) != 2 {
2923+
t.Fatalf("%s", spew.Sdump(actions))
2924+
}
2925+
expectGet(t, actions[0], "clusterversions", "", "version")
2926+
expectUpdateStatus(t, actions[1], "clusterversions", "", &configv1.ClusterVersion{
2927+
ObjectMeta: metav1.ObjectMeta{
2928+
Name: "version",
2929+
ResourceVersion: "1",
2930+
Generation: 1,
2931+
},
2932+
Spec: configv1.ClusterVersionSpec{
2933+
ClusterID: clusterUID,
2934+
Channel: "fast",
2935+
DesiredUpdate: &configv1.Update{Version: desired.Version, Image: desired.Image},
2936+
},
2937+
Status: configv1.ClusterVersionStatus{
2938+
ObservedGeneration: 1,
2939+
// Prefers the operator's version
2940+
Desired: configv1.Release{Version: o.release.Version, Image: o.release.Image},
2941+
VersionHash: "DL-FFQ2Uem8=",
2942+
History: []configv1.UpdateHistory{
2943+
{State: configv1.CompletedUpdate, Image: "image/image:0", Version: "1.0.0-abc", Verified: true, StartedTime: defaultStartedTime, CompletionTime: &defaultCompletionTime},
2944+
},
2945+
Capabilities: configv1.ClusterVersionCapabilitiesStatus{
2946+
EnabledCapabilities: sortedCaps,
2947+
KnownCapabilities: sortedKnownCaps,
2948+
},
2949+
Conditions: []configv1.ClusterOperatorStatusCondition{
2950+
{Type: ImplicitlyEnabledCapabilities, Status: "False", Reason: "AsExpected", Message: "Capabilities match configured spec"},
2951+
{Type: configv1.OperatorAvailable, Status: configv1.ConditionTrue, Message: "Done applying 1.0.0-abc"},
2952+
{Type: ClusterStatusFailing, Status: configv1.ConditionFalse},
2953+
{Type: configv1.OperatorProgressing, Status: configv1.ConditionFalse, Message: "Cluster version is 1.0.0-abc"},
2954+
{Type: configv1.RetrievedUpdates, Status: configv1.ConditionFalse},
2955+
{Type: DesiredReleaseAccepted, Status: configv1.ConditionTrue, Reason: "PayloadLoaded",
2956+
Message: `Payload loaded version="1.0.0-abc" image="image/image:0" architecture="` + architecture + `"`},
2957+
},
2958+
},
2959+
})
2960+
if l := o.queue.Len(); l != 1 {
2961+
t.Errorf("expecting queue length is 1 but got %d", l)
2962+
}
2963+
2964+
}
2965+
28592966
func TestCVO_UpgradeVerifiedPayload(t *testing.T) {
28602967
o, cvs, client, _, shutdownFn := setupCVOTest("testdata/payloadtest-2")
28612968

@@ -2900,6 +3007,7 @@ func TestCVO_UpgradeVerifiedPayload(t *testing.T) {
29003007
defer shutdownFn()
29013008

29023009
worker := o.configSync.(*SyncWorker)
3010+
worker.initializedFunc = func() bool { return true }
29033011
retriever := worker.retriever.(*fakeDirectoryRetriever)
29043012
retriever.Set(PayloadInfo{Directory: "testdata/payloadtest-2", Verified: true}, nil)
29053013

pkg/cvo/sync_test.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -471,8 +471,13 @@ func newAction(gvk schema.GroupVersionKind, namespace, name string) action {
471471
}
472472

473473
type fakeSyncRecorder struct {
474-
Returns *SyncWorkerStatus
475-
Updates []configv1.Update
474+
Returns *SyncWorkerStatus
475+
Updates []configv1.Update
476+
initializedFunc func() bool
477+
}
478+
479+
func (r *fakeSyncRecorder) Initialized() bool {
480+
return r.initializedFunc == nil || r.initializedFunc()
476481
}
477482

478483
func (r *fakeSyncRecorder) StatusCh() <-chan SyncWorkerStatus {

pkg/cvo/sync_worker.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ type ConfigSyncWorker interface {
3535

3636
// NotifyAboutManagedResourceActivity informs the sync worker about activity for a managed resource.
3737
NotifyAboutManagedResourceActivity(msg string)
38+
// Initialized returns true if the worker has work to do already
39+
Initialized() bool
3840
}
3941

4042
// PayloadInfo returns details about the payload when it was retrieved.
@@ -189,6 +191,9 @@ type SyncWorker struct {
189191
// always be implicitly enabled.
190192
// This contributes to whether or not some manifests are included for reconciliation.
191193
alwaysEnableCapabilities []configv1.ClusterVersionCapability
194+
195+
// initializedFunc is only for the unit-test purpose
196+
initializedFunc func() bool
192197
}
193198

194199
// NewSyncWorker initializes a ConfigSyncWorker that will retrieve payloads to disk, apply them via builder
@@ -231,6 +236,13 @@ func (w *SyncWorker) StatusCh() <-chan SyncWorkerStatus {
231236
return w.report
232237
}
233238

239+
func (w *SyncWorker) Initialized() bool {
240+
if w.initializedFunc != nil {
241+
return w.initializedFunc()
242+
}
243+
return w.work != nil
244+
}
245+
234246
// NotifyAboutManagedResourceActivity informs the sync worker about activity for a managed resource.
235247
func (w *SyncWorker) NotifyAboutManagedResourceActivity(message string) {
236248
select {

0 commit comments

Comments
 (0)