Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions src/semantic-router/pkg/extproc/metrics_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,4 +125,42 @@ var _ = Describe("Metrics recording", func() {
Expect(afterPrompt).To(BeNumerically(">", beforePrompt))
Expect(afterCompletion).To(BeNumerically(">", beforeCompletion))
})

It("records TTFT on first streamed body chunk for SSE responses", func() {
ctx := &RequestContext{
RequestModel: "model-stream",
ProcessingStartTime: time.Now().Add(-120 * time.Millisecond),
Headers: map[string]string{"accept": "text/event-stream"},
}

// Simulate header phase: SSE content-type indicates streaming
respHeaders := &ext_proc.ProcessingRequest_ResponseHeaders{
ResponseHeaders: &ext_proc.HttpHeaders{
Headers: &core.HeaderMap{Headers: []*core.HeaderValue{{Key: "content-type", Value: "text/event-stream"}}},
},
}

before := getHistogramSampleCount("llm_model_ttft_seconds", ctx.RequestModel)

// Handle response headers (should NOT record TTFT for streaming)
response1, err := router.handleResponseHeaders(respHeaders, ctx)
Expect(err).NotTo(HaveOccurred())
Expect(response1.GetResponseHeaders()).NotTo(BeNil())
Expect(ctx.IsStreamingResponse).To(BeTrue())
Expect(ctx.TTFTRecorded).To(BeFalse())

// Now simulate the first streamed body chunk
respBody := &ext_proc.ProcessingRequest_ResponseBody{
ResponseBody: &ext_proc.HttpBody{Body: []byte("data: chunk-1\n")},
}

response2, err := router.handleResponseBody(respBody, ctx)
Expect(err).NotTo(HaveOccurred())
Expect(response2.GetResponseBody()).NotTo(BeNil())

after := getHistogramSampleCount("llm_model_ttft_seconds", ctx.RequestModel)
Expect(after).To(BeNumerically(">", before))
Expect(ctx.TTFTRecorded).To(BeTrue())
Expect(ctx.TTFTSeconds).To(BeNumerically(">", 0))
})
})
17 changes: 16 additions & 1 deletion src/semantic-router/pkg/extproc/request_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,10 @@ type RequestContext struct {
StartTime time.Time
ProcessingStartTime time.Time

// Streaming detection
ExpectStreamingResponse bool // set from request Accept header
IsStreamingResponse bool // set from response Content-Type

// TTFT tracking
TTFTRecorded bool
TTFTSeconds float64
Expand Down Expand Up @@ -192,7 +196,14 @@ func (r *OpenAIRouter) handleRequestHeaders(v *ext_proc.ProcessingRequest_Reques
}
}

// Allow the request to continue
// Detect if the client expects a streaming response (SSE)
if accept, ok := ctx.Headers["accept"]; ok {
if strings.Contains(strings.ToLower(accept), "text/event-stream") {
ctx.ExpectStreamingResponse = true
}
}

