Skip to content

Commit 97ab83c

Browse files
committed
adding fairness-id header to be used in flow control
1 parent b1b43dc commit 97ab83c

File tree

10 files changed

+12626
-35
lines changed

10 files changed

+12626
-35
lines changed

cmd/epp/runner/runner.go

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -78,15 +78,6 @@ var (
7878
"enable-pprof",
7979
runserver.DefaultEnablePprof,
8080
"Enables pprof handlers. Defaults to true. Set to false to disable pprof handlers.")
81-
destinationEndpointHintKey = flag.String(
82-
"destination-endpoint-hint-key",
83-
runserver.DefaultDestinationEndpointHintKey,
84-
"Header and response metadata key used by Envoy to route to the appropriate pod. This must match Envoy configuration.")
85-
destinationEndpointHintMetadataNamespace = flag.String(
86-
"destination-endpoint-hint-metadata-namespace",
87-
runserver.DefaultDestinationEndpointHintMetadataNamespace,
88-
"The key for the outer namespace struct in the metadata field of the extproc response that is used to wrap the"+
89-
"target endpoint. If not set, then an outer namespace struct should not be created.")
9081
poolName = flag.String(
9182
"pool-name",
9283
runserver.DefaultPoolName,
@@ -113,6 +104,20 @@ var (
113104
"The path to the certificate for secure serving. The certificate and private key files "+
114105
"are assumed to be named tls.crt and tls.key, respectively. If not set, and secureServing is enabled, "+
115106
"then a self-signed certificate is used.")
107+
// header/metadata flags
108+
destinationEndpointHintKey = flag.String(
109+
"destination-endpoint-hint-key",
110+
runserver.DefaultDestinationEndpointHintKey,
111+
"Header and response metadata key used by Envoy to route to the appropriate pod. This must match Envoy configuration.")
112+
destinationEndpointHintMetadataNamespace = flag.String(
113+
"destination-endpoint-hint-metadata-namespace",
114+
runserver.DefaultDestinationEndpointHintMetadataNamespace,
115+
"The key for the outer namespace struct in the metadata field of the extproc response that is used to wrap the"+
116+
"target endpoint. If not set, then an outer namespace struct should not be created.")
117+
fairnessIDHeaderKey = flag.String(
118+
"fairness-id-header-key",
119+
runserver.DefaultFairnessIDHeaderKey,
120+
"The header key used to pass the fairness ID to be used in Flow Control.")
116121
// metric flags
117122
totalQueuedRequestsMetric = flag.String(
118123
"total-queued-requests-metric",
@@ -337,6 +342,7 @@ func (r *Runner) Run(ctx context.Context) error {
337342
GrpcPort: *grpcPort,
338343
DestinationEndpointHintMetadataNamespace: *destinationEndpointHintMetadataNamespace,
339344
DestinationEndpointHintKey: *destinationEndpointHintKey,
345+
FairnessIDHeaderKey: *fairnessIDHeaderKey,
340346
PoolNamespacedName: poolNamespacedName,
341347
Datastore: datastore,
342348
SecureServing: *secureServing,

config/charts/inferencepool/templates/epp-deployment.yaml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,13 @@ spec:
2828
- --pool-namespace
2929
- {{ .Release.Namespace }}
3030
- --v
31-
- "{{ .Values.inferenceExtension.logVerbosity | default "3" }}"
31+
- "{{ .Values.inferenceExtension.logVerbosity | default "5" }}"
3232
- --grpc-port
3333
- "9002"
3434
- --grpc-health-port
3535
- "9003"
36+
- --zap-encoder
37+
- "json"
3638
- --metrics-port
3739
- "9090"
3840
- --config-file

config/charts/inferencepool/templates/rbac.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@ rules:
4242
- apiGroups: ["inference.networking.x-k8s.io"]
4343
resources: ["inferenceobjectives", "inferencepools"]
4444
verbs: ["get", "watch", "list"]
45+
- apiGroups: ["inference.networking.k8s.io"]
46+
resources: ["inferencepools"]
47+
verbs: ["get", "watch", "list"]
4548
- apiGroups: [""]
4649
resources: ["pods"]
4750
verbs: ["get", "watch", "list"]

output.txt

Lines changed: 12489 additions & 0 deletions
Large diffs are not rendered by default.

pkg/epp/handlers/request.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,13 @@ func (s *StreamingServer) HandleRequestHeaders(ctx context.Context, reqCtx *Requ
5757
} else {
5858
reqCtx.Request.Headers[header.Key] = header.Value
5959
}
60+
if header.Key == s.fairnessIDHeaderKey {
61+
reqCtx.FairnessID = reqCtx.Request.Headers[header.Key]
62+
// remove the fairness ID header from the request headers,
63+
// this is not data that should be manipulated or sent to the backend.
64+
// It is only used for flow control.
65+
delete(reqCtx.Request.Headers, header.Key)
66+
}
6067
}
6168
return nil
6269
}

pkg/epp/handlers/request_test.go

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package handlers
18+
19+
import (
20+
"context"
21+
"testing"
22+
23+
configPb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
24+
extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
25+
)
26+
27+
func TestHandleRequestHeaders(t *testing.T) {
28+
t.Parallel()
29+
30+
// Setup a mock server and request context
31+
server := &StreamingServer{
32+
fairnessIDHeaderKey: "test-fairness-id",
33+
}
34+
35+
reqCtx := &RequestContext{
36+
Request: &Request{
37+
Headers: make(map[string]string),
38+
},
39+
}
40+
41+
req := &extProcPb.ProcessingRequest_RequestHeaders{
42+
RequestHeaders: &extProcPb.HttpHeaders{
43+
Headers: &configPb.HeaderMap{
44+
Headers: []*configPb.HeaderValue{
45+
{
46+
Key: "x-test-header",
47+
Value: "test-value",
48+
},
49+
{
50+
Key: "test-fairness-id",
51+
Value: "test-fairness-id-value",
52+
},
53+
},
54+
},
55+
EndOfStream: false,
56+
},
57+
}
58+
59+
err := server.HandleRequestHeaders(context.Background(), reqCtx, req)
60+
if err != nil {
61+
t.Fatalf("expected no error, got %v", err)
62+
}
63+
64+
if reqCtx.FairnessID != "test-fairness-id-value" {
65+
t.Errorf("expected fairness ID to be 'test-fairness-id-value', got %s", reqCtx.FairnessID)
66+
}
67+
if reqCtx.Request.Headers["test-fairness-id"] == "test-fairness-id-value" {
68+
t.Errorf("expected fairness ID header to be removed from request headers, but it was not")
69+
}
70+
}

pkg/epp/handlers/server.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,11 @@ const (
4444
bodyByteLimit = 62000
4545
)
4646

47-
func NewStreamingServer(destinationEndpointHintMetadataNamespace, destinationEndpointHintKey string, datastore Datastore, director Director) *StreamingServer {
47+
func NewStreamingServer(destinationEndpointHintMetadataNamespace, destinationEndpointHintKey, fairnessIDHeaderKey string, datastore Datastore, director Director) *StreamingServer {
4848
return &StreamingServer{
4949
destinationEndpointHintMetadataNamespace: destinationEndpointHintMetadataNamespace,
5050
destinationEndpointHintKey: destinationEndpointHintKey,
51+
fairnessIDHeaderKey: fairnessIDHeaderKey,
5152
director: director,
5253
datastore: datastore,
5354
}
@@ -72,6 +73,7 @@ type StreamingServer struct {
7273
// The key acting as the outer namespace struct in the metadata extproc response to communicate
7374
// back the picked endpoints.
7475
destinationEndpointHintMetadataNamespace string
76+
fairnessIDHeaderKey string
7577
datastore Datastore
7678
director Director
7779
}
@@ -85,6 +87,7 @@ type RequestContext struct {
8587
TargetEndpoint string
8688
Model string
8789
ResolvedTargetModel string
90+
FairnessID string
8891
RequestReceivedTimestamp time.Time
8992
ResponseCompleteTimestamp time.Time
9093
RequestSize int
@@ -192,8 +195,8 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer)
192195

193196
switch v := req.Request.(type) {
194197
case *extProcPb.ProcessingRequest_RequestHeaders:
195-
if requestId := requtil.ExtractHeaderValue(v, requtil.RequestIdHeaderKey); len(requestId) > 0 {
196-
logger = logger.WithValues(requtil.RequestIdHeaderKey, requestId)
198+
if requestID := requtil.ExtractHeaderValue(v, requtil.RequestIdHeaderKey); len(requestID) > 0 {
199+
logger = logger.WithValues(requtil.RequestIdHeaderKey, requestID)
197200
loggerTrace = logger.V(logutil.TRACE)
198201
ctx = log.IntoContext(ctx, logger)
199202
}

pkg/epp/requestcontrol/director.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ func (d *Director) HandleRequest(ctx context.Context, reqCtx *handlers.RequestCo
139139
logger.V(logutil.DEBUG).Info("LLM request assembled")
140140

141141
// --- 2. Admission Control check --
142-
if err := d.admitRequest(ctx, requestCriticality); err != nil {
142+
if err := d.admitRequest(ctx, requestCriticality, reqCtx.FairnessID); err != nil {
143143
return reqCtx, err
144144
}
145145

@@ -166,9 +166,11 @@ func (d *Director) HandleRequest(ctx context.Context, reqCtx *handlers.RequestCo
166166

167167
// admitRequest handles admission control to decide whether or not to accept the request
168168
// based on the request criticality and system saturation state.
169-
func (d *Director) admitRequest(ctx context.Context, requestCriticality v1alpha2.Criticality) error {
169+
func (d *Director) admitRequest(ctx context.Context, requestCriticality v1alpha2.Criticality, fairnessID string) error {
170170
logger := log.FromContext(ctx)
171171

172+
logger.V(logutil.TRACE).Info("Entering Flow Control", "criticality", requestCriticality, "fairnessID", fairnessID)
173+
172174
if requestCriticality == v1alpha2.Critical {
173175
logger.V(logutil.DEBUG).Info("Critical request bypassing saturation check.")
174176
return nil

pkg/epp/server/runserver.go

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ type ExtProcServerRunner struct {
4646
GrpcPort int
4747
DestinationEndpointHintMetadataNamespace string
4848
DestinationEndpointHintKey string
49+
FairnessIDHeaderKey string
4950
PoolNamespacedName types.NamespacedName
5051
Datastore datastore.Datastore
5152
SecureServing bool
@@ -63,24 +64,25 @@ type ExtProcServerRunner struct {
6364

6465
// Default values for CLI flags in main
6566
const (
66-
DefaultGrpcPort = 9002 // default for --grpc-port
67-
DefaultGrpcHealthPort = 9003 // default for --grpc-health-port
68-
DefaultMetricsPort = 9090 // default for --metrics-port
69-
DefaultDestinationEndpointHintMetadataNamespace = "envoy.lb" // default for --destinationEndpointHintMetadataNamespace
70-
DefaultDestinationEndpointHintKey = "x-gateway-destination-endpoint" // default for --destination-endpoint-hint-key
71-
DefaultPoolName = "" // required but no default
72-
DefaultPoolNamespace = "default" // default for --pool-namespace
73-
DefaultRefreshMetricsInterval = 50 * time.Millisecond // default for --refresh-metrics-interval
74-
DefaultRefreshPrometheusMetricsInterval = 5 * time.Second // default for --refresh-prometheus-metrics-interval
75-
DefaultSecureServing = true // default for --secure-serving
76-
DefaultHealthChecking = false // default for --health-checking
77-
DefaultEnablePprof = true // default for --enable-pprof
78-
DefaultTotalQueuedRequestsMetric = "vllm:num_requests_waiting" // default for --total-queued-requests-metric
79-
DefaultKvCacheUsagePercentageMetric = "vllm:gpu_cache_usage_perc" // default for --kv-cache-usage-percentage-metric
80-
DefaultLoraInfoMetric = "vllm:lora_requests_info" // default for --lora-info-metric
81-
DefaultCertPath = "" // default for --cert-path
82-
DefaultConfigFile = "" // default for --config-file
83-
DefaultConfigText = "" // default for --config-text
67+
DefaultGrpcPort = 9002 // default for --grpc-port
68+
DefaultGrpcHealthPort = 9003 // default for --grpc-health-port
69+
DefaultMetricsPort = 9090 // default for --metrics-port
70+
DefaultDestinationEndpointHintMetadataNamespace = "envoy.lb" // default for --destinationEndpointHintMetadataNamespace
71+
DefaultDestinationEndpointHintKey = "x-gateway-destination-endpoint" // default for --destination-endpoint-hint-key
72+
DefaultFairnessIDHeaderKey = "x-gateway-inference-fairness-id" // default for --fairness-id-header-key
73+
DefaultPoolName = "" // required but no default
74+
DefaultPoolNamespace = "default" // default for --pool-namespace
75+
DefaultRefreshMetricsInterval = 50 * time.Millisecond // default for --refresh-metrics-interval
76+
DefaultRefreshPrometheusMetricsInterval = 5 * time.Second // default for --refresh-prometheus-metrics-interval
77+
DefaultSecureServing = true // default for --secure-serving
78+
DefaultHealthChecking = false // default for --health-checking
79+
DefaultEnablePprof = true // default for --enable-pprof
80+
DefaultTotalQueuedRequestsMetric = "vllm:num_requests_waiting" // default for --total-queued-requests-metric
81+
DefaultKvCacheUsagePercentageMetric = "vllm:gpu_cache_usage_perc" // default for --kv-cache-usage-percentage-metric
82+
DefaultLoraInfoMetric = "vllm:lora_requests_info" // default for --lora-info-metric
83+
DefaultCertPath = "" // default for --cert-path
84+
DefaultConfigFile = "" // default for --config-file
85+
DefaultConfigText = "" // default for --config-text
8486
DefaultMetricsStalenessThreshold = 2 * time.Second
8587
)
8688

@@ -91,6 +93,7 @@ func NewDefaultExtProcServerRunner() *ExtProcServerRunner {
9193
GrpcPort: DefaultGrpcPort,
9294
DestinationEndpointHintKey: DefaultDestinationEndpointHintKey,
9395
DestinationEndpointHintMetadataNamespace: DefaultDestinationEndpointHintMetadataNamespace,
96+
FairnessIDHeaderKey: DefaultFairnessIDHeaderKey,
9497
PoolNamespacedName: types.NamespacedName{Name: DefaultPoolName, Namespace: DefaultPoolNamespace},
9598
SecureServing: DefaultSecureServing,
9699
HealthChecking: DefaultHealthChecking,
@@ -159,6 +162,7 @@ func (r *ExtProcServerRunner) AsRunnable(logger logr.Logger) manager.Runnable {
159162
extProcServer := handlers.NewStreamingServer(
160163
r.DestinationEndpointHintMetadataNamespace,
161164
r.DestinationEndpointHintKey,
165+
r.FairnessIDHeaderKey,
162166
r.Datastore,
163167
r.Director,
164168
)

pkg/epp/server/server_test.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ const (
3838
podAddress = "1.2.3.4"
3939
poolPort = int32(5678)
4040
destinationEndpointHintKey = "test-target"
41+
fairnessIDHeaderKey = "x-fairness-id"
4142
namespace = "ns1"
4243
)
4344

@@ -60,14 +61,18 @@ func TestServer(t *testing.T) {
6061
ctx, cancel, ds, _ := utils.PrepareForTestStreamingServer([]*v1alpha2.InferenceObjective{model},
6162
[]*v1.Pod{{ObjectMeta: metav1.ObjectMeta{Name: podName}}}, "test-pool1", namespace, poolPort)
6263

63-
streamingServer := handlers.NewStreamingServer(namespace, destinationEndpointHintKey, ds, director)
64+
streamingServer := handlers.NewStreamingServer(namespace, destinationEndpointHintKey, fairnessIDHeaderKey, ds, director)
6465

6566
testListener, errChan := utils.SetupTestStreamingServer(t, ctx, ds, streamingServer)
6667
process, conn := utils.GetStreamingServerClient(ctx, t)
6768
defer conn.Close()
6869

6970
// Send request headers - no response expected
70-
headers := utils.BuildEnvoyGRPCHeaders(map[string]string{requestHeader: theHeaderValue, ":method": "POST"}, true)
71+
headers := utils.BuildEnvoyGRPCHeaders(map[string]string{
72+
requestHeader: theHeaderValue,
73+
":method": "POST",
74+
fairnessIDHeaderKey: "a-very-interesting-fairness-id",
75+
}, true)
7176
request := &pb.ProcessingRequest{
7277
Request: &pb.ProcessingRequest_RequestHeaders{
7378
RequestHeaders: headers,

0 commit comments

Comments
 (0)