@@ -67,9 +67,10 @@ import (
67
67
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/saturationdetector"
68
68
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling"
69
69
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework"
70
- "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/filter "
70
+ "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/multi/prefix "
71
71
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/picker"
72
72
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/profile"
73
+ "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/scorer"
73
74
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/server"
74
75
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
75
76
epptestutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/testing"
@@ -249,15 +250,15 @@ func TestFullDuplexStreamed_KubeInferenceObjectiveRequest(t *testing.T) {
249
250
),
250
251
},
251
252
{
252
- name : "select no lora despite active model, avoid excessive queue size " ,
253
+ name : "select lora despite higher kv cache usage " ,
253
254
requests : integrationutils .GenerateStreamedRequestSet (logger , "test3" , modelSQLLora , nil ),
254
255
// Pod 2 will be picked despite NOT having the requested model active as it is above the affinity for queue size.
255
256
// Also it is critical, so we should still admit the request despite all queue sizes being greater than the queue
256
257
// size threshold.
257
258
pods : newPodStates (
258
259
podState {index : 0 , queueSize : 10 , kvCacheUsage : 0.2 , activeModels : []string {"foo" , "bar" }},
259
- podState {index : 1 , queueSize : 200 , kvCacheUsage : 0.1 , activeModels : []string {"foo" , modelSQLLoraTarget }},
260
- podState {index : 2 , queueSize : 6 , kvCacheUsage : 0.2 , activeModels : []string {"foo" }},
260
+ podState {index : 1 , queueSize : 10 , kvCacheUsage : 0.4 , activeModels : []string {"foo" , modelSQLLoraTarget }},
261
+ podState {index : 2 , queueSize : 10 , kvCacheUsage : 0.3 , activeModels : []string {"foo" }},
261
262
),
262
263
wantMetrics : map [string ]string {
263
264
"inference_model_request_total" : inferenceObjectiveRequestTotal ([]label {
@@ -267,7 +268,7 @@ func TestFullDuplexStreamed_KubeInferenceObjectiveRequest(t *testing.T) {
267
268
},
268
269
wantErr : false ,
269
270
wantResponses : integrationutils .NewRequestBufferedResponse (
270
- "192.168.1.3 :8000" ,
271
+ "192.168.1.2 :8000" ,
271
272
fmt .Sprintf (`{"max_tokens":100,"model":%q,"prompt":"test3","temperature":0}` , modelSQLLoraTarget ),
272
273
& configPb.HeaderValueOption {
273
274
Header : & configPb.HeaderValue {
@@ -301,7 +302,7 @@ func TestFullDuplexStreamed_KubeInferenceObjectiveRequest(t *testing.T) {
301
302
// Pod 1 will be picked because it has relatively low queue size and low KV cache.
302
303
pods : newPodStates (
303
304
podState {index : 0 , queueSize : 4 , kvCacheUsage : 0.2 , activeModels : []string {"foo" , "bar" , modelSheddableTarget }},
304
- podState {index : 1 , queueSize : 0 , kvCacheUsage : 0.85 , activeModels : []string {"foo" , modelSheddableTarget }},
305
+ podState {index : 1 , queueSize : 4 , kvCacheUsage : 0.85 , activeModels : []string {"foo" , modelSheddableTarget }},
305
306
podState {index : 2 , queueSize : 10 , kvCacheUsage : 0.9 , activeModels : []string {"foo" , modelSheddableTarget }},
306
307
),
307
308
wantMetrics : map [string ]string {
@@ -312,7 +313,7 @@ func TestFullDuplexStreamed_KubeInferenceObjectiveRequest(t *testing.T) {
312
313
},
313
314
wantErr : false ,
314
315
wantResponses : integrationutils .NewRequestBufferedResponse (
315
- "192.168.1.2 :8000" ,
316
+ "192.168.1.1 :8000" ,
316
317
fmt .Sprintf (`{"max_tokens":100,"model":%q,"prompt":"test5","temperature":0}` , modelSheddableTarget ),
317
318
& configPb.HeaderValueOption {
318
319
Header : & configPb.HeaderValue {
@@ -353,7 +354,7 @@ func TestFullDuplexStreamed_KubeInferenceObjectiveRequest(t *testing.T) {
353
354
// Pod 1 will be picked because it has relatively low queue size and low KV cache.
354
355
pods : newPodStates (
355
356
podState {index : 0 , queueSize : 4 , kvCacheUsage : 0.2 , activeModels : []string {"foo" , "bar" , modelSheddableTarget }},
356
- podState {index : 1 , queueSize : 0 , kvCacheUsage : 0.85 , activeModels : []string {"foo" , modelSheddableTarget }},
357
+ podState {index : 1 , queueSize : 4 , kvCacheUsage : 0.85 , activeModels : []string {"foo" , modelSheddableTarget }},
357
358
podState {index : 2 , queueSize : 10 , kvCacheUsage : 0.9 , activeModels : []string {"foo" , modelSheddableTarget }},
358
359
),
359
360
wantMetrics : map [string ]string {
@@ -364,7 +365,7 @@ func TestFullDuplexStreamed_KubeInferenceObjectiveRequest(t *testing.T) {
364
365
},
365
366
wantErr : false ,
366
367
wantResponses : integrationutils .NewRequestBufferedResponse (
367
- "192.168.1.2 :8000" ,
368
+ "192.168.1.1 :8000" ,
368
369
fmt .Sprintf (`{"max_tokens":100,"model":%q,"prompt":"test6","temperature":0}` , modelSheddableTarget ),
369
370
& configPb.HeaderValueOption {
370
371
Header : & configPb.HeaderValue {
@@ -402,9 +403,7 @@ func TestFullDuplexStreamed_KubeInferenceObjectiveRequest(t *testing.T) {
402
403
},
403
404
},
404
405
},
405
- // pod 0: selected
406
- // pod 1: excluded; above KV cache threshold
407
- // pod 2: excluded; above queue size threshold
406
+ // pod 0: selected due to low queue size and kv cache usage
408
407
pods : newPodStates (
409
408
podState {index : 0 , queueSize : 4 , kvCacheUsage : 0.2 , activeModels : []string {"foo" , "bar" , modelSheddableTarget }},
410
409
podState {index : 1 , queueSize : 0 , kvCacheUsage : 0.85 , activeModels : []string {"foo" , modelSheddableTarget }},
@@ -418,7 +417,7 @@ func TestFullDuplexStreamed_KubeInferenceObjectiveRequest(t *testing.T) {
418
417
},
419
418
wantErr : false ,
420
419
wantResponses : integrationutils .NewRequestBufferedResponse (
421
- "192.168.1.2 :8000" ,
420
+ "192.168.1.1 :8000" ,
422
421
fmt .Sprintf (`{"max_tokens":100,"model":%q,"prompt":"test6","temperature":0}` , modelDirect ),
423
422
& configPb.HeaderValueOption {
424
423
Header : & configPb.HeaderValue {
@@ -1090,35 +1089,18 @@ func BeforeSuite() func() {
1090
1089
serverRunner .PoolNamespacedName = types.NamespacedName {Name : testPoolName , Namespace : testNamespace }
1091
1090
serverRunner .Datastore = datastore .NewDatastore (context .Background (), pmf )
1092
1091
1093
- loraAffinityFilter := filter .NewLoraAffinityFilter (filter .DefaultLoraAffinityThreshold )
1094
- leastQueueFilter := filter .NewLeastQueueFilter ()
1095
- leastKvCacheFilter := filter .NewLeastKVCacheFilter ()
1096
-
1097
- lowLatencyFilter := & filter.DecisionTreeFilter {
1098
- Current : filter .NewLowQueueFilter (filter .DefaultQueueingThresholdLoRA ),
1099
- NextOnSuccess : & filter.DecisionTreeFilter {
1100
- Current : loraAffinityFilter ,
1101
- NextOnSuccessOrFailure : & filter.DecisionTreeFilter {
1102
- Current : leastQueueFilter ,
1103
- NextOnSuccessOrFailure : & filter.DecisionTreeFilter {
1104
- Current : leastKvCacheFilter ,
1105
- },
1106
- },
1107
- },
1108
- NextOnFailure : & filter.DecisionTreeFilter {
1109
- Current : leastQueueFilter ,
1110
- NextOnSuccessOrFailure : & filter.DecisionTreeFilter {
1111
- Current : loraAffinityFilter ,
1112
- NextOnSuccessOrFailure : & filter.DecisionTreeFilter {
1113
- Current : leastKvCacheFilter ,
1114
- },
1115
- },
1116
- },
1117
- }
1092
+ kvCacheUtilizationScorer := scorer .NewKVCacheUtilizationScorer ()
1093
+ queueingScorer := scorer .NewQueueScorer ()
1094
+ prefixCacheScorer := prefix .New (prefix .DefaultConfig )
1095
+ loraAffinityScorer := scorer .NewLoraAffinityScorer ()
1118
1096
1119
1097
defaultProfile := framework .NewSchedulerProfile ().
1120
- WithFilters (lowLatencyFilter ).
1121
- WithPicker (picker .NewRandomPicker (picker .DefaultMaxNumOfEndpoints ))
1098
+ WithScorers (framework .NewWeightedScorer (kvCacheUtilizationScorer , 1 ),
1099
+ framework .NewWeightedScorer (queueingScorer , 1 ),
1100
+ framework .NewWeightedScorer (prefixCacheScorer , 1 ),
1101
+ framework .NewWeightedScorer (loraAffinityScorer , 1 ),
1102
+ ).
1103
+ WithPicker (picker .NewMaxScorePicker (picker .DefaultMaxNumOfEndpoints ))
1122
1104
1123
1105
profileHandler := profile .NewSingleProfileHandler ()
1124
1106
0 commit comments