Skip to content

Commit 2ff46b5

Browse files
authored
feat(Conformance): Add a header based filter to make a controllable epp behavior determined by request header. (#922)
* Add inital head based filter for testing purpose. * Move conformance scheduler to conformance folder. * Rephrase comment. * Rename folder. * Delete the header based filter under epp. * Fix header. * Add path to Dockerfile.
1 parent 8b4f852 commit 2ff46b5

File tree

6 files changed

+320
-2
lines changed

6 files changed

+320
-2
lines changed

Dockerfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ RUN go mod download
1919
# Sources
2020
COPY cmd/epp ./cmd
2121
COPY pkg/epp ./pkg/epp
22+
COPY conformance/testing-epp ./conformance/testing-epp
2223
COPY internal ./internal
2324
COPY api ./api
2425
WORKDIR /src/cmd

cmd/epp/main.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import (
3535
"sigs.k8s.io/controller-runtime/pkg/metrics/filters"
3636
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
3737

38+
conformance_epp "sigs.k8s.io/gateway-api-inference-extension/conformance/testing-epp"
3839
"sigs.k8s.io/gateway-api-inference-extension/internal/runnable"
3940
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
4041
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
@@ -111,8 +112,9 @@ var (
111112
setupLog = ctrl.Log.WithName("setup")
112113

113114
// Environment variables
114-
schedulerV2 = envutil.GetEnvBool("EXPERIMENTAL_USE_SCHEDULER_V2", false, setupLog)
115-
prefixCacheScheduling = envutil.GetEnvBool("ENABLE_PREFIX_CACHE_SCHEDULING", false, setupLog)
115+
schedulerV2 = envutil.GetEnvBool("EXPERIMENTAL_USE_SCHEDULER_V2", false, setupLog)
116+
prefixCacheScheduling = envutil.GetEnvBool("ENABLE_PREFIX_CACHE_SCHEDULING", false, setupLog)
117+
reqHeaderBasedSchedulerForTesting = envutil.GetEnvBool("ENABLE_REQ_HEADER_BASED_SCHEDULER_FOR_TESTING", false, setupLog)
116118
)
117119

118120
func loadPrefixCacheConfig() prefix.Config {
@@ -224,6 +226,10 @@ func run() error {
224226
scheduler = scheduling.NewSchedulerWithConfig(datastore, schedulerConfig)
225227
}
226228

229+
if reqHeaderBasedSchedulerForTesting {
230+
scheduler = conformance_epp.NewReqHeaderBasedScheduler(datastore)
231+
}
232+
227233
saturationDetector := saturationdetector.NewDetector(sdConfig, datastore, ctrl.Log)
228234

229235
director := requestcontrol.NewDirector(datastore, scheduler, saturationDetector) // can call "director.WithPostResponsePlugins" to add post response plugins
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package filter
18+
19+
import (
20+
"context"
21+
"testing"
22+
23+
"github.com/google/go-cmp/cmp"
24+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend"
25+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework"
26+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
27+
)
28+
29+
func TestFilter(t *testing.T) {
30+
tests := []struct {
31+
name string
32+
req *types.LLMRequest
33+
filter framework.Filter
34+
input []types.Pod
35+
output []types.Pod
36+
}{
37+
{
38+
name: "TestHeaderBasedFilter, header endpoint unset in request",
39+
req: &types.LLMRequest{}, // Delieverately unset the header.
40+
filter: &HeaderBasedTestingFilter{},
41+
input: []types.Pod{
42+
&types.PodMetrics{
43+
Pod: &backend.Pod{
44+
Address: "test-endpoint",
45+
},
46+
},
47+
},
48+
output: []types.Pod{},
49+
},
50+
{
51+
name: "TestHeaderBasedFilter, header endpoint set in request but no match",
52+
req: &types.LLMRequest{Headers: map[string]string{headerTestEppEndPointSelectionKey: "test-endpoint"}},
53+
filter: &HeaderBasedTestingFilter{},
54+
input: []types.Pod{
55+
&types.PodMetrics{
56+
Pod: &backend.Pod{
57+
Address: "test-endpoint-unmatch",
58+
},
59+
},
60+
},
61+
output: []types.Pod{},
62+
},
63+
{
64+
name: "TestHeaderBasedFilter, header endpoint set",
65+
req: &types.LLMRequest{Headers: map[string]string{headerTestEppEndPointSelectionKey: "test-endpoint"}},
66+
filter: &HeaderBasedTestingFilter{},
67+
input: []types.Pod{
68+
&types.PodMetrics{
69+
Pod: &backend.Pod{
70+
Address: "test-endpoint",
71+
},
72+
},
73+
},
74+
output: []types.Pod{
75+
&types.PodMetrics{
76+
Pod: &backend.Pod{
77+
Address: "test-endpoint",
78+
},
79+
},
80+
},
81+
},
82+
}
83+
84+
for _, test := range tests {
85+
t.Run(test.name, func(t *testing.T) {
86+
got := test.filter.Filter(context.Background(), test.req, types.NewCycleState(), test.input)
87+
88+
if diff := cmp.Diff(test.output, got); diff != "" {
89+
t.Errorf("Unexpected output (-want +got): %v", diff)
90+
}
91+
})
92+
}
93+
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package filter
18+
19+
import (
20+
"context"
21+
22+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework"
23+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
24+
)
25+
26+
const (
27+
headerTestEppEndPointSelectionKey = "test-epp-endpoint-selection"
28+
)
29+
30+
// compile-time type assertion
31+
var _ framework.Filter = &HeaderBasedTestingFilter{}
32+
33+
// NewHeaderBasedTestingFilter initializes a new HeaderBasedTestingFilter and returns its pointer.
34+
// This should be only used in testing purpose.
35+
func NewHeaderBasedTestingFilter() *HeaderBasedTestingFilter {
36+
return &HeaderBasedTestingFilter{}
37+
}
38+
39+
// HeaderBasedTestingFilter filters Pods based on an address specified in the "test-epp-endpoint-selection" request header.
40+
type HeaderBasedTestingFilter struct{}
41+
42+
// Name returns the name of the filter.
43+
func (f *HeaderBasedTestingFilter) Name() string {
44+
return "test-header-based"
45+
}
46+
47+
// Filter filters out pods that doesn't meet the filter criteria.
48+
func (f *HeaderBasedTestingFilter) Filter(_ context.Context, request *types.LLMRequest, _ *types.CycleState, pods []types.Pod) []types.Pod {
49+
filteredPods := []types.Pod{}
50+
51+
endPointInReqeust, found := request.Headers[headerTestEppEndPointSelectionKey]
52+
if !found {
53+
return filteredPods
54+
}
55+
56+
for _, pod := range pods {
57+
if pod.GetPod().Address == endPointInReqeust {
58+
filteredPods = append(filteredPods, pod)
59+
}
60+
}
61+
return filteredPods
62+
}

conformance/testing-epp/scheduler.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package scheduling
18+
19+
import (
20+
"sigs.k8s.io/gateway-api-inference-extension/conformance/testing-epp/plugins/filter"
21+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling"
22+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework"
23+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/picker"
24+
profilepicker "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/profile-picker"
25+
)
26+
27+
// NewReqHeaderBasedScheduler creates a scheduler for conformance tests that selects
28+
// an endpoint based on the "test-epp-endpoint-selection" request header. If the
29+
// header is missing or the specified endpoint doesn't exist, no endpoint is returned.
30+
func NewReqHeaderBasedScheduler(datastore scheduling.Datastore) *scheduling.Scheduler {
31+
predicatableSchedulerProfile := framework.NewSchedulerProfile().WithFilters(filter.NewHeaderBasedTestingFilter()).WithPicker(picker.NewMaxScorePicker())
32+
return scheduling.NewSchedulerWithConfig(datastore, scheduling.NewSchedulerConfig(
33+
profilepicker.NewAllProfilesPicker(), map[string]*framework.SchedulerProfile{"req-header-based-profile": predicatableSchedulerProfile}))
34+
}
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package scheduling
18+
19+
import (
20+
"context"
21+
"testing"
22+
23+
"github.com/google/go-cmp/cmp"
24+
"github.com/google/uuid"
25+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend"
26+
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" // Import config for thresholds
27+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
28+
)
29+
30+
// Tests the scheduler for conformance tests.
31+
func TestSchedule(t *testing.T) {
32+
tests := []struct {
33+
name string
34+
input []*backendmetrics.FakePodMetrics
35+
req *types.LLMRequest
36+
wantRes map[string]*types.Result
37+
err bool
38+
}{
39+
{
40+
name: "no pods in datastore and req header is set",
41+
req: &types.LLMRequest{
42+
Headers: map[string]string{"test-epp-endpoint-selection": "random-endpoint"},
43+
RequestId: uuid.NewString(),
44+
},
45+
wantRes: nil,
46+
err: true,
47+
},
48+
{
49+
name: "req header not set",
50+
input: []*backendmetrics.FakePodMetrics{
51+
{Pod: &backend.Pod{Address: "random-endpoint"}},
52+
},
53+
req: &types.LLMRequest{
54+
Headers: map[string]string{}, // Deliberately set an empty header.
55+
RequestId: uuid.NewString(),
56+
},
57+
wantRes: nil,
58+
err: true,
59+
},
60+
{
61+
name: "no pods address in datastore matches req header address",
62+
input: []*backendmetrics.FakePodMetrics{
63+
{Pod: &backend.Pod{Address: "nonmatched-endpoint"}},
64+
},
65+
req: &types.LLMRequest{
66+
Headers: map[string]string{"test-epp-endpoint-selection": "matched-endpoint"},
67+
RequestId: uuid.NewString(),
68+
},
69+
wantRes: nil,
70+
err: true,
71+
},
72+
{
73+
name: "one pod address in datastore matches req header address",
74+
input: []*backendmetrics.FakePodMetrics{
75+
{Pod: &backend.Pod{Address: "nonmatched-endpoint"}},
76+
{Pod: &backend.Pod{Address: "matched-endpoint"}},
77+
},
78+
req: &types.LLMRequest{
79+
Headers: map[string]string{"test-epp-endpoint-selection": "matched-endpoint"},
80+
RequestId: uuid.NewString(),
81+
},
82+
wantRes: map[string]*types.Result{
83+
"req-header-based-profile": {
84+
TargetPod: &types.ScoredPod{
85+
Pod: &types.PodMetrics{
86+
Pod: &backend.Pod{
87+
Address: "matched-endpoint",
88+
Labels: map[string]string{},
89+
},
90+
},
91+
},
92+
},
93+
},
94+
},
95+
}
96+
97+
for _, test := range tests {
98+
t.Run(test.name, func(t *testing.T) {
99+
scheduler := NewReqHeaderBasedScheduler(&fakeDataStore{pods: test.input})
100+
got, err := scheduler.Schedule(context.Background(), test.req)
101+
if test.err != (err != nil) {
102+
t.Errorf("Unexpected error, got %v, want %v", err, test.err)
103+
}
104+
105+
if diff := cmp.Diff(test.wantRes, got); diff != "" {
106+
t.Errorf("Unexpected output (-want +got): %v", diff)
107+
}
108+
})
109+
}
110+
}
111+
112+
type fakeDataStore struct {
113+
pods []*backendmetrics.FakePodMetrics
114+
}
115+
116+
func (fds *fakeDataStore) PodGetAll() []backendmetrics.PodMetrics {
117+
pm := make([]backendmetrics.PodMetrics, 0, len(fds.pods))
118+
for _, pod := range fds.pods {
119+
pm = append(pm, pod)
120+
}
121+
return pm
122+
}

0 commit comments

Comments
 (0)