Skip to content

Commit 884d344

Browse files
fix: streamline gzip handling in response processing (envoyproxy#1189)
**Description** Bug: Token Usage not extracted for gzipped http-responses from upstream when endpoint is `v1/messages` Root Cause: The `messages_processor.go` doensn't decompress response body (if gzipped) before extracting token usage. see [1]. Corresponding code in `chatcompletion_processor.go`(See [2]) which handles gzip decompressions. This PR introduces utility functions for handling gzipped http-responses and adds them to `chatcompletion_processor.go`, `embeddings_processor.go` and `messages_processor.go` [1]: https://github.com/envoyproxy/ai-gateway/blob/e767da5559fadf89a01d2168eea262b48a1ab6a4/internal/extproc/messages_processor.go#L254 [2]: https://github.com/envoyproxy/ai-gateway/blob/e767da5559fadf89a01d2168eea262b48a1ab6a4/internal/extproc/chatcompletion_processor.go#L330-L340 --------- Signed-off-by: Sukumar Gaonkar <[email protected]>
1 parent 8a739ab commit 884d344

File tree

5 files changed

+212
-60
lines changed

5 files changed

+212
-60
lines changed

internal/extproc/chatcompletion_processor.go

Lines changed: 10 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,9 @@
66
package extproc
77

88
import (
9-
"bytes"
10-
"compress/gzip"
119
"context"
1210
"encoding/json"
1311
"fmt"
14-
"io"
1512
"log/slog"
1613
"strconv"
1714

@@ -326,24 +323,18 @@ func (c *chatCompletionProcessorUpstreamFilter) ProcessResponseBody(ctx context.
326323
c.metrics.RecordRequestCompletion(ctx, true, c.requestHeaders)
327324
}
328325
}()
329-
var br io.Reader
330-
var isGzip bool
331-
switch c.responseEncoding {
332-
case "gzip":
333-
br, err = gzip.NewReader(bytes.NewReader(body.Body))
334-
if err != nil {
335-
return nil, fmt.Errorf("failed to decode gzip: %w", err)
336-
}
337-
isGzip = true
338-
default:
339-
br = bytes.NewReader(body.Body)
326+
327+
// Decompress the body if needed using common utility.
328+
decodingResult, err := decodeContentIfNeeded(body.Body, c.responseEncoding)
329+
if err != nil {
330+
return nil, err
340331
}
341332

342333
// Assume all responses have a valid status code header.
343334
if code, _ := strconv.Atoi(c.responseHeaders[":status"]); !isGoodStatusCode(code) {
344335
var headerMutation *extprocv3.HeaderMutation
345336
var bodyMutation *extprocv3.BodyMutation
346-
headerMutation, bodyMutation, err = c.translator.ResponseError(c.responseHeaders, br)
337+
headerMutation, bodyMutation, err = c.translator.ResponseError(c.responseHeaders, decodingResult.reader)
347338
if err != nil {
348339
return nil, fmt.Errorf("failed to transform response error: %w", err)
349340
}
@@ -368,22 +359,13 @@ func (c *chatCompletionProcessorUpstreamFilter) ProcessResponseBody(ctx context.
368359
}, nil
369360
}
370361

371-
headerMutation, bodyMutation, tokenUsage, err := c.translator.ResponseBody(c.responseHeaders, br, body.EndOfStream, c.span)
362+
headerMutation, bodyMutation, tokenUsage, err := c.translator.ResponseBody(c.responseHeaders, decodingResult.reader, body.EndOfStream, c.span)
372363
if err != nil {
373364
return nil, fmt.Errorf("failed to transform response: %w", err)
374365
}
375-
if bodyMutation != nil && isGzip {
376-
if headerMutation == nil {
377-
headerMutation = &extprocv3.HeaderMutation{}
378-
}
379-
// TODO: this is a hotfix, we should update this to recompress since its in the header
380-
// If the response was gzipped, ensure we remove the content-encoding header.
381-
//
382-
// This is only needed when the transformation is actually modifying the body. When the backend
383-
// is in OpenAI format (and it's the first try before any retry), the response body is not modified,
384-
// so we don't need to remove the header in that case.
385-
headerMutation.RemoveHeaders = append(headerMutation.RemoveHeaders, "content-encoding")
386-
}
366+
367+
// Remove content-encoding header if original body encoded but was mutated in the processor.
368+
headerMutation = removeContentEncodingIfNeeded(headerMutation, bodyMutation, decodingResult.isEncoded)
387369

388370
resp := &extprocv3.ProcessingResponse{
389371
Response: &extprocv3.ProcessingResponse_ResponseBody{

internal/extproc/embeddings_processor.go

Lines changed: 10 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,9 @@
66
package extproc
77

88
import (
9-
"bytes"
10-
"compress/gzip"
119
"context"
1210
"encoding/json"
1311
"fmt"
14-
"io"
1512
"log/slog"
1613
"strconv"
1714

@@ -250,24 +247,18 @@ func (e *embeddingsProcessorUpstreamFilter) ProcessResponseBody(ctx context.Cont
250247
e.metrics.RecordRequestCompletion(ctx, true, e.requestHeaders)
251248
}
252249
}()
253-
var br io.Reader
254-
var isGzip bool
255-
switch e.responseEncoding {
256-
case "gzip":
257-
br, err = gzip.NewReader(bytes.NewReader(body.Body))
258-
if err != nil {
259-
return nil, fmt.Errorf("failed to decode gzip: %w", err)
260-
}
261-
isGzip = true
262-
default:
263-
br = bytes.NewReader(body.Body)
250+
251+
// Decompress the body if needed using common utility.
252+
decodingResult, err := decodeContentIfNeeded(body.Body, e.responseEncoding)
253+
if err != nil {
254+
return nil, err
264255
}
265256

266257
// Assume all responses have a valid status code header.
267258
if code, _ := strconv.Atoi(e.responseHeaders[":status"]); !isGoodStatusCode(code) {
268259
var headerMutation *extprocv3.HeaderMutation
269260
var bodyMutation *extprocv3.BodyMutation
270-
headerMutation, bodyMutation, err = e.translator.ResponseError(e.responseHeaders, br)
261+
headerMutation, bodyMutation, err = e.translator.ResponseError(e.responseHeaders, decodingResult.reader)
271262
if err != nil {
272263
return nil, fmt.Errorf("failed to transform response error: %w", err)
273264
}
@@ -285,22 +276,13 @@ func (e *embeddingsProcessorUpstreamFilter) ProcessResponseBody(ctx context.Cont
285276
}, nil
286277
}
287278

288-
headerMutation, bodyMutation, tokenUsage, err := e.translator.ResponseBody(e.responseHeaders, br, body.EndOfStream)
279+
headerMutation, bodyMutation, tokenUsage, err := e.translator.ResponseBody(e.responseHeaders, decodingResult.reader, body.EndOfStream)
289280
if err != nil {
290281
return nil, fmt.Errorf("failed to transform response: %w", err)
291282
}
292-
if bodyMutation != nil && isGzip {
293-
if headerMutation == nil {
294-
headerMutation = &extprocv3.HeaderMutation{}
295-
}
296-
// TODO: this is a hotfix, we should update this to recompress since its in the header
297-
// If the response was gzipped, ensure we remove the content-encoding header.
298-
//
299-
// This is only needed when the transformation is actually modifying the body. When the backend
300-
// is in OpenAI format (and it's the first try before any retry), the response body is not modified,
301-
// so we don't need to remove the header in that case.
302-
headerMutation.RemoveHeaders = append(headerMutation.RemoveHeaders, "content-encoding")
303-
}
283+
284+
// Remove content-encoding header if original body encoded but was mutated in the processor.
285+
headerMutation = removeContentEncodingIfNeeded(headerMutation, bodyMutation, decodingResult.isEncoded)
304286

305287
resp := &extprocv3.ProcessingResponse{
306288
Response: &extprocv3.ProcessingResponse_ResponseBody{

internal/extproc/messages_processor.go

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
package extproc
77

88
import (
9-
"bytes"
109
"context"
1110
"encoding/json"
1211
"fmt"
@@ -134,6 +133,7 @@ type messagesProcessorUpstreamFilter struct {
134133
config *processorConfig
135134
requestHeaders map[string]string
136135
responseHeaders map[string]string
136+
responseEncoding string
137137
modelNameOverride string
138138
backendName string
139139
handler backendauth.Handler
@@ -222,6 +222,9 @@ func (c *messagesProcessorUpstreamFilter) ProcessResponseHeaders(ctx context.Con
222222
}()
223223

224224
c.responseHeaders = headersToMap(headers)
225+
if enc := c.responseHeaders["content-encoding"]; enc != "" {
226+
c.responseEncoding = enc
227+
}
225228
headerMutation, err := c.translator.ResponseHeaders(c.responseHeaders)
226229
if err != nil {
227230
return nil, fmt.Errorf("failed to transform response headers: %w", err)
@@ -250,14 +253,21 @@ func (c *messagesProcessorUpstreamFilter) ProcessResponseBody(ctx context.Contex
250253
}
251254
}()
252255

253-
// Simple passthrough: just pass the body as-is without any complex handling.
254-
br := bytes.NewReader(body.Body)
256+
// Decompress the body if needed using common utility.
257+
decodingResult, err := decodeContentIfNeeded(body.Body, c.responseEncoding)
258+
if err != nil {
259+
return nil, err
260+
}
255261

256-
headerMutation, bodyMutation, tokenUsage, err := c.translator.ResponseBody(c.responseHeaders, br, body.EndOfStream)
262+
// headerMutation, bodyMutation, tokenUsage, err := c.translator.ResponseBody(c.responseHeaders, br, body.EndOfStream).
263+
headerMutation, bodyMutation, tokenUsage, err := c.translator.ResponseBody(c.responseHeaders, decodingResult.reader, body.EndOfStream)
257264
if err != nil {
258265
return nil, fmt.Errorf("failed to transform response: %w", err)
259266
}
260267

268+
// Remove content-encoding header if original body encoded but was mutated in the processor.
269+
headerMutation = removeContentEncodingIfNeeded(headerMutation, bodyMutation, decodingResult.isEncoded)
270+
261271
resp := &extprocv3.ProcessingResponse{
262272
Response: &extprocv3.ProcessingResponse_ResponseBody{
263273
ResponseBody: &extprocv3.BodyResponse{

internal/extproc/util.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,60 @@
55

66
package extproc
77

8+
import (
9+
"bytes"
10+
"compress/gzip"
11+
"fmt"
12+
"io"
13+
14+
extprocv3 "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
15+
)
16+
17+
// contentDecodingResult contains the result of content decoding operation.
18+
type contentDecodingResult struct {
19+
reader io.Reader
20+
isEncoded bool
21+
}
22+
23+
// decodeContentIfNeeded decompresses the response body based on the content-encoding header.
24+
// Currently supports gzip encoding, but can be extended to support other encodings in the future.
25+
// Returns a reader for the (potentially decompressed) body and metadata about the encoding.
26+
func decodeContentIfNeeded(body []byte, contentEncoding string) (contentDecodingResult, error) {
27+
switch contentEncoding {
28+
case "gzip":
29+
reader, err := gzip.NewReader(bytes.NewReader(body))
30+
if err != nil {
31+
return contentDecodingResult{}, fmt.Errorf("failed to decode gzip: %w", err)
32+
}
33+
return contentDecodingResult{
34+
reader: reader,
35+
isEncoded: true,
36+
}, nil
37+
default:
38+
return contentDecodingResult{
39+
reader: bytes.NewReader(body),
40+
isEncoded: false,
41+
}, nil
42+
}
43+
}
44+
45+
// removeContentEncodingIfNeeded removes the content-encoding header if the body was modified and was encoded.
46+
// This is needed when the transformation modifies the body content but the response was originally compressed.
47+
func removeContentEncodingIfNeeded(headerMutation *extprocv3.HeaderMutation, bodyMutation *extprocv3.BodyMutation, isEncoded bool) *extprocv3.HeaderMutation {
48+
if bodyMutation != nil && isEncoded {
49+
if headerMutation == nil {
50+
headerMutation = &extprocv3.HeaderMutation{}
51+
}
52+
// TODO: this is a hotfix, we should update this to recompress since its in the header
53+
// If the upstream response was compressed and we decompressed it,
54+
// ensure we remove the content-encoding header.
55+
//
56+
// This is only needed when the transformation is actually modifying the body.
57+
headerMutation.RemoveHeaders = append(headerMutation.RemoveHeaders, "content-encoding")
58+
}
59+
return headerMutation
60+
}
61+
862
// isGoodStatusCode checks if the HTTP status code of the upstream response is successful.
963
// The 2xx - Successful: The request is received by upstream and processed successfully.
1064
// https://developer.mozilla.org/en-US/docs/Web/HTTP/Status#successful_responses

internal/extproc/util_test.go

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,12 @@
66
package extproc
77

88
import (
9+
"bytes"
10+
"compress/gzip"
11+
"io"
912
"testing"
1013

14+
extprocv3 "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
1115
"github.com/stretchr/testify/require"
1216
)
1317

@@ -19,3 +23,123 @@ func TestIsGoodStatusCode(t *testing.T) {
1923
require.False(t, isGoodStatusCode(s))
2024
}
2125
}
26+
27+
func TestDecodeContentIfNeeded(t *testing.T) {
28+
tests := []struct {
29+
name string
30+
body []byte
31+
encoding string
32+
wantEncoded bool
33+
wantEncoding string
34+
wantErr bool
35+
}{
36+
{
37+
name: "plain body",
38+
body: []byte("hello world"),
39+
encoding: "",
40+
wantEncoded: false,
41+
wantEncoding: "",
42+
wantErr: false,
43+
},
44+
{
45+
name: "unsupported encoding",
46+
body: []byte("hello world"),
47+
encoding: "deflate",
48+
wantEncoded: false,
49+
wantEncoding: "",
50+
wantErr: false,
51+
},
52+
{
53+
name: "valid gzip",
54+
body: func() []byte {
55+
var b bytes.Buffer
56+
w := gzip.NewWriter(&b)
57+
_, err := w.Write([]byte("abc"))
58+
if err != nil {
59+
panic(err)
60+
}
61+
w.Close()
62+
return b.Bytes()
63+
}(),
64+
encoding: "gzip",
65+
wantEncoded: true,
66+
wantEncoding: "gzip",
67+
wantErr: false,
68+
},
69+
{
70+
name: "invalid gzip",
71+
body: []byte("not a gzip"),
72+
encoding: "gzip",
73+
wantEncoded: false,
74+
wantEncoding: "",
75+
wantErr: true,
76+
},
77+
}
78+
for _, tt := range tests {
79+
t.Run(tt.name, func(t *testing.T) {
80+
res, err := decodeContentIfNeeded(tt.body, tt.encoding)
81+
if tt.wantErr {
82+
require.Error(t, err)
83+
return
84+
}
85+
require.NoError(t, err)
86+
require.Equal(t, tt.wantEncoded, res.isEncoded)
87+
if !tt.wantEncoded {
88+
out, _ := io.ReadAll(res.reader)
89+
require.Equal(t, tt.body, out)
90+
} else if tt.encoding == "gzip" && !tt.wantErr {
91+
out, _ := io.ReadAll(res.reader)
92+
require.Equal(t, []byte("abc"), out)
93+
}
94+
})
95+
}
96+
}
97+
98+
func TestRemoveContentEncodingIfNeeded(t *testing.T) {
99+
tests := []struct {
100+
name string
101+
hm *extprocv3.HeaderMutation
102+
bm *extprocv3.BodyMutation
103+
isEncoded bool
104+
wantRemoved bool
105+
}{
106+
{
107+
name: "no body mutation, not encoded",
108+
hm: nil,
109+
bm: nil,
110+
isEncoded: false,
111+
wantRemoved: false,
112+
},
113+
{
114+
name: "body mutation, not encoded",
115+
hm: nil,
116+
bm: &extprocv3.BodyMutation{},
117+
isEncoded: false,
118+
wantRemoved: false,
119+
},
120+
{
121+
name: "body mutation, encoded",
122+
hm: nil,
123+
bm: &extprocv3.BodyMutation{},
124+
isEncoded: true,
125+
wantRemoved: true,
126+
},
127+
{
128+
name: "existing header mutation, body mutation, encoded",
129+
hm: &extprocv3.HeaderMutation{RemoveHeaders: []string{"foo"}},
130+
bm: &extprocv3.BodyMutation{},
131+
isEncoded: true,
132+
wantRemoved: true,
133+
},
134+
}
135+
for _, tt := range tests {
136+
t.Run(tt.name, func(t *testing.T) {
137+
res := removeContentEncodingIfNeeded(tt.hm, tt.bm, tt.isEncoded)
138+
if tt.wantRemoved {
139+
require.Contains(t, res.RemoveHeaders, "content-encoding")
140+
} else if res != nil {
141+
require.NotContains(t, res.RemoveHeaders, "content-encoding")
142+
}
143+
})
144+
}
145+
}

0 commit comments

Comments
 (0)