Skip to content

Commit 9cca0e2

Browse files
committed
streaming support
Signed-off-by: JaredforReal <[email protected]>
1 parent b774c48 commit 9cca0e2

File tree

4 files changed

+118
-1
lines changed

4 files changed

+118
-1
lines changed

src/semantic-router/pkg/extproc/mapping_responses.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,3 +144,62 @@ func mapChatCompletionToResponses(chatCompletionJSON []byte) ([]byte, error) {
144144

145145
return json.Marshal(out)
146146
}
147+
148+
// translateSSEChunkToResponses converts a single OpenAI chat.completion.chunk SSE payload
149+
// (the JSON after "data: ") into Responses SSE events (delta/stop). Returns empty when not applicable.
150+
func translateSSEChunkToResponses(chunk []byte) ([][]byte, bool) {
151+
// Expect chunk JSON like {"id":"...","object":"chat.completion.chunk","created":...,"model":"...","choices":[{"index":0,"delta":{"role":"assistant","content":"..."},"finish_reason":null}]}
152+
var parsed map[string]interface{}
153+
if err := json.Unmarshal(chunk, &parsed); err != nil {
154+
return nil, false
155+
}
156+
if parsed["object"] != "chat.completion.chunk" {
157+
return nil, false
158+
}
159+
160+
created, _ := parsed["created"].(float64)
161+
// Emit a created event only once per stream (handled by caller)
162+
163+
// Extract content delta and finish_reason
164+
var deltaText string
165+
var finish string
166+
if arr, ok := parsed["choices"].([]interface{}); ok && len(arr) > 0 {
167+
if ch, ok := arr[0].(map[string]interface{}); ok {
168+
if fr, ok := ch["finish_reason"].(string); ok && fr != "" {
169+
finish = fr
170+
}
171+
if d, ok := ch["delta"].(map[string]interface{}); ok {
172+
if c, ok := d["content"].(string); ok {
173+
deltaText = c
174+
}
175+
}
176+
}
177+
}
178+
179+
var events [][]byte
180+
if deltaText != "" {
181+
ev := map[string]interface{}{
182+
"type": "response.output_text.delta",
183+
"delta": deltaText,
184+
}
185+
if created > 0 {
186+
ev["created"] = int64(created)
187+
}
188+
b, _ := json.Marshal(ev)
189+
events = append(events, b)
190+
}
191+
192+
if finish != "" {
193+
ev := map[string]interface{}{
194+
"type": "response.completed",
195+
"stop_reason": finish,
196+
}
197+
b, _ := json.Marshal(ev)
198+
events = append(events, b)
199+
}
200+
201+
if len(events) == 0 {
202+
return nil, false
203+
}
204+
return events, true
205+
}

src/semantic-router/pkg/extproc/mapping_responses_test.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,3 +44,11 @@ func TestMapChatCompletionToResponses_Minimal(t *testing.T) {
4444
t.Fatalf("stop_reason missing")
4545
}
4646
}
47+
48+
func TestTranslateSSEChunkToResponses(t *testing.T) {
49+
chunk := []byte(`{"id":"c1","object":"chat.completion.chunk","created":1,"model":"m","choices":[{"index":0,"delta":{"role":"assistant","content":"Hi"},"finish_reason":null}]}`)
50+
evs, ok := translateSSEChunkToResponses(chunk)
51+
if !ok || len(evs) == 0 {
52+
t.Fatalf("expected events")
53+
}
54+
}

src/semantic-router/pkg/extproc/processor_res_header.go

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,54 @@ func (r *OpenAIRouter) handleResponseBody(v *ext_proc.ProcessingRequest_Response
205205
}
206206
}
207207

208-
// For streaming chunks, just continue (no token parsing or cache update)
208+
// If Responses adapter is active for this request, translate SSE chunks
209+
if r.Config != nil && r.Config.EnableResponsesAdapter {
210+
if p, ok := ctx.Headers[":path"]; ok && strings.HasPrefix(p, "/v1/responses") {
211+
body := v.ResponseBody.Body
212+
// Envoy provides raw chunk bytes, typically like: "data: {json}\n\n" or "data: [DONE]\n\n"
213+
b := string(body)
214+
if strings.Contains(b, "[DONE]") {
215+
// Emit a final response.completed if not already concluded
216+
response := &ext_proc.ProcessingResponse{
217+
Response: &ext_proc.ProcessingResponse_ResponseBody{
218+
ResponseBody: &ext_proc.BodyResponse{
219+
Response: &ext_proc.CommonResponse{Status: ext_proc.CommonResponse_CONTINUE},
220+
},
221+
},
222+
}
223+
return response, nil
224+
}
225+
226+
// Extract JSON after "data: " prefix if present
227+
idx := strings.Index(b, "data:")
228+
var payload []byte
229+
if idx >= 0 {
230+
payload = []byte(strings.TrimSpace(b[idx+5:]))
231+
} else {
232+
payload = v.ResponseBody.Body
233+
}
234+
235+
if len(payload) > 0 && payload[0] == '{' {
236+
if !ctx.ResponsesStreamInit {
237+
// Emit an initial created event on first chunk
238+
ctx.ResponsesStreamInit = true
239+
// We don't inject a new chunk here; clients will see deltas below
240+
}
241+
events, ok := translateSSEChunkToResponses(payload)
242+
if ok && len(events) > 0 {
243+
// Rebuild body as multiple SSE events in Responses format
244+
var sb strings.Builder
245+
for _, ev := range events {
246+
sb.WriteString("data: ")
247+
sb.Write(ev)
248+
sb.WriteString("\n\n")
249+
}
250+
v.ResponseBody.Body = []byte(sb.String())
251+
}
252+
}
253+
}
254+
}
255+
209256
response := &ext_proc.ProcessingResponse{
210257
Response: &ext_proc.ProcessingResponse_ResponseBody{
211258
ResponseBody: &ext_proc.BodyResponse{

src/semantic-router/pkg/extproc/request_handler.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,9 @@ type RequestContext struct {
249249
TTFTRecorded bool
250250
TTFTSeconds float64
251251

252+
// Responses SSE translation state
253+
ResponsesStreamInit bool
254+
252255
// VSR decision tracking
253256
VSRSelectedCategory string // The category selected by VSR
254257
VSRReasoningMode string // "on" or "off" - whether reasoning mode was determined to be used

0 commit comments

Comments
 (0)