Skip to content

Commit 875360c

Browse files
committed
feat(conformance): add plugin to support verifying destination endpoint served metadata.
1 parent 26384f9 commit 875360c

File tree

7 files changed

+198
-3
lines changed

7 files changed

+198
-3
lines changed

cmd/epp/runner/runner.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ import (
6565
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/profile"
6666
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/scorer"
6767
testfilter "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/test/filter"
68+
testrequestcontrol "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/requestcontrol/plugins/test"
6869
runserver "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/server"
6970
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/env"
7071
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
@@ -408,6 +409,7 @@ func (r *Runner) registerInTreePlugins() {
408409
plugins.Register(scorer.LoraAffinityScorerType, scorer.LoraAffinityScorerFactory)
409410
// register filter for test purpose only (used in conformance tests)
410411
plugins.Register(testfilter.HeaderBasedTestingFilterType, testfilter.HeaderBasedTestingFilterFactory)
412+
plugins.Register(testrequestcontrol.DestinationEndpointServedVerifierType, testrequestcontrol.DestinationEndpointServedVerifierFactory)
411413
}
412414

413415
func (r *Runner) parsePluginsConfiguration(ctx context.Context, ds datastore.Datastore) error {

pkg/epp/metadata/consts.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ const (
2626
DestinationEndpointNamespace = "envoy.lb"
2727
// DestinationEndpointKey is the header and response metadata key used by Envoy to route to the appropriate pod.
2828
DestinationEndpointKey = "x-gateway-destination-endpoint"
29+
// DestinationEndpointServedKey is the metadata key used by Envoy to specify the endpoint that served the request.
30+
DestinationEndpointServedKey = "x-gateway-destination-endpoint-served"
2931
// FlowFairnessIDKey is the header key used to pass the fairness ID to be used in Flow Control.
3032
FlowFairnessIDKey = "x-gateway-inference-fairness-id"
3133
// ObjectiveKey is the header key used to specify the objective of an incoming request.

pkg/epp/requestcontrol/director.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -253,10 +253,10 @@ func (d *Director) toSchedulerPodMetrics(pods []backendmetrics.PodMetrics) []sch
253253
// HandleResponseReceived is called when the response headers are received.
254254
func (d *Director) HandleResponseReceived(ctx context.Context, reqCtx *handlers.RequestContext) (*handlers.RequestContext, error) {
255255
response := &Response{
256-
RequestId: reqCtx.Request.Headers[requtil.RequestIdHeaderKey],
257-
Headers: reqCtx.Response.Headers,
256+
RequestId: reqCtx.Request.Headers[requtil.RequestIdHeaderKey],
257+
Headers: reqCtx.Response.Headers,
258+
ReqMetadata: reqCtx.Request.Metadata,
258259
}
259-
260260
// TODO: to extend fallback functionality, handle cases where target pod is unavailable
261261
// https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/1224
262262
d.runResponseReceivedPlugins(ctx, reqCtx.SchedulingRequest, response, reqCtx.TargetPod)

pkg/epp/requestcontrol/plugins.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,10 @@ type PreRequest interface {
3838
PreRequest(ctx context.Context, request *types.LLMRequest, schedulingResult *types.SchedulingResult)
3939
}
4040

41+
type ResponseResult struct {
42+
HeaderToAdd map[string]string
43+
}
44+
4145
// ResponseReceived is called by the director after the response headers are successfully received
4246
// which indicates the beginning of the response handling by the model server.
4347
// The given pod argument is the pod that served the request.
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package test
18+
19+
import (
20+
"context"
21+
"encoding/json"
22+
23+
"sigs.k8s.io/controller-runtime/pkg/log"
24+
25+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend"
26+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metadata"
27+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
28+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/requestcontrol"
29+
schedulingtypes "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
30+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
31+
)
32+
33+
const (
34+
// ConformanceTestResultHeader is the header key for the conformance test result.
35+
ConformanceTestResultHeader = "x-conformance-test-served-endpoint"
36+
// ExpectedServedEndpointHeader is the header key for the expected served endpoint.
37+
ExpectedServedEndpointHeader = "x-test-expected-served-endpoint"
38+
// DestinationEndpointServedVerifierType is the ResponseReceived type that is used in plugins registry.
39+
DestinationEndpointServedVerifierType = "destination-endpoint-server-verifier"
40+
)
41+
42+
var _ requestcontrol.ResponseReceived = &DestinationEndpointServedVerifier{}
43+
44+
// DestinationEndpointServedVerifier is a test-only plugin for conformance tests.
45+
// It verifies that the request was served by the expected endpoint.
46+
// It works by reading Envoy's dynamic metadata, which is passed in the
47+
// Response.ReqMetadata field. This metadata should contain the
48+
// address of the backend endpoint that served the request. The verifier then
49+
// writes this address to the "x-conformance-test-served-endpoint" response header.
50+
// The conformance test client can then validate this header to ensure the request
51+
// was routed correctly.
52+
type DestinationEndpointServedVerifier struct {
53+
typedName plugins.TypedName
54+
}
55+
56+
// TypedName returns the type and name tuple of this plugin instance.
57+
func (f *DestinationEndpointServedVerifier) TypedName() plugins.TypedName {
58+
return f.typedName
59+
}
60+
61+
// WithName sets the name of the filter.
62+
func (f *DestinationEndpointServedVerifier) WithName(name string) *DestinationEndpointServedVerifier {
63+
f.typedName.Name = name
64+
return f
65+
}
66+
67+
// DestinationEndpointServedVerifierFactory defines the factory function for DestinationEndpointServedVerifier.
68+
func DestinationEndpointServedVerifierFactory(name string, _ json.RawMessage, _ plugins.Handle) (plugins.Plugin, error) {
69+
return NewDestinationEndpointServedVerifier().WithName(name), nil
70+
}
71+
72+
func NewDestinationEndpointServedVerifier() *DestinationEndpointServedVerifier {
73+
return &DestinationEndpointServedVerifier{}
74+
}
75+
76+
// ResponseReceived is the handler for the ResponseReceived extension point.
77+
func (p *DestinationEndpointServedVerifier) ResponseReceived(ctx context.Context, request *schedulingtypes.LLMRequest, response *requestcontrol.Response, _ *backend.Pod) {
78+
logger := log.FromContext(ctx).WithName(p.TypedName().String())
79+
logger.V(logging.DEBUG).Info("Verifying destination endpoint served")
80+
81+
reqMetadata := response.ReqMetadata
82+
lbMetadata, ok := reqMetadata[metadata.DestinationEndpointNamespace].(map[string]any)
83+
if !ok {
84+
logger.V(logging.DEBUG).Info("Response does not contain envoy lb metadata, skipping verification")
85+
response.Headers[ConformanceTestResultHeader] = "fail: missing envoy lb metadata"
86+
return
87+
}
88+
89+
actualEndpoint, ok := lbMetadata[metadata.DestinationEndpointServedKey].(string)
90+
if !ok {
91+
logger.V(logging.DEBUG).Info("Response does not contain destination endpoint served metadata, skipping verification")
92+
response.Headers[ConformanceTestResultHeader] = "fail: missing destination endpoint served metadata"
93+
return
94+
}
95+
response.Headers[ConformanceTestResultHeader] = actualEndpoint
96+
}
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package test
18+
19+
import (
20+
"context"
21+
"testing"
22+
23+
"github.com/stretchr/testify/require"
24+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metadata"
25+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/requestcontrol"
26+
)
27+
28+
func TestDestinationEndpointServedVerifier_ResponseReceived(t *testing.T) {
29+
testCases := []struct {
30+
name string
31+
response *requestcontrol.Response
32+
expectedHeaderValue string
33+
}{
34+
{
35+
name: "success - endpoint is correctly reported",
36+
response: &requestcontrol.Response{
37+
Headers: make(map[string]string),
38+
ReqMetadata: map[string]any{
39+
metadata.DestinationEndpointNamespace: map[string]any{
40+
metadata.DestinationEndpointServedKey: "10.0.0.1:8080",
41+
},
42+
},
43+
},
44+
expectedHeaderValue: "10.0.0.1:8080",
45+
},
46+
{
47+
name: "failure - missing lb metadata",
48+
response: &requestcontrol.Response{
49+
Headers: make(map[string]string),
50+
ReqMetadata: map[string]any{},
51+
},
52+
expectedHeaderValue: "fail: missing envoy lb metadata",
53+
},
54+
{
55+
name: "failure - missing served endpoint key",
56+
response: &requestcontrol.Response{
57+
Headers: make(map[string]string),
58+
ReqMetadata: map[string]any{
59+
metadata.DestinationEndpointNamespace: map[string]any{
60+
"some-other-key": "some-value",
61+
},
62+
},
63+
},
64+
expectedHeaderValue: "fail: missing destination endpoint served metadata",
65+
},
66+
{
67+
name: "failure - nil metadata",
68+
response: &requestcontrol.Response{
69+
Headers: make(map[string]string),
70+
ReqMetadata: nil,
71+
},
72+
expectedHeaderValue: "fail: missing envoy lb metadata",
73+
},
74+
}
75+
76+
for _, tc := range testCases {
77+
t.Run(tc.name, func(t *testing.T) {
78+
plugin := NewDestinationEndpointServedVerifier().WithName("test-verifier")
79+
80+
plugin.ResponseReceived(context.Background(), nil, tc.response, nil)
81+
82+
actualHeader, ok := tc.response.Headers[ConformanceTestResultHeader]
83+
require.True(t, ok, "Expected header %s to be set", ConformanceTestResultHeader)
84+
require.Equal(t, tc.expectedHeaderValue, actualHeader)
85+
})
86+
}
87+
}

pkg/epp/requestcontrol/types.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,4 +28,8 @@ type Response struct {
2828
IsStreaming bool
2929
// EndOfStream when true indicates that this invocation contains the last chunk of the response
3030
EndOfStream bool
31+
// ReqMetadata is a map of metadata that can be passed from Envoy.
32+
// It is populated with Envoy's dynamic metadata when ext_proc is processing ProcessingRequest_ResponseHeaders.
33+
// Currently, this is only used by conformance test.
34+
ReqMetadata map[string]any
3135
}

0 commit comments

Comments
 (0)