Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 10 additions & 86 deletions internal/extproc/embeddings_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,14 +496,13 @@ func TestEmbeddingsProcessorRouterFilter_ProcessResponseHeaders_ProcessResponseB
}

func TestEmbeddingsProcessorUpstreamFilter_ProcessRequestHeaders_WithHeaderMutations(t *testing.T) {
const testModelKey = "x-ai-gateway-model-key"
t.Run("header mutations applied correctly", func(t *testing.T) {
headers := map[string]string{
":path": "/v1/embeddings",
testModelKey: "some-model",
"authorization": "bearer token123",
"x-api-key": "secret-key",
"x-custom": "custom-value",
":path": "/v1/embeddings",
internalapi.ModelNameHeaderKeyDefault: "some-model",
"authorization": "bearer token123",
"x-api-key": "secret-key",
"x-custom": "custom-value",
}
someBody := embeddingBodyFromModel(t, "some-model")
var body openai.EmbeddingRequest
Expand Down Expand Up @@ -557,86 +556,11 @@ func TestEmbeddingsProcessorUpstreamFilter_ProcessRequestHeaders_WithHeaderMutat
require.Equal(t, "custom-value", headers["x-custom"])
})

t.Run("header mutations restored on retry", func(t *testing.T) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this simply re-tests the headermutator internal so it's overlapping with the tests there, hence removed it.

headers := map[string]string{
":path": "/v1/embeddings",
testModelKey: "some-model",
// "x-custom" is not present in current headers, so it can be restored.
"x-new-header": "new-value", // Already set from previous mutation.
}
someBody := embeddingBodyFromModel(t, "some-model")
var body openai.EmbeddingRequest
require.NoError(t, json.Unmarshal(someBody, &body))

// Create header mutations that don't remove x-custom (so it can be restored).
headerMutations := &filterapi.HTTPHeaderMutation{
Remove: []string{"authorization", "x-api-key"},
Set: []filterapi.HTTPHeader{{Name: "x-new-header", Value: "updated-value"}},
}

mt := &mockEmbeddingTranslator{t: t, expRequestBody: &body}
mm := &mockEmbeddingsMetrics{}
p := &embeddingsProcessorUpstreamFilter{
config: &processorConfig{},
requestHeaders: headers,
logger: slog.Default(),
metrics: mm,
translator: mt,
originalRequestBodyRaw: someBody,
originalRequestBody: &body,
handler: &mockBackendAuthHandler{},
onRetry: true, // This is a retry request.
}

// Use the same headers map as the original headers (this simulates the router filter's requestHeaders).
originalHeaders := map[string]string{
":path": "/v1/embeddings",
testModelKey: "some-model",
"authorization": "bearer original-token", // This will be removed, so won't be restored.
"x-api-key": "original-secret", // This will be removed, so won't be restored.
"x-custom": "original-custom", // This won't be removed, so can be restored.
"x-new-header": "original-value", // This will be set, so won't be restored.
}
p.headerMutator = headermutator.NewHeaderMutator(headerMutations, originalHeaders)

resp, err := p.ProcessRequestHeaders(t.Context(), nil)
require.NoError(t, err)
require.NotNil(t, resp)

commonRes := resp.Response.(*extprocv3.ProcessingResponse_RequestHeaders).RequestHeaders.Response

// Check that header mutations were applied.
require.NotNil(t, commonRes.HeaderMutation)
// RemoveHeaders should be empty because authorization/x-api-key don't exist in current headers.
require.Empty(t, commonRes.HeaderMutation.RemoveHeaders)
require.Len(t, commonRes.HeaderMutation.SetHeaders, 2) // Updated header + restored header.

// Check that x-custom header was restored on retry (it's not being removed or set).
var restoredHeader *corev3.HeaderValueOption
var updatedHeader *corev3.HeaderValueOption
for _, h := range commonRes.HeaderMutation.SetHeaders {
switch h.Header.Key {
case "x-custom":
restoredHeader = h
case "x-new-header":
updatedHeader = h
}
}
require.NotNil(t, restoredHeader)
require.Equal(t, []byte("original-custom"), restoredHeader.Header.RawValue)
require.NotNil(t, updatedHeader)
require.Equal(t, []byte("updated-value"), updatedHeader.Header.RawValue)

// Check that headers were updated in the request headers.
require.Equal(t, "updated-value", headers["x-new-header"])
require.Equal(t, "original-custom", headers["x-custom"])
})

