Skip to content

Commit d4d6b17

Browse files
authored
Update maxUnavailable calculation for leader StatefulSet (#781)
* Correctly set maxUnavailable on the leader StatefulSet * Fix sts.maxUnavailable calculation edge cases. Fix maxSurge calculation to be capped at lws.replicas, and evaluate percentages against sts.replicas rather than lws.replicas. Also add unit tests to prevent regressions.
1 parent 7a4e3c1 commit d4d6b17

File tree

7 files changed

+402
-9
lines changed

7 files changed

+402
-9
lines changed

pkg/controllers/leaderworkerset_controller.go

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,14 @@ func (r *LeaderWorkerSetReconciler) Reconcile(ctx context.Context, req ctrl.Requ
166166
r.Record.Eventf(lws, revision, corev1.EventTypeNormal, GroupsProgressing, Create, fmt.Sprintf("Created leader statefulset %s", lws.Name))
167167
} else if !lwsUpdated && partition != *leaderSts.Spec.UpdateStrategy.RollingUpdate.Partition {
168168
// An event is logged to track update progress.
169-
r.Record.Eventf(lws, revision, corev1.EventTypeNormal, GroupsUpdating, Update, fmt.Sprintf("Updating replicas %d to %d", *leaderSts.Spec.UpdateStrategy.RollingUpdate.Partition, partition))
169+
oldPartition := *leaderSts.Spec.UpdateStrategy.RollingUpdate.Partition
170+
var updateMsg string
171+
if oldPartition-1 == partition {
172+
updateMsg = fmt.Sprintf("Updating replica %d", partition)
173+
} else {
174+
updateMsg = fmt.Sprintf("Updating replicas %d to %d (inclusive)", partition, oldPartition-1)
175+
}
176+
r.Record.Eventf(lws, revision, corev1.EventTypeNormal, GroupsUpdating, Update, updateMsg)
170177
}
171178

172179
// Create headless service if it does not exist.
@@ -782,6 +789,27 @@ func constructLeaderStatefulSetApplyConfiguration(lws *leaderworkerset.LeaderWor
782789

783790
podTemplateApplyConfiguration.WithAnnotations(podAnnotations)
784791

792+
lwsReplicas := int(*lws.Spec.Replicas)
793+
lwsMaxUnavailable, err := intstr.GetScaledValueFromIntOrPercent(&lws.Spec.RolloutStrategy.RollingUpdateConfiguration.MaxUnavailable, lwsReplicas, false)
794+
if err != nil {
795+
return nil, err
796+
}
797+
lwsMaxSurge, err := intstr.GetScaledValueFromIntOrPercent(&lws.Spec.RolloutStrategy.RollingUpdateConfiguration.MaxSurge, lwsReplicas, true)
798+
if err != nil {
799+
return nil, err
800+
}
801+
if lwsMaxSurge > lwsReplicas {
802+
lwsMaxSurge = lwsReplicas
803+
}
804+
stsMaxUnavailableInt := int32(lwsMaxUnavailable + lwsMaxSurge)
805+
// lwsMaxUnavailable=0 and lwsMaxSurge=0 together should be blocked by webhook,
806+
// but just in case, we'll make sure that stsMaxUnavailable is at least 1.
807+
// This also handles the case when lws.Spec.Replicas is 0.
808+
if stsMaxUnavailableInt < 1 {
809+
stsMaxUnavailableInt = 1
810+
}
811+
stsMaxUnavailable := intstr.FromInt32(stsMaxUnavailableInt)
812+
785813
// construct statefulset apply configuration
786814
statefulSetConfig := appsapplyv1.StatefulSet(lws.Name, lws.Namespace).
787815
WithSpec(appsapplyv1.StatefulSetSpec().
@@ -790,7 +818,7 @@ func constructLeaderStatefulSetApplyConfiguration(lws *leaderworkerset.LeaderWor
790818
WithPodManagementPolicy(appsv1.ParallelPodManagement).
791819
WithTemplate(&podTemplateApplyConfiguration).
792820
WithUpdateStrategy(appsapplyv1.StatefulSetUpdateStrategy().WithType(appsv1.StatefulSetUpdateStrategyType(lws.Spec.RolloutStrategy.Type)).WithRollingUpdate(
793-
appsapplyv1.RollingUpdateStatefulSetStrategy().WithMaxUnavailable(lws.Spec.RolloutStrategy.RollingUpdateConfiguration.MaxUnavailable).WithPartition(partition),
821+
appsapplyv1.RollingUpdateStatefulSetStrategy().WithMaxUnavailable(stsMaxUnavailable).WithPartition(partition),
794822
)).
795823
WithSelector(metaapplyv1.LabelSelector().
796824
WithMatchLabels(map[string]string{

pkg/controllers/leaderworkerset_controller_test.go

Lines changed: 216 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ func TestLeaderStatefulSetApplyConfig(t *testing.T) {
6969
revisionKey string
7070
lws *leaderworkerset.LeaderWorkerSet
7171
wantApplyConfig *appsapplyv1.StatefulSetApplyConfiguration
72+
stsReplicas *int32
7273
}{
7374
{
7475
name: "1 replica, size 1, with empty leader template, exclusive placement disabled",
@@ -336,7 +337,76 @@ func TestLeaderStatefulSetApplyConfig(t *testing.T) {
336337
PodManagementPolicy: ptr.To[appsv1.PodManagementPolicyType](appsv1.ParallelPodManagement),
337338
UpdateStrategy: appsapplyv1.StatefulSetUpdateStrategy().
338339
WithType(appsv1.RollingUpdateStatefulSetStrategyType).
339-
WithRollingUpdate(appsapplyv1.RollingUpdateStatefulSetStrategy().WithPartition(0).WithMaxUnavailable(intstr.FromInt32(2))),
340+
WithRollingUpdate(appsapplyv1.RollingUpdateStatefulSetStrategy().WithPartition(0).WithMaxUnavailable(intstr.FromInt32(3))),
341+
},
342+
},
343+
},
344+
{
345+
name: "0 maxUnavailable, 2 maxSurge, with empty leader template, exclusive placement disabled",
346+
revisionKey: revisionKey2,
347+
lws: wrappers.BuildBasicLeaderWorkerSet("test-sample", "default").
348+
Replica(1).
349+
RolloutStrategy(leaderworkerset.RolloutStrategy{
350+
Type: leaderworkerset.RollingUpdateStrategyType,
351+
RollingUpdateConfiguration: &leaderworkerset.RollingUpdateConfiguration{
352+
MaxUnavailable: intstr.FromInt32(0),
353+
MaxSurge: intstr.FromInt32(2),
354+
},
355+
}).
356+
WorkerTemplateSpec(wrappers.MakeWorkerPodSpec()).
357+
Size(1).
358+
RestartPolicy(leaderworkerset.RecreateGroupOnPodRestart).Obj(),
359+
wantApplyConfig: &appsapplyv1.StatefulSetApplyConfiguration{
360+
TypeMetaApplyConfiguration: metaapplyv1.TypeMetaApplyConfiguration{
361+
Kind: ptr.To[string]("StatefulSet"),
362+
APIVersion: ptr.To[string]("apps/v1"),
363+
},
364+
ObjectMetaApplyConfiguration: &metaapplyv1.ObjectMetaApplyConfiguration{
365+
Name: ptr.To[string]("test-sample"),
366+
Namespace: ptr.To[string]("default"),
367+
Labels: map[string]string{
368+
"leaderworkerset.sigs.k8s.io/name": "test-sample",
369+
"leaderworkerset.sigs.k8s.io/template-revision-hash": revisionKey2,
370+
},
371+
Annotations: map[string]string{"leaderworkerset.sigs.k8s.io/replicas": "1"},
372+
},
373+
Spec: &appsapplyv1.StatefulSetSpecApplyConfiguration{
374+
Replicas: ptr.To[int32](1),
375+
Selector: &metaapplyv1.LabelSelectorApplyConfiguration{
376+
MatchLabels: map[string]string{
377+
"leaderworkerset.sigs.k8s.io/name": "test-sample",
378+
"leaderworkerset.sigs.k8s.io/worker-index": "0",
379+
},
380+
},
381+
Template: &coreapplyv1.PodTemplateSpecApplyConfiguration{
382+
ObjectMetaApplyConfiguration: &metaapplyv1.ObjectMetaApplyConfiguration{
383+
Labels: map[string]string{
384+
"leaderworkerset.sigs.k8s.io/name": "test-sample",
385+
"leaderworkerset.sigs.k8s.io/worker-index": "0",
386+
"leaderworkerset.sigs.k8s.io/template-revision-hash": revisionKey2,
387+
},
388+
Annotations: map[string]string{
389+
"leaderworkerset.sigs.k8s.io/size": "1",
390+
},
391+
},
392+
Spec: &coreapplyv1.PodSpecApplyConfiguration{
393+
Containers: []coreapplyv1.ContainerApplyConfiguration{
394+
{
395+
Name: ptr.To[string]("worker"),
396+
Image: ptr.To[string]("docker.io/nginxinc/nginx-unprivileged:1.27"),
397+
Ports: []coreapplyv1.ContainerPortApplyConfiguration{{ContainerPort: ptr.To[int32](8080), Protocol: ptr.To[corev1.Protocol](corev1.ProtocolTCP)}},
398+
Resources: &coreapplyv1.ResourceRequirementsApplyConfiguration{},
399+
},
400+
},
401+
},
402+
},
403+
ServiceName: ptr.To[string]("test-sample"),
404+
PodManagementPolicy: ptr.To[appsv1.PodManagementPolicyType](appsv1.ParallelPodManagement),
405+
UpdateStrategy: appsapplyv1.StatefulSetUpdateStrategy().
406+
WithType(appsv1.RollingUpdateStatefulSetStrategyType).
407+
// maxSurge is capped at 1 (the value of lwsReplicas),
408+
// so stsMaxUnavailableInt = 0 (lwsMaxUnavailable) + 1 (capped maxSurge) = 1.
409+
WithRollingUpdate(appsapplyv1.RollingUpdateStatefulSetStrategy().WithPartition(0).WithMaxUnavailable(intstr.FromInt32(1))),
340410
},
341411
},
342412
},
@@ -526,11 +596,155 @@ func TestLeaderStatefulSetApplyConfig(t *testing.T) {
526596
},
527597
},
528598
},
599+
{
600+
name: "0 replica, 0 maxUnavailable, 0 maxSurge, with empty leader template, exclusive placement disabled",
601+
revisionKey: revisionKey2,
602+
lws: wrappers.BuildBasicLeaderWorkerSet("test-sample", "default").
603+
Replica(0).
604+
RolloutStrategy(leaderworkerset.RolloutStrategy{
605+
Type: leaderworkerset.RollingUpdateStrategyType,
606+
RollingUpdateConfiguration: &leaderworkerset.RollingUpdateConfiguration{
607+
MaxUnavailable: intstr.FromInt32(0),
608+
MaxSurge: intstr.FromInt32(0),
609+
},
610+
}).
611+
WorkerTemplateSpec(wrappers.MakeWorkerPodSpec()).
612+
Size(1).
613+
RestartPolicy(leaderworkerset.RecreateGroupOnPodRestart).Obj(),
614+
wantApplyConfig: &appsapplyv1.StatefulSetApplyConfiguration{
615+
TypeMetaApplyConfiguration: metaapplyv1.TypeMetaApplyConfiguration{
616+
Kind: ptr.To[string]("StatefulSet"),
617+
APIVersion: ptr.To[string]("apps/v1"),
618+
},
619+
ObjectMetaApplyConfiguration: &metaapplyv1.ObjectMetaApplyConfiguration{
620+
Name: ptr.To[string]("test-sample"),
621+
Namespace: ptr.To[string]("default"),
622+
Labels: map[string]string{
623+
"leaderworkerset.sigs.k8s.io/name": "test-sample",
624+
"leaderworkerset.sigs.k8s.io/template-revision-hash": revisionKey2,
625+
},
626+
Annotations: map[string]string{"leaderworkerset.sigs.k8s.io/replicas": "0"},
627+
},
628+
Spec: &appsapplyv1.StatefulSetSpecApplyConfiguration{
629+
Replicas: ptr.To[int32](0),
630+
Selector: &metaapplyv1.LabelSelectorApplyConfiguration{
631+
MatchLabels: map[string]string{
632+
"leaderworkerset.sigs.k8s.io/name": "test-sample",
633+
"leaderworkerset.sigs.k8s.io/worker-index": "0",
634+
},
635+
},
636+
Template: &coreapplyv1.PodTemplateSpecApplyConfiguration{
637+
ObjectMetaApplyConfiguration: &metaapplyv1.ObjectMetaApplyConfiguration{
638+
Labels: map[string]string{
639+
"leaderworkerset.sigs.k8s.io/name": "test-sample",
640+
"leaderworkerset.sigs.k8s.io/worker-index": "0",
641+
"leaderworkerset.sigs.k8s.io/template-revision-hash": revisionKey2,
642+
},
643+
Annotations: map[string]string{
644+
"leaderworkerset.sigs.k8s.io/size": "1",
645+
},
646+
},
647+
Spec: &coreapplyv1.PodSpecApplyConfiguration{
648+
Containers: []coreapplyv1.ContainerApplyConfiguration{
649+
{
650+
Name: ptr.To[string]("worker"),
651+
Image: ptr.To[string]("docker.io/nginxinc/nginx-unprivileged:1.27"),
652+
Ports: []coreapplyv1.ContainerPortApplyConfiguration{{ContainerPort: ptr.To[int32](8080), Protocol: ptr.To[corev1.Protocol](corev1.ProtocolTCP)}},
653+
Resources: &coreapplyv1.ResourceRequirementsApplyConfiguration{},
654+
},
655+
},
656+
},
657+
},
658+
ServiceName: ptr.To[string]("test-sample"),
659+
PodManagementPolicy: ptr.To[appsv1.PodManagementPolicyType](appsv1.ParallelPodManagement),
660+
UpdateStrategy: appsapplyv1.StatefulSetUpdateStrategy().
661+
WithType(appsv1.RollingUpdateStatefulSetStrategyType).
662+
// Sts maxUnavailable is forced to be at least 1,
663+
// even if lws maxUnavailable=0 and lws maxSurge=0.
664+
WithRollingUpdate(appsapplyv1.RollingUpdateStatefulSetStrategy().WithPartition(0).WithMaxUnavailable(intstr.FromInt32(1))),
665+
},
666+
},
667+
},
668+
{
669+
// Validates maxSurge uses lws replicas, not sts replicas.
670+
name: "1 maxUnavailable, 50% maxSurge, 2 lws replicas, 3 sts replicas currently",
671+
revisionKey: revisionKey2,
672+
stsReplicas: ptr.To[int32](3),
673+
lws: wrappers.BuildBasicLeaderWorkerSet("test-sample", "default").
674+
Replica(2).
675+
RolloutStrategy(leaderworkerset.RolloutStrategy{
676+
Type: leaderworkerset.RollingUpdateStrategyType,
677+
RollingUpdateConfiguration: &leaderworkerset.RollingUpdateConfiguration{
678+
MaxUnavailable: intstr.FromInt32(1),
679+
MaxSurge: intstr.FromString("50%"),
680+
},
681+
}).
682+
WorkerTemplateSpec(wrappers.MakeWorkerPodSpec()).
683+
Size(1).
684+
RestartPolicy(leaderworkerset.RecreateGroupOnPodRestart).Obj(),
685+
wantApplyConfig: &appsapplyv1.StatefulSetApplyConfiguration{
686+
TypeMetaApplyConfiguration: metaapplyv1.TypeMetaApplyConfiguration{
687+
Kind: ptr.To[string]("StatefulSet"),
688+
APIVersion: ptr.To[string]("apps/v1"),
689+
},
690+
ObjectMetaApplyConfiguration: &metaapplyv1.ObjectMetaApplyConfiguration{
691+
Name: ptr.To[string]("test-sample"),
692+
Namespace: ptr.To[string]("default"),
693+
Labels: map[string]string{
694+
"leaderworkerset.sigs.k8s.io/name": "test-sample",
695+
"leaderworkerset.sigs.k8s.io/template-revision-hash": revisionKey2,
696+
},
697+
Annotations: map[string]string{"leaderworkerset.sigs.k8s.io/replicas": "2"},
698+
},
699+
Spec: &appsapplyv1.StatefulSetSpecApplyConfiguration{
700+
Replicas: ptr.To[int32](3), // using stsReplicas
701+
Selector: &metaapplyv1.LabelSelectorApplyConfiguration{
702+
MatchLabels: map[string]string{
703+
"leaderworkerset.sigs.k8s.io/name": "test-sample",
704+
"leaderworkerset.sigs.k8s.io/worker-index": "0",
705+
},
706+
},
707+
Template: &coreapplyv1.PodTemplateSpecApplyConfiguration{
708+
ObjectMetaApplyConfiguration: &metaapplyv1.ObjectMetaApplyConfiguration{
709+
Labels: map[string]string{
710+
"leaderworkerset.sigs.k8s.io/name": "test-sample",
711+
"leaderworkerset.sigs.k8s.io/worker-index": "0",
712+
"leaderworkerset.sigs.k8s.io/template-revision-hash": revisionKey2,
713+
},
714+
Annotations: map[string]string{
715+
"leaderworkerset.sigs.k8s.io/size": "1",
716+
},
717+
},
718+
Spec: &coreapplyv1.PodSpecApplyConfiguration{
719+
Containers: []coreapplyv1.ContainerApplyConfiguration{
720+
{
721+
Name: ptr.To[string]("worker"),
722+
Image: ptr.To[string]("docker.io/nginxinc/nginx-unprivileged:1.27"),
723+
Ports: []coreapplyv1.ContainerPortApplyConfiguration{{ContainerPort: ptr.To[int32](8080), Protocol: ptr.To[corev1.Protocol](corev1.ProtocolTCP)}},
724+
Resources: &coreapplyv1.ResourceRequirementsApplyConfiguration{},
725+
},
726+
},
727+
},
728+
},
729+
ServiceName: ptr.To[string]("test-sample"),
730+
PodManagementPolicy: ptr.To[appsv1.PodManagementPolicyType](appsv1.ParallelPodManagement),
731+
UpdateStrategy: appsapplyv1.StatefulSetUpdateStrategy().
732+
WithType(appsv1.RollingUpdateStatefulSetStrategyType).
733+
// maxUnavailable=1, maxSurge=50% of 2 replicas (lwsReplicas) = 1.
734+
// So stsMaxUnavailableInt = 1 + 1 = 2
735+
WithRollingUpdate(appsapplyv1.RollingUpdateStatefulSetStrategy().WithPartition(0).WithMaxUnavailable(intstr.FromInt32(2))),
736+
},
737+
},
738+
},
529739
}
530740

531741
for _, tc := range tests {
532742
t.Run(tc.name, func(t *testing.T) {
533-
stsApplyConfig, err := constructLeaderStatefulSetApplyConfiguration(tc.lws, 0, *tc.lws.Spec.Replicas, tc.revisionKey)
743+
stsReplicas := *tc.lws.Spec.Replicas
744+
if tc.stsReplicas != nil {
745+
stsReplicas = *tc.stsReplicas
746+
}
747+
stsApplyConfig, err := constructLeaderStatefulSetApplyConfiguration(tc.lws, 0, stsReplicas, tc.revisionKey)
534748
if err != nil {
535749
t.Errorf("failed with error: %s", err.Error())
536750
}

pkg/controllers/pod_controller.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,9 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
190190
return ctrl.Result{}, err
191191
}
192192
if err = r.Create(ctx, workerStatefulSet); err != nil {
193-
r.Record.Eventf(&leaderWorkerSet, &pod, corev1.EventTypeWarning, FailedCreate, Create, fmt.Sprintf("Failed to create worker statefulset for leader pod %s", pod.Name))
193+
if client.IgnoreAlreadyExists(err) != nil {
194+
r.Record.Eventf(&leaderWorkerSet, &pod, corev1.EventTypeWarning, FailedCreate, Create, fmt.Sprintf("Failed to create worker statefulset for leader pod %s", pod.Name))
195+
}
194196
return ctrl.Result{}, client.IgnoreAlreadyExists(err)
195197
}
196198
r.Record.Eventf(&leaderWorkerSet, &pod, corev1.EventTypeNormal, GroupsProgressing, Create, fmt.Sprintf("Created worker statefulset for leader pod %s", pod.Name))

site/content/en/docs/concepts/rollout-strategy/_index.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,10 @@ description: >
77

88
Rolling update is vital to online services with zero downtime. For LLM inference services, this is particularly important, which helps to mitigate stockout. Two different configurations are supported in LWS, `maxUnavailable` and `maxSurge`:
99

10-
- `MaxUnavailable`: Indicates how many replicas are allowed to be unavailable during the update, the unavailable number is based on the spec.replicas. Defaults to 1. Note that only values >= 1 are supported.
11-
- `MaxSurge`: Indicates how many extra replicas can be deployed during the update. Defaults to 0.
10+
- `maxUnavailable`: Indicates how many replicas are allowed to be unavailable during the update, the unavailable number is based on the spec.replicas. Defaults to 1.
11+
- `maxSurge`: Indicates how many extra replicas can be deployed during the update. Defaults to 0.
12+
13+
Note that `maxSurge` and `maxUnavailable` can not both be zero at the same time.
1214

1315
Here's a leaderWorkerSet configured with rollout strategy, you can find the example [here](https://github.com/kubernetes-sigs/lws/blob/main/docs/examples/sample/lws-rollout-strategy.yaml):
1416

test/e2e/e2e_test.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,25 @@ var _ = ginkgo.Describe("leaderWorkerSet e2e tests", func() {
164164
testing.ExpectLeaderWorkerSetAvailable(ctx, k8sClient, lws, "All replicas are ready")
165165
})
166166

167+
ginkgo.It("Can perform a rolling update with maxUnavailable zero and maxSurge set", func() {
168+
lws := wrappers.BuildLeaderWorkerSet(ns.Name).Replica(4).MaxSurge(1).MaxUnavailable(0).Obj()
169+
testing.MustCreateLws(ctx, k8sClient, lws)
170+
171+
// Wait for leaderWorkerSet to be ready then update it.
172+
testing.ExpectLeaderWorkerSetAvailable(ctx, k8sClient, lws, "All replicas are ready")
173+
testing.UpdateWorkerTemplate(ctx, k8sClient, lws)
174+
175+
// Happen during rolling update. MaxSurge=1, so we expect up to 5 replicas.
176+
testing.ExpectValidLeaderStatefulSet(ctx, k8sClient, lws, 5)
177+
178+
// Rolling update completes.
179+
testing.ExpectValidLeaderStatefulSet(ctx, k8sClient, lws, 4)
180+
testing.ExpectValidWorkerStatefulSets(ctx, lws, k8sClient, true)
181+
testing.ExpectValidPods(ctx, k8sClient, lws, &corev1.PodList{})
182+
// Wait for leaderWorkerSet to be ready again.
183+
testing.ExpectLeaderWorkerSetAvailable(ctx, k8sClient, lws, "All replicas are ready")
184+
})
185+
167186
ginkgo.It("Can perform a rolling update even if old lws not ready", func() {
168187
// Create lws with not exist image.
169188
lws := wrappers.BuildLeaderWorkerSet(ns.Name).LeaderTemplate(nil).Size(1).Replica(2).MaxSurge(1).MaxUnavailable(0).Obj()

0 commit comments

Comments
 (0)