diff --git a/.github/workflows/pr_style_check.yaml b/.github/workflows/pr_style_check.yaml index 955245dc1..9eef6659c 100644 --- a/.github/workflows/pr_style_check.yaml +++ b/.github/workflows/pr_style_check.yaml @@ -87,6 +87,7 @@ jobs: refactor mcp aigw + dynamic_module subjectPattern: ^(?![A-Z]).+$ subjectPatternError: | The subject "{subject}" found in the pull request title "{title}" diff --git a/Makefile b/Makefile index 3ba5c44da..b1b834cc1 100644 --- a/Makefile +++ b/Makefile @@ -270,6 +270,11 @@ build.%: ## Build a binary for the given command under the internal/cmd director build: ## Build all binaries under cmd/ directory. @$(foreach COMMAND_NAME,$(COMMANDS),$(MAKE) build.$(COMMAND_NAME);) +# This builds the dynamic module filter for Envoy. This is the shared library that can be loaded by Envoy to run the AI Gateway filter. +.PHONE: build-dm +build-dm: ## Build the dynamic module for Envoy. + CGO_ENABLED=1 go build -tags "envoy_1.36" -buildmode=c-shared -o $(OUTPUT_DIR)/libaigateway.so ./cmd/dynamic_module + # This builds the docker images for the controller, extproc and testupstream for the e2e tests. .PHONY: build-e2e build-e2e: ## Build the docker images for the controller, extproc and testupstream for the e2e tests. diff --git a/cmd/dynamic_module/main.go b/cmd/dynamic_module/main.go new file mode 100644 index 000000000..d2e5f77af --- /dev/null +++ b/cmd/dynamic_module/main.go @@ -0,0 +1,114 @@ +// Copyright Envoy AI Gateway Authors +// SPDX-License-Identifier: Apache-2.0 +// The full text of the Apache license is available in the LICENSE file at +// the root of the repo. + +package main + +import ( + "context" + "fmt" + "log/slog" + "os" + "time" + + "github.com/prometheus/client_golang/prometheus" + otelprom "go.opentelemetry.io/otel/exporters/prometheus" + + "github.com/envoyproxy/ai-gateway/internal/backendauth" + "github.com/envoyproxy/ai-gateway/internal/dynamicmodule" + "github.com/envoyproxy/ai-gateway/internal/dynamicmodule/sdk" + "github.com/envoyproxy/ai-gateway/internal/filterapi" + "github.com/envoyproxy/ai-gateway/internal/internalapi" + "github.com/envoyproxy/ai-gateway/internal/metrics" +) + +func main() {} // This must be present to make a shared library. + +// Set the envoy.NewHTTPFilter function to create a new http filter. +func init() { + g := &globalState{} + if err := g.initializeEnv(); err != nil { + panic("failed to create env config: " + err.Error()) + } + + // TODO: use a writer implemented with the Logger ABI of Envoy. + logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{ + Level: slog.LevelDebug, // Adjust log level from environment variable if needed. + })) + if err := filterapi.StartConfigWatcher(context.Background(), + os.Getenv("AI_GATEWAY_DYNAMIC_MODULE_FILTER_CONFIG_PATH"), g, logger, time.Second*5); err != nil { + panic("failed to start filter config watcher: " + err.Error()) + } + sdk.NewHTTPFilterConfig = g.newHTTPFilterConfig +} + +// globalState implements [filterapi.ConfigReceiver] to load filter configuration. +type globalState struct { + fc *filterapi.RuntimeConfig + env *dynamicmodule.Env +} + +// newHTTPFilterConfig creates a new http filter based on the config. +// +// `config` is the configuration string that is specified in the Envoy configuration. +func (g *globalState) newHTTPFilterConfig(name string, _ []byte) sdk.HTTPFilterConfig { + switch name { + case "ai_gateway.router": + return dynamicmodule.NewRouterFilterConfig(g.env, &g.fc) + case "ai_gateway.upstream": + return dynamicmodule.NewUpstreamFilterConfig(g.env) + default: + panic("unknown filter: " + name) + } +} + +// LoadConfig implements [filterapi.ConfigReceiver.LoadConfig]. +func (g *globalState) LoadConfig(ctx context.Context, config *filterapi.Config) error { + newConfig, err := filterapi.NewRuntimeConfig(ctx, config, backendauth.NewHandler) + if err != nil { + return fmt.Errorf("cannot create runtime filter config: %w", err) + } + g.fc = newConfig // This is racy but we don't care. + return nil +} + +func (g *globalState) initializeEnv() error { + ctx := context.Background() + promRegistry := prometheus.NewRegistry() + promReader, err := otelprom.New(otelprom.WithRegisterer(promRegistry)) + if err != nil { + return fmt.Errorf("failed to create prometheus reader: %w", err) + } + + meter, _, err := metrics.NewMeterFromEnv(ctx, os.Stdout, promReader) + if err != nil { + return fmt.Errorf("failed to create metrics: %w", err) + } + + endpointPrefixes, err := internalapi.ParseEndpointPrefixes(os.Getenv( + "AI_GATEWAY_DYNAMIC_MODULE_FILTER_ENDPOINT_PREFIXES", + )) + if err != nil { + return fmt.Errorf("failed to parse endpoint prefixes: %w", err) + } + + metricsRequestHeaderAttributes, err := internalapi.ParseRequestHeaderAttributeMapping(os.Getenv( + "AI_GATEWAY_DYNAMIC_MODULE_FILTER_METRICS_REQUEST_HEADER_ATTRIBUTES", + )) + if err != nil { + return fmt.Errorf("failed to parse metrics header mapping: %w", err) + } + + g.env = &dynamicmodule.Env{ + RootPrefix: os.Getenv("AI_GATEWAY_DYNAMIC_MODULE_ROOT_PREFIX"), + EndpointPrefixes: endpointPrefixes, + ChatCompletionMetricsFactory: metrics.NewMetricsFactory(meter, metricsRequestHeaderAttributes, metrics.GenAIOperationChat), + MessagesMetricsFactory: metrics.NewMetricsFactory(meter, metricsRequestHeaderAttributes, metrics.GenAIOperationMessages), + CompletionMetricsFactory: metrics.NewMetricsFactory(meter, metricsRequestHeaderAttributes, metrics.GenAIOperationCompletion), + EmbeddingsMetricsFactory: metrics.NewMetricsFactory(meter, metricsRequestHeaderAttributes, metrics.GenAIOperationEmbedding), + ImageGenerationMetricsFactory: metrics.NewMetricsFactory(meter, metricsRequestHeaderAttributes, metrics.GenAIOperationImageGeneration), + RerankMetricsFactory: metrics.NewMetricsFactory(meter, metricsRequestHeaderAttributes, metrics.GenAIOperationRerank), + } + return nil +} diff --git a/internal/dynamicmodule/dynamic_module.go b/internal/dynamicmodule/dynamic_module.go new file mode 100644 index 000000000..650d0d99b --- /dev/null +++ b/internal/dynamicmodule/dynamic_module.go @@ -0,0 +1,43 @@ +// Copyright Envoy AI Gateway Authors +// SPDX-License-Identifier: Apache-2.0 +// The full text of the Apache license is available in the LICENSE file at +// the root of the repo. + +package dynamicmodule + +import ( + "github.com/envoyproxy/ai-gateway/internal/internalapi" + "github.com/envoyproxy/ai-gateway/internal/metrics" +) + +// endpoint represents the type of the endpoint that the request is targeting. +type endpoint int + +const ( + // chatCompletionsEndpoint represents the /v1/chat/completions endpoint. + chatCompletionsEndpoint endpoint = iota + // completionsEndpoint represents the /v1/completions endpoint. + completionsEndpoint + // embeddingsEndpoint represents the /v1/embeddings endpoint. + embeddingsEndpoint + // imagesGenerationsEndpoint represents the /v1/images/generations endpoint. + imagesGenerationsEndpoint + // rerankEndpoint represents the /v2/rerank endpoint of cohere. + rerankEndpoint + // messagesEndpoint represents the /v1/messages endpoint of anthropic. + messagesEndpoint + // modelsEndpoint represents the /v1/models endpoint. + modelsEndpoint +) + +// Env holds the environment configuration for the dynamic module that is process-wide. +type Env struct { + RootPrefix string + EndpointPrefixes internalapi.EndpointPrefixes + ChatCompletionMetricsFactory, + MessagesMetricsFactory, + CompletionMetricsFactory, + EmbeddingsMetricsFactory, + ImageGenerationMetricsFactory, + RerankMetricsFactory metrics.Factory +} diff --git a/internal/dynamicmodule/router_filter.go b/internal/dynamicmodule/router_filter.go new file mode 100644 index 000000000..a34b97fe4 --- /dev/null +++ b/internal/dynamicmodule/router_filter.go @@ -0,0 +1,194 @@ +// Copyright Envoy AI Gateway Authors +// SPDX-License-Identifier: Apache-2.0 +// The full text of the Apache license is available in the LICENSE file at +// the root of the repo. + +package dynamicmodule + +import ( + "encoding/json" + "fmt" + "io" + "path" + "strings" + "unsafe" + + openaisdk "github.com/openai/openai-go/v2" + + "github.com/envoyproxy/ai-gateway/internal/apischema/anthropic" + cohereschema "github.com/envoyproxy/ai-gateway/internal/apischema/cohere" + "github.com/envoyproxy/ai-gateway/internal/apischema/openai" + "github.com/envoyproxy/ai-gateway/internal/dynamicmodule/sdk" + "github.com/envoyproxy/ai-gateway/internal/filterapi" + "github.com/envoyproxy/ai-gateway/internal/internalapi" +) + +const routerFilterPointerDynamicMetadataKey = "router_filter_pointer" + +type ( + // routerFilterConfig implements [sdk.HTTPFilterConfig]. + // + // This is mostly for debugging purposes, it does not do anything except + // setting a response header with the version of the dynamic module. + routerFilterConfig struct { + fcr **filterapi.RuntimeConfig + prefixToEndpoint map[string]endpoint + } + // routerFilter implements [sdk.HTTPFilter]. + routerFilter struct { + routerFilterConfig *routerFilterConfig + runtimeFilterConfig *filterapi.RuntimeConfig + endpoint endpoint + originalHeaders map[string]string + originalRequestBody any + originalRequestBodyRaw []byte + span any + attemptCount int + } +) + +// NewRouterFilterConfig creates a new instance of an implementation of [sdk.HTTPFilterConfig] for the router filter. +func NewRouterFilterConfig(env *Env, fcr **filterapi.RuntimeConfig) sdk.HTTPFilterConfig { + prefixToEndpoint := map[string]endpoint{ + path.Join(env.RootPrefix, env.EndpointPrefixes.OpenAI, "/v1/chat/completions"): chatCompletionsEndpoint, + path.Join(env.RootPrefix, env.EndpointPrefixes.OpenAI, "/v1/completions"): completionsEndpoint, + path.Join(env.RootPrefix, env.EndpointPrefixes.OpenAI, "/v1/embeddings"): embeddingsEndpoint, + path.Join(env.RootPrefix, env.EndpointPrefixes.OpenAI, "/v1/images/generations"): imagesGenerationsEndpoint, + path.Join(env.RootPrefix, env.EndpointPrefixes.Cohere, "/v2/rerank"): rerankEndpoint, + path.Join(env.RootPrefix, env.EndpointPrefixes.OpenAI, "/v1/models"): modelsEndpoint, + path.Join(env.RootPrefix, env.EndpointPrefixes.Anthropic, "/v1/messages"): messagesEndpoint, + } + return &routerFilterConfig{ + fcr: fcr, + prefixToEndpoint: prefixToEndpoint, + } +} + +// NewFilter implements [sdk.HTTPFilterConfig]. +func (f *routerFilterConfig) NewFilter() sdk.HTTPFilter { + return &routerFilter{routerFilterConfig: f, runtimeFilterConfig: *f.fcr} +} + +// RequestHeaders implements [sdk.HTTPFilter]. +func (f *routerFilter) RequestHeaders(e sdk.EnvoyHTTPFilter, _ bool) sdk.RequestHeadersStatus { + p, _ := e.GetRequestHeader(":path") // The :path pseudo header is always present. + // Strip query parameters for processor lookup. + if queryIndex := strings.Index(p, "?"); queryIndex != -1 { + p = p[:queryIndex] + } + ep, ok := f.routerFilterConfig.prefixToEndpoint[p] + if !ok { + e.SendLocalReply(404, nil, []byte(fmt.Sprintf("unsupported path: %s", p))) + return sdk.RequestHeadersStatusStopIteration + } + f.endpoint = ep + if f.endpoint == modelsEndpoint { + return f.handleModelsEndpoint(e) + } + return sdk.RequestHeadersStatusContinue +} + +// RequestBody implements [sdk.HTTPFilter]. +func (f *routerFilter) RequestBody(e sdk.EnvoyHTTPFilter, endOfStream bool) sdk.RequestBodyStatus { + if !endOfStream { + return sdk.RequestBodyStatusStopIterationAndBuffer + } + b, ok := e.GetRequestBody() + if !ok { + e.SendLocalReply(400, nil, []byte("failed to read request body")) + return sdk.RequestBodyStatusStopIterationAndBuffer + } + raw, err := io.ReadAll(b) + if err != nil { + e.SendLocalReply(400, nil, []byte("failed to read request body: "+err.Error())) + return sdk.RequestBodyStatusStopIterationAndBuffer + } + f.originalRequestBodyRaw = raw + var parsed any + var modelName string + switch f.endpoint { + case chatCompletionsEndpoint: + parsed, modelName, err = parseBodyWithModel(raw, func(req *openai.ChatCompletionRequest) string { return req.Model }) + case completionsEndpoint: + parsed, modelName, err = parseBodyWithModel(raw, func(req *openai.CompletionRequest) string { return req.Model }) + case embeddingsEndpoint: + parsed, modelName, err = parseBodyWithModel(raw, func(req *openai.EmbeddingRequest) string { return req.Model }) + case imagesGenerationsEndpoint: + parsed, modelName, err = parseBodyWithModel(raw, func(req *openaisdk.ImageGenerateParams) string { return req.Model }) + case rerankEndpoint: + parsed, modelName, err = parseBodyWithModel(raw, func(req *cohereschema.RerankV2Request) string { return req.Model }) + case messagesEndpoint: + parsed, modelName, err = parseBodyWithModel(raw, func(req *anthropic.MessagesRequest) string { return req.GetModel() }) + default: + e.SendLocalReply(500, nil, []byte("BUG: unsupported endpoint at body parsing: "+fmt.Sprintf("%d", f.endpoint))) + } + if err != nil { + e.SendLocalReply(400, nil, []byte("failed to parse request body: "+err.Error())) + return sdk.RequestBodyStatusStopIterationAndBuffer + } + f.originalRequestBody = parsed + if !e.SetRequestHeader(internalapi.ModelNameHeaderKeyDefault, []byte(modelName)) { + e.SendLocalReply(500, nil, []byte("failed to set model name header")) + return sdk.RequestBodyStatusStopIterationAndBuffer + } + // Store the pointer to the filter in dynamic metadata for later retrieval in the upstream filter. + e.SetDynamicMetadataString(internalapi.AIGatewayFilterMetadataNamespace, routerFilterPointerDynamicMetadataKey, + fmt.Sprintf("%d", uintptr(unsafe.Pointer(f)))) + + f.originalHeaders = multiValueHeadersToSingleValue(e.GetRequestHeaders()) + return sdk.RequestBodyStatusContinue +} + +// ResponseHeaders implements [sdk.HTTPFilter]. +func (f *routerFilter) ResponseHeaders(sdk.EnvoyHTTPFilter, bool) sdk.ResponseHeadersStatus { + return sdk.ResponseHeadersStatusContinue +} + +// ResponseBody implements [sdk.HTTPFilter]. +func (f *routerFilter) ResponseBody(sdk.EnvoyHTTPFilter, bool) sdk.ResponseBodyStatus { + return sdk.ResponseBodyStatusContinue +} + +// handleModelsEndpoint handles the /v1/models endpoint by returning the list of declared models in the filter configuration. +// +// This is called on request headers phase. +func (f *routerFilter) handleModelsEndpoint(e sdk.EnvoyHTTPFilter) sdk.RequestHeadersStatus { + config := f.runtimeFilterConfig + models := openai.ModelList{ + Object: "list", + Data: make([]openai.Model, 0, len(config.DeclaredModels)), + } + for _, m := range config.DeclaredModels { + models.Data = append(models.Data, openai.Model{ + ID: m.Name, + Object: "model", + OwnedBy: m.OwnedBy, + Created: openai.JSONUNIXTime(m.CreatedAt), + }) + } + + body, _ := json.Marshal(models) + e.SendLocalReply(200, [][2]string{ + {"content-type", "application/json"}, + }, body) + return sdk.RequestHeadersStatusStopIteration +} + +func parseBodyWithModel[T any](body []byte, modelExtractFn func(req *T) string) (interface{}, string, error) { + var req T + if err := json.Unmarshal(body, &req); err != nil { + return nil, "", fmt.Errorf("failed to unmarshal body: %w", err) + } + return req, modelExtractFn(&req), nil +} + +// multiValueHeadersToSingleValue converts a map of headers with multiple values to a map of headers with single values by taking the first value for each header. +// +// TODO: this is purely for feature parity with the old filter where we ignore the case of multiple header values. +func multiValueHeadersToSingleValue(headers map[string][]string) map[string]string { + singleValueHeaders := make(map[string]string, len(headers)) + for k, v := range headers { + singleValueHeaders[k] = v[0] + } + return singleValueHeaders +} diff --git a/internal/dynamicmodule/sdk/abi_envoy_v1.36.go b/internal/dynamicmodule/sdk/abi_envoy_v1.36.go new file mode 100644 index 000000000..78afbf1fd --- /dev/null +++ b/internal/dynamicmodule/sdk/abi_envoy_v1.36.go @@ -0,0 +1,642 @@ +// Copyright Envoy AI Gateway Authors +// SPDX-License-Identifier: Apache-2.0 +// The full text of the Apache license is available in the LICENSE file at +// the root of the repo. + +package sdk + +// Following is a distillation of the Envoy ABI for dynamic modules: +// https://github.com/envoyproxy/envoy/blob/dc2d3098ae5641555f15c71d5bb5ce0060a8015c/source/extensions/dynamic_modules/abi.h +// +// Why not using the header file directly? That is because Go runtime complains +// about passing pointers to C code on the boundary. In the following code, we replace +// all the pointers with uintptr_t instread of *char. At the end of the day, what we +// need from the header is declarations of callbacks, not event hooks, so it won't be that hard to maintain. + +/* +#include +#include +#include + +#cgo noescape envoy_dynamic_module_callback_http_get_request_header +#cgo nocallback envoy_dynamic_module_callback_http_get_request_header +size_t envoy_dynamic_module_callback_http_get_request_header( + uintptr_t filter_envoy_ptr, + uintptr_t key, size_t key_length, + uintptr_t* result_buffer_ptr, size_t* result_buffer_length_ptr, + size_t index); + +#cgo noescape envoy_dynamic_module_callback_http_set_request_header +#cgo nocallback envoy_dynamic_module_callback_http_set_request_header +bool envoy_dynamic_module_callback_http_set_request_header( + uintptr_t filter_envoy_ptr, + uintptr_t key, size_t key_length, + uintptr_t value, size_t value_length); + +#cgo noescape envoy_dynamic_module_callback_http_get_response_header +#cgo nocallback envoy_dynamic_module_callback_http_get_response_header +size_t envoy_dynamic_module_callback_http_get_response_header( + uintptr_t filter_envoy_ptr, + uintptr_t key, size_t key_length, + uintptr_t* result_buffer_ptr, size_t* result_buffer_length_ptr, + size_t index); + +#cgo noescape envoy_dynamic_module_callback_http_set_response_header +#cgo nocallback envoy_dynamic_module_callback_http_set_response_header +bool envoy_dynamic_module_callback_http_set_response_header( + uintptr_t filter_envoy_ptr, + uintptr_t key, size_t key_length, + uintptr_t value, size_t value_length); + +#cgo noescape envoy_dynamic_module_callback_http_append_request_body +#cgo nocallback envoy_dynamic_module_callback_http_append_request_body +bool envoy_dynamic_module_callback_http_append_request_body( + uintptr_t filter_envoy_ptr, + uintptr_t data, size_t length); + +#cgo noescape envoy_dynamic_module_callback_http_drain_request_body +#cgo nocallback envoy_dynamic_module_callback_http_drain_request_body +bool envoy_dynamic_module_callback_http_drain_request_body( + uintptr_t filter_envoy_ptr, + size_t length); + +#cgo noescape envoy_dynamic_module_callback_http_get_request_body_vector +#cgo nocallback envoy_dynamic_module_callback_http_get_request_body_vector +bool envoy_dynamic_module_callback_http_get_request_body_vector( + uintptr_t filter_envoy_ptr, + uintptr_t* result_buffer_vector); + +#cgo noescape envoy_dynamic_module_callback_http_get_request_body_vector_size +#cgo nocallback envoy_dynamic_module_callback_http_get_request_body_vector_size +bool envoy_dynamic_module_callback_http_get_request_body_vector_size( + uintptr_t filter_envoy_ptr, size_t* size); + +#cgo noescape envoy_dynamic_module_callback_http_append_response_body +#cgo nocallback envoy_dynamic_module_callback_http_append_response_body +bool envoy_dynamic_module_callback_http_append_response_body( + uintptr_t filter_envoy_ptr, + uintptr_t data, size_t length); + +#cgo noescape envoy_dynamic_module_callback_http_drain_response_body +#cgo nocallback envoy_dynamic_module_callback_http_drain_response_body +bool envoy_dynamic_module_callback_http_drain_response_body( + uintptr_t filter_envoy_ptr, + size_t length); + +#cgo noescape envoy_dynamic_module_callback_http_get_response_body_vector +#cgo nocallback envoy_dynamic_module_callback_http_get_response_body_vector +bool envoy_dynamic_module_callback_http_get_response_body_vector( + uintptr_t filter_envoy_ptr, + uintptr_t* result_buffer_vector); + +#cgo noescape envoy_dynamic_module_callback_http_get_response_body_vector_size +#cgo nocallback envoy_dynamic_module_callback_http_get_response_body_vector_size +bool envoy_dynamic_module_callback_http_get_response_body_vector_size( + uintptr_t filter_envoy_ptr, size_t* size); + +#cgo noescape envoy_dynamic_module_callback_http_send_response +// Uncomment once https://github.com/envoyproxy/envoy/pull/39206 is merged. +// #cgo nocallback envoy_dynamic_module_callback_http_send_response +void envoy_dynamic_module_callback_http_send_response( + uintptr_t filter_envoy_ptr, uint32_t status_code, + uintptr_t headers_vector, size_t headers_vector_size, + uintptr_t body, size_t body_length); + +#cgo noescape envoy_dynamic_module_callback_http_get_request_headers_count +#cgo nocallback envoy_dynamic_module_callback_http_get_request_headers_count +size_t envoy_dynamic_module_callback_http_get_request_headers_count( + uintptr_t filter_envoy_ptr); + +#cgo noescape envoy_dynamic_module_callback_http_get_request_headers +#cgo nocallback envoy_dynamic_module_callback_http_get_request_headers +bool envoy_dynamic_module_callback_http_get_request_headers( + uintptr_t filter_envoy_ptr, + uintptr_t* result_headers); + +#cgo noescape envoy_dynamic_module_callback_http_get_request_headers_count +#cgo nocallback envoy_dynamic_module_callback_http_get_request_headers_count +size_t envoy_dynamic_module_callback_http_get_response_headers_count( + uintptr_t filter_envoy_ptr); + +#cgo noescape envoy_dynamic_module_callback_http_get_request_headers +#cgo nocallback envoy_dynamic_module_callback_http_get_request_headers +bool envoy_dynamic_module_callback_http_get_response_headers( + uintptr_t filter_envoy_ptr, + uintptr_t* result_headers); + +#cgo noescape envoy_dynamic_module_callback_http_set_dynamic_metadata_string +#cgo nocallback envoy_dynamic_module_callback_http_set_dynamic_metadata_string +bool envoy_dynamic_module_callback_http_set_dynamic_metadata_string( + uintptr_t filter_envoy_ptr, + uintptr_t namespace_ptr, size_t namespace_size, + uintptr_t key_ptr, size_t key_size, + uintptr_t value_ptr, size_t value_size); + +#cgo noescape envoy_dynamic_module_callback_http_get_metadata_string +#cgo nocallback envoy_dynamic_module_callback_http_get_metadata_string +bool envoy_dynamic_module_callback_http_get_metadata_string( + uintptr_t filter_envoy_ptr, + uint32_t source, + uintptr_t namespace_ptr, size_t namespace_size, + uintptr_t key_ptr, size_t key_size, + uintptr_t* result_ptr, size_t* result_size); +*/ +import "C" + +import ( + "io" + "runtime" + "unsafe" +) + +// https://github.com/envoyproxy/envoy/blob/dc2d3098ae5641555f15c71d5bb5ce0060a8015c/source/extensions/dynamic_modules/abi_version.h +var version = append([]byte("ca2be3b80954d2a0e22b41d033b18eff9390c30261c8ec9ffe6e6bf971f41c27"), 0) + +//export envoy_dynamic_module_on_program_init +func envoy_dynamic_module_on_program_init() uintptr { + return uintptr(unsafe.Pointer(&version[0])) +} + +//export envoy_dynamic_module_on_http_filter_config_new +func envoy_dynamic_module_on_http_filter_config_new( + _ uintptr, + namePtr *C.char, + nameSize C.size_t, + configPtr *C.char, + configSize C.size_t, +) uintptr { + name := C.GoStringN(namePtr, C.int(nameSize)) + config := C.GoBytes(unsafe.Pointer(configPtr), C.int(configSize)) + filterConfig := NewHTTPFilterConfig(name, config) + if filterConfig == nil { + return 0 + } + // Pin the filter config to the memory manager. + pinnedFilterConfig := memManager.pinHTTPFilterConfig(filterConfig) + return uintptr(unsafe.Pointer(pinnedFilterConfig)) +} + +//export envoy_dynamic_module_on_http_filter_config_destroy +func envoy_dynamic_module_on_http_filter_config_destroy(ptr uintptr) { + pinnedFilterConfig := unwrapPinnedHTTPFilterConfig(ptr) + memManager.unpinHTTPFilterConfig(pinnedFilterConfig) +} + +//export envoy_dynamic_module_on_http_filter_new +func envoy_dynamic_module_on_http_filter_new( + filterConfigPtr uintptr, + _ uintptr, +) uintptr { + pinnedFilterConfig := unwrapPinnedHTTPFilterConfig(filterConfigPtr) + filterConfig := pinnedFilterConfig.obj + filter := filterConfig.NewFilter() + if filter == nil { + return 0 + } + // Pin the filter to the memory manager. + pinned := memManager.pinHTTPFilter(filter) + // Return the pinned filter. + return uintptr(unsafe.Pointer(pinned)) +} + +//export envoy_dynamic_module_on_http_filter_destroy +func envoy_dynamic_module_on_http_filter_destroy( + filterPtr uintptr, +) { + pinned := unwrapPinnedHTTPFilter(filterPtr) + // Unpin the filter from the memory manager. + memManager.unpinHTTPFilter(pinned) +} + +//export envoy_dynamic_module_on_http_filter_request_headers +func envoy_dynamic_module_on_http_filter_request_headers( + filterEnvoyPtr uintptr, + filterModulePtr uintptr, + endOfStream bool, +) uintptr { + pinned := unwrapPinnedHTTPFilter(filterModulePtr) + status := pinned.obj.RequestHeaders(envoyFilter{raw: filterEnvoyPtr}, endOfStream) + return uintptr(status) +} + +//export envoy_dynamic_module_on_http_filter_request_body +func envoy_dynamic_module_on_http_filter_request_body( + filterEnvoyPtr uintptr, + filterModulePtr uintptr, + endOfStream bool, +) uintptr { + pinned := unwrapPinnedHTTPFilter(filterModulePtr) + status := pinned.obj.RequestBody(envoyFilter{raw: filterEnvoyPtr}, endOfStream) + return uintptr(status) +} + +//export envoy_dynamic_module_on_http_filter_request_trailers +func envoy_dynamic_module_on_http_filter_request_trailers(uintptr, uintptr) uintptr { + return 0 +} + +//export envoy_dynamic_module_on_http_filter_response_headers +func envoy_dynamic_module_on_http_filter_response_headers( + filterEnvoyPtr uintptr, + filterModulePtr uintptr, + endOfStream bool, +) uintptr { + pinned := unwrapPinnedHTTPFilter(filterModulePtr) + status := pinned.obj.ResponseHeaders(envoyFilter{raw: filterEnvoyPtr}, endOfStream) + return uintptr(status) +} + +//export envoy_dynamic_module_on_http_filter_response_body +func envoy_dynamic_module_on_http_filter_response_body( + filterEnvoyPtr uintptr, + filterModulePtr uintptr, + endOfStream bool, +) uintptr { + pinned := unwrapPinnedHTTPFilter(filterModulePtr) + status := pinned.obj.ResponseBody(envoyFilter{raw: filterEnvoyPtr}, endOfStream) + return uintptr(status) +} + +//export envoy_dynamic_module_on_http_filter_response_trailers +func envoy_dynamic_module_on_http_filter_response_trailers(uintptr, uintptr) uintptr { + return 0 +} + +//export envoy_dynamic_module_on_http_filter_stream_complete +func envoy_dynamic_module_on_http_filter_stream_complete(uintptr, uintptr) { +} + +//export envoy_dynamic_module_on_http_filter_http_callout_done +func envoy_dynamic_module_on_http_filter_http_callout_done( + filterEnvoyPtr uintptr, + filterModulePtr uintptr, + calloutID C.uint32_t, + result C.uint32_t, + headersPtr uintptr, + headersSize C.size_t, + bodyVectorPtr uintptr, + bodyVectorSize C.size_t, +) { + _ = filterEnvoyPtr + _ = filterModulePtr + _ = calloutID + _ = result + _ = headersPtr + _ = headersSize + _ = bodyVectorPtr + _ = bodyVectorSize +} + +//export envoy_dynamic_module_on_http_filter_scheduled +func envoy_dynamic_module_on_http_filter_scheduled( + filterEnvoyPtr uintptr, + filterModulePtr uintptr, + eventID C.uint64_t, +) { + _ = filterEnvoyPtr + _ = filterModulePtr + _ = eventID +} + +// GetRequestHeader implements [EnvoyHTTPFilter]. +func (e envoyFilter) GetRequestHeader(key string) (string, bool) { + keyPtr := uintptr(unsafe.Pointer(unsafe.StringData(key))) + var resultBufferPtr *byte + var resultBufferLengthPtr C.size_t + + ret := C.envoy_dynamic_module_callback_http_get_request_header( + C.uintptr_t(e.raw), + C.uintptr_t(keyPtr), + C.size_t(len(key)), + (*C.uintptr_t)(unsafe.Pointer(&resultBufferPtr)), + (*C.size_t)(unsafe.Pointer(&resultBufferLengthPtr)), + 0, + ) + + if ret == 0 { + return "", false + } + + result := unsafe.Slice(resultBufferPtr, resultBufferLengthPtr) + runtime.KeepAlive(key) + return string(result), true +} + +// GetResponseHeader implements [EnvoyHTTPFilter]. +func (e envoyFilter) GetResponseHeader(key string) (string, bool) { + keyPtr := uintptr(unsafe.Pointer(unsafe.StringData(key))) + var resultBufferPtr *byte + var resultBufferLengthPtr C.size_t + + ret := C.envoy_dynamic_module_callback_http_get_response_header( + C.uintptr_t(e.raw), + C.uintptr_t(keyPtr), + C.size_t(len(key)), + (*C.uintptr_t)(unsafe.Pointer(&resultBufferPtr)), + (*C.size_t)(unsafe.Pointer(&resultBufferLengthPtr)), + 0, + ) + + if ret == 0 { + return "", false + } + + result := unsafe.Slice(resultBufferPtr, resultBufferLengthPtr) + runtime.KeepAlive(key) + return string(result), true +} + +// SetRequestHeader implements [EnvoyHTTPFilter]. +func (e envoyFilter) SetRequestHeader(key string, value []byte) bool { + keyPtr := uintptr(unsafe.Pointer(unsafe.StringData(key))) + valuePtr := uintptr(unsafe.Pointer(unsafe.SliceData(value))) + + ret := C.envoy_dynamic_module_callback_http_set_request_header( + C.uintptr_t(e.raw), + C.uintptr_t(keyPtr), + C.size_t(len(key)), + C.uintptr_t(valuePtr), + C.size_t(len(value)), + ) + + runtime.KeepAlive(key) + runtime.KeepAlive(value) + return bool(ret) +} + +// SetResponseHeader implements [EnvoyHTTPFilter]. +func (e envoyFilter) SetResponseHeader(key string, value []byte) bool { + keyPtr := uintptr(unsafe.Pointer(unsafe.StringData(key))) + valuePtr := uintptr(unsafe.Pointer(unsafe.SliceData(value))) + + ret := C.envoy_dynamic_module_callback_http_set_response_header( + C.uintptr_t(e.raw), + C.uintptr_t(keyPtr), + C.size_t(len(key)), + C.uintptr_t(valuePtr), + C.size_t(len(value)), + ) + + runtime.KeepAlive(key) + runtime.KeepAlive(value) + return bool(ret) +} + +// bodyReader implements [io.Reader] for the request or response body. +type bodyReader struct { + chunks []envoySlice + index, offset int +} + +// Read implements [io.Reader]. +func (b *bodyReader) Read(p []byte) (n int, err error) { + if b.index >= len(b.chunks) { + return 0, io.EOF + } + + chunk := b.chunks[b.index] + if b.offset >= int(chunk.length) { + b.index++ + b.offset = 0 + if b.index >= len(b.chunks) { + return 0, io.EOF + } + chunk = b.chunks[b.index] + } + + n = copy(p, unsafe.Slice((*byte)(unsafe.Pointer(chunk.data)), chunk.length)[b.offset:]) + b.offset += n + return n, nil +} + +// Len implements [BodyReader]. +// +// This returns the length of the body in bytes, regardless of how much has been read. +func (b *bodyReader) Len() int { + total := 0 + for _, chunk := range b.chunks { + total += int(chunk.length) + } + return total +} + +type envoySlice struct { + data uintptr + length C.size_t +} + +// envoyFilter implements [EnvoyHTTPFilter]. +type envoyFilter struct{ raw uintptr } + +// GetRequestHeaders implements EnvoyHTTPFilter. +func (e envoyFilter) GetRequestHeaders() map[string][]string { + count := C.envoy_dynamic_module_callback_http_get_request_headers_count(C.uintptr_t(e.raw)) + raw := make([][2]envoySlice, count) + ret := C.envoy_dynamic_module_callback_http_get_request_headers( + C.uintptr_t(e.raw), + (*C.uintptr_t)(unsafe.Pointer(&raw[0])), + ) + if !ret { + return nil + } + // Copy the headers to a Go slice. + headers := make(map[string][]string, count) // The count is the number of (key, value) pairs, so this might be larger than the number of unique names. + for i := range count { + // Copy the Envoy owner data to a Go string. + key := string(unsafe.Slice((*byte)(unsafe.Pointer(raw[i][0].data)), raw[i][0].length)) + value := string(unsafe.Slice((*byte)(unsafe.Pointer(raw[i][1].data)), raw[i][1].length)) + headers[key] = append(headers[key], value) + } + return headers +} + +// GetResponseHeaders implements [EnvoyHTTPFilter]. +func (e envoyFilter) GetResponseHeaders() map[string][]string { + count := C.envoy_dynamic_module_callback_http_get_response_headers_count(C.uintptr_t(e.raw)) + raw := make([][2]envoySlice, count) + ret := C.envoy_dynamic_module_callback_http_get_response_headers( + C.uintptr_t(e.raw), + (*C.uintptr_t)(unsafe.Pointer(&raw[0])), + ) + if !ret { + return nil + } + // Copy the headers to a Go slice. + headers := make(map[string][]string, count) // The count is the number of (key, value) pairs, so this might be larger than the number of unique names. + for i := range count { + // Copy the Envoy owner data to a Go string. + key := string(unsafe.Slice((*byte)(unsafe.Pointer(raw[i][0].data)), raw[i][0].length)) + value := string(unsafe.Slice((*byte)(unsafe.Pointer(raw[i][1].data)), raw[i][1].length)) + headers[key] = append(headers[key], value) + } + return headers +} + +// SendLocalReply implements EnvoyHTTPFilter. +func (e envoyFilter) SendLocalReply(statusCode uint32, headers [][2]string, body []byte) { + headersVecPtr := uintptr(unsafe.Pointer(unsafe.SliceData(headers))) + headersVecSize := len(headers) + bodyPtr := uintptr(unsafe.Pointer(unsafe.SliceData(body))) + bodySize := len(body) + C.envoy_dynamic_module_callback_http_send_response( + C.uintptr_t(e.raw), + C.uint32_t(statusCode), + C.uintptr_t(headersVecPtr), + C.size_t(headersVecSize), + C.uintptr_t(bodyPtr), + C.size_t(bodySize), + ) + runtime.KeepAlive(headers) + runtime.KeepAlive(body) +} + +// AppendRequestBody implements [EnvoyHTTPFilter]. +func (e envoyFilter) AppendRequestBody(data []byte) bool { + dataPtr := uintptr(unsafe.Pointer(unsafe.SliceData(data))) + ret := C.envoy_dynamic_module_callback_http_append_request_body( + C.uintptr_t(e.raw), + C.uintptr_t(dataPtr), + C.size_t(len(data)), + ) + runtime.KeepAlive(data) + return bool(ret) +} + +// DrainRequestBody implements [EnvoyHTTPFilter]. +func (e envoyFilter) DrainRequestBody(n int) bool { + ret := C.envoy_dynamic_module_callback_http_drain_request_body( + C.uintptr_t(e.raw), + C.size_t(n), + ) + return bool(ret) +} + +// GetRequestBody implements [EnvoyHTTPFilter]. +func (e envoyFilter) GetRequestBody() (BodyReader, bool) { + var vectorSize int + ret := C.envoy_dynamic_module_callback_http_get_request_body_vector_size( + C.uintptr_t(e.raw), + (*C.size_t)(unsafe.Pointer(&vectorSize)), + ) + if !ret { + return nil, false + } + + chunks := make([]envoySlice, vectorSize) + ret = C.envoy_dynamic_module_callback_http_get_request_body_vector( + C.uintptr_t(e.raw), + (*C.uintptr_t)(unsafe.Pointer(&chunks[0])), + ) + if !ret { + return nil, false + } + return &bodyReader{chunks: chunks}, true +} + +// AppendResponseBody implements [EnvoyHTTPFilter]. +func (e envoyFilter) AppendResponseBody(data []byte) bool { + dataPtr := uintptr(unsafe.Pointer(unsafe.SliceData(data))) + ret := C.envoy_dynamic_module_callback_http_append_response_body( + C.uintptr_t(e.raw), + C.uintptr_t(dataPtr), + C.size_t(len(data)), + ) + runtime.KeepAlive(data) + return bool(ret) +} + +// DrainResponseBody implements [EnvoyHTTPFilter]. +func (e envoyFilter) DrainResponseBody(n int) bool { + ret := C.envoy_dynamic_module_callback_http_drain_response_body( + C.uintptr_t(e.raw), + C.size_t(n), + ) + return bool(ret) +} + +// GetResponseBody implements [EnvoyHTTPFilter]. +func (e envoyFilter) GetResponseBody() (BodyReader, bool) { + var vectorSize int + ret := C.envoy_dynamic_module_callback_http_get_response_body_vector_size( + C.uintptr_t(e.raw), + (*C.size_t)(unsafe.Pointer(&vectorSize)), + ) + if !ret { + return nil, false + } + chunks := make([]envoySlice, vectorSize) + ret = C.envoy_dynamic_module_callback_http_get_response_body_vector( + C.uintptr_t(e.raw), + (*C.uintptr_t)(unsafe.Pointer(&chunks[0])), + ) + if !ret { + return nil, false + } + return &bodyReader{chunks: chunks}, true +} + +// https://github.com/envoyproxy/envoy/blob/dc2d3098ae5641555f15c71d5bb5ce0060a8015c/source/extensions/dynamic_modules/abi.h#L271-L282 +type metadataSource uint32 + +const ( + metadataSourceDynamic metadataSource = 0 + metadataSourceUpstreamHost metadataSource = 3 +) + +// GetDynamicMetadataString implements [EnvoyHTTPFilter]. +func (e envoyFilter) GetDynamicMetadataString(namespace string, key string) (string, bool) { + return e.getMetadataString(namespace, key, metadataSourceDynamic) +} + +// GetUpstreamHostMetadataString implements [EnvoyHTTPFilter]. +func (e envoyFilter) GetUpstreamHostMetadataString(namespace string, key string) (string, bool) { + return e.getMetadataString(namespace, key, metadataSourceUpstreamHost) +} + +func (e envoyFilter) getMetadataString(namespace string, key string, source metadataSource) (string, bool) { + namespacePtr := uintptr(unsafe.Pointer(unsafe.StringData(namespace))) + keyPtr := uintptr(unsafe.Pointer(unsafe.StringData(key))) + var resultBufferPtr *byte + var resultBufferLengthPtr C.size_t + + ret := C.envoy_dynamic_module_callback_http_get_metadata_string( + C.uintptr_t(e.raw), + C.uint32_t(source), + C.uintptr_t(namespacePtr), + C.size_t(len(namespace)), + C.uintptr_t(keyPtr), + C.size_t(len(key)), + (*C.uintptr_t)(unsafe.Pointer(&resultBufferPtr)), + (*C.size_t)(unsafe.Pointer(&resultBufferLengthPtr)), + ) + + if !ret { + return "", false + } + + result := unsafe.Slice(resultBufferPtr, resultBufferLengthPtr) + runtime.KeepAlive(namespace) + runtime.KeepAlive(key) + return string(result), true +} + +// SetDynamicMetadataString implements [EnvoyHTTPFilter]. +func (e envoyFilter) SetDynamicMetadataString(namespace string, key string, value string) bool { + namespacePtr := uintptr(unsafe.Pointer(unsafe.StringData(namespace))) + keyPtr := uintptr(unsafe.Pointer(unsafe.StringData(key))) + valuePtr := uintptr(unsafe.Pointer(unsafe.StringData(value))) + + ret := C.envoy_dynamic_module_callback_http_set_dynamic_metadata_string( + C.uintptr_t(e.raw), + C.uintptr_t(namespacePtr), + C.size_t(len(namespace)), + C.uintptr_t(keyPtr), + C.size_t(len(key)), + C.uintptr_t(valuePtr), + C.size_t(len(value)), + ) + runtime.KeepAlive(namespace) + runtime.KeepAlive(key) + runtime.KeepAlive(value) + return bool(ret) +} diff --git a/internal/dynamicmodule/sdk/cgoflags_darwin.go b/internal/dynamicmodule/sdk/cgoflags_darwin.go new file mode 100644 index 000000000..b12083942 --- /dev/null +++ b/internal/dynamicmodule/sdk/cgoflags_darwin.go @@ -0,0 +1,11 @@ +// Copyright Envoy AI Gateway Authors +// SPDX-License-Identifier: Apache-2.0 +// The full text of the Apache license is available in the LICENSE file at +// the root of the repo. + +//go:build darwin && cgo + +package sdk + +// #cgo LDFLAGS: -Wl,-undefined,dynamic_lookup +import "C" diff --git a/internal/dynamicmodule/sdk/gosdk.go b/internal/dynamicmodule/sdk/gosdk.go new file mode 100644 index 000000000..2d9313586 --- /dev/null +++ b/internal/dynamicmodule/sdk/gosdk.go @@ -0,0 +1,121 @@ +// Copyright Envoy AI Gateway Authors +// SPDX-License-Identifier: Apache-2.0 +// The full text of the Apache license is available in the LICENSE file at +// the root of the repo. + +package sdk + +import "io" + +// NewHTTPFilterConfig is a function that creates a new HTTPFilter that corresponds to each filter configuration in the Envoy filter chain. +// This is a global variable that should be set in the init function in the program once. +// +// The function is called once globally. The function is only called by the main thread, +// so it does not need to be thread-safe. +var NewHTTPFilterConfig func(name string, config []byte) HTTPFilterConfig + +// HTTPFilterConfig is an interface that represents a single http filter in the Envoy filter chain. +// It is used to create HTTPFilter(s) that correspond to each Http request. +// +// This is only created once per module configuration via the NewHTTPFilter function. +type HTTPFilterConfig interface { + // NewFilter is called for each new Http request. + // Note that this must be concurrency-safe as it can be called concurrently for multiple requests. + NewFilter() HTTPFilter +} + +// EnvoyHTTPFilter is an interface that represents the underlying Envoy filter. +// This is passed to each event hook of the HTTPFilter. +// +// **WARNING**: This must not outlive each event hook since there's no guarantee that the EnvoyHTTPFilter will be valid after the event hook is returned. +// To perform the asynchronous operations, use [EnvoyHTTPFilter.NewScheduler] to create a [Scheduler] and perform the operations in a separate Goroutine. +// Then, use the [Scheduler.Commit] method to commit the event to the Envoy filter on the correct worker thread to continue processing the request. +type EnvoyHTTPFilter interface { + // GetRequestHeader gets the first value of the request header. Returns the value and true if the header is found. + GetRequestHeader(key string) (string, bool) + // GetRequestHeaders gets all the request headers. + GetRequestHeaders() map[string][]string + // SetRequestHeader sets the request header. Returns true if the header is set successfully. + SetRequestHeader(key string, value []byte) bool + // GetResponseHeader gets the first value of the response header. Returns the value and true if the header is found. + GetResponseHeader(key string) (string, bool) + // GetResponseHeaders gets all the response headers. + GetResponseHeaders() map[string][]string + // SetResponseHeader sets the response header. Returns true if the header is set successfully. + SetResponseHeader(key string, value []byte) bool + // GetRequestBody gets the request body. Returns the io.Reader and true if the body is found. + GetRequestBody() (BodyReader, bool) + // DrainRequestBody drains n bytes from the request body. This will invalidate the io.Reader returned by GetRequestBody before this is called. + DrainRequestBody(n int) bool + // AppendRequestBody appends the data to the request body. This will invalidate the io.Reader returned by GetRequestBody before this is called. + AppendRequestBody(data []byte) bool + // GetResponseBody gets the response body. Returns the io.Reader and true if the body is found. + GetResponseBody() (BodyReader, bool) + // DrainResponseBody drains n bytes from the response body. This will invalidate the io.Reader returned by GetResponseBody before this is called. + DrainResponseBody(n int) bool + // AppendResponseBody appends the data to the response body. This will invalidate the io.Reader returned by GetResponseBody before this is called. + AppendResponseBody(data []byte) bool + // SendLocalReply sends a local reply to the client. This must not be used in after returning continue from the response headers phase. + SendLocalReply(statusCode uint32, headers [][2]string, body []byte) + // GetDynamicMetadataString gets the dynamic metadata value for the given namespace and key. Returns the value and true if the value is found. + GetDynamicMetadataString(namespace string, key string) (string, bool) + // GetUpstreamHostMetadataString gets the upstream host metadata value for the given namespace and key. Returns the value and true if the value is found. + GetUpstreamHostMetadataString(namespace string, key string) (string, bool) + // SetDynamicMetadataString sets the dynamic metadata value for the given namespace and key. Returns true if the value is set successfully. + SetDynamicMetadataString(namespace string, key string, value string) bool +} + +type BodyReader interface { + // Len returns the length of the body in bytes, regardless of how much has been read. + Len() int + io.Reader +} + +// HTTPFilter is an interface that represents each Http request. +// +// This is created for each new Http request and is destroyed when the request is completed. +type HTTPFilter interface { + // RequestHeaders is called when the request headers are received. + RequestHeaders(e EnvoyHTTPFilter, endOfStream bool) RequestHeadersStatus + // RequestBody is called when the request body is received. + RequestBody(e EnvoyHTTPFilter, endOfStream bool) RequestBodyStatus + // ResponseHeaders is called when the response headers are received. + ResponseHeaders(e EnvoyHTTPFilter, endOfStream bool) ResponseHeadersStatus + // ResponseBody is called when the response body is received. + ResponseBody(e EnvoyHTTPFilter, endOfStream bool) ResponseBodyStatus +} + +// RequestHeadersStatus is the return value of the HTTPFilter.RequestHeaders. +type RequestHeadersStatus int + +const ( + // RequestHeadersStatusContinue is returned when the operation should continue. + RequestHeadersStatusContinue RequestHeadersStatus = 0 + RequestHeadersStatusStopIteration RequestHeadersStatus = 1 + RequestHeadersStatusStopAllIterationAndBuffer RequestHeadersStatus = 3 +) + +// RequestBodyStatus is the return value of the HTTPFilter.RequestBody event. +type RequestBodyStatus int + +const ( + RequestBodyStatusContinue RequestBodyStatus = 0 + RequestBodyStatusStopIterationAndBuffer RequestBodyStatus = 1 +) + +// ResponseHeadersStatus is the return value of the HTTPFilter.ResponseHeaders event. +type ResponseHeadersStatus int + +const ( + ResponseHeadersStatusContinue ResponseHeadersStatus = 0 + ResponseHeadersStatusStopIteration ResponseHeadersStatus = 1 + ResponseHeadersStatusStopAllIterationAndBuffer ResponseHeadersStatus = 3 +) + +// ResponseBodyStatus is the return value of the HTTPFilter.ResponseBody event. +type ResponseBodyStatus int + +const ( + ResponseBodyStatusContinue ResponseBodyStatus = 0 + ResponseBodyStatusStopIterationAndBuffer ResponseBodyStatus = 1 +) diff --git a/internal/dynamicmodule/sdk/mem.go b/internal/dynamicmodule/sdk/mem.go new file mode 100644 index 000000000..1e727f8b4 --- /dev/null +++ b/internal/dynamicmodule/sdk/mem.go @@ -0,0 +1,116 @@ +// Copyright Envoy AI Gateway Authors +// SPDX-License-Identifier: Apache-2.0 +// The full text of the Apache license is available in the LICENSE file at +// the root of the repo. + +package sdk + +import ( + "sync" + "unsafe" +) + +var memManager memoryManager + +const ( + shardingSize = 1 << 8 + shardingMask = shardingSize - 1 +) + +type ( + // memoryManager manages the heap allocated objects. + // It is used to pin the objects to the heap to avoid them being garbage collected by the Go runtime. + memoryManager struct { + // httpFilterConfigs holds a linked lists of HTTPFilter. + httpFilterConfigs *pinedHTTPFilterConfig + httpFilterLists [shardingSize]*pinedHTTPFilter + httpFilterListsMuxes [shardingSize]sync.Mutex + } + + // pinedHTTPFilterConfig holds a pinned HTTPFilter managed by the memory manager. + pinedHTTPFilterConfig = linkedList[HTTPFilterConfig] + + // pinedHTTPFilter holds a pinned HTTPFilter managed by the memory manager. + pinedHTTPFilter = linkedList[HTTPFilter] + + linkedList[T any] struct { + obj T + next, prev *linkedList[T] + } +) + +// pinHTTPFilterConfig pins the HTTPFilterConfig to the memory manager. +func (m *memoryManager) pinHTTPFilterConfig(filterConfig HTTPFilterConfig) *pinedHTTPFilterConfig { + item := &pinedHTTPFilterConfig{obj: filterConfig, next: m.httpFilterConfigs, prev: nil} + if m.httpFilterConfigs != nil { + m.httpFilterConfigs.prev = item + } + m.httpFilterConfigs = item + return item +} + +// unpinHTTPFilterConfig unpins the HTTPFilterConfig from the memory manager. +func (m *memoryManager) unpinHTTPFilterConfig(filterConfig *pinedHTTPFilterConfig) { + if filterConfig.prev != nil { + filterConfig.prev.next = filterConfig.next + } else { + m.httpFilterConfigs = filterConfig.next + } + if filterConfig.next != nil { + filterConfig.next.prev = filterConfig.prev + } +} + +// unwrapPinnedHTTPFilterConfig unwraps the pinned http filter config. +func unwrapPinnedHTTPFilterConfig(raw uintptr) *pinedHTTPFilterConfig { + return (*pinedHTTPFilterConfig)(unsafe.Pointer(raw)) +} + +// pinHTTPFilter pins the http filter to the memory manager. +func (m *memoryManager) pinHTTPFilter(filter HTTPFilter) *pinedHTTPFilter { + item := &pinedHTTPFilter{obj: filter, next: nil, prev: nil} + index := shardingKey(uintptr(unsafe.Pointer(item))) + mux := &m.httpFilterListsMuxes[index] + mux.Lock() + defer mux.Unlock() + item.next = m.httpFilterLists[index] + if m.httpFilterLists[index] != nil { + m.httpFilterLists[index].prev = item + } + m.httpFilterLists[index] = item + return item +} + +// unpinHTTPFilter unpins the http filter from the memory manager. +func (m *memoryManager) unpinHTTPFilter(filter *pinedHTTPFilter) { + index := shardingKey(uintptr(unsafe.Pointer(filter))) + mux := &m.httpFilterListsMuxes[index] + mux.Lock() + defer mux.Unlock() + + if filter.prev != nil { + filter.prev.next = filter.next + } else { + m.httpFilterLists[index] = filter.next + } + if filter.next != nil { + filter.next.prev = filter.prev + } +} + +// unwrapPinnedHTTPFilter unwraps the raw pointer to the pinned http filter. +func unwrapPinnedHTTPFilter(raw uintptr) *pinedHTTPFilter { + return (*pinedHTTPFilter)(unsafe.Pointer(raw)) +} + +func shardingKey(key uintptr) uintptr { + return splitmix64(key) & shardingMask +} + +func splitmix64(x uintptr) uintptr { + x += 0x9e3779b97f4a7c15 + x = (x ^ (x >> 30)) * 0xbf58476d1ce4e5b9 + x = (x ^ (x >> 27)) * 0x94d049bb133111eb + x ^= x >> 31 + return x +} diff --git a/internal/dynamicmodule/upstream_filter.go b/internal/dynamicmodule/upstream_filter.go new file mode 100644 index 000000000..8abde8d73 --- /dev/null +++ b/internal/dynamicmodule/upstream_filter.go @@ -0,0 +1,264 @@ +// Copyright Envoy AI Gateway Authors +// SPDX-License-Identifier: Apache-2.0 +// The full text of the Apache license is available in the LICENSE file at +// the root of the repo. + +package dynamicmodule + +import ( + "context" + "fmt" + "strconv" + "unsafe" + + openaisdk "github.com/openai/openai-go/v2" + + "github.com/envoyproxy/ai-gateway/internal/apischema/anthropic" + cohereschema "github.com/envoyproxy/ai-gateway/internal/apischema/cohere" + "github.com/envoyproxy/ai-gateway/internal/apischema/openai" + "github.com/envoyproxy/ai-gateway/internal/dynamicmodule/sdk" + "github.com/envoyproxy/ai-gateway/internal/filterapi" + "github.com/envoyproxy/ai-gateway/internal/headermutator" + "github.com/envoyproxy/ai-gateway/internal/internalapi" + "github.com/envoyproxy/ai-gateway/internal/metrics" + "github.com/envoyproxy/ai-gateway/internal/translator" +) + +type ( + // upstreamFilterConfig implements [sdk.HTTPFilterConfig]. + upstreamFilterConfig struct{ env *Env } + // upstreamFilter implements [sdk.HTTPFilter]. + upstreamFilter struct { + env *Env + rf *routerFilter + backend *filterapi.RuntimeBackend + reqHeaders map[string]string + onRetry bool + + // -- per endpoint processor -- + translator any + metrics metrics.Metrics + } +) + +func NewUpstreamFilterConfig(env *Env) sdk.HTTPFilterConfig { + return &upstreamFilterConfig{env: env} +} + +// NewFilter implements [sdk.HTTPFilterConfig]. +func (f *upstreamFilterConfig) NewFilter() sdk.HTTPFilter { + return &upstreamFilter{env: f.env} +} + +// RequestHeaders implements [sdk.HTTPFilter]. +func (f *upstreamFilter) RequestHeaders(e sdk.EnvoyHTTPFilter, _ bool) sdk.RequestHeadersStatus { + rfPtrStr, ok := e.GetDynamicMetadataString(internalapi.AIGatewayFilterMetadataNamespace, + routerFilterPointerDynamicMetadataKey) + if !ok { + e.SendLocalReply(500, nil, []byte("router filter pointer not found in dynamic metadata")) + return sdk.RequestHeadersStatusStopIteration + } + rfPtr, err := strconv.ParseInt(rfPtrStr, 10, 64) + if err != nil { + e.SendLocalReply(500, nil, []byte(fmt.Sprintf("invalid router filter pointer: %v", err))) + return sdk.RequestHeadersStatusStopIteration + } + f.rf = (*routerFilter)(unsafe.Pointer(uintptr(rfPtr))) // nolint:govet + f.rf.attemptCount++ + f.onRetry = f.rf.attemptCount > 1 + + backend, ok := e.GetUpstreamHostMetadataString(internalapi.AIGatewayFilterMetadataNamespace, internalapi.InternalMetadataBackendNameKey) + if !ok { + e.SendLocalReply(500, nil, []byte("backend name not found in upstream host metadata")) + return sdk.RequestHeadersStatusStopIteration + } + b, ok := f.rf.runtimeFilterConfig.Backends[backend] + if !ok { + e.SendLocalReply(500, nil, []byte(fmt.Sprintf("backend %s not found in filter config", backend))) + return sdk.RequestHeadersStatusStopIteration + } + + f.backend = b + f.reqHeaders = multiValueHeadersToSingleValue(e.GetRequestHeaders()) + + if err := f.initializeTranslatorMetrics(b.Backend); err != nil { + e.SendLocalReply(500, nil, []byte(fmt.Sprintf("failed to initialize translator: %v", err))) + return sdk.RequestHeadersStatusStopIteration + } + + // Now mutate the headers based on the backend configuration. + if hm := b.Backend.HeaderMutation; hm != nil { + sets, removes := headermutator.NewHeaderMutator(b.Backend.HeaderMutation, f.rf.originalHeaders).Mutate(f.reqHeaders, f.onRetry) + for _, h := range sets { + if !e.SetRequestHeader(h.Key(), []byte(h.Value())) { + e.SendLocalReply(500, nil, []byte(fmt.Sprintf("failed to set header %s", h.Key()))) + return sdk.RequestHeadersStatusStopIteration + } + f.reqHeaders[h.Key()] = h.Value() + } + for _, key := range removes { + if !e.SetRequestHeader(key, nil) { + e.SendLocalReply(500, nil, []byte(fmt.Sprintf("failed to remove header %s", key))) + return sdk.RequestHeadersStatusStopIteration + } + delete(f.reqHeaders, key) + } + } + return sdk.RequestHeadersStatusContinue +} + +// RequestBody implements [sdk.HTTPFilter]. +func (f *upstreamFilter) RequestBody(e sdk.EnvoyHTTPFilter, endOfStream bool) sdk.RequestBodyStatus { + if !endOfStream { + // TODO: ideally, we should not buffer the entire body for the passthrough case. + return sdk.RequestBodyStatusStopIterationAndBuffer + } + + b := f.backend + + var newHeaders []internalapi.Header + var newBody []byte + var err error + switch t := f.translator.(type) { + case translator.OpenAIChatCompletionTranslator: + newHeaders, newBody, err = t.RequestBody(f.rf.originalRequestBodyRaw, f.rf.originalRequestBody.(*openai.ChatCompletionRequest), f.onRetry) + case translator.OpenAICompletionTranslator: + newHeaders, newBody, err = t.RequestBody(f.rf.originalRequestBodyRaw, f.rf.originalRequestBody.(*openai.CompletionRequest), f.onRetry) + case translator.OpenAIEmbeddingTranslator: + newHeaders, newBody, err = t.RequestBody(f.rf.originalRequestBodyRaw, f.rf.originalRequestBody.(*openai.EmbeddingRequest), f.onRetry) + case translator.AnthropicMessagesTranslator: + newHeaders, newBody, err = t.RequestBody(f.rf.originalRequestBodyRaw, f.rf.originalRequestBody.(*anthropic.MessagesRequest), f.onRetry) + case translator.OpenAIImageGenerationTranslator: + newHeaders, newBody, err = t.RequestBody(f.rf.originalRequestBodyRaw, f.rf.originalRequestBody.(*openaisdk.ImageGenerateParams), f.onRetry) + case translator.CohereRerankTranslator: + newHeaders, newBody, err = t.RequestBody(f.rf.originalRequestBodyRaw, f.rf.originalRequestBody.(*cohereschema.RerankV2Request), f.onRetry) + default: + e.SendLocalReply(500, nil, []byte("BUG: unsupported translator type")) + return sdk.RequestBodyStatusStopIterationAndBuffer + } + if err != nil { + e.SendLocalReply(500, nil, []byte(fmt.Sprintf("failed to translate request body: %v", err))) + return sdk.RequestBodyStatusStopIterationAndBuffer + } + for _, h := range newHeaders { + if !e.SetRequestHeader(h.Key(), []byte(h.Value())) { + e.SendLocalReply(500, nil, []byte(fmt.Sprintf("failed to set mutated header %s", h.Key()))) + return sdk.RequestBodyStatusStopIterationAndBuffer + } + } + + if bm := b.Backend.BodyMutation; bm != nil { + // TODO: body mutation if needed. + _ = bm + } + + if newBody != nil { + cur, ok := e.GetRequestBody() + if !ok { + e.SendLocalReply(500, nil, []byte("failed to get request body for upstream mutation")) + return sdk.RequestBodyStatusStopIterationAndBuffer + } + _ = e.DrainResponseBody(cur.Len()) + _ = e.AppendRequestBody(newBody) + } + + // Next is to do the upstream auth if needed. + if b.Handler != nil { + var originalOrNewBody []byte + if newBody != nil { + originalOrNewBody = newBody + } + + authHeaders, err := b.Handler.Do(context.Background(), f.reqHeaders, originalOrNewBody) + if err != nil { + e.SendLocalReply(500, nil, []byte(fmt.Sprintf("failed to do backend auth: %v", err))) + return sdk.RequestBodyStatusStopIterationAndBuffer + } + for _, h := range authHeaders { + if !e.SetRequestHeader(h.Key(), []byte(h.Value())) { + e.SendLocalReply(500, nil, []byte(fmt.Sprintf("failed to set auth header %s", h.Key()))) + return sdk.RequestBodyStatusStopIterationAndBuffer + } + } + } + return sdk.RequestBodyStatusContinue +} + +// ResponseHeaders implements [sdk.HTTPFilter]. +func (f *upstreamFilter) ResponseHeaders(e sdk.EnvoyHTTPFilter, _ bool) sdk.ResponseHeadersStatus { + _ = e + return sdk.ResponseHeadersStatusContinue +} + +// ResponseBody implements [sdk.HTTPFilter]. +func (f *upstreamFilter) ResponseBody(sdk.EnvoyHTTPFilter, bool) sdk.ResponseBodyStatus { + return sdk.ResponseBodyStatusContinue +} + +func (f *upstreamFilter) initializeTranslatorMetrics(b *filterapi.Backend) error { + out := b.Schema + modelNameOverride := b.ModelNameOverride + switch f.rf.endpoint { + case chatCompletionsEndpoint: + switch out.Name { + case filterapi.APISchemaOpenAI: + f.translator = translator.NewChatCompletionOpenAIToOpenAITranslator(out.Version, modelNameOverride) + case filterapi.APISchemaAWSBedrock: + f.translator = translator.NewChatCompletionOpenAIToAWSBedrockTranslator(modelNameOverride) + case filterapi.APISchemaAzureOpenAI: + f.translator = translator.NewChatCompletionOpenAIToAzureOpenAITranslator(out.Version, modelNameOverride) + case filterapi.APISchemaGCPVertexAI: + f.translator = translator.NewChatCompletionOpenAIToGCPVertexAITranslator(modelNameOverride) + case filterapi.APISchemaGCPAnthropic: + f.translator = translator.NewChatCompletionOpenAIToGCPAnthropicTranslator(out.Version, modelNameOverride) + default: + return fmt.Errorf("unsupported API schema: backend=%s", out) + } + f.metrics = f.env.ChatCompletionMetricsFactory.NewMetrics() + case completionsEndpoint: + switch out.Name { + case filterapi.APISchemaOpenAI: + f.translator = translator.NewChatCompletionOpenAIToOpenAITranslator(out.Version, modelNameOverride) + default: + return fmt.Errorf("unsupported API schema: backend=%s", out) + } + f.metrics = f.env.CompletionMetricsFactory.NewMetrics() + case embeddingsEndpoint: + switch out.Name { + case filterapi.APISchemaOpenAI: + f.translator = translator.NewEmbeddingOpenAIToOpenAITranslator(out.Version, modelNameOverride) + case filterapi.APISchemaAzureOpenAI: + f.translator = translator.NewEmbeddingOpenAIToAzureOpenAITranslator(out.Version, modelNameOverride) + default: + return fmt.Errorf("unsupported API schema: backend=%s", out) + } + f.metrics = f.env.CompletionMetricsFactory.NewMetrics() + case imagesGenerationsEndpoint: + switch out.Name { + case filterapi.APISchemaOpenAI: + f.translator = translator.NewImageGenerationOpenAIToOpenAITranslator(out.Version, modelNameOverride) + default: + return fmt.Errorf("unsupported API schema: backend=%s", out) + } + f.metrics = f.env.CompletionMetricsFactory.NewMetrics() + case rerankEndpoint: + switch out.Name { + case filterapi.APISchemaCohere: + f.translator = translator.NewRerankCohereToCohereTranslator(out.Version, modelNameOverride) + default: + return fmt.Errorf("unsupported API schema: backend=%s", out) + } + f.metrics = f.env.RerankMetricsFactory.NewMetrics() + case messagesEndpoint: + switch out.Name { + case filterapi.APISchemaAnthropic: + f.translator = translator.NewAnthropicToAnthropicTranslator(out.Version, modelNameOverride) + default: + return fmt.Errorf("unsupported API schema: backend=%s", out) + } + f.metrics = f.env.MessagesMetricsFactory.NewMetrics() + default: + return fmt.Errorf("unsupported endpoint for per-route upstream filter: %v", f.rf.endpoint) + } + return nil +}