Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cmd/epp/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ import (
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics/collectors"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/requestcontrol"
testresponsereceived "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/requestcontrol/plugins/test/responsereceived"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/saturationdetector"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/multi/prefix"
Expand Down Expand Up @@ -408,6 +409,8 @@ func (r *Runner) registerInTreePlugins() {
plugins.Register(scorer.LoraAffinityScorerType, scorer.LoraAffinityScorerFactory)
// register filter for test purpose only (used in conformance tests)
plugins.Register(testfilter.HeaderBasedTestingFilterType, testfilter.HeaderBasedTestingFilterFactory)
// register response received plugin for test purpose only (used in conformance tests)
plugins.Register(testresponsereceived.DestinationEndpointServedVerifierType, testresponsereceived.DestinationEndpointServedVerifierFactory)
}

func (r *Runner) parsePluginsConfiguration(ctx context.Context, ds datastore.Datastore) error {
Expand Down
2 changes: 2 additions & 0 deletions pkg/epp/metadata/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ const (
DestinationEndpointNamespace = "envoy.lb"
// DestinationEndpointKey is the header and response metadata key used by Envoy to route to the appropriate pod.
DestinationEndpointKey = "x-gateway-destination-endpoint"
// DestinationEndpointServedKey is the metadata key used by Envoy to specify the endpoint that served the request.
DestinationEndpointServedKey = "x-gateway-destination-endpoint-served"
// FlowFairnessIDKey is the header key used to pass the fairness ID to be used in Flow Control.
FlowFairnessIDKey = "x-gateway-inference-fairness-id"
// ObjectiveKey is the header key used to specify the objective of an incoming request.
Expand Down
6 changes: 3 additions & 3 deletions pkg/epp/requestcontrol/director.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,10 +253,10 @@ func (d *Director) toSchedulerPodMetrics(pods []backendmetrics.PodMetrics) []sch
// HandleResponseReceived is called when the response headers are received.
func (d *Director) HandleResponseReceived(ctx context.Context, reqCtx *handlers.RequestContext) (*handlers.RequestContext, error) {
response := &Response{
RequestId: reqCtx.Request.Headers[requtil.RequestIdHeaderKey],
Headers: reqCtx.Response.Headers,
RequestId: reqCtx.Request.Headers[requtil.RequestIdHeaderKey],
Headers: reqCtx.Response.Headers,
ReqMetadata: reqCtx.Request.Metadata,
}

// TODO: to extend fallback functionality, handle cases where target pod is unavailable
// https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/1224
d.runResponseReceivedPlugins(ctx, reqCtx.SchedulingRequest, response, reqCtx.TargetPod)
Expand Down
22 changes: 22 additions & 0 deletions pkg/epp/requestcontrol/plugins/test/consts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package test

const (
// ConformanceTestResultHeader is the header key for the conformance test result.
ConformanceTestResultHeader = "x-conformance-test-served-endpoint"
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package responsereceived

import (
"context"
"encoding/json"

"sigs.k8s.io/controller-runtime/pkg/log"

"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metadata"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/requestcontrol"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/requestcontrol/plugins/test"
schedulingtypes "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
)

const (
// DestinationEndpointServedVerifierType is the ResponseReceived type that is used in plugins registry.
DestinationEndpointServedVerifierType = "destination-endpoint-served-verifier"
)

var _ requestcontrol.ResponseReceived = &DestinationEndpointServedVerifier{}

// DestinationEndpointServedVerifier is a test-only plugin for conformance tests.
// It verifies that the request was served by the expected endpoint.
// It works by reading Envoy's dynamic metadata, which is passed in the
// Response.ReqMetadata field. This metadata should contain the
// address of the backend endpoint that served the request. The verifier then
// writes this address to the "x-conformance-test-served-endpoint" response header.
// The conformance test client can then validate this header to ensure the request
// was routed correctly.
type DestinationEndpointServedVerifier struct {
typedName plugins.TypedName
}

// TypedName returns the type and name tuple of this plugin instance.
func (f *DestinationEndpointServedVerifier) TypedName() plugins.TypedName {
return f.typedName
}

// WithName sets the name of the filter.
func (f *DestinationEndpointServedVerifier) WithName(name string) *DestinationEndpointServedVerifier {
f.typedName.Name = name
return f
}

// DestinationEndpointServedVerifierFactory defines the factory function for DestinationEndpointServedVerifier.
func DestinationEndpointServedVerifierFactory(name string, _ json.RawMessage, _ plugins.Handle) (plugins.Plugin, error) {
return NewDestinationEndpointServedVerifier().WithName(name), nil
}

func NewDestinationEndpointServedVerifier() *DestinationEndpointServedVerifier {
return &DestinationEndpointServedVerifier{}
}

// ResponseReceived is the handler for the ResponseReceived extension point.
func (p *DestinationEndpointServedVerifier) ResponseReceived(ctx context.Context, request *schedulingtypes.LLMRequest, response *requestcontrol.Response, _ *backend.Pod) {
logger := log.FromContext(ctx).WithName(p.TypedName().String())
logger.V(logging.DEBUG).Info("Verifying destination endpoint served")

reqMetadata := response.ReqMetadata
lbMetadata, ok := reqMetadata[metadata.DestinationEndpointNamespace].(map[string]any)
if !ok {
logger.V(logging.DEBUG).Info("Response does not contain envoy lb metadata, skipping verification")
response.Headers[test.ConformanceTestResultHeader] = "fail: missing envoy lb metadata"
return
}

actualEndpoint, ok := lbMetadata[metadata.DestinationEndpointServedKey].(string)
if !ok {
logger.V(logging.DEBUG).Info("Response does not contain destination endpoint served metadata, skipping verification")
response.Headers[test.ConformanceTestResultHeader] = "fail: missing destination endpoint served metadata"
return
}
response.Headers[test.ConformanceTestResultHeader] = actualEndpoint
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package responsereceived

import (
"context"
"testing"

"github.com/stretchr/testify/require"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metadata"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/requestcontrol"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/requestcontrol/plugins/test"
)

func TestDestinationEndpointServedVerifier_ResponseReceived(t *testing.T) {
testCases := []struct {
name string
response *requestcontrol.Response
expectedHeaderValue string
}{
{
name: "success - endpoint is correctly reported",
response: &requestcontrol.Response{
Headers: make(map[string]string),
ReqMetadata: map[string]any{
metadata.DestinationEndpointNamespace: map[string]any{
metadata.DestinationEndpointServedKey: "10.0.0.1:8080",
},
},
},
expectedHeaderValue: "10.0.0.1:8080",
},
{
name: "failure - missing lb metadata",
response: &requestcontrol.Response{
Headers: make(map[string]string),
ReqMetadata: map[string]any{},
},
expectedHeaderValue: "fail: missing envoy lb metadata",
},
{
name: "failure - missing served endpoint key",
response: &requestcontrol.Response{
Headers: make(map[string]string),
ReqMetadata: map[string]any{
metadata.DestinationEndpointNamespace: map[string]any{
"some-other-key": "some-value",
},
},
},
expectedHeaderValue: "fail: missing destination endpoint served metadata",
},
{
name: "failure - nil metadata",
response: &requestcontrol.Response{
Headers: make(map[string]string),
ReqMetadata: nil,
},
expectedHeaderValue: "fail: missing envoy lb metadata",
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
plugin := NewDestinationEndpointServedVerifier().WithName("test-verifier")

plugin.ResponseReceived(context.Background(), nil, tc.response, nil)

actualHeader, ok := tc.response.Headers[test.ConformanceTestResultHeader]
require.True(t, ok, "Expected header %s to be set", test.ConformanceTestResultHeader)
require.Equal(t, tc.expectedHeaderValue, actualHeader)
})
}
}
4 changes: 4 additions & 0 deletions pkg/epp/requestcontrol/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,8 @@ type Response struct {
IsStreaming bool
// EndOfStream when true indicates that this invocation contains the last chunk of the response
EndOfStream bool
// ReqMetadata is a map of metadata that can be passed from Envoy.
// It is populated with Envoy's dynamic metadata when ext_proc is processing ProcessingRequest_ResponseHeaders.
// Currently, this is only used by conformance test.
ReqMetadata map[string]any
}