// Prepare base response
response := &ext_proc.ProcessingResponse{
Response: &ext_proc.ProcessingResponse_RequestHeaders{
RequestHeaders: &ext_proc.HeadersResponse{
Expand All @@ -204,6 +215,10 @@ func (r *OpenAIRouter) handleRequestHeaders(v *ext_proc.ProcessingRequest_Reques
},
}

// If streaming is expected, we rely on Envoy config to set response_body_mode: STREAMED for SSE.
// Some Envoy/control-plane versions may not support per-message ModeOverride; avoid compile-time coupling here.
// The Accept header is still recorded on context for downstream logic.

return response, nil
}

Expand Down
64 changes: 62 additions & 2 deletions src/semantic-router/pkg/extproc/response_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package extproc
import (
"encoding/json"
"strconv"
"strings"
"time"

core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
http_ext "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/ext_proc/v3"
ext_proc "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"

"github.com/openai/openai-go"
Expand All @@ -17,6 +19,9 @@ import (
func (r *OpenAIRouter) handleResponseHeaders(v *ext_proc.ProcessingRequest_ResponseHeaders, ctx *RequestContext) (*ext_proc.ProcessingResponse, error) {
// Detect upstream HTTP status and record non-2xx as errors
if v != nil && v.ResponseHeaders != nil && v.ResponseHeaders.Headers != nil {
// Determine if the response is streaming based on Content-Type
ctx.IsStreamingResponse = isStreamingContentType(v.ResponseHeaders.Headers)

if statusCode := getStatusFromHeaders(v.ResponseHeaders.Headers); statusCode != 0 {
if statusCode >= 500 {
metrics.RecordRequestError(getModelFromCtx(ctx), "upstream_5xx")
Expand All @@ -26,8 +31,10 @@ func (r *OpenAIRouter) handleResponseHeaders(v *ext_proc.ProcessingRequest_Respo
}
}

// Best-effort TTFT measurement: record on first response headers if we have a start time and model
if ctx != nil && !ctx.TTFTRecorded && !ctx.ProcessingStartTime.IsZero() && ctx.RequestModel != "" {
// Best-effort TTFT measurement:
// - For non-streaming responses, record on first response headers (approx TTFB ~= TTFT)
// - For streaming responses (SSE), defer TTFT until the first response body chunk arrives
if ctx != nil && !ctx.IsStreamingResponse && !ctx.TTFTRecorded && !ctx.ProcessingStartTime.IsZero() && ctx.RequestModel != "" {
ttft := time.Since(ctx.ProcessingStartTime).Seconds()
if ttft > 0 {
metrics.RecordModelTTFT(ctx.RequestModel, ttft)
Expand All @@ -47,6 +54,14 @@ func (r *OpenAIRouter) handleResponseHeaders(v *ext_proc.ProcessingRequest_Respo
},
}

// If this is a streaming (SSE) response, instruct Envoy to stream the response body to ExtProc
// so we can capture TTFT on the first body chunk. Requires allow_mode_override: true in Envoy config.
if ctx != nil && ctx.IsStreamingResponse {
response.ModeOverride = &http_ext.ProcessingMode{
ResponseBodyMode: http_ext.ProcessingMode_STREAMED,
}
}

return response, nil
}

Expand Down Expand Up @@ -79,13 +94,58 @@ func getModelFromCtx(ctx *RequestContext) string {
return ctx.RequestModel
}

// isStreamingContentType checks if the response content-type indicates streaming (SSE)
func isStreamingContentType(headerMap *core.HeaderMap) bool {
if headerMap == nil {
return false
}
for _, hv := range headerMap.Headers {
if strings.ToLower(hv.Key) == "content-type" {
val := hv.Value
if val == "" && len(hv.RawValue) > 0 {
val = string(hv.RawValue)
}
if strings.Contains(strings.ToLower(val), "text/event-stream") {
return true
}
}
}
return false
}

// handleResponseBody processes the response body
func (r *OpenAIRouter) handleResponseBody(v *ext_proc.ProcessingRequest_ResponseBody, ctx *RequestContext) (*ext_proc.ProcessingResponse, error) {
completionLatency := time.Since(ctx.StartTime)

// Process the response for caching
responseBody := v.ResponseBody.Body

// If this is a streaming response (e.g., SSE), record TTFT on the first body chunk
// and skip JSON parsing/caching which are not applicable for SSE chunks.
if ctx.IsStreamingResponse {
if ctx != nil && !ctx.TTFTRecorded && !ctx.ProcessingStartTime.IsZero() && ctx.RequestModel != "" {
ttft := time.Since(ctx.ProcessingStartTime).Seconds()
if ttft > 0 {
metrics.RecordModelTTFT(ctx.RequestModel, ttft)
ctx.TTFTSeconds = ttft
ctx.TTFTRecorded = true
observability.Infof("Recorded TTFT on first streamed body chunk: %.3fs", ttft)
}
}

// For streaming chunks, just continue (no token parsing or cache update)
response := &ext_proc.ProcessingResponse{
Response: &ext_proc.ProcessingResponse_ResponseBody{
ResponseBody: &ext_proc.BodyResponse{
Response: &ext_proc.CommonResponse{
Status: ext_proc.CommonResponse_CONTINUE,
},
},
},
}
return response, nil
}

// Parse tokens from the response JSON using OpenAI SDK types
var parsed openai.ChatCompletion
if err := json.Unmarshal(responseBody, &parsed); err != nil {
Expand Down
6 changes: 6 additions & 0 deletions website/docs/api/router.md
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,12 @@ histogram_quantile(0.95, sum(rate(llm_model_tpot_seconds_bucket[5m])) by (le, mo

These are included in the provided Grafana dashboard at deploy/llm-router-dashboard.json as “TTFT (p95) by Model” and “TPOT (p95) by Model (sec/token)”.

#### Streaming (SSE) notes

- For Server-Sent Events (SSE) responses, the router measures TTFT on the first streamed body chunk (i.e., the first token), not on response headers.
- No manual change to your Envoy config is required: the ExtProc handler automatically sets a ModeOverride with `response_body_mode: STREAMED` for SSE responses so the first chunk reaches ExtProc immediately.
- Prerequisite: Envoy’s ext_proc filter must have `allow_mode_override: true` (the default configs in `config/envoy.yaml` and `config/envoy-docker.yaml` already include this). Keeping `response_body_mode: BUFFERED` in the static processing mode is fine; the router will flip it to STREAMED at runtime for SSE.

### Pricing Configuration

Provide per-1M pricing for your models so the router can compute request cost and emit metrics/logs.
Expand Down
9 changes: 8 additions & 1 deletion website/docs/overview/architecture/envoy-extproc.md
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ static_resources:
request_header_mode: "SEND"
response_header_mode: "SEND"
request_body_mode: "BUFFERED" # Required for content analysis
response_body_mode: "BUFFERED" # Required for caching
response_body_mode: "BUFFERED" # Default: router flips to STREAMED at runtime for SSE
request_trailer_mode: "SKIP"
response_trailer_mode: "SKIP"

Expand All @@ -419,6 +419,13 @@ static_resources:
allow_mode_override: true # Allow ExtProc to change modes
message_timeout: 300s # Timeout for ExtProc responses
max_message_timeout: 600s # Maximum allowed timeout

> Note on SSE (streaming):
>
> When the upstream responds with `Content-Type: text/event-stream`, the router sets a per-message
> `ModeOverride` with `response_body_mode: STREAMED` so the first chunk reaches ExtProc immediately.
> This enables accurate TTFT measurement on the first token. No manual change to the static
> `processing_mode` is required as long as `allow_mode_override: true` is set (it is in the default configs).

# Advanced configuration
mutation_rules:
Expand Down
Loading