Skip to content

Commit 02f8613

Browse files
authored
remove protocol specifics from cmd-line flags (#1296)
* remove protocol specifics from cmd-line flags Signed-off-by: Nir Rozenbaum <[email protected]> * address code review comments Signed-off-by: Nir Rozenbaum <[email protected]> * godoc Signed-off-by: Nir Rozenbaum <[email protected]> * rebase + move ObjectiveKey to metadata pkg Signed-off-by: Nir Rozenbaum <[email protected]> --------- Signed-off-by: Nir Rozenbaum <[email protected]>
1 parent 9792e23 commit 02f8613

File tree

12 files changed

+141
-177
lines changed

12 files changed

+141
-177
lines changed

cmd/epp/runner/runner.go

Lines changed: 11 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -109,20 +109,6 @@ var (
109109
"The path to the certificate for secure serving. The certificate and private key files "+
110110
"are assumed to be named tls.crt and tls.key, respectively. If not set, and secureServing is enabled, "+
111111
"then a self-signed certificate is used.")
112-
// header/metadata flags
113-
destinationEndpointHintKey = flag.String(
114-
"destination-endpoint-hint-key",
115-
runserver.DefaultDestinationEndpointHintKey,
116-
"Header and response metadata key used by Envoy to route to the appropriate pod. This must match Envoy configuration.")
117-
destinationEndpointHintMetadataNamespace = flag.String(
118-
"destination-endpoint-hint-metadata-namespace",
119-
runserver.DefaultDestinationEndpointHintMetadataNamespace,
120-
"The key for the outer namespace struct in the metadata field of the extproc response that is used to wrap the"+
121-
"target endpoint. If not set, then an outer namespace struct should not be created.")
122-
fairnessIDHeaderKey = flag.String(
123-
"fairness-id-header-key",
124-
runserver.DefaultFairnessIDHeaderKey,
125-
"The header key used to pass the fairness ID to be used in Flow Control.")
126112
// metric flags
127113
totalQueuedRequestsMetric = flag.String(
128114
"total-queued-requests-metric",
@@ -201,7 +187,6 @@ func bindEnvToFlags() {
201187
"MODEL_SERVER_METRICS_PATH": "model-server-metrics-path",
202188
"MODEL_SERVER_METRICS_SCHEME": "model-server-metrics-scheme",
203189
"MODEL_SERVER_METRICS_HTTPS_INSECURE_SKIP_VERIFY": "model-server-metrics-https-insecure-skip-verify",
204-
"DESTINATION_ENDPOINT_HINT_KEY": "destination-endpoint-hint-key",
205190
"POOL_NAME": "pool-name",
206191
"POOL_NAMESPACE": "pool-namespace",
207192
// durations & bools work too; flag.Set expects the *string* form
@@ -352,20 +337,17 @@ func (r *Runner) Run(ctx context.Context) error {
352337

353338
// --- Setup ExtProc Server Runner ---
354339
serverRunner := &runserver.ExtProcServerRunner{
355-
GrpcPort: *grpcPort,
356-
DestinationEndpointHintMetadataNamespace: *destinationEndpointHintMetadataNamespace,
357-
DestinationEndpointHintKey: *destinationEndpointHintKey,
358-
FairnessIDHeaderKey: *fairnessIDHeaderKey,
359-
PoolNamespacedName: poolNamespacedName,
360-
PoolGKNN: poolGKNN,
361-
Datastore: datastore,
362-
SecureServing: *secureServing,
363-
HealthChecking: *healthChecking,
364-
CertPath: *certPath,
365-
RefreshPrometheusMetricsInterval: *refreshPrometheusMetricsInterval,
366-
MetricsStalenessThreshold: *metricsStalenessThreshold,
367-
Director: director,
368-
SaturationDetector: saturationDetector,
340+
GrpcPort: *grpcPort,
341+
PoolNamespacedName: poolNamespacedName,
342+
PoolGKNN: poolGKNN,
343+
Datastore: datastore,
344+
SecureServing: *secureServing,
345+
HealthChecking: *healthChecking,
346+
CertPath: *certPath,
347+
RefreshPrometheusMetricsInterval: *refreshPrometheusMetricsInterval,
348+
MetricsStalenessThreshold: *metricsStalenessThreshold,
349+
Director: director,
350+
SaturationDetector: saturationDetector,
369351
}
370352
if err := serverRunner.SetupWithManager(ctx, mgr); err != nil {
371353
setupLog.Error(err, "Failed to setup EPP controllers")

pkg/epp/handlers/request.go

Lines changed: 16 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,10 @@ import (
2424
extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
2525
"google.golang.org/protobuf/types/known/structpb"
2626

27+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metadata"
2728
errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/error"
2829
)
2930

30-
const (
31-
ObjectiveKey = "x-gateway-inference-objective"
32-
)
33-
3431
func (s *StreamingServer) HandleRequestHeaders(reqCtx *RequestContext, req *extProcPb.ProcessingRequest_RequestHeaders) error {
3532
reqCtx.RequestReceivedTimestamp = time.Now()
3633

@@ -61,13 +58,13 @@ func (s *StreamingServer) HandleRequestHeaders(reqCtx *RequestContext, req *extP
6158
reqCtx.Request.Headers[header.Key] = header.Value
6259
}
6360
switch header.Key {
64-
case s.fairnessIDHeaderKey:
61+
case metadata.FlowFairnessIDKey:
6562
reqCtx.FairnessID = reqCtx.Request.Headers[header.Key]
6663
// remove the fairness ID header from the request headers,
6764
// this is not data that should be manipulated or sent to the backend.
6865
// It is only used for flow control.
6966
delete(reqCtx.Request.Headers, header.Key)
70-
case ObjectiveKey:
67+
case metadata.ObjectiveKey:
7168
reqCtx.ObjectiveKey = reqCtx.Request.Headers[header.Key]
7269
// remove the objective header from the request headers,
7370
// this is not data that should be manipulated or sent to the backend.
@@ -117,7 +114,7 @@ func (s *StreamingServer) generateHeaders(reqCtx *RequestContext) []*configPb.He
117114
headers := []*configPb.HeaderValueOption{
118115
{
119116
Header: &configPb.HeaderValue{
120-
Key: s.destinationEndpointHintKey,
117+
Key: metadata.DestinationEndpointKey,
121118
RawValue: []byte(reqCtx.TargetEndpoint),
122119
},
123120
},
@@ -146,27 +143,21 @@ func (s *StreamingServer) generateHeaders(reqCtx *RequestContext) []*configPb.He
146143
}
147144

148145
func (s *StreamingServer) generateMetadata(endpoint string) *structpb.Struct {
149-
targetEndpointValue := &structpb.Struct{
146+
return &structpb.Struct{
150147
Fields: map[string]*structpb.Value{
151-
s.destinationEndpointHintKey: {
152-
Kind: &structpb.Value_StringValue{
153-
StringValue: endpoint,
154-
},
155-
},
156-
},
157-
}
158-
dynamicMetadata := targetEndpointValue
159-
if s.destinationEndpointHintMetadataNamespace != "" {
160-
// If a namespace is defined, wrap the selected endpoint with that.
161-
dynamicMetadata = &structpb.Struct{
162-
Fields: map[string]*structpb.Value{
163-
s.destinationEndpointHintMetadataNamespace: {
164-
Kind: &structpb.Value_StructValue{
165-
StructValue: targetEndpointValue,
148+
metadata.DestinationEndpointNamespace: {
149+
Kind: &structpb.Value_StructValue{
150+
StructValue: &structpb.Struct{
151+
Fields: map[string]*structpb.Value{
152+
metadata.DestinationEndpointKey: {
153+
Kind: &structpb.Value_StringValue{
154+
StringValue: endpoint,
155+
},
156+
},
157+
},
166158
},
167159
},
168160
},
169-
}
161+
},
170162
}
171-
return dynamicMetadata
172163
}

pkg/epp/handlers/request_test.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,14 @@ import (
2121

2222
configPb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
2323
extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
24+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metadata"
2425
)
2526

2627
func TestHandleRequestHeaders(t *testing.T) {
2728
t.Parallel()
2829

2930
// Setup a mock server and request context
30-
server := &StreamingServer{
31-
fairnessIDHeaderKey: "test-fairness-id",
32-
}
31+
server := &StreamingServer{}
3332

3433
reqCtx := &RequestContext{
3534
Request: &Request{
@@ -46,7 +45,7 @@ func TestHandleRequestHeaders(t *testing.T) {
4645
Value: "test-value",
4746
},
4847
{
49-
Key: "test-fairness-id",
48+
Key: metadata.FlowFairnessIDKey,
5049
Value: "test-fairness-id-value",
5150
},
5251
},
@@ -63,7 +62,7 @@ func TestHandleRequestHeaders(t *testing.T) {
6362
if reqCtx.FairnessID != "test-fairness-id-value" {
6463
t.Errorf("expected fairness ID to be 'test-fairness-id-value', got %s", reqCtx.FairnessID)
6564
}
66-
if reqCtx.Request.Headers["test-fairness-id"] == "test-fairness-id-value" {
65+
if reqCtx.Request.Headers[metadata.FlowFairnessIDKey] == "test-fairness-id-value" {
6766
t.Errorf("expected fairness ID header to be removed from request headers, but it was not")
6867
}
6968
}

pkg/epp/handlers/server.go

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,10 @@ const (
4444
bodyByteLimit = 62000
4545
)
4646

47-
func NewStreamingServer(destinationEndpointHintMetadataNamespace, destinationEndpointHintKey, fairnessIDHeaderKey string, datastore Datastore, director Director) *StreamingServer {
47+
func NewStreamingServer(datastore Datastore, director Director) *StreamingServer {
4848
return &StreamingServer{
49-
destinationEndpointHintMetadataNamespace: destinationEndpointHintMetadataNamespace,
50-
destinationEndpointHintKey: destinationEndpointHintKey,
51-
fairnessIDHeaderKey: fairnessIDHeaderKey,
52-
director: director,
53-
datastore: datastore,
49+
director: director,
50+
datastore: datastore,
5451
}
5552
}
5653

@@ -67,15 +64,8 @@ type Datastore interface {
6764
// Server implements the Envoy external processing server.
6865
// https://www.envoyproxy.io/docs/envoy/latest/api-v3/service/ext_proc/v3/external_processor.proto
6966
type StreamingServer struct {
70-
// The key of the header to specify the target pod address. This value needs to match Envoy
71-
// configuration.
72-
destinationEndpointHintKey string
73-
// The key acting as the outer namespace struct in the metadata extproc response to communicate
74-
// back the picked endpoints.
75-
destinationEndpointHintMetadataNamespace string
76-
fairnessIDHeaderKey string
77-
datastore Datastore
78-
director Director
67+
datastore Datastore
68+
director Director
7969
}
8070

8171
// RequestContext stores context information during the life time of an HTTP request.

pkg/epp/metadata/consts.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package metadata
18+
19+
const (
20+
// SubsetFilterNamespace is the key for the outer namespace struct in the metadata field of the extproc request that is used to wrap the subset filter.
21+
SubsetFilterNamespace = "envoy.lb.subset_hint"
22+
// SubsetFilterKey is the metadata key used by Envoy to specify an array candidate pods for serving the request.
23+
// If not specified, all the pods that are associated with the pool are candidates.
24+
SubsetFilterKey = "x-gateway-destination-endpoint-subset"
25+
// DestinationEndpointNamespace is the key for the outer namespace struct in the metadata field of the extproc response that is used to wrap the target endpoint.
26+
DestinationEndpointNamespace = "envoy.lb"
27+
// DestinationEndpointKey is the header and response metadata key used by Envoy to route to the appropriate pod.
28+
DestinationEndpointKey = "x-gateway-destination-endpoint"
29+
// FlowFairnessIDKey is the header key used to pass the fairness ID to be used in Flow Control.
30+
FlowFairnessIDKey = "x-gateway-inference-fairness-id"
31+
// ObjectiveKey is the header key used to specify the objective of an incoming request.
32+
ObjectiveKey = "x-gateway-inference-objective"
33+
)

pkg/epp/requestcontrol/director.go

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,18 +35,14 @@ import (
3535
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
3636
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
3737
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/handlers"
38+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metadata"
3839
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
3940
schedulingtypes "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
4041
errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/error"
4142
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
4243
requtil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/request"
4344
)
4445

45-
const (
46-
subsetHintNamespace = "envoy.lb.subset_hint"
47-
subsetHintKey = "x-gateway-destination-endpoint-subset"
48-
)
49-
5046
// Scheduler defines the interface required by the Director for scheduling.
5147
type Scheduler interface {
5248
Schedule(ctx context.Context, request *schedulingtypes.LLMRequest, candidatePods []schedulingtypes.Pod) (result *schedulingtypes.SchedulingResult, err error)
@@ -196,13 +192,13 @@ func (d *Director) admitRequest(ctx context.Context, requestCriticality v1alpha2
196192
func (d *Director) getCandidatePodsForScheduling(ctx context.Context, requestMetadata map[string]any) []schedulingtypes.Pod {
197193
loggerTrace := log.FromContext(ctx).V(logutil.TRACE)
198194

199-
subsetMap, found := requestMetadata[subsetHintNamespace].(map[string]any)
195+
subsetMap, found := requestMetadata[metadata.SubsetFilterNamespace].(map[string]any)
200196
if !found {
201197
return d.toSchedulerPodMetrics(d.datastore.PodList(backendmetrics.AllPodPredicate))
202198
}
203199

204200
// Check if endpoint key is present in the subset map and ensure there is at least one value
205-
endpointSubsetList, found := subsetMap[subsetHintKey].([]any)
201+
endpointSubsetList, found := subsetMap[metadata.SubsetFilterKey].([]any)
206202
if !found {
207203
return d.toSchedulerPodMetrics(d.datastore.PodList(backendmetrics.AllPodPredicate))
208204
} else if len(endpointSubsetList) == 0 {

pkg/epp/requestcontrol/director_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import (
3939
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
4040
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
4141
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/handlers"
42+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metadata"
4243
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
4344
schedulingtypes "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
4445
errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/error"
@@ -435,8 +436,8 @@ func TestDirector_HandleRequest(t *testing.T) {
435436
func TestGetCandidatePodsForScheduling(t *testing.T) {
436437
var makeFilterMetadata = func(data []any) map[string]any {
437438
return map[string]any{
438-
"envoy.lb.subset_hint": map[string]any{
439-
"x-gateway-destination-endpoint-subset": data,
439+
metadata.SubsetFilterNamespace: map[string]any{
440+
metadata.SubsetFilterKey: data,
440441
},
441442
}
442443
}
@@ -493,7 +494,7 @@ func TestGetCandidatePodsForScheduling(t *testing.T) {
493494
},
494495
{
495496
name: "SubsetFilter, namespace present filter not present — return all pods",
496-
metadata: map[string]any{"envoy.lb.subset_hint": map[string]any{}},
497+
metadata: map[string]any{metadata.SubsetFilterNamespace: map[string]any{}},
497498
output: []schedulingtypes.Pod{
498499
&schedulingtypes.PodMetrics{
499500
Pod: outputPod1,

0 commit comments

Comments
 (0)