Skip to content

Commit cf0bcad

Browse files
yangligt2kfswain
andauthored
Affiliate ready state with leader election with a flag ha-enable-leader-election (#1337)
* feat: Affiliate ready state with leader election * Fix manifest file for e2e test with leader election after rebase * Update cmd/epp/runner/health.go * Add Helm configuration for leader election * Update config/charts/inferencepool/templates/leader-election-rbac.yaml * docs: Add leader election section in helm chart README --------- Co-authored-by: Kellen Swain <[email protected]>
1 parent 8a7b037 commit cf0bcad

File tree

12 files changed

+703
-176
lines changed

12 files changed

+703
-176
lines changed

cmd/epp/runner/health.go

Lines changed: 67 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ package runner
1818

1919
import (
2020
"context"
21+
"fmt"
22+
"sync/atomic"
2123

2224
extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
2325
"github.com/go-logr/logr"
@@ -30,37 +32,83 @@ import (
3032
)
3133

3234
type healthServer struct {
33-
logger logr.Logger
34-
datastore datastore.Datastore
35+
logger logr.Logger
36+
datastore datastore.Datastore
37+
isLeader *atomic.Bool
38+
leaderElectionEnabled bool
3539
}
3640

41+
const (
42+
LivenessCheckService = "liveness"
43+
ReadinessCheckService = "readiness"
44+
)
45+
3746
func (s *healthServer) Check(ctx context.Context, in *healthPb.HealthCheckRequest) (*healthPb.HealthCheckResponse, error) {
38-
// TODO: we're accepting ANY service name for now as a temporary hack in alignment with
39-
// upstream issues. See https://github.com/kubernetes-sigs/gateway-api-inference-extension/pull/788
40-
// if in.Service != extProcPb.ExternalProcessor_ServiceDesc.ServiceName {
41-
// s.logger.V(logutil.DEFAULT).Info("gRPC health check requested unknown service", "available-services", []string{extProcPb.ExternalProcessor_ServiceDesc.ServiceName}, "requested-service", in.Service)
42-
// return &healthPb.HealthCheckResponse{Status: healthPb.HealthCheckResponse_SERVICE_UNKNOWN}, nil
43-
// }
44-
45-
if !s.datastore.PoolHasSynced() {
46-
s.logger.V(logutil.DEFAULT).Info("gRPC health check not serving", "service", in.Service)
47+
isLive := s.datastore.PoolHasSynced()
48+
49+
// If leader election is disabled, use current logic: all checks are based on whether the pool has synced.
50+
if !s.leaderElectionEnabled {
51+
if !isLive {
52+
s.logger.V(logutil.DEFAULT).Info("gRPC health check not serving (leader election disabled)", "service", in.Service)
53+
return &healthPb.HealthCheckResponse{Status: healthPb.HealthCheckResponse_NOT_SERVING}, nil
54+
}
55+
s.logger.V(logutil.TRACE).Info("gRPC health check serving (leader election disabled)", "service", in.Service)
56+
return &healthPb.HealthCheckResponse{Status: healthPb.HealthCheckResponse_SERVING}, nil
57+
}
58+
59+
// When leader election is enabled, differentiate between liveness and readiness.
60+
// The service name in the request determines which check to perform.
61+
var checkName string
62+
var isPassing bool
63+
64+
switch in.Service {
65+
case ReadinessCheckService:
66+
checkName = "readiness"
67+
isPassing = isLive && s.isLeader.Load()
68+
case LivenessCheckService, "": // Default to liveness check if service is empty
69+
checkName = "liveness"
70+
// Any pod that is running and can respond to this gRPC check is considered "live".
71+
// The datastore sync status should not affect liveness, only readiness.
72+
// This is to prevent the non-leader node from continuous restarts
73+
isPassing = true
74+
case extProcPb.ExternalProcessor_ServiceDesc.ServiceName:
75+
// The main service is considered ready only on the leader.
76+
checkName = "ext_proc"
77+
isPassing = isLive && s.isLeader.Load()
78+
default:
79+
s.logger.V(logutil.DEFAULT).Info("gRPC health check requested unknown service", "available-services", []string{LivenessCheckService, ReadinessCheckService, extProcPb.ExternalProcessor_ServiceDesc.ServiceName}, "requested-service", in.Service)
80+
return &healthPb.HealthCheckResponse{Status: healthPb.HealthCheckResponse_SERVICE_UNKNOWN}, nil
81+
}
82+
83+
if !isPassing {
84+
s.logger.V(logutil.DEFAULT).Info(fmt.Sprintf("gRPC %s check not serving", checkName), "service", in.Service, "isLive", isLive, "isLeader", s.isLeader.Load())
4785
return &healthPb.HealthCheckResponse{Status: healthPb.HealthCheckResponse_NOT_SERVING}, nil
4886
}
49-
s.logger.V(logutil.TRACE).Info("gRPC health check serving", "service", in.Service)
87+
88+
s.logger.V(logutil.TRACE).Info(fmt.Sprintf("gRPC %s check serving", checkName), "service", in.Service)
5089
return &healthPb.HealthCheckResponse{Status: healthPb.HealthCheckResponse_SERVING}, nil
5190
}
5291

5392
func (s *healthServer) List(ctx context.Context, _ *healthPb.HealthListRequest) (*healthPb.HealthListResponse, error) {
54-
// currently only the ext_proc service is provided
55-
serviceHealthResponse, err := s.Check(ctx, &healthPb.HealthCheckRequest{Service: extProcPb.ExternalProcessor_ServiceDesc.ServiceName})
56-
if err != nil {
57-
return nil, err
93+
statuses := make(map[string]*healthPb.HealthCheckResponse)
94+
95+
services := []string{extProcPb.ExternalProcessor_ServiceDesc.ServiceName}
96+
if s.leaderElectionEnabled {
97+
services = append(services, LivenessCheckService, ReadinessCheckService)
98+
}
99+
100+
for _, service := range services {
101+
resp, err := s.Check(ctx, &healthPb.HealthCheckRequest{Service: service})
102+
if err != nil {
103+
// Check can return an error for unknown services, but here we are iterating known services.
104+
// If another error occurs, we should probably return it.
105+
return nil, err
106+
}
107+
statuses[service] = resp
58108
}
59109

60110
return &healthPb.HealthListResponse{
61-
Statuses: map[string]*healthPb.HealthCheckResponse{
62-
extProcPb.ExternalProcessor_ServiceDesc.ServiceName: serviceHealthResponse,
63-
},
111+
Statuses: statuses,
64112
}, nil
65113
}
66114

cmd/epp/runner/runner.go

Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"net/http"
2626
"net/http/pprof"
2727
"os"
28+
"sync/atomic"
2829

2930
"github.com/go-logr/logr"
3031
"github.com/prometheus/client_golang/prometheus"
@@ -151,6 +152,10 @@ var (
151152
modelServerMetricsPath = flag.String("model-server-metrics-path", "/metrics", "Path to scrape metrics from pods")
152153
modelServerMetricsScheme = flag.String("model-server-metrics-scheme", "http", "Scheme to scrape metrics from pods")
153154
modelServerMetricsHttpsInsecureSkipVerify = flag.Bool("model-server-metrics-https-insecure-skip-verify", true, "When using 'https' scheme for 'model-server-metrics-scheme', configure 'InsecureSkipVerify' (default to true)")
155+
haEnableLeaderElection = flag.Bool(
156+
"ha-enable-leader-election",
157+
false,
158+
"Enables leader election for high availability. When enabled, readiness probes will only pass on the leader.")
154159

155160
setupLog = ctrl.Log.WithName("setup")
156161
)
@@ -190,8 +195,9 @@ func bindEnvToFlags() {
190195
"POOL_NAME": "pool-name",
191196
"POOL_NAMESPACE": "pool-namespace",
192197
// durations & bools work too; flag.Set expects the *string* form
193-
"REFRESH_METRICS_INTERVAL": "refresh-metrics-interval",
194-
"SECURE_SERVING": "secure-serving",
198+
"REFRESH_METRICS_INTERVAL": "refresh-metrics-interval",
199+
"SECURE_SERVING": "secure-serving",
200+
"HA_ENABLE_LEADER_ELECTION": "ha-enable-leader-election",
195201
} {
196202
if v := os.Getenv(env); v != "" {
197203
// ignore error; Parse() will catch invalid values later
@@ -299,12 +305,28 @@ func (r *Runner) Run(ctx context.Context) error {
299305
NamespacedName: poolNamespacedName,
300306
GroupKind: poolGroupKind,
301307
}
302-
mgr, err := runserver.NewDefaultManager(poolGKNN, cfg, metricsServerOptions)
308+
309+
isLeader := &atomic.Bool{}
310+
isLeader.Store(false)
311+
312+
mgr, err := runserver.NewDefaultManager(poolGKNN, cfg, metricsServerOptions, *haEnableLeaderElection)
303313
if err != nil {
304314
setupLog.Error(err, "Failed to create controller manager")
305315
return err
306316
}
307317

318+
if *haEnableLeaderElection {
319+
setupLog.Info("Leader election enabled")
320+
go func() {
321+
<-mgr.Elected()
322+
isLeader.Store(true)
323+
setupLog.Info("This instance is now the leader!")
324+
}()
325+
} else {
326+
// If leader election is disabled, all instances are "leaders" for readiness purposes.
327+
isLeader.Store(true)
328+
}
329+
308330
if *enablePprof {
309331
setupLog.Info("Enabling pprof handlers")
310332
err = setupPprofHandlers(mgr)
@@ -356,7 +378,7 @@ func (r *Runner) Run(ctx context.Context) error {
356378

357379
// --- Add Runnables to Manager ---
358380
// Register health server.
359-
if err := registerHealthServer(mgr, ctrl.Log.WithName("health"), datastore, *grpcHealthPort); err != nil {
381+
if err := registerHealthServer(mgr, ctrl.Log.WithName("health"), datastore, *grpcHealthPort, isLeader, *haEnableLeaderElection); err != nil {
360382
return err
361383
}
362384

@@ -452,11 +474,13 @@ func registerExtProcServer(mgr manager.Manager, runner *runserver.ExtProcServerR
452474
}
453475

454476
// registerHealthServer adds the Health gRPC server as a Runnable to the given manager.
455-
func registerHealthServer(mgr manager.Manager, logger logr.Logger, ds datastore.Datastore, port int) error {
477+
func registerHealthServer(mgr manager.Manager, logger logr.Logger, ds datastore.Datastore, port int, isLeader *atomic.Bool, leaderElectionEnabled bool) error {
456478
srv := grpc.NewServer()
457479
healthPb.RegisterHealthServer(srv, &healthServer{
458-
logger: logger,
459-
datastore: ds,
480+
logger: logger,
481+
datastore: ds,
482+
isLeader: isLeader,
483+
leaderElectionEnabled: leaderElectionEnabled,
460484
})
461485
if err := mgr.Add(
462486
runnable.NoLeaderElection(runnable.GRPCServer("health", srv, port))); err != nil {

config/charts/inferencepool/README.md

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,24 @@ $ helm install triton-llama3-8b-instruct \
7979
oci://us-central1-docker.pkg.dev/k8s-staging-images/gateway-api-inference-extension/charts/inferencepool --version v0
8080
```
8181

82+
### Install with High Availability (HA)
83+
84+
To deploy the EndpointPicker in a high-availability (HA) active-passive configuration, you can enable leader election. When enabled, the EPP deployment will have multiple replicas, but only one "leader" replica will be active and ready to process traffic at any given time. If the leader pod fails, another pod will be elected as the new leader, ensuring service continuity.
85+
86+
To enable HA, set `inferenceExtension.enableLeaderElection` to `true` and increase the number of replicas in your `values.yaml` file:
87+
88+
```yaml
89+
inferenceExtension:
90+
replicas: 3
91+
enableLeaderElection: true
92+
```
93+
94+
Then apply it with:
95+
96+
```txt
97+
helm install vllm-llama3-8b-instruct ./config/charts/inferencepool -f values.yaml \
98+
```
99+
82100
## Uninstall
83101

84102
Run the following command to uninstall the chart:
@@ -107,6 +125,8 @@ The following table list the configurable parameters of the chart.
107125
| `inferenceExtension.extraServicePorts` | List of additional service ports to expose. Defaults to `[]`. |
108126
| `inferenceExtension.logVerbosity` | Logging verbosity level for the endpoint picker. Defaults to `"3"`. |
109127
| `provider.name` | Name of the Inference Gateway implementation being used. Possible values: `gke`. Defaults to `none`. |
128+
| `inferenceExtension.enableLeaderElection` | Enable leader election for high availability. When enabled, only one EPP pod (the leader) will be ready to serve traffic. It is recommended to set `inferenceExtension.replicas` to a value greater than 1 when this is set to `true`. Defaults to `false`. |
129+
110130

111131
## Notes
112132

config/charts/inferencepool/templates/epp-deployment.yaml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@ spec:
4444
- "--model-server-metrics-path={{ .Values.inferenceExtension.modelServerMetricsPath }}"
4545
- "--model-server-metrics-scheme={{ .Values.inferenceExtension.modelServerMetricsScheme }}"
4646
- "--model-server-metrics-https-insecure-skip-verify={{ .Values.inferenceExtension.modelServerMetricsHttpsInsecureSkipVerify }}"
47+
{{- if .Values.inferenceExtension.enableLeaderElection }}
48+
- "--ha-enable-leader-election"
49+
{{- end }}
4750
{{- if eq (.Values.inferencePool.modelServerType | default "vllm") "triton-tensorrt-llm" }}
4851
- --total-queued-requests-metric
4952
- "nv_trt_llm_request_metrics{request_type=waiting}"
@@ -63,15 +66,27 @@ spec:
6366
{{- toYaml . | nindent 8 }}
6467
{{- end }}
6568
livenessProbe:
69+
{{- if .Values.inferenceExtension.enableLeaderElection }}
70+
grpc:
71+
port: 9003
72+
service: liveness
73+
{{- else }}
6674
grpc:
6775
port: 9003
6876
service: inference-extension
77+
{{- end }}
6978
initialDelaySeconds: 5
7079
periodSeconds: 10
7180
readinessProbe:
81+
{{- if .Values.inferenceExtension.enableLeaderElection }}
82+
grpc:
83+
port: 9003
84+
service: readiness
85+
{{- else }}
7286
grpc:
7387
port: 9003
7488
service: inference-extension
89+
{{- end }}
7590
initialDelaySeconds: 5
7691
periodSeconds: 10
7792
{{- with .Values.inferenceExtension.env }}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
{{- if .Values.inferenceExtension.enableLeaderElection }}
2+
---
3+
kind: Role
4+
apiVersion: rbac.authorization.k8s.io/v1
5+
metadata:
6+
name: {{ include "gateway-api-inference-extension.name" . }}-leader-election
7+
namespace: {{ .Release.Namespace }}
8+
labels:
9+
{{- include "gateway-api-inference-extension.labels" . | nindent 4 }}
10+
rules:
11+
- apiGroups: [ "coordination.k8s.io" ]
12+
resources: [ "leases" ]
13+
verbs: [ "get", "list", "watch", "create", "update", "patch", "delete" ]
14+
- apiGroups: [ "" ]
15+
resources: [ "events" ]
16+
verbs: [ "create", "patch" ]
17+
---
18+
kind: RoleBinding
19+
apiVersion: rbac.authorization.k8s.io/v1
20+
metadata:
21+
name: {{ include "gateway-api-inference-extension.name" . }}-leader-election-binding
22+
namespace: {{ .Release.Namespace }}
23+
subjects:
24+
- kind: ServiceAccount
25+
name: {{ include "gateway-api-inference-extension.name" . }}
26+
roleRef:
27+
apiGroup: rbac.authorization.k8s.io
28+
kind: Role
29+
name: {{ include "gateway-api-inference-extension.name" . }}-leader-election
30+
{{- end }}

config/charts/inferencepool/values.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ inferenceExtension:
3434
extraContainerPorts: []
3535
# Define additional service ports
3636
extraServicePorts: []
37+
# Enable leader election for high availability. When enabled, it is recommended to set replicas > 1.
38+
# Only the leader pod will be ready to serve traffic.
39+
enableLeaderElection: false
3740

3841
inferencePool:
3942
targetPortNumber: 8000

pkg/epp/server/controller_manager.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,12 +85,23 @@ func defaultManagerOptions(gknn common.GKNN, metricsServerOptions metricsserver.
8585
}
8686

8787
// NewDefaultManager creates a new controller manager with default configuration.
88-
func NewDefaultManager(gknn common.GKNN, restConfig *rest.Config, metricsServerOptions metricsserver.Options) (ctrl.Manager, error) {
88+
func NewDefaultManager(gknn common.GKNN, restConfig *rest.Config, metricsServerOptions metricsserver.Options, leaderElectionEnabled bool) (ctrl.Manager, error) {
8989
opt, err := defaultManagerOptions(gknn, metricsServerOptions)
9090
if err != nil {
9191
return nil, fmt.Errorf("failed to create controller manager options: %v", err)
9292
}
93+
94+
if leaderElectionEnabled {
95+
opt.LeaderElection = true
96+
opt.LeaderElectionResourceLock = "leases"
97+
// The lease name needs to be unique per EPP deployment.
98+
opt.LeaderElectionID = fmt.Sprintf("epp-%s-%s.gateway-api-inference-extension.sigs.k8s.io", gknn.Namespace, gknn.Name)
99+
opt.LeaderElectionNamespace = gknn.Namespace
100+
opt.LeaderElectionReleaseOnCancel = true
101+
}
102+
93103
manager, err := ctrl.NewManager(restConfig, opt)
104+
94105
if err != nil {
95106
return nil, fmt.Errorf("failed to create controller manager: %v", err)
96107
}

test/e2e/epp/README.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,19 @@ Follow these steps to run the end-to-end tests:
4545
export E2E_MANIFEST_PATH=[config/manifests/vllm/gpu-deployment.yaml|config/manifests/vllm/cpu-deployment.yaml]
4646
```
4747

48+
- **Enable leader election tests**: By default, the e2e test runs the EPP server as a single replica.
49+
To test the high-availability (HA) mode with leader election (3 replicas), set the following environment variable:
50+
51+
```sh
52+
export E2E_LEADER_ELECTION_ENABLED=true
53+
```
54+
55+
- **Pause before cleanup**: To pause the test run before cleaning up resources, set the `E2E_PAUSE_ON_EXIT` environment variable.
56+
This is useful for debugging the state of the cluster after the test has run.
57+
58+
- To pause indefinitely, set it to `true`: `export E2E_PAUSE_ON_EXIT=true`
59+
- To pause for a specific duration, provide a duration string: `export E2E_PAUSE_ON_EXIT=10m`
60+
4861
1. **Run the Tests**: Run the `test-e2e` target:
4962

5063
```sh

0 commit comments

Comments
 (0)