Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion api/v1alpha1/shared_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ package v1alpha1
type VersionedAPISchema struct {
// Name is the name of the API schema of the AIGatewayRoute or AIServiceBackend.
//
// +kubebuilder:validation:Enum=OpenAI;Cohere;AWSBedrock;AzureOpenAI;GCPVertexAI;GCPAnthropic;Anthropic;AWSAnthropic
// +kubebuilder:validation:Enum=OpenAI;Cohere;AWSBedrock;AzureOpenAI;GCPVertexAI;GCPAnthropic;Anthropic;AWSAnthropic;AWSOpenAI
Name APISchema `json:"name"`

// Version is the version of the API schema.
Expand Down Expand Up @@ -75,6 +75,11 @@ const (
// https://aws.amazon.com/bedrock/anthropic/
// https://docs.claude.com/en/api/claude-on-amazon-bedrock
APISchemaAWSAnthropic APISchema = "AWSAnthropic"
// APISchemaAWSOpenAI is the schema for OpenAI models hosted on AWS Bedrock.
// Uses the AWS Bedrock InvokeModel API with OpenAI format for requests and responses.
//
// https://aws.amazon.com/bedrock/
APISchemaAWSOpenAI APISchema = "AWSOpenAI"
)

const (
Expand Down
2 changes: 2 additions & 0 deletions internal/endpointspec/endpointspec.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ func (ChatCompletionsEndpointSpec) GetTranslator(schema filterapi.VersionedAPISc
return translator.NewChatCompletionOpenAIToOpenAITranslator(schema.Version, modelNameOverride), nil
case filterapi.APISchemaAWSBedrock:
return translator.NewChatCompletionOpenAIToAWSBedrockTranslator(modelNameOverride), nil
case filterapi.APISchemaAWSOpenAI:
return translator.NewChatCompletionOpenAIToAwsOpenAITranslator(schema.Version, modelNameOverride), nil
case filterapi.APISchemaAzureOpenAI:
return translator.NewChatCompletionOpenAIToAzureOpenAITranslator(schema.Version, modelNameOverride), nil
case filterapi.APISchemaGCPVertexAI:
Expand Down
31 changes: 31 additions & 0 deletions internal/endpointspec/endpointspec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func TestChatCompletionsEndpointSpec_GetTranslator(t *testing.T) {
supported := []filterapi.VersionedAPISchema{
{Name: filterapi.APISchemaOpenAI, Version: "v1"},
{Name: filterapi.APISchemaAWSBedrock},
{Name: filterapi.APISchemaAWSOpenAI, Version: "v1"},
{Name: filterapi.APISchemaAzureOpenAI, Version: "2024-02-01"},
{Name: filterapi.APISchemaGCPVertexAI},
{Name: filterapi.APISchemaGCPAnthropic, Version: "2024-05-01"},
Expand Down Expand Up @@ -273,3 +274,33 @@ func TestRerankEndpointSpec_GetTranslator(t *testing.T) {
_, err = spec.GetTranslator(filterapi.VersionedAPISchema{Name: filterapi.APISchemaOpenAI}, "override")
require.ErrorContains(t, err, "unsupported API schema")
}

func TestAWSOpenAIAPISchemaIntegration(t *testing.T) {
schema := filterapi.VersionedAPISchema{
Name: filterapi.APISchemaAWSOpenAI,
Version: "v1",
}

t.Run("ChatCompletions", func(t *testing.T) {
endpointSpec := ChatCompletionsEndpointSpec{}
translator, err := endpointSpec.GetTranslator(schema, "")
require.NoError(t, err)
require.NotNil(t, translator)
})
}

func TestAWSOpenAIAPISchemaWithModelOverride(t *testing.T) {
schema := filterapi.VersionedAPISchema{
Name: filterapi.APISchemaAWSOpenAI,
Version: "v1",
}

modelOverride := "arn:aws:bedrock:us-east-1:123456789:model/gpt-4"

t.Run("ChatCompletions", func(t *testing.T) {
endpointSpec := ChatCompletionsEndpointSpec{}
translator, err := endpointSpec.GetTranslator(schema, modelOverride)
require.NoError(t, err)
require.NotNil(t, translator)
})
}
3 changes: 3 additions & 0 deletions internal/filterapi/filterconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ const (
// APISchemaAWSAnthropic represents the AWS Bedrock Anthropic API schema.
// Used for Claude models hosted on AWS Bedrock using the native Anthropic Messages API.
APISchemaAWSAnthropic APISchemaName = "AWSAnthropic"
// APISchemaAWSOpenAI represents the AWS Bedrock OpenAI API schema.
// Used for gpt models hosted on AWS Bedrock using the OpenAI API.
APISchemaAWSOpenAI APISchemaName = "AWSOpenAI"
)

// RouteRuleName is the name of the route rule.
Expand Down
276 changes: 276 additions & 0 deletions internal/translator/openai_awsopenai.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,276 @@
// Copyright Envoy AI Gateway Authors
// SPDX-License-Identifier: Apache-2.0
// The full text of the Apache license is available in the LICENSE file at
// the root of the repo.

package translator

import (
"encoding/json"
"fmt"
"io"
"net/url"
"strconv"
"strings"

"github.com/envoyproxy/ai-gateway/internal/apischema/openai"
"github.com/envoyproxy/ai-gateway/internal/internalapi"
"github.com/envoyproxy/ai-gateway/internal/metrics"
tracing "github.com/envoyproxy/ai-gateway/internal/tracing/api"
)

// NewChatCompletionOpenAIToAwsOpenAITranslator implements [Factory] for OpenAI to Aws OpenAI translations.
func NewChatCompletionOpenAIToAwsOpenAITranslator(apiVersion string, modelNameOverride internalapi.ModelNameOverride) OpenAIChatCompletionTranslator {

Check failure on line 23 in internal/translator/openai_awsopenai.go

View workflow job for this annotation

GitHub Actions / Check (ubuntu-latest)

unused-parameter: parameter 'apiVersion' seems to be unused, consider removing or renaming it as _ (revive)

Check failure on line 23 in internal/translator/openai_awsopenai.go

View workflow job for this annotation

GitHub Actions / Check (macos-latest)

unused-parameter: parameter 'apiVersion' seems to be unused, consider removing or renaming it as _ (revive)
return &openAIToAwsOpenAITranslatorV1ChatCompletion{
modelNameOverride: modelNameOverride,
}
}

// openAIToAwsOpenAITranslatorV1ChatCompletion adapts OpenAI requests for AWS Bedrock InvokeModel API.
// This uses the InvokeModel API which accepts model-specific request/response formats.
// For OpenAI models, this preserves the OpenAI format but uses AWS Bedrock endpoints.
type openAIToAwsOpenAITranslatorV1ChatCompletion struct {
openAIToOpenAITranslatorV1ChatCompletion
modelNameOverride internalapi.ModelNameOverride
requestModel internalapi.RequestModel
responseID string
stream bool
}

func (o *openAIToAwsOpenAITranslatorV1ChatCompletion) RequestBody(raw []byte, req *openai.ChatCompletionRequest, forceBodyMutation bool) (

Check failure on line 40 in internal/translator/openai_awsopenai.go

View workflow job for this annotation

GitHub Actions / Check (ubuntu-latest)

unused-parameter: parameter 'forceBodyMutation' seems to be unused, consider removing or renaming it as _ (revive)

Check failure on line 40 in internal/translator/openai_awsopenai.go

View workflow job for this annotation

GitHub Actions / Check (macos-latest)

unused-parameter: parameter 'forceBodyMutation' seems to be unused, consider removing or renaming it as _ (revive)
newHeaders []internalapi.Header, newBody []byte, err error,
) {
// Store request model and streaming state
o.requestModel = req.Model
if o.modelNameOverride != "" {
o.requestModel = o.modelNameOverride
}

if req.Stream {
o.stream = true
}

// URL encode the model name for the path to handle special characters (e.g., ARNs)
encodedModelName := url.PathEscape(o.requestModel)

// Set the path for AWS Bedrock InvokeModel API
pathTemplate := "/model/%s/invoke"
if req.Stream {
pathTemplate = "/model/%s/invoke-with-response-stream"
}

// For InvokeModel API, the request body should be the OpenAI format
// since we're invoking OpenAI models through Bedrock
if o.modelNameOverride != "" {
// If we need to override the model in the request body
var openAIReq openai.ChatCompletionRequest
if err := json.Unmarshal(raw, &openAIReq); err != nil {

Check failure on line 67 in internal/translator/openai_awsopenai.go

View workflow job for this annotation

GitHub Actions / Check (ubuntu-latest)

shadow: declaration of "err" shadows declaration at line 41 (govet)

Check failure on line 67 in internal/translator/openai_awsopenai.go

View workflow job for this annotation

GitHub Actions / Check (macos-latest)

shadow: declaration of "err" shadows declaration at line 41 (govet)
return nil, nil, fmt.Errorf("failed to unmarshal request: %w", err)
}
openAIReq.Model = o.modelNameOverride
newBody, err = json.Marshal(openAIReq)
if err != nil {
return nil, nil, fmt.Errorf("failed to marshal request: %w", err)
}
} else {
newBody = raw
}

newHeaders = []internalapi.Header{
{pathHeaderName, fmt.Sprintf(pathTemplate, encodedModelName)},
{contentLengthHeaderName, strconv.Itoa(len(newBody))},
}

return
}

// ResponseHeaders implements [OpenAIChatCompletionTranslator.ResponseHeaders].
func (o *openAIToAwsOpenAITranslatorV1ChatCompletion) ResponseHeaders(headers map[string]string) (
newHeaders []internalapi.Header, err error,
) {
// Store the response ID for tracking
o.responseID = headers["x-amzn-requestid"]

// For streaming responses, ensure content-type is correctly set
if o.stream {
contentType := headers["content-type"]
// AWS Bedrock might return different content-type for streaming
if contentType == "application/vnd.amazon.eventstream" {
// Convert to the expected streaming content-type
newHeaders = []internalapi.Header{{contentTypeHeaderName, "text/event-stream"}}
}
}
return
}

// ResponseBody implements [OpenAIChatCompletionTranslator.ResponseBody].
// AWS Bedrock InvokeModel API with OpenAI models returns responses in OpenAI format.
// This function handles both streaming and non-streaming responses.
func (o *openAIToAwsOpenAITranslatorV1ChatCompletion) ResponseBody(headers map[string]string, body io.Reader, endOfStream bool, span tracing.ChatCompletionSpan) (

Check failure on line 109 in internal/translator/openai_awsopenai.go

View workflow job for this annotation

GitHub Actions / Check (ubuntu-latest)

unused-parameter: parameter 'headers' seems to be unused, consider removing or renaming it as _ (revive)

Check failure on line 109 in internal/translator/openai_awsopenai.go

View workflow job for this annotation

GitHub Actions / Check (macos-latest)

unused-parameter: parameter 'headers' seems to be unused, consider removing or renaming it as _ (revive)
newHeaders []internalapi.Header, newBody []byte, tokenUsage metrics.TokenUsage, responseModel string, err error,
) {
responseModel = o.requestModel

if o.stream {
// Handle streaming response
var buf []byte
buf, err = io.ReadAll(body)
if err != nil {
return nil, nil, metrics.TokenUsage{}, "", fmt.Errorf("failed to read streaming body: %w", err)
}

// For InvokeModel with OpenAI models, the streaming response should already be in
// Server-Sent Events format with OpenAI chunks
newBody = buf

// Parse for token usage if available in the stream
for _, line := range strings.Split(string(buf), "\n") {
if dataStr, found := strings.CutPrefix(line, "data: "); found {
if dataStr != "[DONE]" {
var chunk openai.ChatCompletionResponseChunk
if json.Unmarshal([]byte(dataStr), &chunk) == nil {
if chunk.Usage != nil {
tokenUsage.SetInputTokens(uint32(chunk.Usage.PromptTokens))

Check failure on line 133 in internal/translator/openai_awsopenai.go

View workflow job for this annotation

GitHub Actions / Check (ubuntu-latest)

G115: integer overflow conversion int -> uint32 (gosec)

Check failure on line 133 in internal/translator/openai_awsopenai.go

View workflow job for this annotation

GitHub Actions / Check (macos-latest)

G115: integer overflow conversion int -> uint32 (gosec)
tokenUsage.SetOutputTokens(uint32(chunk.Usage.CompletionTokens))

Check failure on line 134 in internal/translator/openai_awsopenai.go

View workflow job for this annotation

GitHub Actions / Check (ubuntu-latest)

G115: integer overflow conversion int -> uint32 (gosec)

Check failure on line 134 in internal/translator/openai_awsopenai.go

View workflow job for this annotation

GitHub Actions / Check (macos-latest)

G115: integer overflow conversion int -> uint32 (gosec)
tokenUsage.SetTotalTokens(uint32(chunk.Usage.TotalTokens))

Check failure on line 135 in internal/translator/openai_awsopenai.go

View workflow job for this annotation

GitHub Actions / Check (ubuntu-latest)

G115: integer overflow conversion int -> uint32 (gosec)

Check failure on line 135 in internal/translator/openai_awsopenai.go

View workflow job for this annotation

GitHub Actions / Check (macos-latest)

G115: integer overflow conversion int -> uint32 (gosec)
}
if span != nil {
span.RecordResponseChunk(&chunk)
}
}
}
}
}

if endOfStream && !strings.HasSuffix(string(newBody), "data: [DONE]\n") {
newBody = append(newBody, []byte("data: [DONE]\n")...)
}
} else {
// Handle non-streaming response
var buf []byte
buf, err = io.ReadAll(body)
if err != nil {
return nil, nil, metrics.TokenUsage{}, "", fmt.Errorf("failed to read body: %w", err)
}

// For InvokeModel with OpenAI models, response should already be in OpenAI format
var openAIResp openai.ChatCompletionResponse
if err = json.Unmarshal(buf, &openAIResp); err != nil {
return nil, nil, metrics.TokenUsage{}, "", fmt.Errorf("failed to unmarshal response: %w", err)
}

// Use response model if available, otherwise use request model
if openAIResp.Model != "" {
responseModel = openAIResp.Model
}

// Extract token usage
if openAIResp.Usage.TotalTokens > 0 {
tokenUsage.SetInputTokens(uint32(openAIResp.Usage.PromptTokens))

Check failure on line 169 in internal/translator/openai_awsopenai.go

View workflow job for this annotation

GitHub Actions / Check (ubuntu-latest)

G115: integer overflow conversion int -> uint32 (gosec)

Check failure on line 169 in internal/translator/openai_awsopenai.go

View workflow job for this annotation

GitHub Actions / Check (macos-latest)

G115: integer overflow conversion int -> uint32 (gosec)
tokenUsage.SetOutputTokens(uint32(openAIResp.Usage.CompletionTokens))

Check failure on line 170 in internal/translator/openai_awsopenai.go

View workflow job for this annotation

GitHub Actions / Check (ubuntu-latest)

G115: integer overflow conversion int -> uint32 (gosec)

Check failure on line 170 in internal/translator/openai_awsopenai.go

View workflow job for this annotation

GitHub Actions / Check (macos-latest)

G115: integer overflow conversion int -> uint32 (gosec)
tokenUsage.SetTotalTokens(uint32(openAIResp.Usage.TotalTokens))

Check failure on line 171 in internal/translator/openai_awsopenai.go

View workflow job for this annotation

GitHub Actions / Check (ubuntu-latest)

G115: integer overflow conversion int -> uint32 (gosec)

Check failure on line 171 in internal/translator/openai_awsopenai.go

View workflow job for this annotation

GitHub Actions / Check (macos-latest)

G115: integer overflow conversion int -> uint32 (gosec)
}

// Override the ID with AWS request ID if available
if o.responseID != "" {
openAIResp.ID = o.responseID
}

newBody, err = json.Marshal(openAIResp)
if err != nil {
return nil, nil, metrics.TokenUsage{}, "", fmt.Errorf("failed to marshal response: %w", err)
}

if span != nil {
span.RecordResponse(&openAIResp)
}
}

if len(newBody) > 0 {
newHeaders = []internalapi.Header{{contentLengthHeaderName, strconv.Itoa(len(newBody))}}
}
return
}

// ResponseError implements [OpenAIChatCompletionTranslator.ResponseError].
// Translates AWS Bedrock InvokeModel exceptions to OpenAI error format.
// The error type is typically stored in the "x-amzn-errortype" HTTP header for AWS error responses.
func (o *openAIToAwsOpenAITranslatorV1ChatCompletion) ResponseError(respHeaders map[string]string, body io.Reader) (
newHeaders []internalapi.Header, newBody []byte, err error,
) {
statusCode := respHeaders[statusHeaderName]
var openaiError openai.Error

// Check if we have a JSON error response
if v, ok := respHeaders[contentTypeHeaderName]; ok && strings.Contains(v, jsonContentType) {
// Try to parse as AWS Bedrock error
var buf []byte
buf, err = io.ReadAll(body)
if err != nil {
return nil, nil, fmt.Errorf("failed to read error body: %w", err)
}

// Check if it's already an OpenAI error format
var existingOpenAIError openai.Error
if json.Unmarshal(buf, &existingOpenAIError) == nil && existingOpenAIError.Error.Message != "" {
// Already in OpenAI format, return as-is
newBody = buf
} else {
// Try to parse as AWS error and convert to OpenAI format
var awsError struct {
Type string `json:"__type,omitempty"`
Message string `json:"message"`
Code string `json:"code,omitempty"`
}
if json.Unmarshal(buf, &awsError) == nil && awsError.Message != "" {
openaiError = openai.Error{
Type: "error",
Error: openai.ErrorType{
Type: respHeaders[awsErrorTypeHeaderName],
Message: awsError.Message,
Code: &statusCode,
},
}
} else {
// Generic AWS error format
openaiError = openai.Error{
Type: "error",
Error: openai.ErrorType{
Type: awsInvokeModelBackendError,
Message: string(buf),
Code: &statusCode,
},
}
}
newBody, err = json.Marshal(openaiError)
if err != nil {
return nil, nil, fmt.Errorf("failed to marshal error body: %w", err)
}
}
} else {
// Non-JSON error response
var buf []byte
buf, err = io.ReadAll(body)
if err != nil {
return nil, nil, fmt.Errorf("failed to read error body: %w", err)
}
openaiError = openai.Error{
Type: "error",
Error: openai.ErrorType{
Type: awsInvokeModelBackendError,
Message: string(buf),
Code: &statusCode,
},
}
newBody, err = json.Marshal(openaiError)
if err != nil {
return nil, nil, fmt.Errorf("failed to marshal error body: %w", err)
}
}

newHeaders = []internalapi.Header{
{contentTypeHeaderName, jsonContentType},
{contentLengthHeaderName, strconv.Itoa(len(newBody))},
}
return
}
Loading
Loading