Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 28 additions & 7 deletions pkg/controllers/leaderworkerset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,12 +288,27 @@ func (r *LeaderWorkerSetReconciler) rollingUpdateParameters(ctx context.Context,
}
burstReplicas := lwsReplicas + int32(maxSurge)

// wantReplicas calculates the final replicas if needed.
// wantReplicas calculates the desired total replicas (including any surge)
// during an in-progress rolling update based on the number of currently
// unready (not-yet-updated) replicas. It is only called after the initial
// surge expansion has already happened (i.e. stsReplicas == burstReplicas).
//
// We enter the shrink path only when both conditions hold:
// 1. unreadyReplicas < lwsReplicas: at least one original replica has
// already been successfully updated, so the surge pod has served its
// purpose and can start being released.
// 2. unreadyReplicas <= maxSurge: the remaining unready replicas fit
// within the surge budget, meaning we can safely reduce replicas.
//
// Without condition (1), a single-replica LWS (lwsReplicas=1, maxSurge=1)
// would satisfy (2) immediately on the first reconcile after Case 2 expands
// to burstReplicas, causing the surge to be released before any replica has
// actually been updated.
wantReplicas := func(unreadyReplicas int32) int32 {
if unreadyReplicas <= int32(maxSurge) {
// When we have n unready replicas and n bursted replicas, we should
// start to release the burst replica gradually for the accommodation of
// the unready ones.
if unreadyReplicas < lwsReplicas && unreadyReplicas <= int32(maxSurge) {
// At least one replica has been updated and the remaining unready
// count fits within the surge budget. Release one surge slot per
// newly-ready replica so we converge back on lwsReplicas.
finalReplicas := lwsReplicas + utils.NonZeroValue(int32(unreadyReplicas)-1)
r.Record.Eventf(lws, nil, corev1.EventTypeNormal, GroupsProgressing, Delete, fmt.Sprintf("deleting surge replica %s-%d", lws.Name, finalReplicas))
return finalReplicas
Expand All @@ -302,10 +317,16 @@ func (r *LeaderWorkerSetReconciler) rollingUpdateParameters(ctx context.Context,
}

// Case 2:
// Indicates a new rolling update here.
// Indicates a new rolling update here. At this point all existing replicas
// are still running the old template (none are unready due to the update),
// so we must first expand to burstReplicas before the rolling partition
// can start advancing. Calling wantReplicas(lwsReplicas) here was wrong:
// with replicas=1 and maxSurge=1 it satisfied the shrink condition and
// immediately returned lwsReplicas, preventing the surge replica from ever
// being created.
if leaderWorkerSetUpdated {
// Processing scaling up/down first prior to rolling update.
return min(lwsReplicas, stsReplicas), wantReplicas(lwsReplicas), nil
return min(lwsReplicas, stsReplicas), burstReplicas, nil
}

partition := *sts.Spec.UpdateStrategy.RollingUpdate.Partition
Expand Down
77 changes: 77 additions & 0 deletions test/integration/controllers/leaderworkerset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2297,6 +2297,83 @@ var _ = ginkgo.Describe("LeaderWorkerSet controller", func() {
},
},
}),

ginkgo.Entry("rolling update with maxSurge=1 and single replica creates surge before rolling", &testCase{
// Regression test for: with replicas=1 and maxSurge=1, calling
// wantReplicas(lwsReplicas) in Case 2 satisfied the shrink condition
// (unreadyReplicas(1) <= maxSurge(1)), so the controller returned
// replicas=1 and emitted a spurious "deleting surge replica" event
// without ever creating the surge pod, leaving the update stuck forever.
makeLeaderWorkerSet: func(nsName string) *wrappers.LeaderWorkerSetWrapper {
return wrappers.BuildLeaderWorkerSet(nsName).Replica(1).MaxSurge(1)
},
updates: []*update{
{
// Set lws to available condition.
lwsUpdateFn: func(lws *leaderworkerset.LeaderWorkerSet) {
testing.SetSuperPodToReady(ctx, k8sClient, lws, 1)
},
checkLWSState: func(lws *leaderworkerset.LeaderWorkerSet) {
testing.ExpectLeaderWorkerSetAvailable(ctx, k8sClient, lws, "All replicas are ready")
testing.ExpectStatefulsetPartitionEqualTo(ctx, k8sClient, lws, 0)
testing.ExpectValidLeaderStatefulSet(ctx, k8sClient, lws, 1)
testing.ExpectValidWorkerStatefulSets(ctx, lws, k8sClient, true)
testing.ExpectLeaderWorkerSetStatusReplicas(ctx, k8sClient, lws, 1, 1)
},
},
{
// Trigger rolling update by changing worker template.
// The controller must expand to burstReplicas (2) with partition=1
// so the surge pod gets the new template. Before the fix, Case 2
// returned replicas=1 and the STS never grew.
lwsUpdateFn: func(lws *leaderworkerset.LeaderWorkerSet) {
testing.UpdateWorkerTemplate(ctx, k8sClient, lws)
},
checkLWSState: func(lws *leaderworkerset.LeaderWorkerSet) {
// replicas must be 2 (1 original + 1 surge) to prove the surge
// was actually created. partition=1 means pod-0 (old) is held,
// pod-1 (surge, new template) is created first.
testing.ExpectValidLeaderStatefulSet(ctx, k8sClient, lws, 2)
testing.ExpectLeaderWorkerSetProgressing(ctx, k8sClient, lws, "Replicas are progressing")
testing.ExpectLeaderWorkerSetUpgradeInProgress(ctx, k8sClient, lws, "Rolling Upgrade is in progress")
testing.ExpectStatefulsetPartitionEqualTo(ctx, k8sClient, lws, 1)
testing.ExpectLeaderWorkerSetStatusReplicas(ctx, k8sClient, lws, 1, 0)
},
},
{
// Create the surge leader pod and mark it ready. The controller
// advances partition to 0 (pod-0 starts updating) while keeping
// replicas=2 until pod-0 is also ready on the new template.
lwsUpdateFn: func(lws *leaderworkerset.LeaderWorkerSet) {
var leaderSts appsv1.StatefulSet
testing.GetLeaderStatefulset(ctx, lws, k8sClient, &leaderSts)
gomega.Expect(testing.CreateLeaderPods(ctx, leaderSts, k8sClient, lws, 1, 2)).To(gomega.Succeed())
testing.SetPodGroupToReady(ctx, k8sClient, lws.Name+"-1", lws)
},
checkLWSState: func(lws *leaderworkerset.LeaderWorkerSet) {
// pod-1 (surge) is ready; partition advances to 0 so pod-0
// can be updated. Surge (replicas=2) is kept until pod-0 is
// also updated, ensuring zero downtime.
testing.ExpectValidLeaderStatefulSet(ctx, k8sClient, lws, 2)
testing.ExpectStatefulsetPartitionEqualTo(ctx, k8sClient, lws, 0)
testing.ExpectLeaderWorkerSetProgressing(ctx, k8sClient, lws, "Replicas are progressing")
testing.ExpectLeaderWorkerSetUpgradeInProgress(ctx, k8sClient, lws, "Rolling Upgrade is in progress")
},
},
{
// Mark group-0 ready on the new template; the rollout completes.
lwsUpdateFn: func(lws *leaderworkerset.LeaderWorkerSet) {
testing.SetPodGroupToReady(ctx, k8sClient, lws.Name+"-0", lws)
},
checkLWSState: func(lws *leaderworkerset.LeaderWorkerSet) {
testing.ExpectLeaderWorkerSetAvailable(ctx, k8sClient, lws, "All replicas are ready")
testing.ExpectStatefulsetPartitionEqualTo(ctx, k8sClient, lws, 0)
testing.ExpectValidLeaderStatefulSet(ctx, k8sClient, lws, 1)
testing.ExpectLeaderWorkerSetStatusReplicas(ctx, k8sClient, lws, 1, 1)
},
},
},
}),
) // end of DescribeTable

ginkgo.Context("with gang scheduling enabled", ginkgo.Ordered, func() {
Expand Down