t.Run("no header mutations when mutator is nil", func(t *testing.T) {
headers := map[string]string{
":path": "/v1/embeddings",
testModelKey: "some-model",
"authorization": "bearer token123",
":path": "/v1/embeddings",
internalapi.ModelNameHeaderKeyDefault: "some-model",
"authorization": "bearer token123",
}
someBody := embeddingBodyFromModel(t, "some-model")
var body openai.EmbeddingRequest
Expand Down Expand Up @@ -756,7 +680,7 @@ func TestEmbeddingsProcessorUpstreamFilter_SetBackend_WithHeaderMutations(t *tes

// Test retry scenario - original headers should be restored.
testHeaders := map[string]string{
"x-existing": "current-value", // This exists, so won't be restored.
"x-existing": "previously-set-value",
}
mutation := p.headerMutator.Mutate(testHeaders, true) // onRetry = true.

Expand All @@ -776,6 +700,6 @@ func TestEmbeddingsProcessorUpstreamFilter_SetBackend_WithHeaderMutations(t *tes
require.Equal(t, []byte("original-value"), restoredHeader.Header.RawValue)
require.Equal(t, "original-value", testHeaders["x-custom"])
// x-existing should not be restored because it already exists.
require.Equal(t, "current-value", testHeaders["x-existing"])
require.Equal(t, "existing-value", testHeaders["x-existing"])
})
}
31 changes: 28 additions & 3 deletions internal/extproc/headermutator/header_mutator.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
extprocv3 "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"

"github.com/envoyproxy/ai-gateway/internal/filterapi"
"github.com/envoyproxy/ai-gateway/internal/internalapi"
)

type HeaderMutator struct {
Expand Down Expand Up @@ -62,8 +63,8 @@ func (h *HeaderMutator) Mutate(headers map[string]string, onRetry bool) *extproc
}
}

// Restore original headers on retry, only if not being removed, set or not already present.
if onRetry && h.originalHeaders != nil {
if onRetry {
// Restore original headers on retry, only if not being removed, set or not already present.
for h, v := range h.originalHeaders {
key := strings.ToLower(h)
_, isRemoved := removedHeadersSet[key]
Expand All @@ -76,7 +77,31 @@ func (h *HeaderMutator) Mutate(headers map[string]string, onRetry bool) *extproc
})
}
}
// 1. Remove any headers that were added in the previous attempt (not part of original headers and not being set now).
// 2. Restore any original headers that were modified in the previous attempt (and not being set now).
for key := range headers {
key = strings.ToLower(key)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just TOL, should we merge this loop and the loop over original headers? If I understand this loop (if no removals or additions) will make headers equal to orignalHeaders. Should we start with headers = originalHeaders and go ahead with remaining mutations?
Initially I skipped this to avoid any header that was already added and was different from original header before entering mutation but as we are making them same now just wondering if extra loop is required

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's do it in a follow up later

// Skip Envoy AI Gateway headers since some of them are populated after the originalHeaders are captured.
// This should be safe since these headers are managed by Envoy AI Gateway itself, not expected to be
// modified by users via header mutation API.
if strings.HasPrefix(key, internalapi.EnvoyAIGatewayHeaderPrefix) || strings.HasPrefix(key, ":") {
continue
}
if _, set := setHeadersSet[key]; set {
continue
}
originalValue, exists := h.originalHeaders[key]
if !exists {
delete(headers, key)
headerMutation.RemoveHeaders = append(headerMutation.RemoveHeaders, key)
} else {
// Restore original value.
headers[key] = originalValue
headerMutation.SetHeaders = append(headerMutation.SetHeaders, &corev3.HeaderValueOption{
Header: &corev3.HeaderValue{Key: key, RawValue: []byte(originalValue)},
})
}
}
}

return headerMutation
}
14 changes: 10 additions & 4 deletions internal/extproc/headermutator/header_mutator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,17 @@ func TestHeaderMutator_Mutate(t *testing.T) {

t.Run("restore original headers on retry", func(t *testing.T) {
originalHeaders := map[string]string{
"authorization": "secret",
"x-api-key": "key123",
"other": "value",
"authorization": "secret",
"x-api-key": "key123",
"other": "value",
"only-in-original": "original",
"in-original-too-but-previous-attempt-set": "pikachu",
}
headers := map[string]string{
"other": "value",
"authorization": "secret",
"in-original-too-but-previous-attempt-set": "charmander",
"only-set-previously": "bulbasaur",
}
mutations := &filterapi.HTTPHeaderMutation{
Remove: []string{"authorization"},
Expand All @@ -57,9 +61,11 @@ func TestHeaderMutator_Mutate(t *testing.T) {
mutation := mutator.Mutate(headers, true)

require.NotNil(t, mutation)
require.ElementsMatch(t, []string{"authorization"}, mutation.RemoveHeaders)
require.ElementsMatch(t, []string{"authorization", "only-set-previously"}, mutation.RemoveHeaders)
require.Equal(t, "key123", headers["x-api-key"])
require.Equal(t, "value", headers["other"])
require.Equal(t, "secret", headers["authorization"])
require.Equal(t, "original", headers["only-in-original"])
require.Equal(t, "pikachu", headers["in-original-too-but-previous-attempt-set"])
})
}
98 changes: 2 additions & 96 deletions internal/extproc/messages_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -769,100 +769,6 @@ func TestMessagesProcessorUpstreamFilter_ProcessRequestHeaders_WithHeaderMutatio
require.Equal(t, "custom-value", headers["x-custom"])
})

