Skip to content

Commit fe60472

Browse files
metrics: TTFT in streaming mode (#203)
* metrics: TTFT in streaming mode Signed-off-by: Jintao Zhang <[email protected]> * extproc: set ModeOverride to STREAMED for SSE so TTFT is captured on first body chunk (fixes streaming TTFT) Signed-off-by: Jintao Zhang <[email protected]> * update documentations for streaming mode Signed-off-by: Jintao Zhang <[email protected]> --------- Signed-off-by: Jintao Zhang <[email protected]> Co-authored-by: Huamin Chen <[email protected]>
1 parent ede160f commit fe60472

File tree

5 files changed

+130
-4
lines changed

5 files changed

+130
-4
lines changed

src/semantic-router/pkg/extproc/metrics_integration_test.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,4 +125,42 @@ var _ = Describe("Metrics recording", func() {
125125
Expect(afterPrompt).To(BeNumerically(">", beforePrompt))
126126
Expect(afterCompletion).To(BeNumerically(">", beforeCompletion))
127127
})
128+
129+
It("records TTFT on first streamed body chunk for SSE responses", func() {
130+
ctx := &RequestContext{
131+
RequestModel: "model-stream",
132+
ProcessingStartTime: time.Now().Add(-120 * time.Millisecond),
133+
Headers: map[string]string{"accept": "text/event-stream"},
134+
}
135+
136+
// Simulate header phase: SSE content-type indicates streaming
137+
respHeaders := &ext_proc.ProcessingRequest_ResponseHeaders{
138+
ResponseHeaders: &ext_proc.HttpHeaders{
139+
Headers: &core.HeaderMap{Headers: []*core.HeaderValue{{Key: "content-type", Value: "text/event-stream"}}},
140+
},
141+
}
142+
143+
before := getHistogramSampleCount("llm_model_ttft_seconds", ctx.RequestModel)
144+
145+
// Handle response headers (should NOT record TTFT for streaming)
146+
response1, err := router.handleResponseHeaders(respHeaders, ctx)
147+
Expect(err).NotTo(HaveOccurred())
148+
Expect(response1.GetResponseHeaders()).NotTo(BeNil())
149+
Expect(ctx.IsStreamingResponse).To(BeTrue())
150+
Expect(ctx.TTFTRecorded).To(BeFalse())
151+
152+
// Now simulate the first streamed body chunk
153+
respBody := &ext_proc.ProcessingRequest_ResponseBody{
154+
ResponseBody: &ext_proc.HttpBody{Body: []byte("data: chunk-1\n")},
155+
}
156+
157+
response2, err := router.handleResponseBody(respBody, ctx)
158+
Expect(err).NotTo(HaveOccurred())
159+
Expect(response2.GetResponseBody()).NotTo(BeNil())
160+
161+
after := getHistogramSampleCount("llm_model_ttft_seconds", ctx.RequestModel)
162+
Expect(after).To(BeNumerically(">", before))
163+
Expect(ctx.TTFTRecorded).To(BeTrue())
164+
Expect(ctx.TTFTSeconds).To(BeNumerically(">", 0))
165+
})
128166
})

src/semantic-router/pkg/extproc/request_handler.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,10 @@ type RequestContext struct {
164164
StartTime time.Time
165165
ProcessingStartTime time.Time
166166

167+
// Streaming detection
168+
ExpectStreamingResponse bool // set from request Accept header
169+
IsStreamingResponse bool // set from response Content-Type
170+
167171
// TTFT tracking
168172
TTFTRecorded bool
169173
TTFTSeconds float64
@@ -192,7 +196,14 @@ func (r *OpenAIRouter) handleRequestHeaders(v *ext_proc.ProcessingRequest_Reques
192196
}
193197
}
194198

