Skip to content

Commit c7487f2

Browse files
committed
feat(conformance): add plugin to support verifying destination endpoint served metadata.
1 parent 7fae938 commit c7487f2

File tree

7 files changed

+215
-3
lines changed

7 files changed

+215
-3
lines changed

cmd/epp/runner/runner.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ import (
5858
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics/collectors"
5959
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
6060
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/requestcontrol"
61+
testresponsereceived "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/requestcontrol/plugins/test/responsereceived"
6162
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/saturationdetector"
6263
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling"
6364
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/multi/prefix"
@@ -408,6 +409,8 @@ func (r *Runner) registerInTreePlugins() {
408409
plugins.Register(scorer.LoraAffinityScorerType, scorer.LoraAffinityScorerFactory)
409410
// register filter for test purpose only (used in conformance tests)
410411
plugins.Register(testfilter.HeaderBasedTestingFilterType, testfilter.HeaderBasedTestingFilterFactory)
412+
// register response received plugin for test purpose only (used in conformance tests)
413+
plugins.Register(testresponsereceived.DestinationEndpointServedVerifierType, testresponsereceived.DestinationEndpointServedVerifierFactory)
411414
}
412415

413416
func (r *Runner) parsePluginsConfiguration(ctx context.Context, ds datastore.Datastore) error {

pkg/epp/metadata/consts.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ const (
2626
DestinationEndpointNamespace = "envoy.lb"
2727
// DestinationEndpointKey is the header and response metadata key used by Envoy to route to the appropriate pod.
2828
DestinationEndpointKey = "x-gateway-destination-endpoint"
29+
// DestinationEndpointServedKey is the metadata key used by Envoy to specify the endpoint that served the request.
30+
DestinationEndpointServedKey = "x-gateway-destination-endpoint-served"
2931
// FlowFairnessIDKey is the header key used to pass the fairness ID to be used in Flow Control.
3032
FlowFairnessIDKey = "x-gateway-inference-fairness-id"
3133
// ObjectiveKey is the header key used to specify the objective of an incoming request.

pkg/epp/requestcontrol/director.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -253,10 +253,10 @@ func (d *Director) toSchedulerPodMetrics(pods []backendmetrics.PodMetrics) []sch
253253
// HandleResponseReceived is called when the response headers are received.
254254
func (d *Director) HandleResponseReceived(ctx context.Context, reqCtx *handlers.RequestContext) (*handlers.RequestContext, error) {
255255
response := &Response{
256-
RequestId: reqCtx.Request.Headers[requtil.RequestIdHeaderKey],
257-
Headers: reqCtx.Response.Headers,
256+
RequestId: reqCtx.Request.Headers[requtil.RequestIdHeaderKey],
257+
Headers: reqCtx.Response.Headers,
258+
ReqMetadata: reqCtx.Request.Metadata,
258259
}
259-
260260
// TODO: to extend fallback functionality, handle cases where target pod is unavailable
261261
// https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/1224
262262
d.runResponseReceivedPlugins(ctx, reqCtx.SchedulingRequest, response, reqCtx.TargetPod)
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package test
18+
19+
const (
20+
// ConformanceTestResultHeader is the header key for the conformance test result.
21+
ConformanceTestResultHeader = "x-conformance-test-served-endpoint"
22+
)
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package responsereceived
18+
19+
import (
20+
"context"
21+
"encoding/json"
22+
23+
"sigs.k8s.io/controller-runtime/pkg/log"
24+
25+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend"
26+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metadata"
27+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
28+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/requestcontrol"
29+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/requestcontrol/plugins/test"
30+
schedulingtypes "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
31+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
32+
)
33+
34+
const (
35+
// DestinationEndpointServedVerifierType is the ResponseReceived type that is used in plugins registry.
36+
DestinationEndpointServedVerifierType = "destination-endpoint-served-verifier"
37+
)
38+
39+
var _ requestcontrol.ResponseReceived = &DestinationEndpointServedVerifier{}
40+
41+
// DestinationEndpointServedVerifier is a test-only plugin for conformance tests.
42+
// It verifies that the request was served by the expected endpoint.
43+
// It works by reading Envoy's dynamic metadata, which is passed in the
44+
// Response.ReqMetadata field. This metadata should contain the
45+
// address of the backend endpoint that served the request. The verifier then
46+
// writes this address to the "x-conformance-test-served-endpoint" response header.
47+
// The conformance test client can then validate this header to ensure the request
48+
// was routed correctly.
49+
type DestinationEndpointServedVerifier struct {
50+
typedName plugins.TypedName
51+
}
52+
53+
// TypedName returns the type and name tuple of this plugin instance.
54+
func (f *DestinationEndpointServedVerifier) TypedName() plugins.TypedName {
55+
return f.typedName
56+
}
57+
58+
// WithName sets the name of the filter.
59+
func (f *DestinationEndpointServedVerifier) WithName(name string) *DestinationEndpointServedVerifier {
60+
f.typedName.Name = name
61+
return f
62+
}
63+
64+
// DestinationEndpointServedVerifierFactory defines the factory function for DestinationEndpointServedVerifier.
65+
func DestinationEndpointServedVerifierFactory(name string, _ json.RawMessage, _ plugins.Handle) (plugins.Plugin, error) {
66+
return NewDestinationEndpointServedVerifier().WithName(name), nil
67+
}
68+
69+
func NewDestinationEndpointServedVerifier() *DestinationEndpointServedVerifier {
70+
return &DestinationEndpointServedVerifier{}
71+
}
72+
73+
// ResponseReceived is the handler for the ResponseReceived extension point.
74+
func (p *DestinationEndpointServedVerifier) ResponseReceived(ctx context.Context, request *schedulingtypes.LLMRequest, response *requestcontrol.Response, _ *backend.Pod) {
75+
logger := log.FromContext(ctx).WithName(p.TypedName().String())
76+
logger.V(logging.DEBUG).Info("Verifying destination endpoint served")
77+
78+
reqMetadata := response.ReqMetadata
79+
lbMetadata, ok := reqMetadata[metadata.DestinationEndpointNamespace].(map[string]any)
80+
if !ok {
81+
logger.V(logging.DEBUG).Info("Response does not contain envoy lb metadata, skipping verification")
82+
response.Headers[test.ConformanceTestResultHeader] = "fail: missing envoy lb metadata"
83+
return
84+
}
85+
86+
actualEndpoint, ok := lbMetadata[metadata.DestinationEndpointServedKey].(string)
87+
if !ok {
88+
logger.V(logging.DEBUG).Info("Response does not contain destination endpoint served metadata, skipping verification")
89+
response.Headers[test.ConformanceTestResultHeader] = "fail: missing destination endpoint served metadata"
90+
return
91+
}
92+
response.Headers[test.ConformanceTestResultHeader] = actualEndpoint
93+
}
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package responsereceived
18+
19+
import (
20+
"context"
21+
"testing"
22+
23+
"github.com/stretchr/testify/require"
24+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metadata"
25+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/requestcontrol"
26+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/requestcontrol/plugins/test"
27+
)
28+
29+
func TestDestinationEndpointServedVerifier_ResponseReceived(t *testing.T) {
30+
testCases := []struct {
31+
name string
32+
response *requestcontrol.Response
33+
expectedHeaderValue string
34+
}{
35+
{
36+
name: "success - endpoint is correctly reported",
37+
response: &requestcontrol.Response{
38+
Headers: make(map[string]string),
39+
ReqMetadata: map[string]any{
40+
metadata.DestinationEndpointNamespace: map[string]any{
41+
metadata.DestinationEndpointServedKey: "10.0.0.1:8080",
42+
},
43+
},
44+
},
45+
expectedHeaderValue: "10.0.0.1:8080",
46+
},
47+
{
48+
name: "failure - missing lb metadata",
49+
response: &requestcontrol.Response{
50+
Headers: make(map[string]string),
51+
ReqMetadata: map[string]any{},
52+
},
53+
expectedHeaderValue: "fail: missing envoy lb metadata",
54+
},
55+
{
56+
name: "failure - missing served endpoint key",
57+
response: &requestcontrol.Response{
58+
Headers: make(map[string]string),
59+
ReqMetadata: map[string]any{
60+
metadata.DestinationEndpointNamespace: map[string]any{
61+
"some-other-key": "some-value",
62+
},
63+
},
64+
},
65+
expectedHeaderValue: "fail: missing destination endpoint served metadata",
66+
},
67+
{
68+
name: "failure - nil metadata",
69+
response: &requestcontrol.Response{
70+
Headers: make(map[string]string),
71+
ReqMetadata: nil,
72+
},
73+
expectedHeaderValue: "fail: missing envoy lb metadata",
74+
},
75+
}
76+
77+
for _, tc := range testCases {
78+
t.Run(tc.name, func(t *testing.T) {
79+
plugin := NewDestinationEndpointServedVerifier().WithName("test-verifier")
80+
81+
plugin.ResponseReceived(context.Background(), nil, tc.response, nil)
82+
83+
actualHeader, ok := tc.response.Headers[test.ConformanceTestResultHeader]
84+
require.True(t, ok, "Expected header %s to be set", test.ConformanceTestResultHeader)
85+
require.Equal(t, tc.expectedHeaderValue, actualHeader)
86+
})
87+
}
88+
}

pkg/epp/requestcontrol/types.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,4 +28,8 @@ type Response struct {
2828
IsStreaming bool
2929
// EndOfStream when true indicates that this invocation contains the last chunk of the response
3030
EndOfStream bool
31+
// ReqMetadata is a map of metadata that can be passed from Envoy.
32+
// It is populated with Envoy's dynamic metadata when ext_proc is processing ProcessingRequest_ResponseHeaders.
33+
// Currently, this is only used by conformance test.
34+
ReqMetadata map[string]any
3135
}

0 commit comments

Comments
 (0)