t.Run("header mutations restored on retry", func(t *testing.T) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same

headers := map[string]string{
":path": "/anthropic/v1/messages",
"x-ai-eg-model": "claude-3-sonnet",
// "x-custom" is not present in current headers, so it can be restored.
"x-new-header": "new-value", // Already set from previous mutation.
}

// Create request body.
requestBody := &anthropicschema.MessagesRequest{
"model": "claude-3-sonnet",
"max_tokens": 1000,
"messages": []any{map[string]any{"role": "user", "content": "Hello"}},
}
requestBodyRaw := []byte(`{"model": "claude-3-sonnet", "max_tokens": 1000, "messages": [{"role": "user", "content": "Hello"}]}`)

// Create header mutations that don't remove x-custom (so it can be restored).
headerMutations := &filterapi.HTTPHeaderMutation{
Remove: []string{"authorization", "x-api-key"},
Set: []filterapi.HTTPHeader{{Name: "x-new-header", Value: "updated-value"}},
}

// Create mock translator.
mockTranslator := mockAnthropicTranslator{
t: t,
expRequestBody: requestBody,
expForceRequestBodyMutation: true, // This is a retry request.
retHeaderMutation: &extprocv3.HeaderMutation{},
retBodyMutation: &extprocv3.BodyMutation{},
retErr: nil,
}

// Create mock metrics.
chatMetrics := metrics.NewMessagesFactory(noop.NewMeterProvider().Meter("test"), map[string]string{})()

// Create processor.
processor := &messagesProcessorUpstreamFilter{
config: &processorConfig{},
requestHeaders: headers,
logger: slog.Default(),
metrics: chatMetrics,
translator: mockTranslator,
originalRequestBody: requestBody,
originalRequestBodyRaw: requestBodyRaw,
handler: &mockBackendAuthHandler{},
onRetry: true, // This is a retry request.
}

// Use the same headers map as the original headers (this simulates the router filter's requestHeaders).
originalHeaders := map[string]string{
":path": "/anthropic/v1/messages",
"x-ai-eg-model": "claude-3-sonnet",
"authorization": "bearer original-token", // This will be removed, so won't be restored.
"x-api-key": "original-secret", // This will be removed, so won't be restored.
"x-custom": "original-custom", // This won't be removed, so can be restored.
"x-new-header": "original-value", // This will be set, so won't be restored.
}
processor.headerMutator = headermutator.NewHeaderMutator(headerMutations, originalHeaders)

ctx := context.Background()
response, err := processor.ProcessRequestHeaders(ctx, nil)

require.NoError(t, err)
require.NotNil(t, response)

commonRes := response.Response.(*extprocv3.ProcessingResponse_RequestHeaders).RequestHeaders.Response

// Check that header mutations were applied.
require.NotNil(t, commonRes.HeaderMutation)
// RemoveHeaders should be empty because authorization/x-api-key don't exist in current headers.
require.Empty(t, commonRes.HeaderMutation.RemoveHeaders)
require.Len(t, commonRes.HeaderMutation.SetHeaders, 2) // Updated header + restored header.

// Check that x-custom header was restored on retry (it's not being removed or set).
var restoredHeader *corev3.HeaderValueOption
var updatedHeader *corev3.HeaderValueOption
for _, h := range commonRes.HeaderMutation.SetHeaders {
switch h.Header.Key {
case "x-custom":
restoredHeader = h
case "x-new-header":
updatedHeader = h
}
}
require.NotNil(t, restoredHeader)
require.Equal(t, []byte("original-custom"), restoredHeader.Header.RawValue)
require.NotNil(t, updatedHeader)
require.Equal(t, []byte("updated-value"), updatedHeader.Header.RawValue)

// Check that headers were updated in the request headers.
require.Equal(t, "updated-value", headers["x-new-header"])
require.Equal(t, "original-custom", headers["x-custom"])
})

