From 0e4a99d86c0d18ef13dd3b255ed4380d825e02a9 Mon Sep 17 00:00:00 2001 From: bobzetian Date: Thu, 13 Nov 2025 07:39:11 +0000 Subject: [PATCH] feat(conformance): add plugin to support verifying destination endpoint served metadata. --- cmd/epp/runner/runner.go | 3 + pkg/epp/metadata/consts.go | 2 + pkg/epp/requestcontrol/director.go | 6 +- pkg/epp/requestcontrol/plugins/test/consts.go | 22 +++++ .../destination_endpoint_served_verifier.go | 93 +++++++++++++++++++ ...stination_endpoint_served_verifier_test.go | 88 ++++++++++++++++++ pkg/epp/requestcontrol/types.go | 4 + 7 files changed, 215 insertions(+), 3 deletions(-) create mode 100644 pkg/epp/requestcontrol/plugins/test/consts.go create mode 100644 pkg/epp/requestcontrol/plugins/test/responsereceived/destination_endpoint_served_verifier.go create mode 100644 pkg/epp/requestcontrol/plugins/test/responsereceived/destination_endpoint_served_verifier_test.go diff --git a/cmd/epp/runner/runner.go b/cmd/epp/runner/runner.go index da81fdf460..4ef2970987 100644 --- a/cmd/epp/runner/runner.go +++ b/cmd/epp/runner/runner.go @@ -58,6 +58,7 @@ import ( "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics/collectors" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/requestcontrol" + testresponsereceived "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/requestcontrol/plugins/test/responsereceived" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/saturationdetector" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/multi/prefix" @@ -408,6 +409,8 @@ func (r *Runner) registerInTreePlugins() { plugins.Register(scorer.LoraAffinityScorerType, scorer.LoraAffinityScorerFactory) // register filter for test purpose only (used in conformance tests) plugins.Register(testfilter.HeaderBasedTestingFilterType, testfilter.HeaderBasedTestingFilterFactory) + // register response received plugin for test purpose only (used in conformance tests) + plugins.Register(testresponsereceived.DestinationEndpointServedVerifierType, testresponsereceived.DestinationEndpointServedVerifierFactory) } func (r *Runner) parsePluginsConfiguration(ctx context.Context, ds datastore.Datastore) error { diff --git a/pkg/epp/metadata/consts.go b/pkg/epp/metadata/consts.go index be6c482d02..b69319bec7 100644 --- a/pkg/epp/metadata/consts.go +++ b/pkg/epp/metadata/consts.go @@ -26,6 +26,8 @@ const ( DestinationEndpointNamespace = "envoy.lb" // DestinationEndpointKey is the header and response metadata key used by Envoy to route to the appropriate pod. DestinationEndpointKey = "x-gateway-destination-endpoint" + // DestinationEndpointServedKey is the metadata key used by Envoy to specify the endpoint that served the request. + DestinationEndpointServedKey = "x-gateway-destination-endpoint-served" // FlowFairnessIDKey is the header key used to pass the fairness ID to be used in Flow Control. FlowFairnessIDKey = "x-gateway-inference-fairness-id" // ObjectiveKey is the header key used to specify the objective of an incoming request. diff --git a/pkg/epp/requestcontrol/director.go b/pkg/epp/requestcontrol/director.go index f6f7deebed..ed841a0824 100644 --- a/pkg/epp/requestcontrol/director.go +++ b/pkg/epp/requestcontrol/director.go @@ -253,10 +253,10 @@ func (d *Director) toSchedulerPodMetrics(pods []backendmetrics.PodMetrics) []sch // HandleResponseReceived is called when the response headers are received. func (d *Director) HandleResponseReceived(ctx context.Context, reqCtx *handlers.RequestContext) (*handlers.RequestContext, error) { response := &Response{ - RequestId: reqCtx.Request.Headers[requtil.RequestIdHeaderKey], - Headers: reqCtx.Response.Headers, + RequestId: reqCtx.Request.Headers[requtil.RequestIdHeaderKey], + Headers: reqCtx.Response.Headers, + ReqMetadata: reqCtx.Request.Metadata, } - // TODO: to extend fallback functionality, handle cases where target pod is unavailable // https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/1224 d.runResponseReceivedPlugins(ctx, reqCtx.SchedulingRequest, response, reqCtx.TargetPod) diff --git a/pkg/epp/requestcontrol/plugins/test/consts.go b/pkg/epp/requestcontrol/plugins/test/consts.go new file mode 100644 index 0000000000..194ef070e0 --- /dev/null +++ b/pkg/epp/requestcontrol/plugins/test/consts.go @@ -0,0 +1,22 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package test + +const ( + // ConformanceTestResultHeader is the header key for the conformance test result. + ConformanceTestResultHeader = "x-conformance-test-served-endpoint" +) diff --git a/pkg/epp/requestcontrol/plugins/test/responsereceived/destination_endpoint_served_verifier.go b/pkg/epp/requestcontrol/plugins/test/responsereceived/destination_endpoint_served_verifier.go new file mode 100644 index 0000000000..216746ab1e --- /dev/null +++ b/pkg/epp/requestcontrol/plugins/test/responsereceived/destination_endpoint_served_verifier.go @@ -0,0 +1,93 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package responsereceived + +import ( + "context" + "encoding/json" + + "sigs.k8s.io/controller-runtime/pkg/log" + + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metadata" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/requestcontrol" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/requestcontrol/plugins/test" + schedulingtypes "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" +) + +const ( + // DestinationEndpointServedVerifierType is the ResponseReceived type that is used in plugins registry. + DestinationEndpointServedVerifierType = "destination-endpoint-served-verifier" +) + +var _ requestcontrol.ResponseReceived = &DestinationEndpointServedVerifier{} + +// DestinationEndpointServedVerifier is a test-only plugin for conformance tests. +// It verifies that the request was served by the expected endpoint. +// It works by reading Envoy's dynamic metadata, which is passed in the +// Response.ReqMetadata field. This metadata should contain the +// address of the backend endpoint that served the request. The verifier then +// writes this address to the "x-conformance-test-served-endpoint" response header. +// The conformance test client can then validate this header to ensure the request +// was routed correctly. +type DestinationEndpointServedVerifier struct { + typedName plugins.TypedName +} + +// TypedName returns the type and name tuple of this plugin instance. +func (f *DestinationEndpointServedVerifier) TypedName() plugins.TypedName { + return f.typedName +} + +// WithName sets the name of the filter. +func (f *DestinationEndpointServedVerifier) WithName(name string) *DestinationEndpointServedVerifier { + f.typedName.Name = name + return f +} + +// DestinationEndpointServedVerifierFactory defines the factory function for DestinationEndpointServedVerifier. +func DestinationEndpointServedVerifierFactory(name string, _ json.RawMessage, _ plugins.Handle) (plugins.Plugin, error) { + return NewDestinationEndpointServedVerifier().WithName(name), nil +} + +func NewDestinationEndpointServedVerifier() *DestinationEndpointServedVerifier { + return &DestinationEndpointServedVerifier{} +} + +// ResponseReceived is the handler for the ResponseReceived extension point. +func (p *DestinationEndpointServedVerifier) ResponseReceived(ctx context.Context, request *schedulingtypes.LLMRequest, response *requestcontrol.Response, _ *backend.Pod) { + logger := log.FromContext(ctx).WithName(p.TypedName().String()) + logger.V(logging.DEBUG).Info("Verifying destination endpoint served") + + reqMetadata := response.ReqMetadata + lbMetadata, ok := reqMetadata[metadata.DestinationEndpointNamespace].(map[string]any) + if !ok { + logger.V(logging.DEBUG).Info("Response does not contain envoy lb metadata, skipping verification") + response.Headers[test.ConformanceTestResultHeader] = "fail: missing envoy lb metadata" + return + } + + actualEndpoint, ok := lbMetadata[metadata.DestinationEndpointServedKey].(string) + if !ok { + logger.V(logging.DEBUG).Info("Response does not contain destination endpoint served metadata, skipping verification") + response.Headers[test.ConformanceTestResultHeader] = "fail: missing destination endpoint served metadata" + return + } + response.Headers[test.ConformanceTestResultHeader] = actualEndpoint +} diff --git a/pkg/epp/requestcontrol/plugins/test/responsereceived/destination_endpoint_served_verifier_test.go b/pkg/epp/requestcontrol/plugins/test/responsereceived/destination_endpoint_served_verifier_test.go new file mode 100644 index 0000000000..b5e6c9428c --- /dev/null +++ b/pkg/epp/requestcontrol/plugins/test/responsereceived/destination_endpoint_served_verifier_test.go @@ -0,0 +1,88 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package responsereceived + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metadata" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/requestcontrol" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/requestcontrol/plugins/test" +) + +func TestDestinationEndpointServedVerifier_ResponseReceived(t *testing.T) { + testCases := []struct { + name string + response *requestcontrol.Response + expectedHeaderValue string + }{ + { + name: "success - endpoint is correctly reported", + response: &requestcontrol.Response{ + Headers: make(map[string]string), + ReqMetadata: map[string]any{ + metadata.DestinationEndpointNamespace: map[string]any{ + metadata.DestinationEndpointServedKey: "10.0.0.1:8080", + }, + }, + }, + expectedHeaderValue: "10.0.0.1:8080", + }, + { + name: "failure - missing lb metadata", + response: &requestcontrol.Response{ + Headers: make(map[string]string), + ReqMetadata: map[string]any{}, + }, + expectedHeaderValue: "fail: missing envoy lb metadata", + }, + { + name: "failure - missing served endpoint key", + response: &requestcontrol.Response{ + Headers: make(map[string]string), + ReqMetadata: map[string]any{ + metadata.DestinationEndpointNamespace: map[string]any{ + "some-other-key": "some-value", + }, + }, + }, + expectedHeaderValue: "fail: missing destination endpoint served metadata", + }, + { + name: "failure - nil metadata", + response: &requestcontrol.Response{ + Headers: make(map[string]string), + ReqMetadata: nil, + }, + expectedHeaderValue: "fail: missing envoy lb metadata", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + plugin := NewDestinationEndpointServedVerifier().WithName("test-verifier") + + plugin.ResponseReceived(context.Background(), nil, tc.response, nil) + + actualHeader, ok := tc.response.Headers[test.ConformanceTestResultHeader] + require.True(t, ok, "Expected header %s to be set", test.ConformanceTestResultHeader) + require.Equal(t, tc.expectedHeaderValue, actualHeader) + }) + } +} diff --git a/pkg/epp/requestcontrol/types.go b/pkg/epp/requestcontrol/types.go index c881ed7138..7a6725678e 100644 --- a/pkg/epp/requestcontrol/types.go +++ b/pkg/epp/requestcontrol/types.go @@ -28,4 +28,8 @@ type Response struct { IsStreaming bool // EndOfStream when true indicates that this invocation contains the last chunk of the response EndOfStream bool + // ReqMetadata is a map of metadata that can be passed from Envoy. + // It is populated with Envoy's dynamic metadata when ext_proc is processing ProcessingRequest_ResponseHeaders. + // Currently, this is only used by conformance test. + ReqMetadata map[string]any }