Skip to content

Commit 4fa525d

Browse files
authored
Apply shedding upon saturation for priority below 0 (#1361)
* Apply sheding upon saturation for priority below 0 * Update log line
1 parent 131eddc commit 4fa525d

File tree

3 files changed

+17
-35
lines changed

3 files changed

+17
-35
lines changed

pkg/epp/requestcontrol/director.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -164,19 +164,17 @@ func (d *Director) admitRequest(ctx context.Context, requestPriority int, fairne
164164
logger.V(logutil.TRACE).Info("Entering Flow Control", "priority", requestPriority, "fairnessID", fairnessID)
165165

166166
// This will be removed in favor of a more robust implementation (Flow Control) in the very near future.
167-
// For now we will keep similar behavior to the previous implementation.
168167
// TODO: Make this a configurable value.
169168
// Tracking issue https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/1347
170-
if requestPriority >= 2 {
171-
logger.V(logutil.DEBUG).Info("Critical request bypassing saturation check.")
169+
if requestPriority >= 0 {
170+
logger.V(logutil.TRACE).Info("Non-sheddable request bypassing saturation check.")
172171
return nil
173172
}
174173

175-
logger.V(logutil.DEBUG).Info("Performing saturation check for non-critical request.")
176174
if d.saturationDetector.IsSaturated(ctx) { // Assuming non-nil Saturation Detector
177175
return errutil.Error{
178176
Code: errutil.InferencePoolResourceExhausted,
179-
Msg: "system saturated, non-critical request dropped",
177+
Msg: "system saturated, sheddable request dropped",
180178
}
181179
}
182180

pkg/epp/requestcontrol/director_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ func TestDirector_HandleRequest(t *testing.T) {
8484
ObjRef()
8585
ioFoodReviewSheddable := testutil.MakeInferenceObjective("imFoodReviewSheddable").
8686
CreationTimestamp(metav1.Unix(1000, 0)).
87-
Priority(0).
87+
Priority(-1).
8888
ObjRef()
8989
ioFoodReviewResolve := testutil.MakeInferenceObjective("imFoodReviewResolve").
9090
CreationTimestamp(metav1.Unix(1000, 0)).
@@ -201,7 +201,7 @@ func TestDirector_HandleRequest(t *testing.T) {
201201
targetModelName: model,
202202
},
203203
{
204-
name: "successful chat completions request (critical, saturation ignored)",
204+
name: "successful chat completions request (default critical, saturation ignored)",
205205
reqBodyMap: map[string]any{
206206
"model": model,
207207
"messages": []any{
@@ -211,21 +211,20 @@ func TestDirector_HandleRequest(t *testing.T) {
211211
},
212212
},
213213
},
214+
mockSaturationDetector: &mockSaturationDetector{isSaturated: true},
214215
schedulerMockSetup: func(m *mockScheduler) {
215216
m.scheduleResults = defaultSuccessfulScheduleResults
216217
},
217218
wantReqCtx: &handlers.RequestContext{
218-
ObjectiveKey: objectiveName,
219219
TargetModelName: model,
220220
TargetPod: &backend.Pod{
221221
NamespacedName: types.NamespacedName{Namespace: "default", Name: "pod1"},
222222
Address: "192.168.1.100",
223223
},
224224
TargetEndpoint: "192.168.1.100:8000,192.168.2.100:8000,192.168.4.100:8000",
225225
},
226-
wantMutatedBodyModel: model,
227-
inferenceObjectiveName: objectiveName,
228-
targetModelName: model,
226+
wantMutatedBodyModel: model,
227+
targetModelName: model,
229228
},
230229
{
231230
name: "successful chat completions request with multiple messages (critical, saturation ignored)",
@@ -334,6 +333,7 @@ func TestDirector_HandleRequest(t *testing.T) {
334333
"model": modelSheddable,
335334
"prompt": "sheddable prompt",
336335
},
336+
inferenceObjectiveName: objectiveNameSheddable,
337337
mockSaturationDetector: &mockSaturationDetector{isSaturated: true},
338338
wantErrCode: errutil.InferencePoolResourceExhausted,
339339
},

test/integration/epp/hermetic_test.go

Lines changed: 8 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -282,42 +282,26 @@ func TestFullDuplexStreamed_KubeInferenceObjectiveRequest(t *testing.T) {
282282
),
283283
},
284284
{
285-
name: "noncritical and all models past threshold, shed request",
286-
requests: integrationutils.GenerateStreamedRequestSet(logger, "test4", modelSheddable, modelSQLLoraTarget, nil),
285+
name: "don't shed requests by default",
286+
requests: integrationutils.GenerateStreamedRequestSet(logger, "test4", modelSQLLora, modelSQLLoraTarget, nil),
287287
// pod 0: excluded; above queue size threshold
288288
// pod 1: excluded; above KV cache threshold
289289
// pod 2: excluded; above queue size threshold
290290
pods: newPodStates(
291-
podState{index: 0, queueSize: 6, kvCacheUsage: 0.2, activeModels: []string{"foo", "bar", modelSheddableTarget}},
292-
podState{index: 1, queueSize: 0, kvCacheUsage: 0.85, activeModels: []string{"foo", modelSheddableTarget}},
293-
podState{index: 2, queueSize: 10, kvCacheUsage: 0.9, activeModels: []string{"foo", modelSheddableTarget}},
294-
),
295-
wantErr: false,
296-
wantMetrics: map[string]string{},
297-
wantResponses: integrationutils.NewImmediateErrorResponse(
298-
envoyTypePb.StatusCode_TooManyRequests,
299-
"inference gateway: InferencePoolResourceExhausted - system saturated, non-critical request dropped",
300-
),
301-
},
302-
{
303-
name: "noncritical, but one server has capacity, do not shed",
304-
requests: integrationutils.GenerateStreamedRequestSet(logger, "test5", modelSheddable, modelSheddableTarget, nil),
305-
// Pod 1 will be picked because it has relatively low queue size and low KV cache.
306-
pods: newPodStates(
307-
podState{index: 0, queueSize: 4, kvCacheUsage: 0.2, activeModels: []string{"foo", "bar", modelSheddableTarget}},
308-
podState{index: 1, queueSize: 4, kvCacheUsage: 0.85, activeModels: []string{"foo", modelSheddableTarget}},
309-
podState{index: 2, queueSize: 10, kvCacheUsage: 0.9, activeModels: []string{"foo", modelSheddableTarget}},
291+
podState{index: 0, queueSize: 6, kvCacheUsage: 0.2, activeModels: []string{"foo", "bar", modelSQLLoraTarget}},
292+
podState{index: 1, queueSize: 0, kvCacheUsage: 0.85, activeModels: []string{"foo"}},
293+
podState{index: 2, queueSize: 10, kvCacheUsage: 0.9, activeModels: []string{"foo"}},
310294
),
311295
wantMetrics: map[string]string{
312296
"inference_model_request_total": inferenceObjectiveRequestTotal([]label{
313-
{"model_name", modelSheddable},
314-
{"target_model_name", modelSheddableTarget},
297+
{"model_name", modelSQLLora},
298+
{"target_model_name", modelSQLLoraTarget},
315299
}),
316300
},
317301
wantErr: false,
318302
wantResponses: integrationutils.NewRequestBufferedResponse(
319303
"192.168.1.1:8000",
320-
fmt.Sprintf(`{"max_tokens":100,"model":%q,"prompt":"test5","temperature":0}`, modelSheddableTarget),
304+
fmt.Sprintf(`{"max_tokens":100,"model":%q,"prompt":"test4","temperature":0}`, modelSQLLoraTarget),
321305
&configPb.HeaderValueOption{
322306
Header: &configPb.HeaderValue{
323307
Key: "hi",

0 commit comments

Comments
 (0)