t.Run("no header mutations when mutator is nil", func(t *testing.T) {
headers := map[string]string{
":path": "/anthropic/v1/messages",
Expand Down Expand Up @@ -1023,7 +929,7 @@ func TestMessagesProcessorUpstreamFilter_SetBackend_WithHeaderMutations(t *testi

// Test retry scenario - original headers should be restored.
testHeaders := map[string]string{
"x-existing": "current-value", // This exists, so won't be restored.
"x-existing": "current-value",
}
mutation := p.headerMutator.Mutate(testHeaders, true) // onRetry = true.

Expand All @@ -1043,6 +949,6 @@ func TestMessagesProcessorUpstreamFilter_SetBackend_WithHeaderMutations(t *testi
require.Equal(t, []byte("original-value"), restoredHeader.Header.RawValue)
require.Equal(t, "original-value", testHeaders["x-custom"])
// x-existing should not be restored because it already exists.
require.Equal(t, "current-value", testHeaders["x-existing"])
require.Equal(t, "existing-value", testHeaders["x-existing"])
})
}
4 changes: 2 additions & 2 deletions internal/extproc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,11 @@ func (s *Server) processorForPath(requestHeaders map[string]string, isUpstreamFi

// originalPathHeader is the header used to pass the original path to the processor.
// This is used in the upstream filter level to determine the original path of the request on retry.
const originalPathHeader = "x-ai-eg-original-path"
const originalPathHeader = internalapi.EnvoyAIGatewayHeaderPrefix + "original-path"

// internalReqIDHeader is the header used to pass the unique internal request ID to the upstream filter.
// This ensures that the upstream filter uses the same unique ID as the router filter to avoid race conditions.
const internalReqIDHeader = "x-ai-eg-internal-req-id"
const internalReqIDHeader = internalapi.EnvoyAIGatewayHeaderPrefix + "internal-req-id"

// Process implements [extprocv3.ExternalProcessorServer].
func (s *Server) Process(stream extprocv3.ExternalProcessor_ProcessServer) error {
Expand Down
6 changes: 4 additions & 2 deletions internal/internalapi/internalapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,16 @@ import (
)

const (
// EnvoyAIGatewayHeaderPrefix is the prefix for special headers used by AI Gateway, either for internal or external use.
EnvoyAIGatewayHeaderPrefix = "x-ai-eg-"
// InternalEndpointMetadataNamespace is the namespace used for the dynamic metadata for internal use.
InternalEndpointMetadataNamespace = "aigateway.envoy.io"
// InternalMetadataBackendNameKey is the key used to store the backend name
InternalMetadataBackendNameKey = "per_route_rule_backend_name"
// MCPBackendHeader is the special header key used to specify the target backend name.
MCPBackendHeader = "x-ai-eg-mcp-backend"
MCPBackendHeader = EnvoyAIGatewayHeaderPrefix + "mcp-backend"
// MCPRouteHeader is the special header key used to identify the mcp route.
MCPRouteHeader = "x-ai-eg-mcp-route"
MCPRouteHeader = EnvoyAIGatewayHeaderPrefix + "mcp-route"
// MCPBackendListenerPort is the port for the MCP backend listener.
MCPBackendListenerPort = 10088
// MCPProxyPort is the port where the MCP proxy listens.
Expand Down
2 changes: 1 addition & 1 deletion tests/extproc/envoy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ static_resources:
metadata:
filter_metadata:
aigateway.envoy.io:
per_route_rule_backend_name: "testupstream-openai-always-200"
per_route_rule_backend_name: "testupstream-openai"
priority: 1 # Secondary.
- name: testupstream-modelname-override
connect_timeout: 0.25s
Expand Down
4 changes: 0 additions & 4 deletions tests/extproc/testupstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,6 @@ func TestWithTestUpstream(t *testing.T) {
testUpstreamAzureBackend,
testUpstreamGCPVertexAIBackend,
testUpstreamGCPAnthropicAIBackend,
// TODO: this shouldn't be needed. The previous per-backend headers shouldn't affect the subsequent retries.
Copy link
Member Author

@mathetake mathetake Oct 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This TODO summarizes what was happening with the bug being fixed here

{Name: "testupstream-openai-always-200", Schema: openAISchema, HeaderMutation: &filterapi.HTTPHeaderMutation{
Set: []filterapi.HTTPHeader{{Name: testupstreamlib.ResponseStatusKey, Value: "200"}},
}},
{
Name: "testupstream-openai-5xx", Schema: openAISchema, HeaderMutation: &filterapi.HTTPHeaderMutation{
Set: []filterapi.HTTPHeader{{Name: testupstreamlib.ResponseStatusKey, Value: "500"}},
Expand Down
4 changes: 2 additions & 2 deletions tests/internal/testenvironment/test_environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,8 @@ func requireEnvoy(t testing.TB,
"--concurrency", strconv.Itoa(max(runtime.NumCPU(), 2)),
// This allows multiple Envoy instances to run in parallel.
"--base-id", strconv.Itoa(time.Now().Nanosecond()),
// Add debug logging for extproc.
"--component-log-level", "ext_proc:trace,http:debug,connection:debug",
// Add debug logging for http.
"--component-log-level", "http:debug",
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ext_proc trace is just a noise as well as connection stuff as well

)

// wait for the ready message or exit.
Expand Down
Loading