Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 31 additions & 25 deletions pkg/epp/handlers/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,37 @@ import (
func (s *StreamingServer) HandleRequestHeaders(reqCtx *RequestContext, req *extProcPb.ProcessingRequest_RequestHeaders) error {
reqCtx.RequestReceivedTimestamp = time.Now()

// Headers must be processed first to populate the request context as subsequent logic (like body processing) may
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not following, where in this function do we use the reqCtx that would require moving this logic earlier? it is fine to move it up, but wanted to double check where the bug is in this specific case.

// depend upon it.
for _, header := range req.RequestHeaders.Headers.Headers {
key := header.Key
// Per the Envoy API, a header's value can be in either the `RawValue` (bytes) or `Value` (string) field.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we fix ExtractHeaderValue then? see https://github.com/kubernetes-sigs/gateway-api-inference-extension/blob/main/pkg/epp/util/request/headers.go#L35;

Probably best if we have a function for the if/else logic that we use across the code for reading the header value and name it ExtractHeaderValue and rename the current ExtractHeaderValue to ExtractCaseInsensitiveHeaderValue

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is a much cleaner solution, thanks! I missed this helper as I was tracing the request path. Most of these were discovered through hermetic test failures post-refactoring, and I am not deeply familiar with this part of the codebase. I will consolidate this into the helper.

if header.RawValue != nil {
reqCtx.Request.Headers[key] = string(header.RawValue)
} else {
reqCtx.Request.Headers[key] = header.Value
}
Comment on lines +40 to +44
Copy link
Contributor

@nirrozenbaum nirrozenbaum Sep 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

continuing @ahg-g line of thinking - I agree we should fix the ExtractHeaderValue helper func and then we can use it here, e.g.,:

reqCtx.Request.Headers[key] = ExtractHeaderValue(key)

I think the logic that handles RawValue or Value should be scoped to a single helper function.

switch key {
case metadata.FlowFairnessIDKey:
reqCtx.FairnessID = reqCtx.Request.Headers[key]
// remove the fairness ID header from the request headers,
// this is not data that should be manipulated or sent to the backend.
// It is only used for flow control.
delete(reqCtx.Request.Headers, key)
case metadata.ObjectiveKey:
reqCtx.ObjectiveKey = reqCtx.Request.Headers[key]
reqCtx.IncomingModelName = reqCtx.ObjectiveKey
// remove the objective header from the request headers,
// this is not data that should be manipulated or sent to the backend.
delete(reqCtx.Request.Headers, key)
case metadata.ModelNameRewriteKey:
reqCtx.TargetModelName = reqCtx.Request.Headers[key]
// remove the rewrite header from the request headers,
// this is not data that should be manipulated or sent to the backend.
delete(reqCtx.Request.Headers, key)
}
}

// an EoS in the request headers means this request has no body or trailers.
if req.RequestHeaders.EndOfStream {
// We will route this request to a random pod as this is assumed to just be a GET
Expand All @@ -55,31 +86,6 @@ func (s *StreamingServer) HandleRequestHeaders(reqCtx *RequestContext, req *extP
return nil
}

for _, header := range req.RequestHeaders.Headers.Headers {
if header.RawValue != nil {
reqCtx.Request.Headers[header.Key] = string(header.RawValue)
} else {
reqCtx.Request.Headers[header.Key] = header.Value
}
switch header.Key {
case metadata.FlowFairnessIDKey:
reqCtx.FairnessID = reqCtx.Request.Headers[header.Key]
// remove the fairness ID header from the request headers,
// this is not data that should be manipulated or sent to the backend.
// It is only used for flow control.
delete(reqCtx.Request.Headers, header.Key)
case metadata.ObjectiveKey:
reqCtx.ObjectiveKey = reqCtx.Request.Headers[header.Key]
// remove the objective header from the request headers,
// this is not data that should be manipulated or sent to the backend.
delete(reqCtx.Request.Headers, header.Key)
case metadata.ModelNameRewriteKey:
reqCtx.TargetModelName = reqCtx.Request.Headers[header.Key]
// remove the rewrite header from the request headers,
// this is not data that should be manipulated or sent to the backend.
delete(reqCtx.Request.Headers, header.Key)
}
}
return nil
}

Expand Down
28 changes: 23 additions & 5 deletions pkg/epp/handlers/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,31 @@ func (s *StreamingServer) HandleResponseBody(ctx context.Context, reqCtx *Reques
return reqCtx, nil
}

// The function is to handle streaming response if the modelServer is streaming.
// HandleResponseBodyModelStreaming processes a single chunk of a streaming response.
// It accumulates token usage over the entire stream and records the final metrics only when the end-of-stream message
// is detected.
func (s *StreamingServer) HandleResponseBodyModelStreaming(ctx context.Context, reqCtx *RequestContext, responseText string) {
// Parse the current chunk for a 'usage' block.
resp := parseRespForUsage(ctx, responseText)

// Accumulate token counts.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two comments:

  1. From metrics perspective, we are not truly accumulating anything, this is just reporting the values in the last block.

  2. Which logic depends reqCtx.Usage while we are accumulating the stream that would require us to keep updating the usage rather than just doing it at the end?

Copy link
Contributor Author

@LukeAVanDrie LukeAVanDrie Sep 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue here was that the 'usage' block was not coming in the final "[DONE]" message in the hermetic testing environment. It was arriving in the penultimate message instead.

When processing the final message of the stream, it was empty, resulting in underreporting and empty labels. I need to do some more investigating to understand if this is an issue with the hermetic testing environment or something that can also occur with real traffic.

From metrics perspective, we are not truly accumulating anything, this is just reporting the values in the last block.

True, we just need to find the block that populates it. My comment is misleading as this is still a "detect and write once" process that is only reported after the stream ends.

Copy link
Contributor Author

@LukeAVanDrie LukeAVanDrie Sep 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was trying to understand and debug behavior from integration test results, and I am not too familiar with our request stream handling code and the properties we can rely upon in production. It's very possible my root cause analysis is wrong in places, but the symptoms I reported in the linked issues are reproducible.

Let me write some targeted unit tests over the relevant components to better understand this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue here was that the 'usage' block was not coming in the final "[DONE]" message in the hermetic testing environment. It was arriving in the penultimate message instead.

This might be an issue in the hermetic test. In real traffic, the penultimate and last message come in a same chunk:

data: {"id":"...","object":"text_completion","created":1739400043,"model":"food-review-0","choices":[], "usage":{"prompt_tokens":7,"total_tokens":17,"completion_tokens":10}}

data: [DONE]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great, thanks for confirming! I will fix the hermetic test setup to conform to this.

I think #1626 can be closed as WAI. I also reported an issue where metric labels were not being set properly, so I need to first confirm that this is solely triggered by the data: [DONE] final chunk case. If so, I'll revert my change to this response handling code.

// The 'usage' block typically appears in one of the last messages of a stream.
// We continuously update the context with the latest non-zero values we've seen.
if resp.Usage.PromptTokens > 0 {
reqCtx.Usage.PromptTokens = resp.Usage.PromptTokens
}
if resp.Usage.CompletionTokens > 0 {
reqCtx.Usage.CompletionTokens = resp.Usage.CompletionTokens
}
if resp.Usage.TotalTokens > 0 {
reqCtx.Usage.TotalTokens = resp.Usage.TotalTokens
}

// Record metrics at the end of the stream.
// When we see the final "[DONE]" message, we record the total accumulated token counts from the context.
if strings.Contains(responseText, streamingEndMsg) {
resp := parseRespForUsage(ctx, responseText)
reqCtx.Usage = resp.Usage
metrics.RecordInputTokens(reqCtx.IncomingModelName, reqCtx.TargetModelName, resp.Usage.PromptTokens)
metrics.RecordOutputTokens(reqCtx.IncomingModelName, reqCtx.TargetModelName, resp.Usage.CompletionTokens)
metrics.RecordInputTokens(reqCtx.IncomingModelName, reqCtx.TargetModelName, reqCtx.Usage.PromptTokens)
metrics.RecordOutputTokens(reqCtx.IncomingModelName, reqCtx.TargetModelName, reqCtx.Usage.CompletionTokens)
}
}

Expand Down
11 changes: 9 additions & 2 deletions pkg/epp/handlers/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,10 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer)
if len(requestID) == 0 {
requestID = uuid.NewString()
loggerTrace.Info("RequestID header is not found in the request, generated a request id")
reqCtx.Request.Headers[requtil.RequestIdHeaderKey] = requestID // update in headers so director can consume it
}
// Ensure the request ID, whether pre-existing or newly generated, is in the context's header map.
// This makes it available to all downstream logic (e.g,. the director).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

trying to understand if this is really a bug, in the case the header already existed, HandleRequestHeaders would have set it in the reqCtx.Request.Headers, right? I guess the only difference here is that we are unifying the header key as requtil.RequestIdHeaderKey since at line 187 the header could have had a different casing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since at line 187 the header could have had a different casing

Let me look into this more. You may be onto something here. Perhaps there is a simpler solution / root cause for the behaviors I was seeing.

reqCtx.Request.Headers[requtil.RequestIdHeaderKey] = requestID
logger = logger.WithValues(requtil.RequestIdHeaderKey, requestID)
loggerTrace = logger.V(logutil.TRACE)
ctx = log.IntoContext(ctx, logger)
Expand Down Expand Up @@ -241,7 +243,12 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer)
// This is currently unused.
case *extProcPb.ProcessingRequest_ResponseHeaders:
for _, header := range v.ResponseHeaders.Headers.GetHeaders() {
value := string(header.RawValue)
var value string
if len(header.RawValue) > 0 {
value = string(header.RawValue)
} else {
value = header.Value
}
Comment on lines +246 to +251
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use the single helper function?
RawValue/Value should be encapsulated to ExtractHeaderValue helper func.


loggerTrace.Info("header", "key", header.Key, "value", value)
if header.Key == "status" && value != "200" {
Expand Down
6 changes: 5 additions & 1 deletion pkg/epp/util/request/headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,11 @@ func ExtractHeaderValue(req *extProcPb.ProcessingRequest_RequestHeaders, headerK
if req != nil && req.RequestHeaders != nil && req.RequestHeaders.Headers != nil {
for _, headerKv := range req.RequestHeaders.Headers.Headers {
if strings.ToLower(headerKv.Key) == headerKeyInLower {
return string(headerKv.RawValue)
// Per the Envoy API, a header's value can be in either the `RawValue` (bytes) or `Value` (string) field.
if len(headerKv.RawValue) > 0 {
return string(headerKv.RawValue)
}
return headerKv.Value
Comment on lines +35 to +39
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

}
}
}
Expand Down
Loading