Skip to content

Commit 25b1f09

Browse files
authored
Merge branch 'main' into main
2 parents a878e70 + 2990379 commit 25b1f09

File tree

8 files changed

+227
-75
lines changed

8 files changed

+227
-75
lines changed

internal/apischema/openai/openai.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1167,9 +1167,9 @@ type ChatCompletionResponseChoiceMessage struct {
11671167
// Audio is the audio response generated by the model, if applicable.
11681168
Audio *ChatCompletionResponseChoiceMessageAudio `json:"audio,omitempty"`
11691169

1170-
// AWSBedRockResponseVendorFields is used to hold any non-standard fields from the backend,
1170+
// AWSBedrockResponseVendorFields is used to hold any non-standard fields from the backend,
11711171
// like "reasoningContent" from AWS Bedrock.
1172-
*AWSBedRockResponseVendorFields `json:",inline,omitempty"`
1172+
*AWSBedrockResponseVendorFields `json:",inline,omitempty"`
11731173
}
11741174

11751175
// URLCitation contains citation information for web search results.
@@ -1303,7 +1303,7 @@ type ChatCompletionResponseChunkChoiceDelta struct {
13031303
Role string `json:"role,omitempty"`
13041304
ToolCalls []ChatCompletionMessageToolCallParam `json:"tool_calls,omitempty"`
13051305
Annotations *[]Annotation `json:"annotations,omitempty"`
1306-
ReasoningContent *AWSBedRockStreamReasoningContent `json:"reasoning_content,omitempty"`
1306+
ReasoningContent *AWSBedrockStreamReasoningContent `json:"reasoning_content,omitempty"`
13071307
}
13081308

13091309
// Error is described in the OpenAI API documentation
@@ -1516,20 +1516,20 @@ type AnthropicVendorFields struct {
15161516
Thinking *anthropic.ThinkingConfigParamUnion `json:"thinking,omitzero"`
15171517
}
15181518

