Skip to content

Commit 04db6e9

Browse files
Fix streamed request being called one final time after request complete, add predictor check to the beginning of each requestcontrol hook
1 parent 39aed1a commit 04db6e9

File tree

2 files changed

+19
-2
lines changed

2 files changed

+19
-2
lines changed

pkg/epp/requestcontrol/director.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -307,8 +307,9 @@ func (d *Director) HandleResponseBodyStreaming(ctx context.Context, reqCtx *hand
307307
logger := log.FromContext(ctx).WithValues("stage", "bodyChunk")
308308
logger.V(logutil.TRACE).Info("Entering HandleResponseBodyChunk")
309309
response := &Response{
310-
RequestId: reqCtx.Request.Headers[requtil.RequestIdHeaderKey],
311-
Headers: reqCtx.Response.Headers,
310+
RequestId: reqCtx.Request.Headers[requtil.RequestIdHeaderKey],
311+
Headers: reqCtx.Response.Headers,
312+
EndOfStream: reqCtx.ResponseComplete,
312313
}
313314

314315
d.runResponseStreamingPlugins(ctx, reqCtx.SchedulingRequest, response, reqCtx.TargetPod)

pkg/epp/scheduling/framework/plugins/multi/slo_aware_router/requestcontrol_hooks.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,11 @@ func (t *SLOAwareRouter) PreRequest(ctx context.Context, request *schedulingtype
114114
}
115115

116116
targetPod := schedulingResult.ProfileResults[schedulingResult.PrimaryProfileName].TargetPods[0].GetPod()
117+
<<<<<<< HEAD
117118
if !t.checkPredictor(logger, targetPod) {
119+
=======
120+
if !t.CheckPredictor(logger, targetPod) {
121+
>>>>>>> b2a7d45 (Fix streamed request being called one final time after request complete, add predictor check to the beginning of each requestcontrol hook)
118122
return
119123
}
120124

@@ -157,7 +161,11 @@ func (t *SLOAwareRouter) PreRequest(ctx context.Context, request *schedulingtype
157161

158162
func (t *SLOAwareRouter) ResponseReceived(ctx context.Context, request *schedulingtypes.LLMRequest, response *requestcontrol.Response, targetPod *backend.Pod) {
159163
logger := log.FromContext(ctx)
164+
<<<<<<< HEAD
160165
if !t.checkPredictor(logger, targetPod) {
166+
=======
167+
if !t.CheckPredictor(logger, targetPod) {
168+
>>>>>>> b2a7d45 (Fix streamed request being called one final time after request complete, add predictor check to the beginning of each requestcontrol hook)
161169
return
162170
}
163171

@@ -169,15 +177,23 @@ func (t *SLOAwareRouter) ResponseReceived(ctx context.Context, request *scheduli
169177
return
170178
}
171179

180+
<<<<<<< HEAD
172181
if err := processHeaderForLatencyPrediction(ctx, t.latencypredictor, sloCtx); err != nil {
182+
=======
183+
if err := ProcessHeaderForLatencyPrediction(ctx, t.latencypredictor, sloCtx); err != nil {
184+
>>>>>>> b2a7d45 (Fix streamed request being called one final time after request complete, add predictor check to the beginning of each requestcontrol hook)
173185
logger.V(logutil.DEBUG).Error(err, "ProcessHeader in latencypredictor failed")
174186
}
175187

176188
}
177189

178190
func (t *SLOAwareRouter) ResponseStreaming(ctx context.Context, request *schedulingtypes.LLMRequest, response *requestcontrol.Response, pod *backend.Pod) {
179191
logger := log.FromContext(ctx)
192+
<<<<<<< HEAD
180193
if !t.checkPredictor(logger, pod) || response.EndOfStream {
194+
=======
195+
if !t.CheckPredictor(logger, pod) || response.EndOfStream {
196+
>>>>>>> b2a7d45 (Fix streamed request being called one final time after request complete, add predictor check to the beginning of each requestcontrol hook)
181197
return
182198
}
183199

0 commit comments

Comments
 (0)