-
Notifications
You must be signed in to change notification settings - Fork 176
fix: streaming metrics and header parsing bugs #1627
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
This commit addresses several critical bugs discovered in the External Processing Proxy (EPP) that impact reliability, observability, and correctness, particularly for streaming use cases. **Bug Fixes:** 1. **Correct Streaming Token Metrics:** - **Problem:** For streaming responses (e.g., `text/event-stream`), token usage metrics were recorded incorrectly. The logic only inspected the final `[DONE]` message for a `usage` block, failing to accumulate token counts from earlier messages in the stream. Additionally, the `IncomingModelName` was being overwritten by a blank value from the request body, causing the `model_name` label in Prometheus to be empty. - **Fix:** The response handler now correctly accumulates token counts from all streaming chunks into the `RequestContext`. The final, accurate count is recorded only when the `[DONE]` message is received. The request handler logic was reordered to ensure headers (containing the model name) are always processed before the body, preventing the context from being corrupted. 2. **Robust Header Parsing:** - **Problem:** Multiple locations in the codebase exclusively checked the `RawValue` field of an Envoy `HeaderValue` message, ignoring the valid `Value` field. This caused failures in detecting the `content-type` for streaming and loss of the `x-request-id` for tracing if a client sent them in the `Value` field. - **Fix:** All header parsing logic has been updated to check both `RawValue` and `Value`, making it robust and compliant with the Envoy API. **Refactoring:** - **Hermetic Test Overhaul:** The integration test suite in `test/integration/epp/hermetic_test.go` has been completely refactored for reliability and clarity. - The old, monolithic, table-driven test has been replaced with a `testHarness` structure that provides each test case with its own isolated server instance, Kubernetes resources (scoped by a unique label), and gRPC client. - This eliminates test interference and makes the suite significantly more stable and maintainable. While true parallelism is still blocked by a global metrics registry in controller-runtime, this change achieves full resource and state isolation. - Test cases are now grouped by functionality (`RequestRouting`, `ResponseHandling`, etc.) with clear, descriptive names. - All associated test utilities and documentation have been polished to improve readability and maintainability.
@LukeAVanDrie: The label(s) In response to this:
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: LukeAVanDrie The full list of commands accepted by this bot can be found here.
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
Hi @LukeAVanDrie. Thanks for your PR. I'm waiting for a kubernetes-sigs member to verify that this patch is reasonable to test. If it is, they should reply with Once the patch is verified, the new status will be reflected by the I understand the commands that are listed here. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
✅ Deploy Preview for gateway-api-inference-extension ready!
To edit notification comments on pull requests, go to your Netlify project configuration. |
|
||
requests = append(requests, headerReq) | ||
requests = append(requests, GenerateRequest(logger, prompt, model, filterMetadata)) | ||
// Simulate a multi-chunk body by splitting the marshaled JSON. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a small improvement to better reflect actual streaming use cases.
inference_objective_input_tokens_bucket{model_name="",target_model_name="",le="+Inf"} 1 | ||
inference_objective_input_tokens_sum{model_name="",target_model_name=""} 7 | ||
inference_objective_input_tokens_count{model_name="",target_model_name=""} 1 | ||
inference_objective_input_tokens_bucket{model_name="sql-lora-sheddable",target_model_name="sql-lora-1fdg3",le="1"} 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Verification that the labels are being applied properly now.
Here is a mapping of the original test cases in All test scenarios are preserved. The only change in assertions is the intentional correction of the streaming metrics bug.
|
/ok-to-test |
@LukeAVanDrie: The following test failed, say
Full PR test history. Your PR dashboard. Please help us cut down on flakes by linking to an open issue when you hit one in your PR. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
func (s *StreamingServer) HandleRequestHeaders(reqCtx *RequestContext, req *extProcPb.ProcessingRequest_RequestHeaders) error { | ||
reqCtx.RequestReceivedTimestamp = time.Now() | ||
|
||
// Headers must be processed first to populate the request context as subsequent logic (like body processing) may |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not following, where in this function do we use the reqCtx that would require moving this logic earlier? it is fine to move it up, but wanted to double check where the bug is in this specific case.
// depend upon it. | ||
for _, header := range req.RequestHeaders.Headers.Headers { | ||
key := header.Key | ||
// Per the Envoy API, a header's value can be in either the `RawValue` (bytes) or `Value` (string) field. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we fix ExtractHeaderValue then? see https://github.com/kubernetes-sigs/gateway-api-inference-extension/blob/main/pkg/epp/util/request/headers.go#L35;
Probably best if we have a function for the if/else logic that we use across the code for reading the header value and name it ExtractHeaderValue
and rename the current ExtractHeaderValue
to ExtractCaseInsensitiveHeaderValue
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this is a much cleaner solution, thanks! I missed this helper as I was tracing the request path. Most of these were discovered through hermetic test failures post-refactoring, and I am not deeply familiar with this part of the codebase. I will consolidate this into the helper.
reqCtx.Request.Headers[requtil.RequestIdHeaderKey] = requestID // update in headers so director can consume it | ||
} | ||
// Ensure the request ID, whether pre-existing or newly generated, is in the context's header map. | ||
// This makes it available to all downstream logic (e.g,. the director). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
trying to understand if this is really a bug, in the case the header already existed, HandleRequestHeaders
would have set it in the reqCtx.Request.Headers, right? I guess the only difference here is that we are unifying the header key as requtil.RequestIdHeaderKey
since at line 187 the header could have had a different casing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since at line 187 the header could have had a different casing
Let me look into this more. You may be onto something here. Perhaps there is a simpler solution / root cause for the behaviors I was seeing.
// Parse the current chunk for a 'usage' block. | ||
resp := parseRespForUsage(ctx, responseText) | ||
|
||
// Accumulate token counts. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two comments:
-
From metrics perspective, we are not truly accumulating anything, this is just reporting the values in the last block.
-
Which logic depends
reqCtx.Usage
while we are accumulating the stream that would require us to keep updating the usage rather than just doing it at the end?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The issue here was that the 'usage' block was not coming in the final "[DONE]" message in the hermetic testing environment. It was arriving in the penultimate message instead.
When processing the final message of the stream, it was empty, resulting in underreporting and empty labels. I need to do some more investigating to understand if this is an issue with the hermetic testing environment or something that can also occur with real traffic.
From metrics perspective, we are not truly accumulating anything, this is just reporting the values in the last block.
True, we just need to find the block that populates it. My comment is misleading as this is still a "detect and write once" process that is only reported after the stream ends.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was trying to understand and debug behavior from integration test results, and I am not too familiar with our request stream handling code and the properties we can rely upon in production. It's very possible my root cause analysis is wrong in places, but the symptoms I reported in the linked issues are reproducible.
Let me write some targeted unit tests over the relevant components to better understand this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The issue here was that the 'usage' block was not coming in the final "[DONE]" message in the hermetic testing environment. It was arriving in the penultimate message instead.
This might be an issue in the hermetic test. In real traffic, the penultimate and last message come in a same chunk:
data: {"id":"...","object":"text_completion","created":1739400043,"model":"food-review-0","choices":[], "usage":{"prompt_tokens":7,"total_tokens":17,"completion_tokens":10}}
data: [DONE]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great, thanks for confirming! I will fix the hermetic test setup to conform to this.
I think #1626 can be closed as WAI. I also reported an issue where metric labels were not being set properly, so I need to first confirm that this is solely triggered by the data: [DONE]
final chunk case. If so, I'll revert my change to this response handling code.
if header.RawValue != nil { | ||
reqCtx.Request.Headers[key] = string(header.RawValue) | ||
} else { | ||
reqCtx.Request.Headers[key] = header.Value | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
continuing @ahg-g line of thinking - I agree we should fix the ExtractHeaderValue
helper func and then we can use it here, e.g.,:
reqCtx.Request.Headers[key] = ExtractHeaderValue(key)
I think the logic that handles RawValue or Value should be scoped to a single helper function.
var value string | ||
if len(header.RawValue) > 0 { | ||
value = string(header.RawValue) | ||
} else { | ||
value = header.Value | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use the single helper function?
RawValue/Value should be encapsulated to ExtractHeaderValue helper func.
// Per the Envoy API, a header's value can be in either the `RawValue` (bytes) or `Value` (string) field. | ||
if len(headerKv.RawValue) > 0 { | ||
return string(headerKv.RawValue) | ||
} | ||
return headerKv.Value |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
// GetFreePort finds and returns an available TCP port on the host. | ||
// It works by asking the OS to allocate a port by listening on port 0, capturing the assigned address, and then | ||
// immediately closing the listener. | ||
func GetFreePort() (*net.TCPAddr, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is great
This passes when I run |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The primary focus of this PR is a significant refactoring of the EPP hermetic test suite to achieve true test isolation. While it also includes important bug fixes that this new test rigor uncovered, the change here was necessary to unblock upcoming integration tests for the Flow Control layer, which require a clean and mutable environment for each test scenario.
Guidance for Reviewers
I would greatly appreciate your focus on the test refactoring itself:
- Isolation Strategy: Does the combination of unique labels and a scoped cache seem robust and effective for serial execution?
- Clarity & Maintainability: Is the new
testHarness
approach clearer and easier to extend? - Potential Risks: Do you see any potential downsides to this per-test setup approach?
I just want to make sure this doesn't get overlooked amidst the bug fix discussion as this is the heart of this PR.
Drawbacks of the Previous Approach
Previously, our tests relied on a shared, suite-level setup. The controller-runtime
manager's cache was unscoped, meaning it watched all pods in the namespace. This design was fragile and prone to state leakage between test runs, making it difficult to write reliable tests for features that need to mutate backend state (e.g., marking pods as saturated/unsaturated).
The New testHarness
This PR overhauls this approach by introducing a testHarness
that provides a fully encapsulated, hermetic environment for each test case.
Here’s a summary of the key isolation mechanisms:
- Per-Test Harness: Each test now runs within its own
testHarness
instance, which encapsulates all necessary components (server, client, manager) and state. - Unique Resource Labeling: A unique
test-run-id
label (derived from a UUID) is generated for each harness and applied to all Kubernetes pods it creates. - Scoped Kubernetes Cache: This is the most crucial part. The
controller-runtime
manager within each harness is configured with a cache that only watches pods matching that specific, uniquetest-run-id
label. This prevents the test from seeing or interacting with pods from any other test run. - Dedicated Lifecycle Management: Each harness starts its own server instance and creates its own gRPC client. A robust cleanup function, automatically called by
t.Cleanup
, uses acontext.CancelFunc
to gracefully tear down the server, manager, and client, and then deletes the uniquely labeled Kubernetes pods.
A Note on Parallelism
Despite this isolation, tests cannot use t.Parallel()
due to controller-runtime
's global metrics registry. Our solution for clean serial execution is resetting the registry post-test (t.Cleanup(metrics.Reset)
). True parallelism would require per-test envtest
instances, which is prohibitively slow. This refactoring is a pragmatic step, solving resource contention while sharing the suite-level envtest
setup.
Thanks!
IIRC the usage field is always on the tail end of a streamed message, I remember going through these cases with Jeff. Why would the previous messages have a usage block? |
This may actually be an issue in the test setup then where we are not accurately simulating the streaming use case. If this is never the case in production, I will update the test utils to reflect this and revert this particular change. I changed the structure of how the tests are run (so they are executed in isolated environments) but not the assertions, so I was chasing down failures and may have root caused this incorrectly. |
I don't mind keeping the logic you have if we think it is more "resilient" |
Yes, agreed, that's fine. I'm just walking through the risk assessment of these bugs, and weighing the risk of patching new logic, vs the risk of these bugs. |
What type of PR is this?
/kind bug
/kind cleanup
What this PR does / why we need it:
This PR addresses several critical bugs in the endpoint picker that were discovered during refactoring work. These fixes are essential for stability, reliability, and observability.
The key changes are:
Fixes Critical Streaming Metrics Bug: For
text/event-stream
responses, token usage metrics are being undercounted. The logic only parsed the final[DONE]
message for ausage
block, failing to accumulate token counts from earlier messages. Furthermore, a context corruption bug caused themodel_name
label in Prometheus to be empty, rendering the metric unusable for per-model monitoring. This PR corrects the logic to accumulate tokens across the entire stream and ensures all metric labels are correctly populated.Fixes Header Parsing Reliability: The EPP was brittle in how it parsed headers, exclusively checking the
RawValue
field and ignoring theValue
field. This could lead to two severe production issues:content-type: text/event-stream
using theValue
field, the EPP would fail to detect the stream and attempt to buffer the entire response, risking high memory usage and OOM kills.x-request-id
header could be missed, breaking the distributed trace and hindering debugging.This PR makes all header parsing robust by correctly checking both fields.
Complete Hermetic Test Overhaul: The integration test suite has been refactored to be significantly more robust and maintainable. The previous shared-state, table-driven test has been replaced with a
testHarness
structure that provides full resource and state isolation for each test case, eliminating flakes and improving clarity.Which issue(s) this PR fixes:
Fixes #1624
Fixes #1626
Does this PR introduce a user-facing change?: