Skip to content

Commit 59ea497

Browse files
Merge pull request #714 from WHOIM1205/fix/pd-scheduler-panic-empty-prefill-pods
Fix panic in PD scheduler when no prefill pods are available
2 parents e55557a + 82170fa commit 59ea497

File tree

2 files changed

+321
-0
lines changed

2 files changed

+321
-0
lines changed

pkg/kthena-router/scheduler/scheduler_impl.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,11 @@ func (s *SchedulerImpl) Schedule(ctx *framework.Context, pods []*datastore.PodIn
141141
klog.V(4).Info("Running score plugins for prefill pod")
142142
scores = s.RunScorePlugins(selectedPods, ctx)
143143
bestPrefillPod := TopNPodInfos(scores, 1)
144+
if len(bestPrefillPod) == 0 {
145+
klog.V(4).InfoS("no valid prefill pods after scoring, skipping",
146+
"decode instance", klog.KObj(decodePod.Pod))
147+
continue
148+
}
144149
prefillPods[i] = bestPrefillPod[0]
145150
}
146151
ctx.PrefillPods = prefillPods
Lines changed: 316 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,316 @@
1+
/*
2+
Copyright The Volcano Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package scheduler
18+
19+
import (
20+
"testing"
21+
22+
"github.com/stretchr/testify/assert"
23+
"github.com/stretchr/testify/require"
24+
corev1 "k8s.io/api/core/v1"
25+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
26+
"k8s.io/apimachinery/pkg/types"
27+
28+
aiv1alpha1 "github.com/volcano-sh/kthena/pkg/apis/networking/v1alpha1"
29+
"github.com/volcano-sh/kthena/pkg/kthena-router/datastore"
30+
"github.com/volcano-sh/kthena/pkg/kthena-router/scheduler/framework"
31+
)
32+
33+
// TestTopNPodInfos tests the TopNPodInfos function
34+
func TestTopNPodInfos(t *testing.T) {
35+
tests := []struct {
36+
name string
37+
scores map[*datastore.PodInfo]int
38+
n int
39+
expected int
40+
}{
41+
{
42+
name: "empty scores map returns empty slice",
43+
scores: map[*datastore.PodInfo]int{},
44+
n: 1,
45+
expected: 0,
46+
},
47+
{
48+
name: "nil scores map returns empty slice",
49+
scores: nil,
50+
n: 1,
51+
expected: 0,
52+
},
53+
{
54+
name: "single pod with n=1",
55+
scores: map[*datastore.PodInfo]int{
56+
createTestPodInfo("pod1"): 100,
57+
},
58+
n: 1,
59+
expected: 1,
60+
},
61+
{
62+
name: "multiple pods with n greater than available",
63+
scores: map[*datastore.PodInfo]int{
64+
createTestPodInfo("pod1"): 100,
65+
createTestPodInfo("pod2"): 50,
66+
},
67+
n: 5,
68+
expected: 2,
69+
},
70+
{
71+
name: "multiple pods returns top n by score",
72+
scores: map[*datastore.PodInfo]int{
73+
createTestPodInfo("pod1"): 100,
74+
createTestPodInfo("pod2"): 50,
75+
createTestPodInfo("pod3"): 75,
76+
},
77+
n: 2,
78+
expected: 2,
79+
},
80+
{
81+
name: "n=0 returns empty slice",
82+
scores: map[*datastore.PodInfo]int{
83+
createTestPodInfo("pod1"): 100,
84+
},
85+
n: 0,
86+
expected: 0,
87+
},
88+
}
89+
90+
for _, tt := range tests {
91+
t.Run(tt.name, func(t *testing.T) {
92+
result := TopNPodInfos(tt.scores, tt.n)
93+
assert.Equal(t, tt.expected, len(result))
94+
})
95+
}
96+
}
97+
98+
// TestTopNPodInfosOrdering verifies that TopNPodInfos returns pods in descending score order
99+
func TestTopNPodInfosOrdering(t *testing.T) {
100+
pod1 := createTestPodInfo("pod1")
101+
pod2 := createTestPodInfo("pod2")
102+
pod3 := createTestPodInfo("pod3")
103+
104+
scores := map[*datastore.PodInfo]int{
105+
pod1: 50,
106+
pod2: 100,
107+
pod3: 75,
108+
}
109+
110+
result := TopNPodInfos(scores, 3)
111+
112+
require.Equal(t, 3, len(result))
113+
// Verify ordering: highest score first
114+
assert.Equal(t, "pod2", result[0].Pod.Name)
115+
assert.Equal(t, "pod3", result[1].Pod.Name)
116+
assert.Equal(t, "pod1", result[2].Pod.Name)
117+
}
118+
119+
// TestSchedulePDGroup uses table-driven tests to validate PD scheduling behavior.
120+
// It covers both graceful degradation (no prefill pods) and the happy path (valid prefill pod).
121+
func TestSchedulePDGroup(t *testing.T) {
122+
tests := []struct {
123+
name string
124+
includePrefillPod bool
125+
expectedDecodePodCount int
126+
expectedPrefillCount int
127+
expectPrefillNil bool
128+
expectedPrefillPodName string
129+
}{
130+
{
131+
name: "empty prefill scores - graceful degradation with nil prefill pod",
132+
includePrefillPod: false,
133+
expectedDecodePodCount: 1,
134+
expectedPrefillCount: 1,
135+
expectPrefillNil: true,
136+
expectedPrefillPodName: "",
137+
},
138+
{
139+
name: "valid prefill pod selected - happy path",
140+
includePrefillPod: true,
141+
expectedDecodePodCount: 1,
142+
expectedPrefillCount: 1,
143+
expectPrefillNil: false,
144+
expectedPrefillPodName: "prefill-pod-0",
145+
},
146+
}
147+
148+
for _, tt := range tests {
149+
t.Run(tt.name, func(t *testing.T) {
150+
// Create a mock store
151+
store := datastore.New()
152+
153+
// Create model server with PDGroup configuration
154+
modelServer := &aiv1alpha1.ModelServer{
155+
ObjectMeta: metav1.ObjectMeta{
156+
Name: "test-model-server",
157+
Namespace: "default",
158+
},
159+
Spec: aiv1alpha1.ModelServerSpec{
160+
WorkloadSelector: &aiv1alpha1.WorkloadSelector{
161+
PDGroup: &aiv1alpha1.PDGroup{
162+
GroupKey: "pd-group",
163+
DecodeLabels: map[string]string{"role": "decode"},
164+
PrefillLabels: map[string]string{"role": "prefill"},
165+
},
166+
},
167+
},
168+
}
169+
170+
// Add model server to store
171+
modelServerName := types.NamespacedName{Namespace: "default", Name: "test-model-server"}
172+
err := store.AddOrUpdateModelServer(modelServer, nil)
173+
require.NoError(t, err)
174+
175+
// Add decode pod
176+
decodePod := &corev1.Pod{
177+
ObjectMeta: metav1.ObjectMeta{
178+
Name: "decode-pod-0",
179+
Namespace: "default",
180+
Labels: map[string]string{
181+
"pd-group": "group-1",
182+
"role": "decode",
183+
},
184+
},
185+
Status: corev1.PodStatus{
186+
PodIP: "10.0.0.1",
187+
},
188+
}
189+
err = store.AddOrUpdatePod(decodePod, []*aiv1alpha1.ModelServer{modelServer})
190+
require.NoError(t, err)
191+
192+
// Conditionally add prefill pod
193+
if tt.includePrefillPod {
194+
prefillPod := &corev1.Pod{
195+
ObjectMeta: metav1.ObjectMeta{
196+
Name: "prefill-pod-0",
197+
Namespace: "default",
198+
Labels: map[string]string{
199+
"pd-group": "group-1",
200+
"role": "prefill",
201+
},
202+
},
203+
Status: corev1.PodStatus{
204+
PodIP: "10.0.0.2",
205+
},
206+
}
207+
err = store.AddOrUpdatePod(prefillPod, []*aiv1alpha1.ModelServer{modelServer})
208+
require.NoError(t, err)
209+
}
210+
211+
// Create scheduler with minimal configuration
212+
scheduler := NewScheduler(store, nil).(*SchedulerImpl)
213+
214+
// Create scheduling context with PDGroup enabled
215+
ctx := &framework.Context{
216+
ModelServerName: modelServerName,
217+
PDGroup: &aiv1alpha1.PDGroup{
218+
GroupKey: "pd-group",
219+
DecodeLabels: map[string]string{"role": "decode"},
220+
PrefillLabels: map[string]string{"role": "prefill"},
221+
},
222+
}
223+
224+
// Get pods for scheduling
225+
pods, err := store.GetPodsByModelServer(modelServerName)
226+
require.NoError(t, err)
227+
228+
// Schedule should complete without error
229+
err = scheduler.Schedule(ctx, pods)
230+
require.NoError(t, err)
231+
232+
// Verify decode pod count
233+
require.Len(t, ctx.DecodePods, tt.expectedDecodePodCount, "unexpected decode pod count")
234+
235+
// Verify prefill pod count
236+
require.Len(t, ctx.PrefillPods, tt.expectedPrefillCount, "unexpected prefill pod count")
237+
238+
// Verify prefill pod nil/non-nil status
239+
if tt.expectPrefillNil {
240+
assert.Nil(t, ctx.PrefillPods[0], "expected nil prefill pod for graceful degradation")
241+
} else {
242+
require.NotNil(t, ctx.PrefillPods[0], "expected non-nil prefill pod")
243+
assert.Equal(t, tt.expectedPrefillPodName, ctx.PrefillPods[0].Pod.Name, "unexpected prefill pod name")
244+
}
245+
})
246+
}
247+
}
248+
249+
// TestScheduleNonPDGroupWithEmptyScores tests non-PD scheduling with empty scores
250+
func TestScheduleNonPDGroupWithEmptyScores(t *testing.T) {
251+
store := datastore.New()
252+
scheduler := NewScheduler(store, nil).(*SchedulerImpl)
253+
254+
ctx := &framework.Context{
255+
ModelServerName: types.NamespacedName{Namespace: "default", Name: "test"},
256+
PDGroup: nil, // Non-PD scheduling
257+
}
258+
259+
// Empty pods slice
260+
pods := []*datastore.PodInfo{}
261+
262+
// This will return an error from filter plugins (all filtered out)
263+
// but should not panic
264+
assert.NotPanics(t, func() {
265+
_ = scheduler.Schedule(ctx, pods)
266+
})
267+
}
268+
269+
// TestRunScorePluginsEdgeCases uses table-driven tests to validate RunScorePlugins
270+
// handles empty and nil pods gracefully without panicking.
271+
func TestRunScorePluginsEdgeCases(t *testing.T) {
272+
tests := []struct {
273+
name string
274+
pods []*datastore.PodInfo
275+
}{
276+
{
277+
name: "empty pods slice",
278+
pods: []*datastore.PodInfo{},
279+
},
280+
{
281+
name: "nil pods slice",
282+
pods: nil,
283+
},
284+
}
285+
286+
for _, tt := range tests {
287+
t.Run(tt.name, func(t *testing.T) {
288+
store := datastore.New()
289+
scheduler := NewScheduler(store, nil).(*SchedulerImpl)
290+
291+
ctx := &framework.Context{
292+
ModelServerName: types.NamespacedName{Namespace: "default", Name: "test"},
293+
}
294+
295+
// Should return empty map without panic
296+
result := scheduler.RunScorePlugins(tt.pods, ctx)
297+
assert.NotNil(t, result)
298+
assert.Equal(t, 0, len(result))
299+
})
300+
}
301+
}
302+
303+
// Helper function to create test PodInfo
304+
func createTestPodInfo(name string) *datastore.PodInfo {
305+
return &datastore.PodInfo{
306+
Pod: &corev1.Pod{
307+
ObjectMeta: metav1.ObjectMeta{
308+
Name: name,
309+
Namespace: "default",
310+
},
311+
Status: corev1.PodStatus{
312+
PodIP: "10.0.0.1",
313+
},
314+
},
315+
}
316+
}

0 commit comments

Comments
 (0)