Skip to content

Commit 6b82b89

Browse files
authored
Add subsetting logic for epp (#981)
1 parent 9111ef2 commit 6b82b89

File tree

12 files changed

+494
-23
lines changed

12 files changed

+494
-23
lines changed

cmd/epp/runner/runner.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ import (
4646
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/saturationdetector"
4747
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling"
4848
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework"
49+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/filter"
4950
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/multi/prefix"
5051
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/picker"
5152
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/profile"
@@ -304,6 +305,7 @@ func (r *Runner) initializeScheduler() (*scheduling.Scheduler, error) {
304305
kvCacheScorerWeight := envutil.GetEnvInt("KV_CACHE_SCORE_WEIGHT", scorer.DefaultKVCacheScorerWeight, setupLog)
305306

306307
schedulerProfile := framework.NewSchedulerProfile().
308+
WithFilters(filter.NewSubsetFilter()).
307309
WithScorers(framework.NewWeightedScorer(scorer.NewQueueScorer(), queueScorerWeight),
308310
framework.NewWeightedScorer(scorer.NewKVCacheScorer(), kvCacheScorerWeight)).
309311
WithPicker(picker.NewMaxScorePicker())

pkg/epp/handlers/server.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -111,8 +111,9 @@ type RequestContext struct {
111111
}
112112

113113
type Request struct {
114-
Headers map[string]string
115-
Body map[string]interface{}
114+
Headers map[string]string
115+
Body map[string]interface{}
116+
Metadata map[string]any
116117
}
117118
type Response struct {
118119
Headers map[string]string
@@ -141,8 +142,9 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer)
141142
reqCtx := &RequestContext{
142143
RequestState: RequestReceived,
143144
Request: &Request{
144-
Headers: make(map[string]string),
145-
Body: make(map[string]interface{}),
145+
Headers: make(map[string]string),
146+
Body: make(map[string]interface{}),
147+
Metadata: make(map[string]any),
146148
},
147149
Response: &Response{
148150
Headers: make(map[string]string),
@@ -185,6 +187,8 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer)
185187
return status.Errorf(codes.Unknown, "cannot receive stream request: %v", err)
186188
}
187189

190+
reqCtx.Request.Metadata = requtil.ExtractMetadataValues(req)
191+
188192
switch v := req.Request.(type) {
189193
case *extProcPb.ProcessingRequest_RequestHeaders:
190194
if requestId := requtil.ExtractHeaderValue(v, requtil.RequestIdHeaderKey); len(requestId) > 0 {

pkg/epp/requestcontrol/director.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -118,14 +118,15 @@ func (d *Director) HandleRequest(ctx context.Context, reqCtx *handlers.RequestCo
118118
}
119119

120120
// Prepare LLMRequest (needed for both saturation detection and Scheduler)
121-
reqCtx.SchedulingRequest = &schedulingtypes.LLMRequest{
122-
RequestId: reqCtx.Request.Headers[requtil.RequestIdHeaderKey],
123-
TargetModel: reqCtx.ResolvedTargetModel,
124-
Prompt: prompt,
125-
Headers: reqCtx.Request.Headers,
126-
}
121+
reqCtx.SchedulingRequest = schedulingtypes.NewLLMRequest(
122+
reqCtx.Request.Headers[requtil.RequestIdHeaderKey],
123+
reqCtx.ResolvedTargetModel,
124+
prompt,
125+
reqCtx.Request.Headers,
126+
reqCtx.Request.Metadata)
127127

128128
logger = logger.WithValues("model", reqCtx.Model, "resolvedTargetModel", reqCtx.ResolvedTargetModel, "criticality", requestCriticality)
129+
129130
ctx = log.IntoContext(ctx, logger)
130131
logger.V(logutil.DEBUG).Info("LLM request assembled")
131132

pkg/epp/scheduling/framework/plugins/filter/filter_test.go

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -251,3 +251,142 @@ func TestLoRASoftAffinityDistribution(t *testing.T) {
251251
actualAvailablePercent, availableLowerBound, availableUpperBound)
252252
}
253253
}
254+
255+
func TestSubsettingFilter(t *testing.T) {
256+
var makeFilterMetadata = func(data []interface{}) map[string]any {
257+
return map[string]any{
258+
"envoy.lb.subset_hint": map[string]any{
259+
"x-gateway-destination-endpoint-subset": data,
260+
},
261+
}
262+
}
263+
264+
tests := []struct {
265+
name string
266+
metadata map[string]any
267+
filter framework.Filter
268+
input []types.Pod
269+
output []types.Pod
270+
}{
271+
{
272+
name: "SubsetFilter, filter not present — return all pods",
273+
filter: &SubsetFilter{},
274+
metadata: map[string]any{},
275+
input: []types.Pod{
276+
&types.PodMetrics{
277+
Pod: &backend.Pod{Address: "10.0.0.1"},
278+
},
279+
&types.PodMetrics{
280+
Pod: &backend.Pod{Address: "10.0.0.2"},
281+
},
282+
},
283+
output: []types.Pod{
284+
&types.PodMetrics{
285+
Pod: &backend.Pod{Address: "10.0.0.1"},
286+
},
287+
&types.PodMetrics{
288+
Pod: &backend.Pod{Address: "10.0.0.2"},
289+
},
290+
},
291+
},
292+
{
293+
name: "SubsetFilter, namespace present filter not present — return all pods",
294+
filter: &SubsetFilter{},
295+
metadata: map[string]any{"envoy.lb.subset_hint": map[string]any{}},
296+
input: []types.Pod{
297+
&types.PodMetrics{
298+
Pod: &backend.Pod{Address: "10.0.0.1"},
299+
},
300+
&types.PodMetrics{
301+
Pod: &backend.Pod{Address: "10.0.0.2"},
302+
},
303+
},
304+
output: []types.Pod{
305+
&types.PodMetrics{
306+
Pod: &backend.Pod{Address: "10.0.0.1"},
307+
},
308+
&types.PodMetrics{
309+
Pod: &backend.Pod{Address: "10.0.0.2"},
310+
},
311+
},
312+
},
313+
{
314+
name: "SubsetFilter, filter present with empty list — return no pods",
315+
filter: &SubsetFilter{},
316+
metadata: makeFilterMetadata([]interface{}{}),
317+
input: []types.Pod{
318+
&types.PodMetrics{
319+
Pod: &backend.Pod{Address: "10.0.0.1"},
320+
},
321+
&types.PodMetrics{
322+
Pod: &backend.Pod{Address: "10.0.0.2"},
323+
},
324+
},
325+
output: []types.Pod{},
326+
},
327+
{
328+
name: "SubsetFilter, subset with one matching pod",
329+
metadata: makeFilterMetadata([]interface{}{"10.0.0.1"}),
330+
filter: &SubsetFilter{},
331+
input: []types.Pod{
332+
&types.PodMetrics{
333+
Pod: &backend.Pod{Address: "10.0.0.1"},
334+
},
335+
&types.PodMetrics{
336+
Pod: &backend.Pod{Address: "10.0.0.2"},
337+
},
338+
},
339+
output: []types.Pod{
340+
&types.PodMetrics{
341+
Pod: &backend.Pod{Address: "10.0.0.1"},
342+
},
343+
},
344+
},
345+
{
346+
name: "SubsetFilter, subset with multiple matching pods",
347+
metadata: makeFilterMetadata([]interface{}{"10.0.0.1", "10.0.0.2", "10.0.0.3"}),
348+
filter: &SubsetFilter{},
349+
input: []types.Pod{
350+
&types.PodMetrics{
351+
Pod: &backend.Pod{Address: "10.0.0.1"},
352+
},
353+
&types.PodMetrics{
354+
Pod: &backend.Pod{Address: "10.0.0.2"},
355+
},
356+
},
357+
output: []types.Pod{
358+
&types.PodMetrics{
359+
Pod: &backend.Pod{Address: "10.0.0.1"},
360+
},
361+
&types.PodMetrics{
362+
Pod: &backend.Pod{Address: "10.0.0.2"},
363+
},
364+
},
365+
},
366+
{
367+
name: "SubsetFilter, subset with no matching pods",
368+
metadata: makeFilterMetadata([]interface{}{"10.0.0.3"}),
369+
filter: &SubsetFilter{},
370+
input: []types.Pod{
371+
&types.PodMetrics{
372+
Pod: &backend.Pod{Address: "10.0.0.1"},
373+
},
374+
&types.PodMetrics{
375+
Pod: &backend.Pod{Address: "10.0.0.2"},
376+
},
377+
},
378+
output: []types.Pod{},
379+
},
380+
}
381+
382+
for _, test := range tests {
383+
t.Run(test.name, func(t *testing.T) {
384+
req := types.NewLLMRequest(uuid.NewString(), "", "", nil, test.metadata)
385+
got := test.filter.Filter(context.Background(), types.NewCycleState(), req, test.input)
386+
387+
if diff := cmp.Diff(test.output, got); diff != "" {
388+
t.Errorf("Unexpected output (-want +got): %v", diff)
389+
}
390+
})
391+
}
392+
}
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package filter
18+
19+
import (
20+
"context"
21+
"strings"
22+
23+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework"
24+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
25+
)
26+
27+
const (
28+
SubsetFilterType = "subset"
29+
30+
subsetHintKey = "x-gateway-destination-endpoint-subset"
31+
subsetHintNamespace = "envoy.lb.subset_hint"
32+
)
33+
34+
// compile-time type assertion
35+
var _ framework.Filter = &SubsetFilter{}
36+
37+
// NewSubsetFilter initializes a new SubsetFilter.
38+
func NewSubsetFilter() *SubsetFilter {
39+
return &SubsetFilter{}
40+
}
41+
42+
// SubsetFilter filters Pods based on the subset hint provided by the proxy via filterMetadata.
43+
type SubsetFilter struct{}
44+
45+
// Name returns the name of the filter.
46+
func (f *SubsetFilter) Name() string {
47+
return "subset-hint"
48+
}
49+
50+
// Type returns the type of the filter.
51+
func (f *SubsetFilter) Type() string {
52+
return SubsetFilterType
53+
}
54+
55+
// Filter filters out pods that are not in the subset provided in filterMetadata.
56+
func (f *SubsetFilter) Filter(_ context.Context, _ *types.CycleState, request *types.LLMRequest, pods []types.Pod) []types.Pod {
57+
// Check if subset namespace key is present in the metadata map
58+
subsetMap, found := request.GetMetadata()[subsetHintNamespace].(map[string]any)
59+
if !found {
60+
return pods
61+
}
62+
63+
// Check if endpoint key is present in the subset map and ensure there is at least one value
64+
endpointSubsetList, found := subsetMap[subsetHintKey].([]interface{})
65+
if !found {
66+
return pods
67+
} else if len(endpointSubsetList) == 0 {
68+
return []types.Pod{}
69+
}
70+
71+
// Create a map of endpoint addrs for easy lookup
72+
endpoints := make(map[string]bool)
73+
for _, endpoint := range endpointSubsetList {
74+
// Extract address from endpoint
75+
// The endpoint is formatted as "<address>:<port>" (ex. "10.0.1.0:8080")
76+
epStr := strings.Split(endpoint.(string), ":")[0]
77+
endpoints[epStr] = true
78+
}
79+
80+
// Filter based on address
81+
filteredPods := []types.Pod{}
82+
for _, pod := range pods {
83+
if _, found := endpoints[pod.GetPod().Address]; found {
84+
filteredPods = append(filteredPods, pod)
85+
}
86+
}
87+
88+
return filteredPods
89+
}

pkg/epp/scheduling/scheduler.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ func NewScheduler() *Scheduler {
4444
// it's possible to call NewSchedulerWithConfig to pass a different scheduler config.
4545
// For build time plugins changes, it's recommended to call in main.go to NewSchedulerWithConfig.
4646
loraAffinityFilter := filter.NewLoraAffinityFilter(config.Conf.LoraAffinityThreshold)
47+
endpointSubsetFilter := filter.NewSubsetFilter()
4748
leastQueueFilter := filter.NewLeastQueueFilter()
4849
leastKvCacheFilter := filter.NewLeastKVCacheFilter()
4950

@@ -70,7 +71,7 @@ func NewScheduler() *Scheduler {
7071
}
7172

7273
defaultProfile := framework.NewSchedulerProfile().
73-
WithFilters(lowLatencyFilter).
74+
WithFilters(endpointSubsetFilter, lowLatencyFilter).
7475
WithPicker(&picker.RandomPicker{})
7576

7677
profileHandler := profile.NewSingleProfileHandler()

pkg/epp/scheduling/types/types.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,29 @@ type LLMRequest struct {
3333
Prompt string
3434
// Headers is a map of the request headers.
3535
Headers map[string]string
36+
37+
// metadata is a map of metadata in the request
38+
metadata map[string]any
39+
}
40+
41+
func NewLLMRequest(reqID, targetModel, prompt string, headers map[string]string, metadata map[string]any) *LLMRequest {
42+
return &LLMRequest{
43+
RequestId: reqID,
44+
TargetModel: targetModel,
45+
Prompt: prompt,
46+
Headers: headers,
47+
metadata: metadata,
48+
}
3649
}
3750

3851
func (r *LLMRequest) String() string {
3952
return fmt.Sprintf("RequestID: %s, TargetModel: %s, PromptLength: %d, Headers: %v", r.RequestId, r.TargetModel, len(r.Prompt), r.Headers)
4053
}
4154

55+
func (r *LLMRequest) GetMetadata() map[string]any {
56+
return r.metadata
57+
}
58+
4259
type Pod interface {
4360
GetPod() *backend.Pod
4461
GetMetrics() *backendmetrics.MetricsState

pkg/epp/util/request/metadata.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package request
18+
19+
import (
20+
extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
21+
)
22+
23+
func ExtractMetadataValues(req *extProcPb.ProcessingRequest) map[string]any {
24+
metadata := make(map[string]any)
25+
if req != nil && req.MetadataContext != nil && req.MetadataContext.FilterMetadata != nil {
26+
for key, val := range req.MetadataContext.FilterMetadata {
27+
metadata[key] = val.AsMap()
28+
}
29+
}
30+
return metadata
31+
}

0 commit comments

Comments
 (0)