diff --git a/src/semantic-router/pkg/extproc/metrics_integration_test.go b/src/semantic-router/pkg/extproc/metrics_integration_test.go index 964e714b1..0604022de 100644 --- a/src/semantic-router/pkg/extproc/metrics_integration_test.go +++ b/src/semantic-router/pkg/extproc/metrics_integration_test.go @@ -125,4 +125,42 @@ var _ = Describe("Metrics recording", func() { Expect(afterPrompt).To(BeNumerically(">", beforePrompt)) Expect(afterCompletion).To(BeNumerically(">", beforeCompletion)) }) + + It("records TTFT on first streamed body chunk for SSE responses", func() { + ctx := &RequestContext{ + RequestModel: "model-stream", + ProcessingStartTime: time.Now().Add(-120 * time.Millisecond), + Headers: map[string]string{"accept": "text/event-stream"}, + } + + // Simulate header phase: SSE content-type indicates streaming + respHeaders := &ext_proc.ProcessingRequest_ResponseHeaders{ + ResponseHeaders: &ext_proc.HttpHeaders{ + Headers: &core.HeaderMap{Headers: []*core.HeaderValue{{Key: "content-type", Value: "text/event-stream"}}}, + }, + } + + before := getHistogramSampleCount("llm_model_ttft_seconds", ctx.RequestModel) + + // Handle response headers (should NOT record TTFT for streaming) + response1, err := router.handleResponseHeaders(respHeaders, ctx) + Expect(err).NotTo(HaveOccurred()) + Expect(response1.GetResponseHeaders()).NotTo(BeNil()) + Expect(ctx.IsStreamingResponse).To(BeTrue()) + Expect(ctx.TTFTRecorded).To(BeFalse()) + + // Now simulate the first streamed body chunk + respBody := &ext_proc.ProcessingRequest_ResponseBody{ + ResponseBody: &ext_proc.HttpBody{Body: []byte("data: chunk-1\n")}, + } + + response2, err := router.handleResponseBody(respBody, ctx) + Expect(err).NotTo(HaveOccurred()) + Expect(response2.GetResponseBody()).NotTo(BeNil()) + + after := getHistogramSampleCount("llm_model_ttft_seconds", ctx.RequestModel) + Expect(after).To(BeNumerically(">", before)) + Expect(ctx.TTFTRecorded).To(BeTrue()) + Expect(ctx.TTFTSeconds).To(BeNumerically(">", 0)) + }) }) diff --git a/src/semantic-router/pkg/extproc/request_handler.go b/src/semantic-router/pkg/extproc/request_handler.go index dcefd55a4..2f6e47fcb 100644 --- a/src/semantic-router/pkg/extproc/request_handler.go +++ b/src/semantic-router/pkg/extproc/request_handler.go @@ -164,6 +164,10 @@ type RequestContext struct { StartTime time.Time ProcessingStartTime time.Time + // Streaming detection + ExpectStreamingResponse bool // set from request Accept header + IsStreamingResponse bool // set from response Content-Type + // TTFT tracking TTFTRecorded bool TTFTSeconds float64 @@ -192,7 +196,14 @@ func (r *OpenAIRouter) handleRequestHeaders(v *ext_proc.ProcessingRequest_Reques } } - // Allow the request to continue + // Detect if the client expects a streaming response (SSE) + if accept, ok := ctx.Headers["accept"]; ok { + if strings.Contains(strings.ToLower(accept), "text/event-stream") { + ctx.ExpectStreamingResponse = true + } + } + + // Prepare base response response := &ext_proc.ProcessingResponse{ Response: &ext_proc.ProcessingResponse_RequestHeaders{ RequestHeaders: &ext_proc.HeadersResponse{ @@ -204,6 +215,10 @@ func (r *OpenAIRouter) handleRequestHeaders(v *ext_proc.ProcessingRequest_Reques }, } + // If streaming is expected, we rely on Envoy config to set response_body_mode: STREAMED for SSE. + // Some Envoy/control-plane versions may not support per-message ModeOverride; avoid compile-time coupling here. + // The Accept header is still recorded on context for downstream logic. + return response, nil } diff --git a/src/semantic-router/pkg/extproc/response_handler.go b/src/semantic-router/pkg/extproc/response_handler.go index 5fbe97112..b5648c988 100644 --- a/src/semantic-router/pkg/extproc/response_handler.go +++ b/src/semantic-router/pkg/extproc/response_handler.go @@ -3,9 +3,11 @@ package extproc import ( "encoding/json" "strconv" + "strings" "time" core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" + http_ext "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/ext_proc/v3" ext_proc "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" "github.com/openai/openai-go" @@ -17,6 +19,9 @@ import ( func (r *OpenAIRouter) handleResponseHeaders(v *ext_proc.ProcessingRequest_ResponseHeaders, ctx *RequestContext) (*ext_proc.ProcessingResponse, error) { // Detect upstream HTTP status and record non-2xx as errors if v != nil && v.ResponseHeaders != nil && v.ResponseHeaders.Headers != nil { + // Determine if the response is streaming based on Content-Type + ctx.IsStreamingResponse = isStreamingContentType(v.ResponseHeaders.Headers) + if statusCode := getStatusFromHeaders(v.ResponseHeaders.Headers); statusCode != 0 { if statusCode >= 500 { metrics.RecordRequestError(getModelFromCtx(ctx), "upstream_5xx") @@ -26,8 +31,10 @@ func (r *OpenAIRouter) handleResponseHeaders(v *ext_proc.ProcessingRequest_Respo } } - // Best-effort TTFT measurement: record on first response headers if we have a start time and model - if ctx != nil && !ctx.TTFTRecorded && !ctx.ProcessingStartTime.IsZero() && ctx.RequestModel != "" { + // Best-effort TTFT measurement: + // - For non-streaming responses, record on first response headers (approx TTFB ~= TTFT) + // - For streaming responses (SSE), defer TTFT until the first response body chunk arrives + if ctx != nil && !ctx.IsStreamingResponse && !ctx.TTFTRecorded && !ctx.ProcessingStartTime.IsZero() && ctx.RequestModel != "" { ttft := time.Since(ctx.ProcessingStartTime).Seconds() if ttft > 0 { metrics.RecordModelTTFT(ctx.RequestModel, ttft) @@ -47,6 +54,14 @@ func (r *OpenAIRouter) handleResponseHeaders(v *ext_proc.ProcessingRequest_Respo }, } + // If this is a streaming (SSE) response, instruct Envoy to stream the response body to ExtProc + // so we can capture TTFT on the first body chunk. Requires allow_mode_override: true in Envoy config. + if ctx != nil && ctx.IsStreamingResponse { + response.ModeOverride = &http_ext.ProcessingMode{ + ResponseBodyMode: http_ext.ProcessingMode_STREAMED, + } + } + return response, nil } @@ -79,6 +94,25 @@ func getModelFromCtx(ctx *RequestContext) string { return ctx.RequestModel } +// isStreamingContentType checks if the response content-type indicates streaming (SSE) +func isStreamingContentType(headerMap *core.HeaderMap) bool { + if headerMap == nil { + return false + } + for _, hv := range headerMap.Headers { + if strings.ToLower(hv.Key) == "content-type" { + val := hv.Value + if val == "" && len(hv.RawValue) > 0 { + val = string(hv.RawValue) + } + if strings.Contains(strings.ToLower(val), "text/event-stream") { + return true + } + } + } + return false +} + // handleResponseBody processes the response body func (r *OpenAIRouter) handleResponseBody(v *ext_proc.ProcessingRequest_ResponseBody, ctx *RequestContext) (*ext_proc.ProcessingResponse, error) { completionLatency := time.Since(ctx.StartTime) @@ -86,6 +120,32 @@ func (r *OpenAIRouter) handleResponseBody(v *ext_proc.ProcessingRequest_Response // Process the response for caching responseBody := v.ResponseBody.Body + // If this is a streaming response (e.g., SSE), record TTFT on the first body chunk + // and skip JSON parsing/caching which are not applicable for SSE chunks. + if ctx.IsStreamingResponse { + if ctx != nil && !ctx.TTFTRecorded && !ctx.ProcessingStartTime.IsZero() && ctx.RequestModel != "" { + ttft := time.Since(ctx.ProcessingStartTime).Seconds() + if ttft > 0 { + metrics.RecordModelTTFT(ctx.RequestModel, ttft) + ctx.TTFTSeconds = ttft + ctx.TTFTRecorded = true + observability.Infof("Recorded TTFT on first streamed body chunk: %.3fs", ttft) + } + } + + // For streaming chunks, just continue (no token parsing or cache update) + response := &ext_proc.ProcessingResponse{ + Response: &ext_proc.ProcessingResponse_ResponseBody{ + ResponseBody: &ext_proc.BodyResponse{ + Response: &ext_proc.CommonResponse{ + Status: ext_proc.CommonResponse_CONTINUE, + }, + }, + }, + } + return response, nil + } + // Parse tokens from the response JSON using OpenAI SDK types var parsed openai.ChatCompletion if err := json.Unmarshal(responseBody, &parsed); err != nil { diff --git a/website/docs/api/router.md b/website/docs/api/router.md index 21df4f5d1..8aba5899e 100644 --- a/website/docs/api/router.md +++ b/website/docs/api/router.md @@ -351,6 +351,12 @@ histogram_quantile(0.95, sum(rate(llm_model_tpot_seconds_bucket[5m])) by (le, mo These are included in the provided Grafana dashboard at deploy/llm-router-dashboard.json as “TTFT (p95) by Model” and “TPOT (p95) by Model (sec/token)”. +#### Streaming (SSE) notes + +- For Server-Sent Events (SSE) responses, the router measures TTFT on the first streamed body chunk (i.e., the first token), not on response headers. +- No manual change to your Envoy config is required: the ExtProc handler automatically sets a ModeOverride with `response_body_mode: STREAMED` for SSE responses so the first chunk reaches ExtProc immediately. +- Prerequisite: Envoy’s ext_proc filter must have `allow_mode_override: true` (the default configs in `config/envoy.yaml` and `config/envoy-docker.yaml` already include this). Keeping `response_body_mode: BUFFERED` in the static processing mode is fine; the router will flip it to STREAMED at runtime for SSE. + ### Pricing Configuration Provide per-1M pricing for your models so the router can compute request cost and emit metrics/logs. diff --git a/website/docs/overview/architecture/envoy-extproc.md b/website/docs/overview/architecture/envoy-extproc.md index 954af8274..d9186fdd6 100644 --- a/website/docs/overview/architecture/envoy-extproc.md +++ b/website/docs/overview/architecture/envoy-extproc.md @@ -410,7 +410,7 @@ static_resources: request_header_mode: "SEND" response_header_mode: "SEND" request_body_mode: "BUFFERED" # Required for content analysis - response_body_mode: "BUFFERED" # Required for caching + response_body_mode: "BUFFERED" # Default: router flips to STREAMED at runtime for SSE request_trailer_mode: "SKIP" response_trailer_mode: "SKIP" @@ -419,6 +419,13 @@ static_resources: allow_mode_override: true # Allow ExtProc to change modes message_timeout: 300s # Timeout for ExtProc responses max_message_timeout: 600s # Maximum allowed timeout + +> Note on SSE (streaming): +> +> When the upstream responds with `Content-Type: text/event-stream`, the router sets a per-message +> `ModeOverride` with `response_body_mode: STREAMED` so the first chunk reaches ExtProc immediately. +> This enables accurate TTFT measurement on the first token. No manual change to the static +> `processing_mode` is required as long as `allow_mode_override: true` is set (it is in the default configs). # Advanced configuration mutation_rules: