Skip to content

Commit be38a4b

Browse files
authored
fix: resolve semantic cache hit streaming response format issue (#378)
* fix: resolve semantic cache hit streaming response format issue When semantic cache hits occur with streaming requests, the cached response (in chat.completion JSON format) was being returned directly without converting to SSE format (chat.completion.chunk), causing streaming clients to receive malformed responses. This fix: - Updates CreateCacheHitResponse() to accept isStreaming parameter - Converts cached chat.completion to chat.completion.chunk format for streaming - Sets appropriate content-type header (text/event-stream vs application/json) - Maintains backward compatibility for non-streaming requests - Adds comprehensive unit tests for both streaming and non-streaming cases Similar to the fix in a0f0581 for jailbreak/PII violations, this ensures consistent response format handling across all direct response scenarios. Resolves streaming client hanging issues when cache hits occur. Signed-off-by: bitliu <[email protected]> * fix lint Signed-off-by: bitliu <[email protected]> --------- Signed-off-by: bitliu <[email protected]>
1 parent ab92fac commit be38a4b

File tree

3 files changed

+303
-4
lines changed

3 files changed

+303
-4
lines changed

src/semantic-router/pkg/extproc/request_handler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -516,7 +516,7 @@ func (r *OpenAIRouter) handleCaching(ctx *RequestContext) (*ext_proc.ProcessingR
516516
"query": requestQuery,
517517
})
518518
// Return immediate response from cache
519-
response := http.CreateCacheHitResponse(cachedResponse)
519+
response := http.CreateCacheHitResponse(cachedResponse, ctx.ExpectStreamingResponse)
520520
ctx.TraceContext = spanCtx
521521
return response, true
522522
}

src/semantic-router/pkg/utils/http/response.go

Lines changed: 52 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,56 @@ func CreateJailbreakViolationResponse(jailbreakType string, confidence float32,
232232
}
233233

234234
// CreateCacheHitResponse creates an immediate response from cache
235-
func CreateCacheHitResponse(cachedResponse []byte) *ext_proc.ProcessingResponse {
235+
func CreateCacheHitResponse(cachedResponse []byte, isStreaming bool) *ext_proc.ProcessingResponse {
236+
var responseBody []byte
237+
var contentType string
238+
239+
if isStreaming {
240+
// For streaming responses, convert cached JSON to SSE format
241+
contentType = "text/event-stream"
242+
243+
// Parse the cached JSON response
244+
var cachedCompletion openai.ChatCompletion
245+
if err := json.Unmarshal(cachedResponse, &cachedCompletion); err != nil {
246+
observability.Errorf("Error parsing cached response for streaming conversion: %v", err)
247+
responseBody = []byte("data: {\"error\": \"Failed to convert cached response\"}\n\ndata: [DONE]\n\n")
248+
} else {
249+
// Convert chat.completion to chat.completion.chunk format
250+
streamChunk := map[string]interface{}{
251+
"id": cachedCompletion.ID,
252+
"object": "chat.completion.chunk",
253+
"created": cachedCompletion.Created,
254+
"model": cachedCompletion.Model,
255+
"choices": []map[string]interface{}{},
256+
}
257+
258+
// Convert choices from message format to delta format
259+
for _, choice := range cachedCompletion.Choices {
260+
streamChoice := map[string]interface{}{
261+
"index": choice.Index,
262+
"delta": map[string]interface{}{
263+
"role": choice.Message.Role,
264+
"content": choice.Message.Content,
265+
},
266+
"finish_reason": choice.FinishReason,
267+
}
268+
streamChunk["choices"] = append(streamChunk["choices"].([]map[string]interface{}), streamChoice)
269+
}
270+
271+
chunkJSON, err := json.Marshal(streamChunk)
272+
if err != nil {
273+
observability.Errorf("Error marshaling streaming cache response: %v", err)
274+
responseBody = []byte("data: {\"error\": \"Failed to generate response\"}\n\ndata: [DONE]\n\n")
275+
} else {
276+
responseBody = []byte(fmt.Sprintf("data: %s\n\ndata: [DONE]\n\n", chunkJSON))
277+
}
278+
}
279+
} else {
280+
// For non-streaming responses, use cached JSON directly
281+
contentType = "application/json"
282+
responseBody = cachedResponse
283+
}
284+
236285
immediateResponse := &ext_proc.ImmediateResponse{
237286
Status: &typev3.HttpStatus{
238287
Code: typev3.StatusCode_OK,
@@ -242,7 +291,7 @@ func CreateCacheHitResponse(cachedResponse []byte) *ext_proc.ProcessingResponse
242291
{
243292
Header: &core.HeaderValue{
244293
Key: "content-type",
245-
RawValue: []byte("application/json"),
294+
RawValue: []byte(contentType),
246295
},
247296
},
248297
{
@@ -253,7 +302,7 @@ func CreateCacheHitResponse(cachedResponse []byte) *ext_proc.ProcessingResponse
253302
},
254303
},
255304
},
256-
Body: cachedResponse,
305+
Body: responseBody,
257306
}
258307

259308
return &ext_proc.ProcessingResponse{
Lines changed: 250 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,250 @@
1+
package http
2+
3+
import (
4+
"encoding/json"
5+
"strings"
6+
"testing"
7+
8+
"github.com/openai/openai-go"
9+
)
10+
11+
func TestCreateCacheHitResponse_NonStreaming(t *testing.T) {
12+
// Create a sample cached response
13+
cachedCompletion := openai.ChatCompletion{
14+
ID: "chatcmpl-test-123",
15+
Object: "chat.completion",
16+
Created: 1234567890,
17+
Model: "test-model",
18+
Choices: []openai.ChatCompletionChoice{
19+
{
20+
Index: 0,
21+
Message: openai.ChatCompletionMessage{
22+
Role: "assistant",
23+
Content: "This is a cached response.",
24+
},
25+
FinishReason: "stop",
26+
},
27+
},
28+
Usage: openai.CompletionUsage{
29+
PromptTokens: 10,
30+
CompletionTokens: 5,
31+
TotalTokens: 15,
32+
},
33+
}
34+
35+
cachedResponse, err := json.Marshal(cachedCompletion)
36+
if err != nil {
37+
t.Fatalf("Failed to marshal cached response: %v", err)
38+
}
39+
40+
// Test non-streaming response
41+
response := CreateCacheHitResponse(cachedResponse, false)
42+
43+
// Verify response structure
44+
if response == nil {
45+
t.Fatal("Response is nil")
46+
}
47+
48+
immediateResp := response.GetImmediateResponse()
49+
if immediateResp == nil {
50+
t.Fatal("ImmediateResponse is nil")
51+
}
52+
53+
// Verify status code
54+
if immediateResp.Status.Code.String() != "OK" {
55+
t.Errorf("Expected status OK, got %s", immediateResp.Status.Code.String())
56+
}
57+
58+
// Verify content-type header
59+
var contentType string
60+
var cacheHit string
61+
for _, header := range immediateResp.Headers.SetHeaders {
62+
if header.Header.Key == "content-type" {
63+
contentType = string(header.Header.RawValue)
64+
}
65+
if header.Header.Key == "x-vsr-cache-hit" {
66+
cacheHit = string(header.Header.RawValue)
67+
}
68+
}
69+
70+
if contentType != "application/json" {
71+
t.Errorf("Expected content-type application/json, got %s", contentType)
72+
}
73+
74+
if cacheHit != "true" {
75+
t.Errorf("Expected x-vsr-cache-hit true, got %s", cacheHit)
76+
}
77+
78+
// Verify body is unchanged
79+
if string(immediateResp.Body) != string(cachedResponse) {
80+
t.Error("Body was modified for non-streaming response")
81+
}
82+
83+
// Verify body can be parsed as JSON
84+
var parsedResponse openai.ChatCompletion
85+
if err := json.Unmarshal(immediateResp.Body, &parsedResponse); err != nil {
86+
t.Errorf("Failed to parse response body as JSON: %v", err)
87+
}
88+
89+
if parsedResponse.Object != "chat.completion" {
90+
t.Errorf("Expected object chat.completion, got %s", parsedResponse.Object)
91+
}
92+
}
93+
94+
func TestCreateCacheHitResponse_Streaming(t *testing.T) {
95+
// Create a sample cached response
96+
cachedCompletion := openai.ChatCompletion{
97+
ID: "chatcmpl-test-456",
98+
Object: "chat.completion",
99+
Created: 1234567890,
100+
Model: "test-model",
101+
Choices: []openai.ChatCompletionChoice{
102+
{
103+
Index: 0,
104+
Message: openai.ChatCompletionMessage{
105+
Role: "assistant",
106+
Content: "This is a cached streaming response.",
107+
},
108+
FinishReason: "stop",
109+
},
110+
},
111+
Usage: openai.CompletionUsage{
112+
PromptTokens: 10,
113+
CompletionTokens: 5,
114+
TotalTokens: 15,
115+
},
116+
}
117+
118+
cachedResponse, err := json.Marshal(cachedCompletion)
119+
if err != nil {
120+
t.Fatalf("Failed to marshal cached response: %v", err)
121+
}
122+
123+
// Test streaming response
124+
response := CreateCacheHitResponse(cachedResponse, true)
125+
126+
// Verify response structure
127+
if response == nil {
128+
t.Fatal("Response is nil")
129+
}
130+
131+
immediateResp := response.GetImmediateResponse()
132+
if immediateResp == nil {
133+
t.Fatal("ImmediateResponse is nil")
134+
}
135+
136+
// Verify status code
137+
if immediateResp.Status.Code.String() != "OK" {
138+
t.Errorf("Expected status OK, got %s", immediateResp.Status.Code.String())
139+
}
140+
141+
// Verify content-type header
142+
var contentType string
143+
var cacheHit string
144+
for _, header := range immediateResp.Headers.SetHeaders {
145+
if header.Header.Key == "content-type" {
146+
contentType = string(header.Header.RawValue)
147+
}
148+
if header.Header.Key == "x-vsr-cache-hit" {
149+
cacheHit = string(header.Header.RawValue)
150+
}
151+
}
152+
153+
if contentType != "text/event-stream" {
154+
t.Errorf("Expected content-type text/event-stream, got %s", contentType)
155+
}
156+
157+
if cacheHit != "true" {
158+
t.Errorf("Expected x-vsr-cache-hit true, got %s", cacheHit)
159+
}
160+
161+
// Verify body is in SSE format
162+
bodyStr := string(immediateResp.Body)
163+
if !strings.HasPrefix(bodyStr, "data: ") {
164+
t.Error("Body does not start with 'data: ' prefix")
165+
}
166+
167+
if !strings.Contains(bodyStr, "data: [DONE]") {
168+
t.Error("Body does not contain 'data: [DONE]' terminator")
169+
}
170+
171+
// Parse the SSE data
172+
lines := strings.Split(bodyStr, "\n")
173+
var dataLine string
174+
for _, line := range lines {
175+
if strings.HasPrefix(line, "data: ") && !strings.Contains(line, "[DONE]") {
176+
dataLine = strings.TrimPrefix(line, "data: ")
177+
break
178+
}
179+
}
180+
181+
if dataLine == "" {
182+
t.Fatal("No data line found in SSE response")
183+
}
184+
185+
// Parse the JSON chunk
186+
var chunk map[string]interface{}
187+
if err := json.Unmarshal([]byte(dataLine), &chunk); err != nil {
188+
t.Fatalf("Failed to parse SSE data as JSON: %v", err)
189+
}
190+
191+
// Verify chunk structure
192+
if chunk["object"] != "chat.completion.chunk" {
193+
t.Errorf("Expected object chat.completion.chunk, got %v", chunk["object"])
194+
}
195+
196+
if chunk["id"] != "chatcmpl-test-456" {
197+
t.Errorf("Expected id chatcmpl-test-456, got %v", chunk["id"])
198+
}
199+
200+
// Verify choices structure
201+
choices, ok := chunk["choices"].([]interface{})
202+
if !ok || len(choices) == 0 {
203+
t.Fatal("Choices not found or empty")
204+
}
205+
206+
choice := choices[0].(map[string]interface{})
207+
delta, ok := choice["delta"].(map[string]interface{})
208+
if !ok {
209+
t.Fatal("Delta not found in choice")
210+
}
211+
212+
if delta["role"] != "assistant" {
213+
t.Errorf("Expected role assistant, got %v", delta["role"])
214+
}
215+
216+
if delta["content"] != "This is a cached streaming response." {
217+
t.Errorf("Expected content 'This is a cached streaming response.', got %v", delta["content"])
218+
}
219+
220+
if choice["finish_reason"] != "stop" {
221+
t.Errorf("Expected finish_reason stop, got %v", choice["finish_reason"])
222+
}
223+
}
224+
225+
func TestCreateCacheHitResponse_StreamingWithInvalidJSON(t *testing.T) {
226+
// Test with invalid JSON
227+
invalidJSON := []byte("invalid json")
228+
229+
response := CreateCacheHitResponse(invalidJSON, true)
230+
231+
// Verify response structure
232+
if response == nil {
233+
t.Fatal("Response is nil")
234+
}
235+
236+
immediateResp := response.GetImmediateResponse()
237+
if immediateResp == nil {
238+
t.Fatal("ImmediateResponse is nil")
239+
}
240+
241+
// Verify error response
242+
bodyStr := string(immediateResp.Body)
243+
if !strings.Contains(bodyStr, "error") {
244+
t.Error("Expected error message in response body")
245+
}
246+
247+
if !strings.Contains(bodyStr, "data: [DONE]") {
248+
t.Error("Expected SSE terminator even in error case")
249+
}
250+
}

0 commit comments

Comments
 (0)