Skip to content

Commit e09547c

Browse files
committed
update
Signed-off-by: yxia216 <[email protected]>
1 parent 30c3e4d commit e09547c

File tree

8 files changed

+604
-36
lines changed

8 files changed

+604
-36
lines changed

cmd/aigw/run.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -136,14 +136,15 @@ func run(ctx context.Context, c cmdRun, o *runOpts, stdout, stderr io.Writer) er
136136
// Do the translation of the given AI Gateway resources Yaml into Envoy Gateway resources and write them to the file.
137137
resourcesBuf := &bytes.Buffer{}
138138
runCtx := &runCmdContext{
139-
isDebug: c.Debug,
140-
envoyGatewayResourcesOut: resourcesBuf,
141-
stderrLogger: debugLogger,
142-
stderr: stderr,
143-
tmpdir: filepath.Dir(o.logPath), // runDir
144-
udsPath: o.extprocUDSPath,
145-
adminPort: c.AdminPort,
146-
extProcLauncher: o.extProcLauncher,
139+
isDebug: c.Debug,
140+
envoyGatewayResourcesOut: resourcesBuf,
141+
stderrLogger: debugLogger,
142+
stderr: stderr,
143+
tmpdir: filepath.Dir(o.logPath), // runDir
144+
udsPath: o.extprocUDSPath,
145+
adminPort: c.AdminPort,
146+
extProcLauncher: o.extProcLauncher,
147+
mcpSessionEncryptionIterations: c.MCPSessionEncryptionIterations,
147148
}
148149
// If any of the configured MCP servers is using stdio, set up the streamable HTTP proxies for them
149150
if err = proxyStdioMCPServers(ctx, debugLogger, c.mcpConfig); err != nil {

internal/translator/openai_gcpanthropic.go

Lines changed: 37 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
openAIconstant "github.com/openai/openai-go/shared/constant"
2323
"github.com/tidwall/sjson"
2424

25+
"github.com/envoyproxy/ai-gateway/internal/apischema/awsbedrock"
2526
"github.com/envoyproxy/ai-gateway/internal/apischema/openai"
2627
"github.com/envoyproxy/ai-gateway/internal/internalapi"
2728
"github.com/envoyproxy/ai-gateway/internal/metrics"
@@ -859,15 +860,43 @@ func (o *openAIToGCPAnthropicTranslatorV1ChatCompletion) ResponseBody(_ map[stri
859860

860861
for i := range anthropicResp.Content { // NOTE: Content structure is massive, do not range over values.
861862
output := &anthropicResp.Content[i]
862-
if output.Type == string(constant.ValueOf[constant.ToolUse]()) && output.ID != "" {
863-
toolCalls, toolErr := anthropicToolUseToOpenAICalls(output)
864-
if toolErr != nil {
865-
return nil, nil, metrics.TokenUsage{}, "", fmt.Errorf("failed to convert anthropic tool use to openai tool call: %w", toolErr)
863+
switch output.Type {
864+
case string(constant.ValueOf[constant.ToolUse]()):
865+
if output.ID != "" {
866+
toolCalls, toolErr := anthropicToolUseToOpenAICalls(output)
867+
if toolErr != nil {
868+
return nil, nil, metrics.TokenUsage{}, "", fmt.Errorf("failed to convert anthropic tool use to openai tool call: %w", toolErr)
869+
}
870+
choice.Message.ToolCalls = append(choice.Message.ToolCalls, toolCalls...)
871+
}
872+
case string(constant.ValueOf[constant.Text]()):
873+
if output.Text != "" {
874+
if choice.Message.Content == nil {
875+
choice.Message.Content = &output.Text
876+
}
866877
}
867-
choice.Message.ToolCalls = append(choice.Message.ToolCalls, toolCalls...)
868-
} else if output.Type == string(constant.ValueOf[constant.Text]()) && output.Text != "" {
869-
if choice.Message.Content == nil {
870-
choice.Message.Content = &output.Text
878+
case string(constant.ValueOf[constant.Thinking]()):
879+
if output.Thinking != "" {
880+
choice.Message.ReasoningContent = &openai.ReasoningContentUnion{
881+
Value: &openai.ReasoningContent{
882+
ReasoningContent: &awsbedrock.ReasoningContentBlock{
883+
ReasoningText: &awsbedrock.ReasoningTextBlock{
884+
Text: output.Thinking,
885+
Signature: output.Signature,
886+
},
887+
},
888+
},
889+
}
890+
}
891+
case string(constant.ValueOf[constant.RedactedThinking]()):
892+
if output.Data != "" {
893+
choice.Message.ReasoningContent = &openai.ReasoningContentUnion{
894+
Value: &openai.ReasoningContent{
895+
ReasoningContent: &awsbedrock.ReasoningContentBlock{
896+
RedactedContent: []byte(output.Data),
897+
},
898+
},
899+
}
871900
}
872901
}
873902
}

internal/translator/openai_gcpanthropic_stream.go

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -14,18 +14,14 @@ import (
1414

1515
"github.com/anthropics/anthropic-sdk-go"
1616
"github.com/anthropics/anthropic-sdk-go/shared/constant"
17-
"k8s.io/utils/ptr"
1817

1918
"github.com/envoyproxy/ai-gateway/internal/apischema/openai"
2019
"github.com/envoyproxy/ai-gateway/internal/internalapi"
2120
"github.com/envoyproxy/ai-gateway/internal/metrics"
2221
tracing "github.com/envoyproxy/ai-gateway/internal/tracing/api"
2322
)
2423

25-
var (
26-
sseEventPrefix = []byte("event:")
27-
emptyStrPtr = ptr.To("")
28-
)
24+
var sseEventPrefix = []byte("event:")
2925

3026
// streamingToolCall holds the state for a single tool call that is being streamed.
3127
type streamingToolCall struct {
@@ -265,16 +261,7 @@ func (p *anthropicStreamParser) handleAnthropicStreamEvent(eventType []byte, dat
265261
}
266262
return p.constructOpenAIChatCompletionChunk(delta, ""), nil
267263
}
268-
if event.ContentBlock.Type == string(constant.ValueOf[constant.Thinking]()) {
269-
delta := openai.ChatCompletionResponseChunkChoiceDelta{Content: emptyStrPtr}
270-
return p.constructOpenAIChatCompletionChunk(delta, ""), nil
271-
}
272-
273-
if event.ContentBlock.Type == string(constant.ValueOf[constant.RedactedThinking]()) {
274-
// This is a latency-hiding event, ignore it.
275-
return nil, nil
276-
}
277-
264+
// do not need to return an empty str for thinking start block
278265
return nil, nil
279266

280267
case string(constant.ValueOf[constant.MessageDelta]()):
@@ -304,10 +291,28 @@ func (p *anthropicStreamParser) handleAnthropicStreamEvent(eventType []byte, dat
304291
return nil, fmt.Errorf("unmarshal content_block_delta: %w", err)
305292
}
306293
switch event.Delta.Type {
307-
case string(constant.ValueOf[constant.TextDelta]()), string(constant.ValueOf[constant.ThinkingDelta]()):
308-
// Treat thinking_delta just like a text_delta.
294+
case string(constant.ValueOf[constant.TextDelta]()):
309295
delta := openai.ChatCompletionResponseChunkChoiceDelta{Content: &event.Delta.Text}
310296
return p.constructOpenAIChatCompletionChunk(delta, ""), nil
297+
298+
case string(constant.ValueOf[constant.ThinkingDelta]()):
299+
// this should already include the case for redacted thinking: https://platform.claude.com/docs/en/build-with-claude/streaming#content-block-delta-types
300+
301+
reasoningDelta := &openai.StreamReasoningContent{}
302+
303+
// Map all relevant fields from the Bedrock delta to our flattened OpenAI delta struct.
304+
if event.Delta.Thinking != "" {
305+
reasoningDelta.Text = event.Delta.Thinking
306+
}
307+
if event.Delta.Signature != "" {
308+
reasoningDelta.Signature = event.Delta.Signature
309+
}
310+
311+
delta := openai.ChatCompletionResponseChunkChoiceDelta{
312+
ReasoningContent: reasoningDelta,
313+
}
314+
return p.constructOpenAIChatCompletionChunk(delta, ""), nil
315+
311316
case string(constant.ValueOf[constant.InputJSONDelta]()):
312317
tool, ok := p.activeToolCalls[p.toolIndex]
313318
if !ok {
@@ -326,6 +331,7 @@ func (p *anthropicStreamParser) handleAnthropicStreamEvent(eventType []byte, dat
326331
tool.inputJSON += event.Delta.PartialJSON
327332
return p.constructOpenAIChatCompletionChunk(delta, ""), nil
328333
}
334+
// Do not process redacted thinking stream? Did not find the source
329335

330336
case string(constant.ValueOf[constant.ContentBlockStop]()):
331337
// This event is for state cleanup, no chunk is sent.

internal/translator/openai_gcpanthropic_stream_test.go

Lines changed: 153 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -539,7 +539,7 @@ event: content_block_start
539539
data: {"type": "content_block_start", "index": 0, "content_block": {"type": "thinking", "name": "web_searcher"}}
540540
541541
event: content_block_delta
542-
data: {"type": "content_block_delta", "index": 0, "delta": {"type": "thinking_delta", "text": "Searching for information..."}}
542+
data: {"type": "content_block_delta", "index": 0, "delta": {"type": "thinking_delta", "thinking": "Searching for information..."}}
543543
544544
event: content_block_stop
545545
data: {"type": "content_block_stop", "index": 0}
@@ -564,6 +564,7 @@ data: {"type": "message_stop"}
564564
bodyStr := string(bm)
565565

566566
var contentDeltas []string
567+
var reasoningTexts []string
567568
var foundToolCallWithArgs bool
568569
var finalFinishReason openai.ChatCompletionChoicesFinishReason
569570

@@ -586,6 +587,11 @@ data: {"type": "message_stop"}
586587
if choice.Delta.Content != nil {
587588
contentDeltas = append(contentDeltas, *choice.Delta.Content)
588589
}
590+
if choice.Delta.ReasoningContent != nil {
591+
if choice.Delta.ReasoningContent.Text != "" {
592+
reasoningTexts = append(reasoningTexts, choice.Delta.ReasoningContent.Text)
593+
}
594+
}
589595
if len(choice.Delta.ToolCalls) > 0 {
590596
toolCall := choice.Delta.ToolCalls[0]
591597
// Check if this is the tool chunk that contains the arguments.
@@ -607,11 +613,155 @@ data: {"type": "message_stop"}
607613
}
608614
}
609615

610-
fullContent := strings.Join(contentDeltas, "")
611-
assert.Contains(t, fullContent, "Searching for information...")
616+
fullReasoning := strings.Join(reasoningTexts, "")
617+
618+
assert.Contains(t, fullReasoning, "Searching for information...")
612619
require.True(t, foundToolCallWithArgs, "Did not find a tool call chunk with arguments to assert against")
613620
assert.Equal(t, openai.ChatCompletionChoicesFinishReasonToolCalls, finalFinishReason, "Final finish reason should be 'tool_calls'")
614621
})
622+
623+
t.Run("handles thinking delta stream with text only", func(t *testing.T) {
624+
sseStream := `
625+
event: message_start
626+
data: {"type": "message_start", "message": {"id": "msg_thinking_1", "type": "message", "role": "assistant", "usage": {"input_tokens": 20, "output_tokens": 1}}}
627+
628+
event: content_block_start
629+
data: {"type": "content_block_start", "index": 0, "content_block": {"type": "thinking"}}
630+
631+
event: content_block_delta
632+
data: {"type": "content_block_delta", "index": 0, "delta": {"type": "thinking_delta", "thinking": "Let me think about this problem step by step."}}
633+
634+
event: content_block_delta
635+
data: {"type": "content_block_delta", "index": 0, "delta": {"type": "thinking_delta", "thinking": " First, I need to understand the requirements."}}
636+
637+
event: content_block_stop
638+
data: {"type": "content_block_stop", "index": 0}
639+
640+
event: message_delta
641+
data: {"type": "message_delta", "delta": {"stop_reason": "end_turn"}, "usage": {"output_tokens": 15}}
642+
643+
event: message_stop
644+
data: {"type": "message_stop"}
645+
`
646+
openAIReq := &openai.ChatCompletionRequest{Stream: true, Model: "test-model", MaxTokens: new(int64)}
647+
translator := NewChatCompletionOpenAIToGCPAnthropicTranslator("", "").(*openAIToGCPAnthropicTranslatorV1ChatCompletion)
648+
_, _, err := translator.RequestBody(nil, openAIReq, false)
649+
require.NoError(t, err)
650+
651+
_, bm, _, _, err := translator.ResponseBody(map[string]string{}, strings.NewReader(sseStream), true, nil)
652+
require.NoError(t, err)
653+
require.NotNil(t, bm)
654+
bodyStr := string(bm)
655+
656+
var reasoningTexts []string
657+
var foundFinishReason bool
658+
659+
lines := strings.SplitSeq(strings.TrimSpace(bodyStr), "\n\n")
660+
for line := range lines {
661+
if !strings.HasPrefix(line, "data: ") || strings.Contains(line, "[DONE]") {
662+
continue
663+
}
664+
jsonBody := strings.TrimPrefix(line, "data: ")
665+
666+
var chunk openai.ChatCompletionResponseChunk
667+
err = json.Unmarshal([]byte(jsonBody), &chunk)
668+
require.NoError(t, err, "Failed to unmarshal chunk: %s", jsonBody)
669+
670+
if len(chunk.Choices) == 0 {
671+
continue
672+
}
673+
choice := chunk.Choices[0]
674+
if choice.Delta != nil && choice.Delta.ReasoningContent != nil {
675+
if choice.Delta.ReasoningContent.Text != "" {
676+
reasoningTexts = append(reasoningTexts, choice.Delta.ReasoningContent.Text)
677+
}
678+
}
679+
if choice.FinishReason == openai.ChatCompletionChoicesFinishReasonStop {
680+
foundFinishReason = true
681+
}
682+
}
683+
684+
fullReasoning := strings.Join(reasoningTexts, "")
685+
assert.Contains(t, fullReasoning, "Let me think about this problem step by step.")
686+
assert.Contains(t, fullReasoning, " First, I need to understand the requirements.")
687+
require.True(t, foundFinishReason, "Should find stop finish reason")
688+
})
689+
690+
t.Run("handles thinking delta stream with text and signature", func(t *testing.T) {
691+
sseStream := `
692+
event: message_start
693+
data: {"type": "message_start", "message": {"id": "msg_thinking_2", "type": "message", "role": "assistant", "usage": {"input_tokens": 25, "output_tokens": 1}}}
694+
695+
event: content_block_start
696+
data: {"type": "content_block_start", "index": 0, "content_block": {"type": "thinking"}}
697+
698+
event: content_block_delta
699+
data: {"type": "content_block_delta", "index": 0, "delta": {"type": "thinking_delta", "thinking": "Processing request...", "signature": "sig_abc123"}}
700+
701+
event: content_block_delta
702+
data: {"type": "content_block_delta", "index": 0, "delta": {"type": "thinking_delta", "thinking": " Analyzing data...", "signature": "sig_def456"}}
703+
704+
event: content_block_stop
705+
data: {"type": "content_block_stop", "index": 0}
706+
707+
event: message_delta
708+
data: {"type": "message_delta", "delta": {"stop_reason": "end_turn"}, "usage": {"output_tokens": 20}}
709+
710+
event: message_stop
711+
data: {"type": "message_stop"}
712+
`
713+
openAIReq := &openai.ChatCompletionRequest{Stream: true, Model: "test-model", MaxTokens: new(int64)}
714+
translator := NewChatCompletionOpenAIToGCPAnthropicTranslator("", "").(*openAIToGCPAnthropicTranslatorV1ChatCompletion)
715+
_, _, err := translator.RequestBody(nil, openAIReq, false)
716+
require.NoError(t, err)
717+
718+
_, bm, _, _, err := translator.ResponseBody(map[string]string{}, strings.NewReader(sseStream), true, nil)
719+
require.NoError(t, err)
720+
require.NotNil(t, bm)
721+
bodyStr := string(bm)
722+
723+
var reasoningTexts []string
724+
var signatures []string
725+
var foundFinishReason bool
726+
727+
lines := strings.SplitSeq(strings.TrimSpace(bodyStr), "\n\n")
728+
for line := range lines {
729+
if !strings.HasPrefix(line, "data: ") || strings.Contains(line, "[DONE]") {
730+
continue
731+
}
732+
jsonBody := strings.TrimPrefix(line, "data: ")
733+
734+
var chunk openai.ChatCompletionResponseChunk
735+
err = json.Unmarshal([]byte(jsonBody), &chunk)
736+
require.NoError(t, err, "Failed to unmarshal chunk: %s", jsonBody)
737+
738+
if len(chunk.Choices) == 0 {
739+
continue
740+
}
741+
choice := chunk.Choices[0]
742+
if choice.Delta != nil && choice.Delta.ReasoningContent != nil {
743+
if choice.Delta.ReasoningContent.Text != "" {
744+
reasoningTexts = append(reasoningTexts, choice.Delta.ReasoningContent.Text)
745+
}
746+
if choice.Delta.ReasoningContent.Signature != "" {
747+
signatures = append(signatures, choice.Delta.ReasoningContent.Signature)
748+
}
749+
}
750+
if choice.FinishReason == openai.ChatCompletionChoicesFinishReasonStop {
751+
foundFinishReason = true
752+
}
753+
}
754+
755+
fullReasoning := strings.Join(reasoningTexts, "")
756+
assert.Contains(t, fullReasoning, "Processing request...")
757+
assert.Contains(t, fullReasoning, " Analyzing data...")
758+
759+
allSignatures := strings.Join(signatures, ",")
760+
assert.Contains(t, allSignatures, "sig_abc123")
761+
assert.Contains(t, allSignatures, "sig_def456")
762+
763+
require.True(t, foundFinishReason, "Should find stop finish reason")
764+
})
615765
}
616766

617767
func TestAnthropicStreamParser_EventTypes(t *testing.T) {

0 commit comments

Comments
 (0)