Skip to content

Commit 0fc0059

Browse files
committed
Fix "live get after write" in static pod installer controller.
The static pod installer controller builds an apply configuration for the status of the nodes it manages based on potentially-stale state from an informer cache. The controller is written to assume that it has observed the effect of its own previous writes. If the cache is stale, this assumption can be violated which results in unpredictable installation decisions. A mechanism was recently introduced requiring the installer controller to wait for its lister to catch up to the latest version after performing a write. This mechanism did not work because it depended on keeping state across calls to the Sync method, and because Sync has a value receiver, field writes were not visible on subsequent calls.
1 parent 0f76e23 commit 0fc0059

File tree

2 files changed

+136
-6
lines changed

2 files changed

+136
-6
lines changed

pkg/operator/staticpod/controller/installer/installer_controller.go

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,14 @@ const (
5353
//go:embed manifests/installer-pod.yaml
5454
var podTemplate []byte
5555

56+
type OperatorClient interface {
57+
GetStaticPodOperatorState() (spec *operatorv1.StaticPodOperatorSpec, status *operatorv1.StaticPodOperatorStatus, resourceVersion string, err error)
58+
GetStaticPodOperatorStateWithQuorum(ctx context.Context) (spec *operatorv1.StaticPodOperatorSpec, status *operatorv1.StaticPodOperatorStatus, resourceVersion string, err error)
59+
ApplyStaticPodOperatorStatus(ctx context.Context, fieldManager string, applyConfiguration *applyoperatorv1.StaticPodOperatorStatusApplyConfiguration) (err error)
60+
}
61+
62+
var _ OperatorClient = v1helpers.StaticPodOperatorClient(nil)
63+
5664
// InstallerController is a controller that watches the currentRevision and targetRevision fields for each node and spawn
5765
// installer pods to update the static pods on the master nodes.
5866
type InstallerController struct {
@@ -81,7 +89,7 @@ type InstallerController struct {
8189
certSecrets []UnrevisionedResource
8290
certDir string
8391

84-
operatorClient v1helpers.StaticPodOperatorClient
92+
operatorClient OperatorClient
8593

8694
configMapsGetter corev1client.ConfigMapsGetter
8795
secretsGetter corev1client.SecretsGetter
@@ -92,7 +100,9 @@ type InstallerController struct {
92100
// installerPodImageFn returns the image name for the installer pod
93101
installerPodImageFn func() string
94102
// ownerRefsFn sets the ownerrefs on the pruner pod
95-
ownerRefsFn func(ctx context.Context, revision int32) ([]metav1.OwnerReference, error)
103+
ownerRefsFn func(ctx context.Context, revision int32) ([]metav1.OwnerReference, error)
104+
ensureRequiredResourcesExistFn func(ctx context.Context, revisionNumber int32) error
105+
manageInstallationPodsFn func(ctx context.Context, operatorSpec *operatorv1.StaticPodOperatorSpec, originalOperatorStatus *operatorv1.StaticPodOperatorStatus) (bool, time.Duration, *operatorv1.NodeStatus, func(), error)
96106

97107
installerPodMutationFns []InstallerPodMutationFunc
98108

@@ -200,6 +210,8 @@ func NewInstallerController(
200210
}
201211

202212
c.ownerRefsFn = c.setOwnerRefs
213+
c.ensureRequiredResourcesExistFn = c.ensureRequiredResourcesExist
214+
c.manageInstallationPodsFn = c.manageInstallationPods
203215
c.factory = factory.New().
204216
WithInformers(
205217
operatorClient.Informer(),
@@ -1111,7 +1123,7 @@ func (c InstallerController) ensureRequiredResourcesExist(ctx context.Context, r
11111123
return fmt.Errorf("missing required resources: %v", aggregatedErr)
11121124
}
11131125

1114-
func (c InstallerController) Sync(ctx context.Context, syncCtx factory.SyncContext) error {
1126+
func (c *InstallerController) Sync(ctx context.Context, syncCtx factory.SyncContext) error {
11151127
operatorSpec, originalOperatorStatus, operatorResourceVersion, err := c.operatorClient.GetStaticPodOperatorState()
11161128
if err != nil {
11171129
return err
@@ -1123,7 +1135,7 @@ func (c InstallerController) Sync(ctx context.Context, syncCtx factory.SyncConte
11231135
// Apply response.
11241136
if c.podOperatorStatusApplied {
11251137
if c.lastPodOperatorAppliedRV == 0 {
1126-
_, _, resourceVersion, err := c.operatorClient.GetOperatorStateWithQuorum(ctx)
1138+
_, _, resourceVersion, err := c.operatorClient.GetStaticPodOperatorStateWithQuorum(ctx)
11271139
if err != nil {
11281140
return err
11291141
}
@@ -1152,7 +1164,7 @@ func (c InstallerController) Sync(ctx context.Context, syncCtx factory.SyncConte
11521164
return nil
11531165
}
11541166

1155-
err = c.ensureRequiredResourcesExist(ctx, originalOperatorStatus.LatestAvailableRevision)
1167+
err = c.ensureRequiredResourcesExistFn(ctx, originalOperatorStatus.LatestAvailableRevision)
11561168

11571169
// Only manage installation pods when all required certs are present.
11581170
var updatedNode *operatorv1.NodeStatus
@@ -1161,7 +1173,7 @@ func (c InstallerController) Sync(ctx context.Context, syncCtx factory.SyncConte
11611173
var requeue bool
11621174
var after time.Duration
11631175
var syncErr error
1164-
requeue, after, updatedNode, updatedNodeReportOnSuccessfulUpdateFn, syncErr = c.manageInstallationPods(ctx, operatorSpec, operatorStatus)
1176+
requeue, after, updatedNode, updatedNodeReportOnSuccessfulUpdateFn, syncErr = c.manageInstallationPodsFn(ctx, operatorSpec, operatorStatus)
11651177
if requeue && syncErr == nil {
11661178
syncCtx.Queue().AddAfter(syncCtx.QueueKey(), after)
11671179
return nil

pkg/operator/staticpod/controller/installer/installer_controller_test.go

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2651,3 +2651,121 @@ func metav1TimestampPtr(s string) *metav1.Time {
26512651
t := metav1Timestamp(s)
26522652
return &t
26532653
}
2654+
2655+
type GenericStaticPodOperator struct {
2656+
metav1.ObjectMeta `json:"metadata"`
2657+
Spec operatorv1.StaticPodOperatorSpec `json:"spec"`
2658+
Status operatorv1.StaticPodOperatorStatus `json:"status"`
2659+
}
2660+
2661+
type StubOperatorClient struct {
2662+
live *GenericStaticPodOperator
2663+
cached *GenericStaticPodOperator
2664+
onApply func()
2665+
}
2666+
2667+
func (stub *StubOperatorClient) GetStaticPodOperatorState() (*operatorv1.StaticPodOperatorSpec, *operatorv1.StaticPodOperatorStatus, string, error) {
2668+
return &stub.cached.Spec, &stub.cached.Status, stub.cached.ResourceVersion, nil
2669+
}
2670+
2671+
func (stub *StubOperatorClient) GetStaticPodOperatorStateWithQuorum(context.Context) (*operatorv1.StaticPodOperatorSpec, *operatorv1.StaticPodOperatorStatus, string, error) {
2672+
return &stub.live.Spec, &stub.live.Status, stub.live.ResourceVersion, nil
2673+
}
2674+
2675+
func (stub *StubOperatorClient) ApplyStaticPodOperatorStatus(context.Context, string, *applyoperatorv1.StaticPodOperatorStatusApplyConfiguration) error {
2676+
stub.onApply()
2677+
return nil
2678+
}
2679+
2680+
func TestWaitToObserveWrites(t *testing.T) {
2681+
live := GenericStaticPodOperator{
2682+
ObjectMeta: metav1.ObjectMeta{
2683+
ResourceVersion: "1",
2684+
},
2685+
Status: operatorv1.StaticPodOperatorStatus{
2686+
NodeStatuses: []operatorv1.NodeStatus{
2687+
{NodeName: "test-node"},
2688+
},
2689+
},
2690+
}
2691+
cached := GenericStaticPodOperator{
2692+
ObjectMeta: metav1.ObjectMeta{
2693+
ResourceVersion: "1",
2694+
},
2695+
Status: operatorv1.StaticPodOperatorStatus{
2696+
NodeStatuses: []operatorv1.NodeStatus{
2697+
{NodeName: "test-node"},
2698+
},
2699+
},
2700+
}
2701+
2702+
nApplies := 0
2703+
operatorClient := StubOperatorClient{
2704+
live: &live,
2705+
cached: &cached,
2706+
onApply: func() {
2707+
nApplies++
2708+
live = GenericStaticPodOperator{
2709+
ObjectMeta: metav1.ObjectMeta{
2710+
ResourceVersion: "2",
2711+
},
2712+
Status: operatorv1.StaticPodOperatorStatus{
2713+
NodeStatuses: []operatorv1.NodeStatus{
2714+
{NodeName: "test-node", TargetRevision: 1},
2715+
},
2716+
},
2717+
}
2718+
},
2719+
}
2720+
2721+
ctrl := InstallerController{
2722+
operatorClient: &operatorClient,
2723+
ensureRequiredResourcesExistFn: func(context.Context, int32) error {
2724+
return nil
2725+
},
2726+
manageInstallationPodsFn: func(context.Context, *operatorv1.StaticPodOperatorSpec, *operatorv1.StaticPodOperatorStatus) (bool, time.Duration, *operatorv1.NodeStatus, func(), error) {
2727+
return false, 0, &operatorv1.NodeStatus{NodeName: "test-node", TargetRevision: 1}, func() {}, nil
2728+
},
2729+
}
2730+
2731+
recorder := events.NewInMemoryRecorder("test", clocktesting.NewFakeClock(time.Date(1, 2, 3, 4, 5, 6, 7, time.UTC)))
2732+
2733+
// Write TargetRevision change.
2734+
if err := ctrl.Sync(context.TODO(), factory.NewSyncContext("test", recorder)); err != nil {
2735+
t.Fatal(err)
2736+
}
2737+
2738+
if want := 1; nApplies != want {
2739+
t.Fatalf("expected %d status apply, got %d", want, nApplies)
2740+
}
2741+
2742+
// Cache has not observed the previous write!
2743+
if err := ctrl.Sync(context.TODO(), factory.NewSyncContext("test", recorder)); err != nil {
2744+
t.Fatal(err)
2745+
}
2746+
2747+
if want := 1; nApplies != want {
2748+
t.Fatalf("expected %d status apply, got %d", want, nApplies)
2749+
}
2750+
2751+
// Cache observes the write.
2752+
cached = GenericStaticPodOperator{
2753+
ObjectMeta: metav1.ObjectMeta{
2754+
ResourceVersion: "2",
2755+
},
2756+
Status: operatorv1.StaticPodOperatorStatus{
2757+
NodeStatuses: []operatorv1.NodeStatus{
2758+
{NodeName: "test-node", TargetRevision: 1},
2759+
},
2760+
},
2761+
}
2762+
2763+
// Sync can continue with the next state transition now.
2764+
if err := ctrl.Sync(context.TODO(), factory.NewSyncContext("test", recorder)); err != nil {
2765+
t.Fatal(err)
2766+
}
2767+
2768+
if want := 2; nApplies != want {
2769+
t.Fatalf("expected %d status apply, got %d", want, nApplies)
2770+
}
2771+
}

0 commit comments

Comments
 (0)