Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 40 additions & 2 deletions internal/apischema/openai/openai.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package openai

import (
"bytes"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -515,8 +516,8 @@ type ChatCompletionAssistantMessageParamContent struct {
Text *string `json:"text,omitempty"`

// The signature for a thinking block.
Signature *string `json:"signature,omitempty"`
RedactedContent []byte `json:"redactedContent,omitempty"`
Signature *string `json:"signature,omitempty"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra spaces ?

RedactedContent *RedactedContentUnion `json:"redactedContent,omitempty"`
*AnthropicContentFields `json:",inline,omitempty"`
}

Expand Down Expand Up @@ -1583,6 +1584,43 @@ func (e EmbeddingUnion) MarshalJSON() ([]byte, error) {
return json.Marshal(e.Value)
}

// RedactedContentUnion is a union type that can handle both []byte and string formats.
// AWS Bedrock uses []byte while GCP Anthropic uses string.
type RedactedContentUnion struct {
Value any
}

// UnmarshalJSON implements json.Unmarshaler to handle both []byte and string formats.
func (r *RedactedContentUnion) UnmarshalJSON(data []byte) error {
// Try to unmarshal as []byte first (base64 encoded).
var str string
if err := json.Unmarshal(data, &str); err == nil {
// Try to decode as base64 first (this would be []byte encoded as base64)
if decoded, err := base64.StdEncoding.DecodeString(str); err == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to decode in gateway ?

r.Value = decoded
return nil
}
// If not base64, treat as plain string
r.Value = str
return nil
}

return errors.New("redactedContent must be either []byte (base64 encoded) or string")
}

// MarshalJSON implements json.Marshaler.
func (r RedactedContentUnion) MarshalJSON() ([]byte, error) {
switch v := r.Value.(type) {
case []byte:
// Encode []byte as base64 string
return json.Marshal(base64.StdEncoding.EncodeToString(v))
case string:
return json.Marshal(v)
default:
return json.Marshal(r.Value)
}
}

// EmbeddingUsage represents the usage information for an embeddings request.
// https://platform.openai.com/docs/api-reference/embeddings/object#embeddings/object-usage
type EmbeddingUsage struct {
Expand Down
17 changes: 12 additions & 5 deletions internal/translator/openai_awsbedrock.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,11 +312,18 @@ func (o *openAIToAWSBedrockTranslatorV1ChatCompletion) openAIMessageToBedrockMes
}
case openai.ChatCompletionAssistantMessageParamContentTypeRedactedThinking:
if content.RedactedContent != nil {
contentBlocks = append(contentBlocks, &awsbedrock.ContentBlock{
ReasoningContent: &awsbedrock.ReasoningContentBlock{
RedactedContent: content.RedactedContent,
},
})
switch v := content.RedactedContent.Value.(type) {
case []byte:
contentBlocks = append(contentBlocks, &awsbedrock.ContentBlock{
ReasoningContent: &awsbedrock.ReasoningContentBlock{
RedactedContent: v,
},
})
case string:
return nil, fmt.Errorf("AWS Bedrock does not support string format for RedactedContent, expected []byte")
default:
return nil, fmt.Errorf("unsupported RedactedContent type: %T, expected []byte", v)
}
}
case openai.ChatCompletionAssistantMessageParamContentTypeRefusal:
if content.Refusal != nil {
Expand Down
2 changes: 1 addition & 1 deletion internal/translator/openai_awsbedrock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -986,7 +986,7 @@ func TestOpenAIToAWSBedrockTranslatorV1ChatCompletion_RequestBody(t *testing.T)
Value: []openai.ChatCompletionAssistantMessageParamContent{
{
Type: openai.ChatCompletionAssistantMessageParamContentTypeRedactedThinking,
RedactedContent: []byte{104, 101, 108, 108, 111},
RedactedContent: &openai.RedactedContentUnion{Value: []byte{104, 101, 108, 108, 111}},
},
},
},
Expand Down
117 changes: 94 additions & 23 deletions internal/translator/openai_gcpanthropic.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
openAIconstant "github.com/openai/openai-go/shared/constant"
"github.com/tidwall/sjson"

"github.com/envoyproxy/ai-gateway/internal/apischema/awsbedrock"
"github.com/envoyproxy/ai-gateway/internal/apischema/openai"
"github.com/envoyproxy/ai-gateway/internal/internalapi"
"github.com/envoyproxy/ai-gateway/internal/metrics"
Expand Down Expand Up @@ -375,29 +376,71 @@ func anthropicRoleToOpenAIRole(role anthropic.MessageParamRole) (string, error)
}
}

// processAssistantContent processes a single ChatCompletionAssistantMessageParamContent and returns the corresponding Anthropic content block.
func processAssistantContent(content openai.ChatCompletionAssistantMessageParamContent) (*anthropic.ContentBlockParamUnion, error) {
switch content.Type {
case openai.ChatCompletionAssistantMessageParamContentTypeRefusal:
if content.Refusal != nil {
block := anthropic.NewTextBlock(*content.Refusal)
return &block, nil
}
case openai.ChatCompletionAssistantMessageParamContentTypeText:
if content.Text != nil {
textBlock := anthropic.NewTextBlock(*content.Text)
if isCacheEnabled(content.AnthropicContentFields) {
textBlock.OfText.CacheControl = content.CacheControl
}
return &textBlock, nil
}
case openai.ChatCompletionAssistantMessageParamContentTypeThinking:
// thinking can not be cached: https://platform.claude.com/docs/en/build-with-claude/prompt-caching
if content.Text != nil && content.Signature != nil {
thinkBlock := anthropic.NewThinkingBlock(*content.Text, *content.Signature)
return &thinkBlock, nil
}
case openai.ChatCompletionAssistantMessageParamContentTypeRedactedThinking:
if content.RedactedContent != nil {
switch v := content.RedactedContent.Value.(type) {
case string:
redactedThinkingBlock := anthropic.NewRedactedThinkingBlock(v)
return &redactedThinkingBlock, nil
case []byte:
return nil, fmt.Errorf("GCP Anthropic does not support []byte format for RedactedContent, expected string")
default:
return nil, fmt.Errorf("unsupported RedactedContent type: %T, expected string", v)
}
}
default:
return nil, fmt.Errorf("content type not supported: %v", content.Type)
}
return nil, nil
}

// openAIMessageToAnthropicMessageRoleAssistant converts an OpenAI assistant message to Anthropic content blocks.
// The tool_use content is appended to the Anthropic message content list if tool_calls are present.
func openAIMessageToAnthropicMessageRoleAssistant(openAiMessage *openai.ChatCompletionAssistantMessageParam) (anthropicMsg anthropic.MessageParam, err error) {
contentBlocks := make([]anthropic.ContentBlockParamUnion, 0)
if v, ok := openAiMessage.Content.Value.(string); ok && len(v) > 0 {
contentBlocks = append(contentBlocks, anthropic.NewTextBlock(v))
} else if content, ok := openAiMessage.Content.Value.(openai.ChatCompletionAssistantMessageParamContent); ok {
switch content.Type {
case openai.ChatCompletionAssistantMessageParamContentTypeRefusal:
if content.Refusal != nil {
contentBlocks = append(contentBlocks, anthropic.NewTextBlock(*content.Refusal))
}
case openai.ChatCompletionAssistantMessageParamContentTypeText:
if content.Text != nil {
textBlock := anthropic.NewTextBlock(*content.Text)
if isCacheEnabled(content.AnthropicContentFields) {
textBlock.OfText.CacheControl = content.CacheControl
}
contentBlocks = append(contentBlocks, textBlock)
// Handle single content object
var block *anthropic.ContentBlockParamUnion
block, err = processAssistantContent(content)
if err != nil {
return anthropicMsg, err
} else if block != nil {
contentBlocks = append(contentBlocks, *block)
}
} else if contents, ok := openAiMessage.Content.Value.([]openai.ChatCompletionAssistantMessageParamContent); ok {
// Handle array of content objects
for _, content := range contents {
var block *anthropic.ContentBlockParamUnion
block, err = processAssistantContent(content)
if err != nil {
return anthropicMsg, err
} else if block != nil {
contentBlocks = append(contentBlocks, *block)
}
default:
err = fmt.Errorf("content type not supported: %v", content.Type)
return
}
}

Expand Down Expand Up @@ -823,15 +866,43 @@ func (o *openAIToGCPAnthropicTranslatorV1ChatCompletion) ResponseBody(_ map[stri

for i := range anthropicResp.Content { // NOTE: Content structure is massive, do not range over values.
output := &anthropicResp.Content[i]
if output.Type == string(constant.ValueOf[constant.ToolUse]()) && output.ID != "" {
toolCalls, toolErr := anthropicToolUseToOpenAICalls(output)
if toolErr != nil {
return nil, nil, metrics.TokenUsage{}, "", fmt.Errorf("failed to convert anthropic tool use to openai tool call: %w", toolErr)
switch output.Type {
case string(constant.ValueOf[constant.ToolUse]()):
if output.ID != "" {
toolCalls, toolErr := anthropicToolUseToOpenAICalls(output)
if toolErr != nil {
return nil, nil, metrics.TokenUsage{}, "", fmt.Errorf("failed to convert anthropic tool use to openai tool call: %w", toolErr)
}
choice.Message.ToolCalls = append(choice.Message.ToolCalls, toolCalls...)
}
case string(constant.ValueOf[constant.Text]()):
if output.Text != "" {
if choice.Message.Content == nil {
choice.Message.Content = &output.Text
}
}
choice.Message.ToolCalls = append(choice.Message.ToolCalls, toolCalls...)
} else if output.Type == string(constant.ValueOf[constant.Text]()) && output.Text != "" {
if choice.Message.Content == nil {
choice.Message.Content = &output.Text
case string(constant.ValueOf[constant.Thinking]()):
if output.Thinking != "" {
choice.Message.ReasoningContent = &openai.ReasoningContentUnion{
Value: &openai.ReasoningContent{
ReasoningContent: &awsbedrock.ReasoningContentBlock{
ReasoningText: &awsbedrock.ReasoningTextBlock{
Text: output.Thinking,
Signature: output.Signature,
},
},
},
}
}
case string(constant.ValueOf[constant.RedactedThinking]()):
if output.Data != "" {
choice.Message.ReasoningContent = &openai.ReasoningContentUnion{
Value: &openai.ReasoningContent{
ReasoningContent: &awsbedrock.ReasoningContentBlock{
RedactedContent: []byte(output.Data),
},
},
}
}
}
}
Expand Down
40 changes: 23 additions & 17 deletions internal/translator/openai_gcpanthropic_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,14 @@ import (

"github.com/anthropics/anthropic-sdk-go"
"github.com/anthropics/anthropic-sdk-go/shared/constant"
"k8s.io/utils/ptr"

"github.com/envoyproxy/ai-gateway/internal/apischema/openai"
"github.com/envoyproxy/ai-gateway/internal/internalapi"
"github.com/envoyproxy/ai-gateway/internal/metrics"
tracing "github.com/envoyproxy/ai-gateway/internal/tracing/api"
)

var (
sseEventPrefix = []byte("event:")
emptyStrPtr = ptr.To("")
)
var sseEventPrefix = []byte("event:")

// streamingToolCall holds the state for a single tool call that is being streamed.
type streamingToolCall struct {
Expand Down Expand Up @@ -271,16 +267,7 @@ func (p *anthropicStreamParser) handleAnthropicStreamEvent(eventType []byte, dat
}
return p.constructOpenAIChatCompletionChunk(delta, ""), nil
}
if event.ContentBlock.Type == string(constant.ValueOf[constant.Thinking]()) {
delta := openai.ChatCompletionResponseChunkChoiceDelta{Content: emptyStrPtr}
return p.constructOpenAIChatCompletionChunk(delta, ""), nil
}

if event.ContentBlock.Type == string(constant.ValueOf[constant.RedactedThinking]()) {
// This is a latency-hiding event, ignore it.
return nil, nil
}

// do not need to return an empty str for thinking start block
return nil, nil

case string(constant.ValueOf[constant.MessageDelta]()):
Expand Down Expand Up @@ -316,10 +303,28 @@ func (p *anthropicStreamParser) handleAnthropicStreamEvent(eventType []byte, dat
return nil, fmt.Errorf("unmarshal content_block_delta: %w", err)
}
switch event.Delta.Type {
case string(constant.ValueOf[constant.TextDelta]()), string(constant.ValueOf[constant.ThinkingDelta]()):
// Treat thinking_delta just like a text_delta.
case string(constant.ValueOf[constant.TextDelta]()):
delta := openai.ChatCompletionResponseChunkChoiceDelta{Content: &event.Delta.Text}
return p.constructOpenAIChatCompletionChunk(delta, ""), nil

case string(constant.ValueOf[constant.ThinkingDelta]()):
// this should already include the case for redacted thinking: https://platform.claude.com/docs/en/build-with-claude/streaming#content-block-delta-types

reasoningDelta := &openai.StreamReasoningContent{}

// Map all relevant fields from the Bedrock delta to our flattened OpenAI delta struct.
if event.Delta.Thinking != "" {
reasoningDelta.Text = event.Delta.Thinking
}
if event.Delta.Signature != "" {
reasoningDelta.Signature = event.Delta.Signature
}

delta := openai.ChatCompletionResponseChunkChoiceDelta{
ReasoningContent: reasoningDelta,
}
return p.constructOpenAIChatCompletionChunk(delta, ""), nil

case string(constant.ValueOf[constant.InputJSONDelta]()):
tool, ok := p.activeToolCalls[p.toolIndex]
if !ok {
Expand All @@ -338,6 +343,7 @@ func (p *anthropicStreamParser) handleAnthropicStreamEvent(eventType []byte, dat
tool.inputJSON += event.Delta.PartialJSON
return p.constructOpenAIChatCompletionChunk(delta, ""), nil
}
// Do not process redacted thinking stream? Did not find the source

case string(constant.ValueOf[constant.ContentBlockStop]()):
// This event is for state cleanup, no chunk is sent.
Expand Down
Loading