195-
// Allow the request to continue
199+
// Detect if the client expects a streaming response (SSE)
200+
if accept, ok := ctx.Headers["accept"]; ok {
201+
if strings.Contains(strings.ToLower(accept), "text/event-stream") {
202+
ctx.ExpectStreamingResponse = true
203+
}
204+
}
205+
206+
// Prepare base response
196207
response := &ext_proc.ProcessingResponse{
197208
Response: &ext_proc.ProcessingResponse_RequestHeaders{
198209
RequestHeaders: &ext_proc.HeadersResponse{
@@ -204,6 +215,10 @@ func (r *OpenAIRouter) handleRequestHeaders(v *ext_proc.ProcessingRequest_Reques
204215
},
205216
}
206217

218+
// If streaming is expected, we rely on Envoy config to set response_body_mode: STREAMED for SSE.
219+
// Some Envoy/control-plane versions may not support per-message ModeOverride; avoid compile-time coupling here.
220+
// The Accept header is still recorded on context for downstream logic.
221+
207222
return response, nil
208223
}
209224

src/semantic-router/pkg/extproc/response_handler.go

Lines changed: 62 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@ package extproc
33
import (
44
"encoding/json"
55
"strconv"
6+
"strings"
67
"time"
78

89
core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
10+
http_ext "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/ext_proc/v3"
911
ext_proc "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
1012

1113
"github.com/openai/openai-go"
@@ -17,6 +19,9 @@ import (
1719
func (r *OpenAIRouter) handleResponseHeaders(v *ext_proc.ProcessingRequest_ResponseHeaders, ctx *RequestContext) (*ext_proc.ProcessingResponse, error) {
1820
// Detect upstream HTTP status and record non-2xx as errors
1921
if v != nil && v.ResponseHeaders != nil && v.ResponseHeaders.Headers != nil {
22+
// Determine if the response is streaming based on Content-Type
23+
ctx.IsStreamingResponse = isStreamingContentType(v.ResponseHeaders.Headers)
24+
2025
if statusCode := getStatusFromHeaders(v.ResponseHeaders.Headers); statusCode != 0 {
2126
if statusCode >= 500 {
2227
metrics.RecordRequestError(getModelFromCtx(ctx), "upstream_5xx")
@@ -26,8 +31,10 @@ func (r *OpenAIRouter) handleResponseHeaders(v *ext_proc.ProcessingRequest_Respo
2631
}
2732
}
2833

29-
// Best-effort TTFT measurement: record on first response headers if we have a start time and model
30-
if ctx != nil && !ctx.TTFTRecorded && !ctx.ProcessingStartTime.IsZero() && ctx.RequestModel != "" {
34+
// Best-effort TTFT measurement:
35+
// - For non-streaming responses, record on first response headers (approx TTFB ~= TTFT)
36+
// - For streaming responses (SSE), defer TTFT until the first response body chunk arrives
37+
if ctx != nil && !ctx.IsStreamingResponse && !ctx.TTFTRecorded && !ctx.ProcessingStartTime.IsZero() && ctx.RequestModel != "" {
3138
ttft := time.Since(ctx.ProcessingStartTime).Seconds()
3239
if ttft > 0 {
3340
metrics.RecordModelTTFT(ctx.RequestModel, ttft)
@@ -47,6 +54,14 @@ func (r *OpenAIRouter) handleResponseHeaders(v *ext_proc.ProcessingRequest_Respo
4754
},
4855
}
4956

57+
// If this is a streaming (SSE) response, instruct Envoy to stream the response body to ExtProc
58+
// so we can capture TTFT on the first body chunk. Requires allow_mode_override: true in Envoy config.
59+
if ctx != nil && ctx.IsStreamingResponse {
60+
response.ModeOverride = &http_ext.ProcessingMode{
61+
ResponseBodyMode: http_ext.ProcessingMode_STREAMED,
62+
}
63+
}
64+
5065
return response, nil
5166
}
5267

@@ -79,13 +94,58 @@ func getModelFromCtx(ctx *RequestContext) string {
7994
return ctx.RequestModel
8095
}
8196

97+
// isStreamingContentType checks if the response content-type indicates streaming (SSE)
98+
func isStreamingContentType(headerMap *core.HeaderMap) bool {
99+
if headerMap == nil {
100+
return false
101+
}
102+
for _, hv := range headerMap.Headers {
103+
if strings.ToLower(hv.Key) == "content-type" {
104+
val := hv.Value
105+
if val == "" && len(hv.RawValue) > 0 {
106+
val = string(hv.RawValue)
107+
}
108+
if strings.Contains(strings.ToLower(val), "text/event-stream") {
109+
return true
110+
}
111+
}
112+
}
113+
return false
114+
}
115+
82116
// handleResponseBody processes the response body
83117
func (r *OpenAIRouter) handleResponseBody(v *ext_proc.ProcessingRequest_ResponseBody, ctx *RequestContext) (*ext_proc.ProcessingResponse, error) {
84118
completionLatency := time.Since(ctx.StartTime)
85119

86120
// Process the response for caching
87121
responseBody := v.ResponseBody.Body
88122

123+
// If this is a streaming response (e.g., SSE), record TTFT on the first body chunk
124+
// and skip JSON parsing/caching which are not applicable for SSE chunks.
125+
if ctx.IsStreamingResponse {
126+
if ctx != nil && !ctx.TTFTRecorded && !ctx.ProcessingStartTime.IsZero() && ctx.RequestModel != "" {
127+
ttft := time.Since(ctx.ProcessingStartTime).Seconds()
128+
if ttft > 0 {
129+
metrics.RecordModelTTFT(ctx.RequestModel, ttft)
130+
ctx.TTFTSeconds = ttft
131+
ctx.TTFTRecorded = true
132+
observability.Infof("Recorded TTFT on first streamed body chunk: %.3fs", ttft)
133+
}
134+
}
135+
136+
// For streaming chunks, just continue (no token parsing or cache update)
137+
response := &ext_proc.ProcessingResponse{
138+
Response: &ext_proc.ProcessingResponse_ResponseBody{
139+
ResponseBody: &ext_proc.BodyResponse{
140+
Response: &ext_proc.CommonResponse{
141+
Status: ext_proc.CommonResponse_CONTINUE,
142+
},
143+
},
144+
},
145+
}
146+
return response, nil
147+
}
148+
89149
// Parse tokens from the response JSON using OpenAI SDK types
90150
var parsed openai.ChatCompletion
91151
if err := json.Unmarshal(responseBody, &parsed); err != nil {

website/docs/api/router.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -351,6 +351,12 @@ histogram_quantile(0.95, sum(rate(llm_model_tpot_seconds_bucket[5m])) by (le, mo
351351

352352
These are included in the provided Grafana dashboard at deploy/llm-router-dashboard.json as “TTFT (p95) by Model” and “TPOT (p95) by Model (sec/token)”.
353353

354+
#### Streaming (SSE) notes
355+
356+
- For Server-Sent Events (SSE) responses, the router measures TTFT on the first streamed body chunk (i.e., the first token), not on response headers.
357+
- No manual change to your Envoy config is required: the ExtProc handler automatically sets a ModeOverride with `response_body_mode: STREAMED` for SSE responses so the first chunk reaches ExtProc immediately.
358+
- Prerequisite: Envoy’s ext_proc filter must have `allow_mode_override: true` (the default configs in `config/envoy.yaml` and `config/envoy-docker.yaml` already include this). Keeping `response_body_mode: BUFFERED` in the static processing mode is fine; the router will flip it to STREAMED at runtime for SSE.
359+
354360
### Pricing Configuration
355361

356362
Provide per-1M pricing for your models so the router can compute request cost and emit metrics/logs.

website/docs/overview/architecture/envoy-extproc.md

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -410,7 +410,7 @@ static_resources:
410410
request_header_mode: "SEND"
411411
response_header_mode: "SEND"
412412
request_body_mode: "BUFFERED" # Required for content analysis
413-
response_body_mode: "BUFFERED" # Required for caching
413+
response_body_mode: "BUFFERED" # Default: router flips to STREAMED at runtime for SSE
414414
request_trailer_mode: "SKIP"
415415
response_trailer_mode: "SKIP"
416416
@@ -419,6 +419,13 @@ static_resources:
419419
allow_mode_override: true # Allow ExtProc to change modes
420420
message_timeout: 300s # Timeout for ExtProc responses
421421
max_message_timeout: 600s # Maximum allowed timeout
422+
423+
> Note on SSE (streaming):
424+
>
425+
> When the upstream responds with `Content-Type: text/event-stream`, the router sets a per-message
426+
> `ModeOverride` with `response_body_mode: STREAMED` so the first chunk reaches ExtProc immediately.
427+
> This enables accurate TTFT measurement on the first token. No manual change to the static
428+
> `processing_mode` is required as long as `allow_mode_override: true` is set (it is in the default configs).
422429

423430
# Advanced configuration
424431
mutation_rules:

0 commit comments

Comments
 (0)