Skip to content

Commit 073b3bc

Browse files
author
blenbot
committed
another round of merge conflicts
2 parents 73a86d1 + 69ce26b commit 073b3bc

File tree

17 files changed

+456
-161
lines changed

17 files changed

+456
-161
lines changed

charts/kthena/charts/workload/crds/workload.serving.volcano.sh_modelservings.yaml

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -65,19 +65,6 @@ spec:
6565
RollingUpdateConfiguration defines the parameters to be used when type is RollingUpdateStrategyType.
6666
optional
6767
properties:
68-
maxSurge:
69-
anyOf:
70-
- type: integer
71-
- type: string
72-
default: 0
73-
description: |-
74-
The maximum number of replicas that can be scheduled above the original number of
75-
replicas.
76-
Value can be an absolute number (ex: 5) or a percentage of total replicas at
77-
the start of the update (ex: 10%).
78-
Absolute number is calculated from percentage by rounding up.
79-
By default, a value of 0 is used.
80-
x-kubernetes-int-or-string: true
8168
maxUnavailable:
8269
anyOf:
8370
- type: integer
@@ -87,7 +74,7 @@ spec:
8774
The maximum number of replicas that can be unavailable during the update.
8875
Value can be an absolute number (ex: 5) or a percentage of total replicas at the start of update (ex: 10%).
8976
Absolute number is calculated from percentage by rounding down.
90-
This can not be 0 if MaxSurge is 0.
77+
This can not be 0.
9178
By default, a fixed value of 1 is used.
9279
x-kubernetes-int-or-string: true
9380
partition:

client-go/applyconfiguration/workload/v1alpha1/rollingupdateconfiguration.go

Lines changed: 0 additions & 9 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

docs/kthena/docs/reference/crd/workload.serving.volcano.sh.md

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -678,8 +678,7 @@ _Appears in:_
678678

679679
| Field | Description | Default | Validation |
680680
| --- | --- | --- | --- |
681-
| `maxUnavailable` _[IntOrString](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.33/#intorstring-intstr-util)_ | The maximum number of replicas that can be unavailable during the update.<br />Value can be an absolute number (ex: 5) or a percentage of total replicas at the start of update (ex: 10%).<br />Absolute number is calculated from percentage by rounding down.<br />This can not be 0 if MaxSurge is 0.<br />By default, a fixed value of 1 is used. | 1 | XIntOrString: \{\} <br /> |
682-
| `maxSurge` _[IntOrString](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.33/#intorstring-intstr-util)_ | The maximum number of replicas that can be scheduled above the original number of<br />replicas.<br />Value can be an absolute number (ex: 5) or a percentage of total replicas at<br />the start of the update (ex: 10%).<br />Absolute number is calculated from percentage by rounding up.<br />By default, a value of 0 is used. | 0 | XIntOrString: \{\} <br /> |
681+
| `maxUnavailable` _[IntOrString](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.33/#intorstring-intstr-util)_ | The maximum number of replicas that can be unavailable during the update.<br />Value can be an absolute number (ex: 5) or a percentage of total replicas at the start of update (ex: 10%).<br />Absolute number is calculated from percentage by rounding down.<br />This can not be 0.<br />By default, a fixed value of 1 is used. | 1 | XIntOrString: \{\} <br /> |
683682
| `partition` _integer_ | Partition indicates the ordinal at which the ModelServing should be partitioned<br />for updates. During a rolling update, all ServingGroups from ordinal Replicas-1 to<br />Partition are updated. All ServingGroups from ordinal Partition-1 to 0 remain untouched.<br />The default value is 0. | | |
684683

685684

examples/kthena-router/ModelRouteLora.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,4 @@ spec:
1010
rules:
1111
- name: "lora-route"
1212
targetModels:
13-
- modelServerName: "deepseek-r1-1-5b"
13+
- modelServerName: "deepseek-r1-7b"

pkg/apis/workload/v1alpha1/model_serving_types.go

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -104,21 +104,12 @@ type RollingUpdateConfiguration struct {
104104
// The maximum number of replicas that can be unavailable during the update.
105105
// Value can be an absolute number (ex: 5) or a percentage of total replicas at the start of update (ex: 10%).
106106
// Absolute number is calculated from percentage by rounding down.
107-
// This can not be 0 if MaxSurge is 0.
107+
// This can not be 0.
108108
// By default, a fixed value of 1 is used.
109109
// +kubebuilder:validation:XIntOrString
110110
// +kubebuilder:default=1
111-
MaxUnavailable intstr.IntOrString `json:"maxUnavailable,omitempty"`
112-
113-
// The maximum number of replicas that can be scheduled above the original number of
114-
// replicas.
115-
// Value can be an absolute number (ex: 5) or a percentage of total replicas at
116-
// the start of the update (ex: 10%).
117-
// Absolute number is calculated from percentage by rounding up.
118-
// By default, a value of 0 is used.
119-
// +kubebuilder:validation:XIntOrString
120-
// +kubebuilder:default=0
121-
MaxSurge intstr.IntOrString `json:"maxSurge,omitempty"`
111+
MaxUnavailable *intstr.IntOrString `json:"maxUnavailable,omitempty"`
112+
122113
// Partition indicates the ordinal at which the ModelServing should be partitioned
123114
// for updates. During a rolling update, all ServingGroups from ordinal Replicas-1 to
124115
// Partition are updated. All ServingGroups from ordinal Partition-1 to 0 remain untouched.

pkg/apis/workload/v1alpha1/zz_generated.deepcopy.go

Lines changed: 6 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/kthena-router/backend/vllm/models.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,10 @@ func (engine *vllmEngine) GetPodModels(pod *corev1.Pod) ([]string, error) {
4141
}
4242
defer resp.Body.Close()
4343

44+
if resp.StatusCode != http.StatusOK {
45+
return nil, fmt.Errorf("failed to get models from pod %s/%s: HTTP %d", pod.GetNamespace(), pod.GetName(), resp.StatusCode)
46+
}
47+
4448
body, err := io.ReadAll(resp.Body)
4549
if err != nil {
4650
return nil, err

pkg/model-serving-controller/controller/model_serving_controller.go

Lines changed: 35 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -840,16 +840,41 @@ func (c *ModelServingController) DeleteRole(ctx context.Context, ms *workloadv1a
840840
}
841841

842842
func (c *ModelServingController) manageServingGroupRollingUpdate(ctx context.Context, ms *workloadv1alpha1.ModelServing, revision string) error {
843+
maxUnavailable, err := utils.GetMaxUnavailable(ms)
844+
if err != nil {
845+
return fmt.Errorf("failed to calculate maxUnavailable: %v", err)
846+
}
847+
843848
servingGroupList, err := c.store.GetServingGroupByModelServing(utils.GetNamespaceName(ms))
844849
if err != nil {
845850
return fmt.Errorf("cannot get ServingGroupList from store, err:%v", err)
846851
}
847852

853+
// Count how many groups are currently not running(Unavailable)
854+
currentUnavailableCount := 0
855+
for _, sg := range servingGroupList {
856+
if sg.Status != datastore.ServingGroupRunning {
857+
currentUnavailableCount++
858+
}
859+
}
860+
// Check if kthena have reached the maxUnavailable limit
861+
if currentUnavailableCount >= maxUnavailable {
862+
// Wait until some groups become available before continuing updates
863+
klog.V(4).Infof("current unavailable ServingGroup count %d has reached the maxUnavailable limit %d, waiting for next reconcile", currentUnavailableCount, maxUnavailable)
864+
return nil
865+
}
866+
// Calculate how many more groups we can delete in this reconcile.
867+
groupToDelete := maxUnavailable - currentUnavailableCount
868+
869+
// Determine if partition is set
848870
partition := c.getPartition(ms)
849871

872+
// we terminate the ServingGroup with the largest ordinal that does not match the update revision.
873+
// Update outdated groups respecting the maxUnavailable constraint
874+
updateCount := 0
850875
if partition > 0 {
851876
// When partition is set, delete ServingGroups with ordinal >= partition
852-
for i := len(servingGroupList) - 1; i >= 0; i-- {
877+
for i := len(servingGroupList) - 1; i >= 0 && updateCount < groupToDelete; i-- {
853878
_, ordinal := utils.GetParentNameAndOrdinal(servingGroupList[i].Name)
854879
if ordinal < partition {
855880
// Skip partition-protected ServingGroups
@@ -859,32 +884,24 @@ func (c *ModelServingController) manageServingGroupRollingUpdate(ctx context.Con
859884
if c.isServingGroupOutdated(servingGroupList[i], ms.Namespace, revision) {
860885
// target ServingGroup is not the latest version, needs to be updated
861886
klog.V(2).Infof("ServingGroup %s will be terminated for update (partition=%d)", servingGroupList[i].Name, partition)
862-
return c.deleteServingGroup(ctx, ms, servingGroupList[i].Name)
863-
}
864-
if servingGroupList[i].Status != datastore.ServingGroupRunning {
865-
// target ServingGroup is the latest version, but not running. We need to wait for the status to change to running.
866-
klog.V(4).Infof("waiting for the ServingGroup %s status become running", servingGroupList[i].Name)
867-
return nil
887+
if err := c.deleteServingGroup(ctx, ms, servingGroupList[i].Name); err != nil {
888+
return err
889+
}
890+
updateCount += 1
868891
}
869892
}
870893
klog.V(2).Infof("all target groups of modelServing %s have been updated (partition=%d)", ms.Name, partition)
871894
} else {
872895
// Original behavior: terminate the ServingGroup with the largest ordinal that does not match the update revision
873-
for i := len(servingGroupList) - 1; i >= 0; i-- {
896+
for i := len(servingGroupList) - 1; i >= 0 && updateCount < groupToDelete; i-- {
874897
if c.isServingGroupOutdated(servingGroupList[i], ms.Namespace, revision) {
875898
// target ServingGroup is not the latest version, needs to be updated
876899
klog.V(2).Infof("ServingGroup %s will be terminated for update", servingGroupList[i].Name)
877-
return c.deleteServingGroup(ctx, ms, servingGroupList[i].Name)
878-
}
879-
if servingGroupList[i].Status != datastore.ServingGroupRunning {
880-
// target ServingGroup is the latest version, but not running. We need to wait for the status to change to running.
881-
// If the group fails after rolling, it will automatically be deleted and rebuilt when detecting the pod failure.
882-
// If the group still pending due to reasons such as being unable to be scheduled, rolling update process will stop
883-
// to avoid affecting other groups that are running normally.
884-
klog.V(4).Infof("waiting for the ServingGroup %s status become running", servingGroupList[i].Name)
885-
return nil
900+
if err := c.deleteServingGroup(ctx, ms, servingGroupList[i].Name); err != nil {
901+
return err
902+
}
903+
updateCount += 1
886904
}
887-
// target ServingGroup is already the latest version and running, processing the rolling update of the next group.
888905
}
889906
klog.V(2).Infof("all target groups of modelServing %s have been updated", ms.Name)
890907
}

pkg/model-serving-controller/utils/utils.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
apierrors "k8s.io/apimachinery/pkg/api/errors"
3131
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3232
"k8s.io/apimachinery/pkg/types"
33+
"k8s.io/apimachinery/pkg/util/intstr"
3334
"k8s.io/client-go/kubernetes"
3435
"k8s.io/klog/v2"
3536
"k8s.io/utils/ptr"
@@ -544,3 +545,15 @@ func RoleIDIndexFunc(obj interface{}) ([]string, error) {
544545
compositeKey := fmt.Sprintf("%s/%s/%s/%s", namespace, groupName, roleName, roleID)
545546
return []string{compositeKey}, nil
546547
}
548+
549+
func GetMaxUnavailable(mi *workloadv1alpha1.ModelServing) (int, error) {
550+
maxUnavailable := intstr.FromInt(1) // Default value
551+
replicas := int(*mi.Spec.Replicas)
552+
if mi.Spec.RolloutStrategy != nil && mi.Spec.RolloutStrategy.RollingUpdateConfiguration != nil {
553+
if mi.Spec.RolloutStrategy.RollingUpdateConfiguration.MaxUnavailable != nil {
554+
maxUnavailable = *mi.Spec.RolloutStrategy.RollingUpdateConfiguration.MaxUnavailable
555+
}
556+
}
557+
// Calculate maxUnavailable as absolute numbers
558+
return intstr.GetScaledValueFromIntOrPercent(&maxUnavailable, replicas, false)
559+
}

pkg/model-serving-controller/utils/utils_test.go

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ import (
2121

2222
"github.com/stretchr/testify/assert"
2323
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
24+
"k8s.io/apimachinery/pkg/util/intstr"
25+
"k8s.io/utils/ptr"
2426

2527
workloadv1alpha1 "github.com/volcano-sh/kthena/pkg/apis/workload/v1alpha1"
2628
)
@@ -97,3 +99,145 @@ func TestSetCondition(t *testing.T) {
9799
assert.Contains(t, cond.Message, SomeGroupsAreProgressing)
98100
})
99101
}
102+
103+
func TestGetMaxUnavailable(t *testing.T) {
104+
tests := []struct {
105+
name string
106+
modelServing *workloadv1alpha1.ModelServing
107+
expectedResult int
108+
expectError bool
109+
}{
110+
{
111+
name: "Default case - no rollout strategy",
112+
modelServing: &workloadv1alpha1.ModelServing{
113+
Spec: workloadv1alpha1.ModelServingSpec{
114+
Replicas: ptr.To[int32](5),
115+
},
116+
},
117+
expectedResult: 1, // Default value
118+
expectError: false,
119+
},
120+
{
121+
name: "Default case - rollout strategy but no rolling update config",
122+
modelServing: &workloadv1alpha1.ModelServing{
123+
Spec: workloadv1alpha1.ModelServingSpec{
124+
Replicas: ptr.To[int32](10),
125+
RolloutStrategy: &workloadv1alpha1.RolloutStrategy{
126+
Type: "ServingGroupRollingUpdate",
127+
},
128+
},
129+
},
130+
expectedResult: 1, // Default value
131+
expectError: false,
132+
},
133+
{
134+
name: "MaxUnavailable as integer - value 2",
135+
modelServing: &workloadv1alpha1.ModelServing{
136+
Spec: workloadv1alpha1.ModelServingSpec{
137+
Replicas: ptr.To[int32](10),
138+
RolloutStrategy: &workloadv1alpha1.RolloutStrategy{
139+
Type: "ServingGroupRollingUpdate",
140+
RollingUpdateConfiguration: &workloadv1alpha1.RollingUpdateConfiguration{
141+
MaxUnavailable: ptr.To(intstr.FromInt(2)),
142+
},
143+
},
144+
},
145+
},
146+
expectedResult: 2,
147+
expectError: false,
148+
},
149+
{
150+
name: "MaxUnavailable as integer - value 0",
151+
modelServing: &workloadv1alpha1.ModelServing{
152+
Spec: workloadv1alpha1.ModelServingSpec{
153+
Replicas: ptr.To[int32](5),
154+
RolloutStrategy: &workloadv1alpha1.RolloutStrategy{
155+
Type: "ServingGroupRollingUpdate",
156+
RollingUpdateConfiguration: &workloadv1alpha1.RollingUpdateConfiguration{
157+
MaxUnavailable: ptr.To(intstr.FromInt(0)),
158+
},
159+
},
160+
},
161+
},
162+
expectedResult: 0,
163+
expectError: false,
164+
},
165+
{
166+
name: "MaxUnavailable as percentage - 20%",
167+
modelServing: &workloadv1alpha1.ModelServing{
168+
Spec: workloadv1alpha1.ModelServingSpec{
169+
Replicas: ptr.To[int32](10),
170+
RolloutStrategy: &workloadv1alpha1.RolloutStrategy{
171+
Type: "ServingGroupRollingUpdate",
172+
RollingUpdateConfiguration: &workloadv1alpha1.RollingUpdateConfiguration{
173+
MaxUnavailable: ptr.To(intstr.FromString("20%")),
174+
},
175+
},
176+
},
177+
},
178+
expectedResult: 2, // 20% of 10 is 2
179+
expectError: false,
180+
},
181+
{
182+
name: "MaxUnavailable as percentage - 50%",
183+
modelServing: &workloadv1alpha1.ModelServing{
184+
Spec: workloadv1alpha1.ModelServingSpec{
185+
Replicas: ptr.To[int32](9),
186+
RolloutStrategy: &workloadv1alpha1.RolloutStrategy{
187+
Type: "ServingGroupRollingUpdate",
188+
RollingUpdateConfiguration: &workloadv1alpha1.RollingUpdateConfiguration{
189+
MaxUnavailable: ptr.To(intstr.FromString("50%")),
190+
},
191+
},
192+
},
193+
},
194+
expectedResult: 4, // 50% of 9 is 4.5, rounded down to 4
195+
expectError: false,
196+
},
197+
{
198+
name: "MaxUnavailable as percentage - 100%",
199+
modelServing: &workloadv1alpha1.ModelServing{
200+
Spec: workloadv1alpha1.ModelServingSpec{
201+
Replicas: ptr.To[int32](3),
202+
RolloutStrategy: &workloadv1alpha1.RolloutStrategy{
203+
Type: "ServingGroupRollingUpdate",
204+
RollingUpdateConfiguration: &workloadv1alpha1.RollingUpdateConfiguration{
205+
MaxUnavailable: ptr.To(intstr.FromString("100%")),
206+
},
207+
},
208+
},
209+
},
210+
expectedResult: 3, // 100% of 3 is 3
211+
expectError: false,
212+
},
213+
{
214+
name: "MaxUnavailable as percentage - 0%",
215+
modelServing: &workloadv1alpha1.ModelServing{
216+
Spec: workloadv1alpha1.ModelServingSpec{
217+
Replicas: ptr.To[int32](10),
218+
RolloutStrategy: &workloadv1alpha1.RolloutStrategy{
219+
Type: "ServingGroupRollingUpdate",
220+
RollingUpdateConfiguration: &workloadv1alpha1.RollingUpdateConfiguration{
221+
MaxUnavailable: ptr.To(intstr.FromString("0%")),
222+
},
223+
},
224+
},
225+
},
226+
expectedResult: 0, // 0% of 10 is 0
227+
expectError: false,
228+
},
229+
}
230+
231+
for _, tt := range tests {
232+
t.Run(tt.name, func(t *testing.T) {
233+
result, err := GetMaxUnavailable(tt.modelServing)
234+
235+
if tt.expectError {
236+
assert.Error(t, err)
237+
} else {
238+
assert.NoError(t, err)
239+
assert.Equal(t, tt.expectedResult, result)
240+
}
241+
})
242+
}
243+
}

0 commit comments

Comments
 (0)