1519-
// AWSBedRockResponseVendorFields contains extra vendor-specific fields to add to the openai response.
1520-
type AWSBedRockResponseVendorFields struct {
1519+
// AWSBedrockResponseVendorFields contains extra vendor-specific fields to add to the openai response.
1520+
type AWSBedrockResponseVendorFields struct {
15211521
// Contains content regarding the reasoning that is carried out by the model. Reasoning refers to a Chain of Thought (CoT) that the model generates to enhance the accuracy of its final response.
15221522
// Note: This object is a Union. Only one member of this object can be specified or returned.
15231523
// Required: No
15241524
// See https://docs.aws.amazon.com/bedrock/latest/APIReference/API_runtime_ReasoningContentBlock.html for more information.
1525-
ReasoningContent *AWSBedRockReasoningContent `json:"reasoning_content,omitzero"`
1525+
ReasoningContent *AWSBedrockReasoningContent `json:"reasoning_content,omitzero"`
15261526
}
15271527

1528-
type AWSBedRockReasoningContent struct {
1528+
type AWSBedrockReasoningContent struct {
15291529
ReasoningContent *awsbedrock.ReasoningContentBlock `json:"reasoningContent,omitzero"`
15301530
}
15311531

1532-
type AWSBedRockStreamReasoningContent struct {
1532+
type AWSBedrockStreamReasoningContent struct {
15331533
Text string `json:"text,omitzero"`
15341534
Signature string `json:"signature,omitzero"`
15351535
RedactedContent []byte `json:"redactedContent,omitzero"`

internal/extproc/chatcompletion_processor.go

Lines changed: 10 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,9 @@
66
package extproc
77

88
import (
9-
"bytes"
10-
"compress/gzip"
119
"context"
1210
"encoding/json"
1311
"fmt"
14-
"io"
1512
"log/slog"
1613
"strconv"
1714

@@ -326,24 +323,18 @@ func (c *chatCompletionProcessorUpstreamFilter) ProcessResponseBody(ctx context.
326323
c.metrics.RecordRequestCompletion(ctx, true, c.requestHeaders)
327324
}
328325
}()
329-
var br io.Reader
330-
var isGzip bool
331-
switch c.responseEncoding {
332-
case "gzip":
333-
br, err = gzip.NewReader(bytes.NewReader(body.Body))
334-
if err != nil {
335-
return nil, fmt.Errorf("failed to decode gzip: %w", err)
336-
}
337-
isGzip = true
338-
default:
339-
br = bytes.NewReader(body.Body)
326+
327+
// Decompress the body if needed using common utility.
328+
decodingResult, err := decodeContentIfNeeded(body.Body, c.responseEncoding)
329+
if err != nil {
330+
return nil, err
340331
}
341332

342333
// Assume all responses have a valid status code header.
343334
if code, _ := strconv.Atoi(c.responseHeaders[":status"]); !isGoodStatusCode(code) {
344335
var headerMutation *extprocv3.HeaderMutation
345336
var bodyMutation *extprocv3.BodyMutation
346-
headerMutation, bodyMutation, err = c.translator.ResponseError(c.responseHeaders, br)
337+
headerMutation, bodyMutation, err = c.translator.ResponseError(c.responseHeaders, decodingResult.reader)
347338
if err != nil {
348339
return nil, fmt.Errorf("failed to transform response error: %w", err)
349340
}
@@ -368,22 +359,13 @@ func (c *chatCompletionProcessorUpstreamFilter) ProcessResponseBody(ctx context.
368359
}, nil
369360
}
370361

371-
headerMutation, bodyMutation, tokenUsage, err := c.translator.ResponseBody(c.responseHeaders, br, body.EndOfStream, c.span)
362+
headerMutation, bodyMutation, tokenUsage, err := c.translator.ResponseBody(c.responseHeaders, decodingResult.reader, body.EndOfStream, c.span)
372363
if err != nil {
373364
return nil, fmt.Errorf("failed to transform response: %w", err)
374365
}
375-
if bodyMutation != nil && isGzip {
376-
if headerMutation == nil {
377-
headerMutation = &extprocv3.HeaderMutation{}
378-
}
379-
// TODO: this is a hotfix, we should update this to recompress since its in the header
380-
// If the response was gzipped, ensure we remove the content-encoding header.
381-
//
382-
// This is only needed when the transformation is actually modifying the body. When the backend
383-
// is in OpenAI format (and it's the first try before any retry), the response body is not modified,
384-
// so we don't need to remove the header in that case.
385-
headerMutation.RemoveHeaders = append(headerMutation.RemoveHeaders, "content-encoding")
386-
}
366+
367+
// Remove content-encoding header if original body encoded but was mutated in the processor.
368+
headerMutation = removeContentEncodingIfNeeded(headerMutation, bodyMutation, decodingResult.isEncoded)
387369

388370
resp := &extprocv3.ProcessingResponse{
389371
Response: &extprocv3.ProcessingResponse_ResponseBody{

internal/extproc/embeddings_processor.go

Lines changed: 10 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,9 @@
66
package extproc
77

88
import (
9-
"bytes"
10-
"compress/gzip"
119
"context"
1210
"encoding/json"
1311
"fmt"
14-
"io"
1512
"log/slog"
1613
"strconv"
1714

@@ -250,24 +247,18 @@ func (e *embeddingsProcessorUpstreamFilter) ProcessResponseBody(ctx context.Cont
250247
e.metrics.RecordRequestCompletion(ctx, true, e.requestHeaders)
251248
}
252249
}()
253-
var br io.Reader
254-
var isGzip bool
255-
switch e.responseEncoding {
256-
case "gzip":
257-
br, err = gzip.NewReader(bytes.NewReader(body.Body))
258-
if err != nil {
259-
return nil, fmt.Errorf("failed to decode gzip: %w", err)
260-
}
261-
isGzip = true
262-
default:
263-
br = bytes.NewReader(body.Body)
250+
251+
// Decompress the body if needed using common utility.
252+
decodingResult, err := decodeContentIfNeeded(body.Body, e.responseEncoding)
253+
if err != nil {
254+
return nil, err
264255
}
265256

266257
// Assume all responses have a valid status code header.
267258
if code, _ := strconv.Atoi(e.responseHeaders[":status"]); !isGoodStatusCode(code) {
268259
var headerMutation *extprocv3.HeaderMutation
269260
var bodyMutation *extprocv3.BodyMutation
270-
headerMutation, bodyMutation, err = e.translator.ResponseError(e.responseHeaders, br)
261+
headerMutation, bodyMutation, err = e.translator.ResponseError(e.responseHeaders, decodingResult.reader)
271262
if err != nil {
272263
return nil, fmt.Errorf("failed to transform response error: %w", err)
273264
}
@@ -285,22 +276,13 @@ func (e *embeddingsProcessorUpstreamFilter) ProcessResponseBody(ctx context.Cont
285276
}, nil
286277
}
287278

288-
headerMutation, bodyMutation, tokenUsage, err := e.translator.ResponseBody(e.responseHeaders, br, body.EndOfStream)
279+
headerMutation, bodyMutation, tokenUsage, err := e.translator.ResponseBody(e.responseHeaders, decodingResult.reader, body.EndOfStream)
289280
if err != nil {
290281
return nil, fmt.Errorf("failed to transform response: %w", err)
291282
}
292-
if bodyMutation != nil && isGzip {
293-
if headerMutation == nil {
294-
headerMutation = &extprocv3.HeaderMutation{}
295-
}
296-
// TODO: this is a hotfix, we should update this to recompress since its in the header
297-
// If the response was gzipped, ensure we remove the content-encoding header.
298-
//
299-
// This is only needed when the transformation is actually modifying the body. When the backend
300-
// is in OpenAI format (and it's the first try before any retry), the response body is not modified,
301-
// so we don't need to remove the header in that case.
302-
headerMutation.RemoveHeaders = append(headerMutation.RemoveHeaders, "content-encoding")
303-
}
283+
284+
// Remove content-encoding header if original body encoded but was mutated in the processor.
285+
headerMutation = removeContentEncodingIfNeeded(headerMutation, bodyMutation, decodingResult.isEncoded)
304286

305287
resp := &extprocv3.ProcessingResponse{
306288
Response: &extprocv3.ProcessingResponse_ResponseBody{

internal/extproc/messages_processor.go

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
package extproc
77

88
import (
9-
"bytes"
109
"context"
1110
"encoding/json"
1211
"fmt"
@@ -134,6 +133,7 @@ type messagesProcessorUpstreamFilter struct {
134133
config *processorConfig
135134
requestHeaders map[string]string
136135
responseHeaders map[string]string
136+
responseEncoding string
137137
modelNameOverride string
138138
backendName string
139139
handler backendauth.Handler
@@ -222,6 +222,9 @@ func (c *messagesProcessorUpstreamFilter) ProcessResponseHeaders(ctx context.Con
222222
}()
223223

224224
c.responseHeaders = headersToMap(headers)
225+
if enc := c.responseHeaders["content-encoding"]; enc != "" {
226+
c.responseEncoding = enc
227+
}
225228
headerMutation, err := c.translator.ResponseHeaders(c.responseHeaders)
226229
if err != nil {
227230
return nil, fmt.Errorf("failed to transform response headers: %w", err)
@@ -250,14 +253,21 @@ func (c *messagesProcessorUpstreamFilter) ProcessResponseBody(ctx context.Contex
250253
}
251254
}()
252255

253-
// Simple passthrough: just pass the body as-is without any complex handling.
254-
br := bytes.NewReader(body.Body)
256+
// Decompress the body if needed using common utility.
257+
decodingResult, err := decodeContentIfNeeded(body.Body, c.responseEncoding)
258+
if err != nil {
259+
return nil, err
260+
}
255261

256-
headerMutation, bodyMutation, tokenUsage, err := c.translator.ResponseBody(c.responseHeaders, br, body.EndOfStream)
262+
// headerMutation, bodyMutation, tokenUsage, err := c.translator.ResponseBody(c.responseHeaders, br, body.EndOfStream).
263+
headerMutation, bodyMutation, tokenUsage, err := c.translator.ResponseBody(c.responseHeaders, decodingResult.reader, body.EndOfStream)
257264
if err != nil {
258265
return nil, fmt.Errorf("failed to transform response: %w", err)
259266
}
260267

268+
// Remove content-encoding header if original body encoded but was mutated in the processor.
269+
headerMutation = removeContentEncodingIfNeeded(headerMutation, bodyMutation, decodingResult.isEncoded)
270+
261271
resp := &extprocv3.ProcessingResponse{
262272
Response: &extprocv3.ProcessingResponse_ResponseBody{
263273
ResponseBody: &extprocv3.BodyResponse{

internal/extproc/translator/openai_awsbedrock.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -668,10 +668,10 @@ func (o *openAIToAWSBedrockTranslatorV1ChatCompletion) ResponseBody(_ map[string
668668
choice.Message.Content = output.Text
669669
}
670670
case output.ReasoningContent != nil:
671-
if choice.Message.AWSBedRockResponseVendorFields == nil {
672-
choice.Message.AWSBedRockResponseVendorFields = &openai.AWSBedRockResponseVendorFields{}
671+
if choice.Message.AWSBedrockResponseVendorFields == nil {
672+
choice.Message.AWSBedrockResponseVendorFields = &openai.AWSBedrockResponseVendorFields{}
673673
}
674-
choice.Message.ReasoningContent = &openai.AWSBedRockReasoningContent{ReasoningContent: output.ReasoningContent}
674+
choice.Message.ReasoningContent = &openai.AWSBedrockReasoningContent{ReasoningContent: output.ReasoningContent}
675675
}
676676
}
677677
openAIResp.Choices = append(openAIResp.Choices, choice)
@@ -761,7 +761,7 @@ func (o *openAIToAWSBedrockTranslatorV1ChatCompletion) convertEvent(event *awsbe
761761
},
762762
})
763763
case event.Delta.ReasoningContent != nil:
764-
reasoningDelta := &openai.AWSBedRockStreamReasoningContent{}
764+
reasoningDelta := &openai.AWSBedrockStreamReasoningContent{}
765765

766766
// Map all relevant fields from the Bedrock delta to our flattened OpenAI delta struct.
767767
if event.Delta.ReasoningContent.ReasoningText != nil {

internal/extproc/translator/openai_awsbedrock_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1603,8 +1603,8 @@ func TestOpenAIToAWSBedrockTranslatorV1ChatCompletion_ResponseBody(t *testing.T)
16031603
Message: openai.ChatCompletionResponseChoiceMessage{
16041604
Role: awsbedrock.ConversationRoleAssistant,
16051605
Content: ptr.To("This is the final answer."),
1606-
AWSBedRockResponseVendorFields: &openai.AWSBedRockResponseVendorFields{
1607-
ReasoningContent: &openai.AWSBedRockReasoningContent{
1606+
AWSBedrockResponseVendorFields: &openai.AWSBedrockResponseVendorFields{
1607+
ReasoningContent: &openai.AWSBedrockReasoningContent{
16081608
ReasoningContent: &awsbedrock.ReasoningContentBlock{
16091609
ReasoningText: &awsbedrock.ReasoningTextBlock{
16101610
Text: "This is the model's thought process.",
@@ -1807,7 +1807,7 @@ func TestOpenAIToAWSBedrockTranslator_convertEvent(t *testing.T) {
18071807
Choices: []openai.ChatCompletionResponseChunkChoice{
18081808
{
18091809
Delta: &openai.ChatCompletionResponseChunkChoiceDelta{
1810-
ReasoningContent: &openai.AWSBedRockStreamReasoningContent{
1810+
ReasoningContent: &openai.AWSBedrockStreamReasoningContent{
18111811
Text: "thinking...",
18121812
},
18131813
},

internal/extproc/util.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,60 @@
55

66
package extproc
77

8+
import (
9+
"bytes"
10+
"compress/gzip"
11+
"fmt"
12+
"io"
13+
14+
extprocv3 "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
15+
)
16+
17+
// contentDecodingResult contains the result of content decoding operation.
18+
type contentDecodingResult struct {
19+
reader io.Reader
20+
isEncoded bool
21+
}
22+
23+
// decodeContentIfNeeded decompresses the response body based on the content-encoding header.
24+
// Currently supports gzip encoding, but can be extended to support other encodings in the future.
25+
// Returns a reader for the (potentially decompressed) body and metadata about the encoding.
26+
func decodeContentIfNeeded(body []byte, contentEncoding string) (contentDecodingResult, error) {
27+
switch contentEncoding {
28+
case "gzip":
29+
reader, err := gzip.NewReader(bytes.NewReader(body))
30+
if err != nil {
31+
return contentDecodingResult{}, fmt.Errorf("failed to decode gzip: %w", err)
32+
}
33+
return contentDecodingResult{
34+
reader: reader,
35+
isEncoded: true,
36+
}, nil
37+
default:
38+
return contentDecodingResult{
39+
reader: bytes.NewReader(body),
40+
isEncoded: false,
41+
}, nil
42+
}
43+
}
44+
45+
// removeContentEncodingIfNeeded removes the content-encoding header if the body was modified and was encoded.
46+
// This is needed when the transformation modifies the body content but the response was originally compressed.
47+
func removeContentEncodingIfNeeded(headerMutation *extprocv3.HeaderMutation, bodyMutation *extprocv3.BodyMutation, isEncoded bool) *extprocv3.HeaderMutation {
48+
if bodyMutation != nil && isEncoded {
49+
if headerMutation == nil {
50+
headerMutation = &extprocv3.HeaderMutation{}
51+
}
52+
// TODO: this is a hotfix, we should update this to recompress since its in the header
53+
// If the upstream response was compressed and we decompressed it,
54+
// ensure we remove the content-encoding header.
55+
//
56+
// This is only needed when the transformation is actually modifying the body.
57+
headerMutation.RemoveHeaders = append(headerMutation.RemoveHeaders, "content-encoding")
58+
}
59+
return headerMutation
60+
}
61+
862
// isGoodStatusCode checks if the HTTP status code of the upstream response is successful.
963
// The 2xx - Successful: The request is received by upstream and processed successfully.
1064
// https://developer.mozilla.org/en-US/docs/Web/HTTP/Status#successful_responses

0 commit comments

Comments
 (0)