Skip to content

Commit 9455f12

Browse files
committed
more
Signed-off-by: Takeshi Yoneda <[email protected]>
1 parent 95fce20 commit 9455f12

File tree

4 files changed

+48
-55
lines changed

4 files changed

+48
-55
lines changed

internal/dynamic_module/router_filter.go

Lines changed: 17 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
cohereschema "github.com/envoyproxy/ai-gateway/internal/apischema/cohere"
1212
"github.com/envoyproxy/ai-gateway/internal/apischema/openai"
1313
"github.com/envoyproxy/ai-gateway/internal/dynamic_module/sdk"
14+
"github.com/envoyproxy/ai-gateway/internal/filterapi"
1415
"github.com/envoyproxy/ai-gateway/internal/internalapi"
1516
openaisdk "github.com/openai/openai-go/v2"
1617
)
@@ -33,8 +34,6 @@ type (
3334
originalRequestBody interface{}
3435
originalRequestBodyRaw []byte
3536
}
36-
37-
requestBodyParserFn func(body []byte) (parsed interface{}, modelName string, err error)
3837
)
3938

4039
func newRouterFilterConfig(fcr *filterConfigReceiverImpl) *routerFilterConfig {
@@ -59,6 +58,11 @@ func (f *routerFilterConfig) NewFilter() sdk.HTTPFilter {
5958
return &routerFilter{fc: f}
6059
}
6160

61+
// filterConfig is a helper that returns the filter config for the current stream.
62+
func (f *routerFilter) filterConfig() *filterapi.RuntimeConfig {
63+
return f.fc.fcr.fc
64+
}
65+
6266
// RequestHeaders implements [sdk.HTTPFilter].
6367
func (f *routerFilter) RequestHeaders(e sdk.EnvoyHTTPFilter, _ bool) sdk.RequestHeadersStatus {
6468
p, _ := e.GetRequestHeader(":path") // The :path pseudo header is always present.
@@ -110,24 +114,24 @@ func (f *routerFilter) RequestBody(e sdk.EnvoyHTTPFilter, endOfStream bool) sdk.
110114
return sdk.RequestBodyStatusStopIterationAndBuffer
111115
}
112116
f.originalRequestBodyRaw = raw
113-
var parserFn requestBodyParserFn
117+
var parsed any
118+
var modelName string
114119
switch f.endpoint {
115120
case chatCompletionsEndpoint:
116-
parserFn = chatCompletionsBodyParser
121+
parsed, modelName, err = modelBodyParser(raw, func(req *openai.ChatCompletionRequest) string { return req.Model })
117122
case completionsEndpoint:
118-
parserFn = completionsBodyParser
123+
parsed, modelName, err = modelBodyParser(raw, func(req *openai.CompletionRequest) string { return req.Model })
119124
case embeddingsEndpoint:
120-
parserFn = embeddingsBodyParser
125+
parsed, modelName, err = modelBodyParser(raw, func(req *openai.EmbeddingRequest) string { return req.Model })
121126
case imagesGenerationsEndpoint:
122-
parserFn = imagesGenerationsBodyParser
127+
parsed, modelName, err = modelBodyParser(raw, func(req *openaisdk.ImageGenerateParams) string { return req.Model })
123128
case rerankEndpoint:
124-
parserFn = rerankBodyParser
129+
parsed, modelName, err = modelBodyParser(raw, func(req *cohereschema.RerankV2Request) string { return req.Model })
125130
case messagesEndpoint:
126-
parserFn = messagesBodyParser
131+
parsed, modelName, err = modelBodyParser(raw, func(req *anthropic.MessagesRequest) string { return req.GetModel() })
127132
default:
128133
e.SendLocalReply(500, nil, []byte("BUG: unsupported endpoint at body parsing: "+fmt.Sprintf("%d", f.endpoint)))
129134
}
130-
parsed, modelName, err := parserFn(raw)
131135
if err != nil {
132136
e.SendLocalReply(400, nil, []byte("failed to parse request body: "+err.Error()))
133137
return sdk.RequestBodyStatusStopIterationAndBuffer
@@ -164,50 +168,10 @@ func (f *routerFilter) handleModelsEndpoint(e sdk.EnvoyHTTPFilter) sdk.RequestHe
164168
return sdk.RequestHeadersStatusStopIteration
165169
}
166170

167-
func chatCompletionsBodyParser(body []byte) (interface{}, string, error) {
168-
var req openai.ChatCompletionRequest
169-
if err := json.Unmarshal(body, &req); err != nil {
170-
return nil, "", fmt.Errorf("failed to unmarshal body: %w", err)
171-
}
172-
return req, req.Model, nil
173-
}
174-
175-
func completionsBodyParser(body []byte) (interface{}, string, error) {
176-
var req openai.CompletionRequest
171+
func modelBodyParser[T any](body []byte, modelExtractFn func(req *T) string) (interface{}, string, error) {
172+
var req T
177173
if err := json.Unmarshal(body, &req); err != nil {
178174
return nil, "", fmt.Errorf("failed to unmarshal body: %w", err)
179175
}
180-
return req, req.Model, nil
181-
}
182-
183-
func embeddingsBodyParser(body []byte) (interface{}, string, error) {
184-
var req openai.EmbeddingRequest
185-
if err := json.Unmarshal(body, &req); err != nil {
186-
return nil, "", fmt.Errorf("failed to unmarshal body: %w", err)
187-
}
188-
return req, req.Model, nil
189-
}
190-
191-
func imagesGenerationsBodyParser(body []byte) (interface{}, string, error) {
192-
var req openaisdk.ImageGenerateParams
193-
if err := json.Unmarshal(body, &req); err != nil {
194-
return nil, "", fmt.Errorf("failed to unmarshal body: %w", err)
195-
}
196-
return req, req.Model, nil
197-
}
198-
199-
func rerankBodyParser(body []byte) (interface{}, string, error) {
200-
var req cohereschema.RerankV2Request
201-
if err := json.Unmarshal(body, &req); err != nil {
202-
return nil, "", fmt.Errorf("failed to unmarshal body: %w", err)
203-
}
204-
return req, req.Model, nil
205-
}
206-
207-
func messagesBodyParser(body []byte) (interface{}, string, error) {
208-
var anthropicReq anthropic.MessagesRequest
209-
if err := json.Unmarshal(body, &anthropicReq); err != nil {
210-
return nil, "", fmt.Errorf("failed to unmarshal body: %w", err)
211-
}
212-
return anthropicReq, anthropicReq.GetModel(), nil
176+
return req, modelExtractFn(&req), nil
213177
}

internal/dynamic_module/sdk/abi_envoy_v1.36.go

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -551,17 +551,33 @@ func (e envoyFilter) GetResponseBody() (io.Reader, bool) {
551551
return &bodyReader{chunks: chunks}, true
552552
}
553553

554+
// https://github.com/envoyproxy/envoy/blob/dc2d3098ae5641555f15c71d5bb5ce0060a8015c/source/extensions/dynamic_modules/abi.h#L271-L282
555+
type metadataSource uint32
556+
557+
const (
558+
metadataSourceDynamic metadataSource = 0
559+
metadataSourceUpstreamHost metadataSource = 3
560+
)
561+
554562
// GetDynamicMetadataString implements [EnvoyHTTPFilter].
555563
func (e envoyFilter) GetDynamicMetadataString(namespace string, key string) (string, bool) {
564+
return e.getMetadataString(namespace, key, metadataSourceDynamic)
565+
}
566+
567+
// GetUpstreamHostMetadataString implements [EnvoyHTTPFilter].
568+
func (e envoyFilter) GetUpstreamHostMetadataString(namespace string, key string) (string, bool) {
569+
return e.getMetadataString(namespace, key, metadataSourceUpstreamHost)
570+
}
571+
572+
func (e envoyFilter) getMetadataString(namespace string, key string, source metadataSource) (string, bool) {
556573
namespacePtr := uintptr(unsafe.Pointer(unsafe.StringData(namespace)))
557574
keyPtr := uintptr(unsafe.Pointer(unsafe.StringData(key)))
558575
var resultBufferPtr *byte
559576
var resultBufferLengthPtr C.size_t
560577

561578
ret := C.envoy_dynamic_module_callback_http_get_metadata_string(
562579
C.uintptr_t(e.raw),
563-
// https://github.com/envoyproxy/envoy/blob/dc2d3098ae5641555f15c71d5bb5ce0060a8015c/source/extensions/dynamic_modules/abi.h#L273
564-
C.uint32_t(0), // source
580+
C.uint32_t(source),
565581
C.uintptr_t(namespacePtr),
566582
C.size_t(len(namespace)),
567583
C.uintptr_t(keyPtr),

internal/dynamic_module/sdk/gosdk.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ type EnvoyHTTPFilter interface {
5454
SendLocalReply(statusCode uint32, headers [][2]string, body []byte)
5555
// GetDynamicMetadataString gets the dynamic metadata value for the given namespace and key. Returns the value and true if the value is found.
5656
GetDynamicMetadataString(namespace string, key string) (string, bool)
57+
// GetUpstreamHostMetadataString gets the upstream host metadata value for the given namespace and key. Returns the value and true if the value is found.
58+
GetUpstreamHostMetadataString(namespace string, key string) (string, bool)
5759
// SetDynamicMetadataString sets the dynamic metadata value for the given namespace and key. Returns true if the value is set successfully.
5860
SetDynamicMetadataString(namespace string, key string, value string) bool
5961
}

internal/dynamic_module/upstream_filter.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,17 @@ func (f *upstreamFilter) RequestHeaders(e sdk.EnvoyHTTPFilter, _ bool) sdk.Reque
3838
return sdk.RequestHeadersStatusStopIteration
3939
}
4040
f.rf = (*routerFilter)(unsafe.Pointer(uintptr(rfPtr)))
41+
42+
backend, ok := e.GetUpstreamHostMetadataString(internalapi.AIGatewayFilterMetadataNamespace, internalapi.InternalMetadataBackendNameKey)
43+
if !ok {
44+
e.SendLocalReply(500, nil, []byte("backend name not found in upstream host metadata"))
45+
return sdk.RequestHeadersStatusStopIteration
46+
}
47+
b, ok := f.rf.filterConfig().Backends[backend]
48+
if !ok {
49+
e.SendLocalReply(500, nil, []byte(fmt.Sprintf("backend %s not found in filter config", backend)))
50+
return sdk.RequestHeadersStatusStopIteration
51+
}
4152
return sdk.RequestHeadersStatusContinue
4253
}
4354

0 commit comments

Comments
 (0)