Skip to content

Commit d85c966

Browse files
authored
Merge branch 'main' into fix/stream-mode
2 parents 25a1433 + 9868dbb commit d85c966

File tree

9 files changed

+388
-13
lines changed

9 files changed

+388
-13
lines changed

.github/workflows/test-and-build.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ jobs:
7272

7373
- name: Install HuggingFace CLI
7474
run: |
75-
pip install -U "huggingface_hub[cli]"
75+
pip install -U "huggingface_hub[cli]" hf_transfer
7676
7777
7878
- name: Download models (minimal on PRs)

src/semantic-router/go.mod

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ require (
2020
github.com/openai/openai-go v1.12.0
2121
github.com/prometheus/client_golang v1.23.0
2222
github.com/prometheus/client_model v0.6.2
23+
github.com/stretchr/testify v1.10.0
2324
github.com/vllm-project/semantic-router/candle-binding v0.0.0-00010101000000-000000000000
2425
go.uber.org/zap v1.27.0
2526
google.golang.org/grpc v1.71.1
@@ -34,6 +35,7 @@ require (
3435
github.com/cockroachdb/errors v1.9.1 // indirect
3536
github.com/cockroachdb/logtags v0.0.0-20211118104740-dabe8e521a4f // indirect
3637
github.com/cockroachdb/redact v1.1.3 // indirect
38+
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
3739
github.com/envoyproxy/protoc-gen-validate v1.2.1 // indirect
3840
github.com/fxamacker/cbor/v2 v2.7.0 // indirect
3941
github.com/getsentry/sentry-go v0.12.0 // indirect
@@ -54,6 +56,7 @@ require (
5456
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
5557
github.com/pkg/errors v0.9.1 // indirect
5658
github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect
59+
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
5760
github.com/prometheus/common v0.65.0 // indirect
5861
github.com/prometheus/procfs v0.16.1 // indirect
5962
github.com/rogpeppe/go-internal v1.12.0 // indirect

src/semantic-router/go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -257,8 +257,8 @@ github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An
257257
github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s=
258258
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
259259
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
260-
github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c=
261-
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
260+
github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY=
261+
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
262262
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
263263
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
264264
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=

src/semantic-router/pkg/extproc/request_handler.go

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,12 @@ type RequestContext struct {
208208
// TTFT tracking
209209
TTFTRecorded bool
210210
TTFTSeconds float64
211+
212+
// VSR decision tracking
213+
VSRSelectedCategory string // The category selected by VSR
214+
VSRReasoningMode string // "on" or "off" - whether reasoning mode was determined to be used
215+
VSRSelectedModel string // The model selected by VSR
216+
VSRCacheHit bool // Whether this request hit the cache
211217
}
212218

213219
// handleRequestHeaders processes the request headers
@@ -379,6 +385,8 @@ func (r *OpenAIRouter) handleCaching(ctx *RequestContext) (*ext_proc.ProcessingR
379385
if err != nil {
380386
observability.Errorf("Error searching cache: %v", err)
381387
} else if found {
388+
// Mark this request as a cache hit
389+
ctx.VSRCacheHit = true
382390
// Log cache hit
383391
observability.LogEvent("cache_hit", map[string]interface{}{
384392
"request_id": ctx.RequestID,
@@ -389,13 +397,13 @@ func (r *OpenAIRouter) handleCaching(ctx *RequestContext) (*ext_proc.ProcessingR
389397
response := http.CreateCacheHitResponse(cachedResponse)
390398
return response, true
391399
}
400+
}
392401

393-
// Cache miss, store the request for later
394-
err = r.Cache.AddPendingRequest(ctx.RequestID, requestModel, requestQuery, ctx.OriginalRequestBody)
395-
if err != nil {
396-
observability.Errorf("Error adding pending request to cache: %v", err)
397-
// Continue without caching
398-
}
402+
// Cache miss, store the request for later
403+
err = r.Cache.AddPendingRequest(ctx.RequestID, requestModel, requestQuery, ctx.OriginalRequestBody)
404+
if err != nil {
405+
observability.Errorf("Error adding pending request to cache: %v", err)
406+
// Continue without caching
399407
}
400408

401409
return nil, false
@@ -499,6 +507,15 @@ func (r *OpenAIRouter) handleModelRouting(openAIRequest *openai.ChatCompletionNe
499507
effortForMetrics := r.getReasoningEffort(categoryName)
500508
metrics.RecordReasoningDecision(categoryName, matchedModel, useReasoning, effortForMetrics)
501509

510+
// Track VSR decision information
511+
ctx.VSRSelectedCategory = categoryName
512+
ctx.VSRSelectedModel = matchedModel
513+
if useReasoning {
514+
ctx.VSRReasoningMode = "on"
515+
} else {
516+
ctx.VSRReasoningMode = "off"
517+
}
518+
502519
// Track the model routing change
503520
metrics.RecordModelRouting(originalModel, matchedModel)
504521

@@ -612,6 +629,9 @@ func (r *OpenAIRouter) handleModelRouting(openAIRequest *openai.ChatCompletionNe
612629
}
613630
} else if originalModel != "auto" {
614631
observability.Infof("Using specified model: %s", originalModel)
632+
// Track VSR decision information for non-auto models
633+
ctx.VSRSelectedModel = originalModel
634+
ctx.VSRReasoningMode = "off" // Non-auto models don't use reasoning mode by default
615635
// For non-auto models, check PII policy compliance
616636
allContent := pii.ExtractAllContent(userContent, nonUserMessages)
617637
detectedPII := r.Classifier.DetectPIIInContent(allContent)

src/semantic-router/pkg/extproc/response_handler.go

Lines changed: 55 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,18 @@ import (
1717

1818
// handleResponseHeaders processes the response headers
1919
func (r *OpenAIRouter) handleResponseHeaders(v *ext_proc.ProcessingRequest_ResponseHeaders, ctx *RequestContext) (*ext_proc.ProcessingResponse, error) {
20+
var statusCode int
21+
var isSuccessful bool
22+
2023
// Detect upstream HTTP status and record non-2xx as errors
2124
if v != nil && v.ResponseHeaders != nil && v.ResponseHeaders.Headers != nil {
2225
// Determine if the response is streaming based on Content-Type
2326
ctx.IsStreamingResponse = isStreamingContentType(v.ResponseHeaders.Headers)
2427

25-
if statusCode := getStatusFromHeaders(v.ResponseHeaders.Headers); statusCode != 0 {
28+
statusCode = getStatusFromHeaders(v.ResponseHeaders.Headers)
29+
isSuccessful = statusCode >= 200 && statusCode < 300
30+
31+
if statusCode != 0 {
2632
if statusCode >= 500 {
2733
metrics.RecordRequestError(getModelFromCtx(ctx), "upstream_5xx")
2834
} else if statusCode >= 400 {
@@ -43,12 +49,58 @@ func (r *OpenAIRouter) handleResponseHeaders(v *ext_proc.ProcessingRequest_Respo
4349
}
4450
}
4551

46-
// Allow the response to continue without modification
52+
// Prepare response headers with VSR decision tracking headers if applicable
53+
var headerMutation *ext_proc.HeaderMutation
54+
55+
// Add VSR decision headers if request was successful and didn't hit cache
56+
if isSuccessful && !ctx.VSRCacheHit && ctx != nil {
57+
var setHeaders []*core.HeaderValueOption
58+
59+
// Add x-vsr-selected-category header
60+
if ctx.VSRSelectedCategory != "" {
61+
setHeaders = append(setHeaders, &core.HeaderValueOption{
62+
Header: &core.HeaderValue{
63+
Key: "x-vsr-selected-category",
64+
RawValue: []byte(ctx.VSRSelectedCategory),
65+
},
66+
})
67+
}
68+
69+
// Add x-vsr-selected-reasoning header
70+
if ctx.VSRReasoningMode != "" {
71+
setHeaders = append(setHeaders, &core.HeaderValueOption{
72+
Header: &core.HeaderValue{
73+
Key: "x-vsr-selected-reasoning",
74+
RawValue: []byte(ctx.VSRReasoningMode),
75+
},
76+
})
77+
}
78+
79+
// Add x-vsr-selected-model header
80+
if ctx.VSRSelectedModel != "" {
81+
setHeaders = append(setHeaders, &core.HeaderValueOption{
82+
Header: &core.HeaderValue{
83+
Key: "x-vsr-selected-model",
84+
RawValue: []byte(ctx.VSRSelectedModel),
85+
},
86+
})
87+
}
88+
89+
// Create header mutation if we have headers to add
90+
if len(setHeaders) > 0 {
91+
headerMutation = &ext_proc.HeaderMutation{
92+
SetHeaders: setHeaders,
93+
}
94+
}
95+
}
96+
97+
// Allow the response to continue with VSR headers if applicable
4798
response := &ext_proc.ProcessingResponse{
4899
Response: &ext_proc.ProcessingResponse_ResponseHeaders{
49100
ResponseHeaders: &ext_proc.HeadersResponse{
50101
Response: &ext_proc.CommonResponse{
51-
Status: ext_proc.CommonResponse_CONTINUE,
102+
Status: ext_proc.CommonResponse_CONTINUE,
103+
HeaderMutation: headerMutation,
52104
},
53105
},
54106
},
Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
package extproc
2+
3+
import (
4+
"testing"
5+
6+
core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
7+
ext_proc "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
8+
"github.com/stretchr/testify/assert"
9+
)
10+
11+
func TestVSRHeadersAddedOnSuccessfulNonCachedResponse(t *testing.T) {
12+
// Create a mock router
13+
router := &OpenAIRouter{}
14+
15+
// Create request context with VSR decision information
16+
ctx := &RequestContext{
17+
VSRSelectedCategory: "math",
18+
VSRReasoningMode: "on",
19+
VSRSelectedModel: "deepseek-v31",
20+
VSRCacheHit: false, // Not a cache hit
21+
}
22+
23+
// Create response headers with successful status (200)
24+
responseHeaders := &ext_proc.ProcessingRequest_ResponseHeaders{
25+
ResponseHeaders: &ext_proc.HttpHeaders{
26+
Headers: &core.HeaderMap{
27+
Headers: []*core.HeaderValue{
28+
{Key: ":status", Value: "200"},
29+
{Key: "content-type", Value: "application/json"},
30+
},
31+
},
32+
},
33+
}
34+
35+
// Call handleResponseHeaders
36+
response, err := router.handleResponseHeaders(responseHeaders, ctx)
37+
38+
// Verify no error occurred
39+
assert.NoError(t, err)
40+
assert.NotNil(t, response)
41+
42+
// Verify response structure
43+
assert.NotNil(t, response.GetResponseHeaders())
44+
assert.NotNil(t, response.GetResponseHeaders().GetResponse())
45+
46+
// Verify VSR headers were added
47+
headerMutation := response.GetResponseHeaders().GetResponse().GetHeaderMutation()
48+
assert.NotNil(t, headerMutation, "HeaderMutation should not be nil for successful non-cached response")
49+
50+
setHeaders := headerMutation.GetSetHeaders()
51+
assert.Len(t, setHeaders, 3, "Should have 3 VSR headers")
52+
53+
// Verify each header
54+
headerMap := make(map[string]string)
55+
for _, header := range setHeaders {
56+
headerMap[header.Header.Key] = string(header.Header.RawValue)
57+
}
58+
59+
assert.Equal(t, "math", headerMap["x-vsr-selected-category"])
60+
assert.Equal(t, "on", headerMap["x-vsr-selected-reasoning"])
61+
assert.Equal(t, "deepseek-v31", headerMap["x-vsr-selected-model"])
62+
}
63+
64+
func TestVSRHeadersNotAddedOnCacheHit(t *testing.T) {
65+
// Create a mock router
66+
router := &OpenAIRouter{}
67+
68+
// Create request context with cache hit
69+
ctx := &RequestContext{
70+
VSRSelectedCategory: "math",
71+
VSRReasoningMode: "on",
72+
VSRSelectedModel: "deepseek-v31",
73+
VSRCacheHit: true, // Cache hit - headers should not be added
74+
}
75+
76+
// Create response headers with successful status (200)
77+
responseHeaders := &ext_proc.ProcessingRequest_ResponseHeaders{
78+
ResponseHeaders: &ext_proc.HttpHeaders{
79+
Headers: &core.HeaderMap{
80+
Headers: []*core.HeaderValue{
81+
{Key: ":status", Value: "200"},
82+
{Key: "content-type", Value: "application/json"},
83+
},
84+
},
85+
},
86+
}
87+
88+
// Call handleResponseHeaders
89+
response, err := router.handleResponseHeaders(responseHeaders, ctx)
90+
91+
// Verify no error occurred
92+
assert.NoError(t, err)
93+
assert.NotNil(t, response)
94+
95+
// Verify VSR headers were NOT added due to cache hit
96+
headerMutation := response.GetResponseHeaders().GetResponse().GetHeaderMutation()
97+
assert.Nil(t, headerMutation, "HeaderMutation should be nil for cache hit")
98+
}
99+
100+
func TestVSRHeadersNotAddedOnErrorResponse(t *testing.T) {
101+
// Create a mock router
102+
router := &OpenAIRouter{}
103+
104+
// Create request context with VSR decision information
105+
ctx := &RequestContext{
106+
VSRSelectedCategory: "math",
107+
VSRReasoningMode: "on",
108+
VSRSelectedModel: "deepseek-v31",
109+
VSRCacheHit: false, // Not a cache hit
110+
}
111+
112+
// Create response headers with error status (500)
113+
responseHeaders := &ext_proc.ProcessingRequest_ResponseHeaders{
114+
ResponseHeaders: &ext_proc.HttpHeaders{
115+
Headers: &core.HeaderMap{
116+
Headers: []*core.HeaderValue{
117+
{Key: ":status", Value: "500"},
118+
{Key: "content-type", Value: "application/json"},
119+
},
120+
},
121+
},
122+
}
123+
124+
// Call handleResponseHeaders
125+
response, err := router.handleResponseHeaders(responseHeaders, ctx)
126+
127+
// Verify no error occurred
128+
assert.NoError(t, err)
129+
assert.NotNil(t, response)
130+
131+
// Verify VSR headers were NOT added due to error status
132+
headerMutation := response.GetResponseHeaders().GetResponse().GetHeaderMutation()
133+
assert.Nil(t, headerMutation, "HeaderMutation should be nil for error response")
134+
}
135+
136+
func TestVSRHeadersPartialInformation(t *testing.T) {
137+
// Create a mock router
138+
router := &OpenAIRouter{}
139+
140+
// Create request context with partial VSR information
141+
ctx := &RequestContext{
142+
VSRSelectedCategory: "math",
143+
VSRReasoningMode: "", // Empty reasoning mode
144+
VSRSelectedModel: "deepseek-v31",
145+
VSRCacheHit: false,
146+
}
147+
148+
// Create response headers with successful status (200)
149+
responseHeaders := &ext_proc.ProcessingRequest_ResponseHeaders{
150+
ResponseHeaders: &ext_proc.HttpHeaders{
151+
Headers: &core.HeaderMap{
152+
Headers: []*core.HeaderValue{
153+
{Key: ":status", Value: "200"},
154+
{Key: "content-type", Value: "application/json"},
155+
},
156+
},
157+
},
158+
}
159+
160+
// Call handleResponseHeaders
161+
response, err := router.handleResponseHeaders(responseHeaders, ctx)
162+
163+
// Verify no error occurred
164+
assert.NoError(t, err)
165+
assert.NotNil(t, response)
166+
167+
// Verify only non-empty headers were added
168+
headerMutation := response.GetResponseHeaders().GetResponse().GetHeaderMutation()
169+
assert.NotNil(t, headerMutation)
170+
171+
setHeaders := headerMutation.GetSetHeaders()
172+
assert.Len(t, setHeaders, 2, "Should have 2 VSR headers (excluding empty reasoning mode)")
173+
174+
// Verify each header
175+
headerMap := make(map[string]string)
176+
for _, header := range setHeaders {
177+
headerMap[header.Header.Key] = string(header.Header.RawValue)
178+
}
179+
180+
assert.Equal(t, "math", headerMap["x-vsr-selected-category"])
181+
assert.Equal(t, "deepseek-v31", headerMap["x-vsr-selected-model"])
182+
assert.NotContains(t, headerMap, "x-vsr-selected-reasoning", "Empty reasoning mode should not be added")
183+
}

src/semantic-router/pkg/utils/http/response.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ func CreateCacheHitResponse(cachedResponse []byte) *ext_proc.ProcessingResponse
169169
},
170170
{
171171
Header: &core.HeaderValue{
172-
Key: "x-cache-hit",
172+
Key: "x-vsr-cache-hit",
173173
RawValue: []byte("true"),
174174
},
175175
},

0 commit comments

Comments
 (0)