Skip to content

Commit 67f3c0d

Browse files
committed
feat: Affiliate ready state with leader election
1 parent d27a716 commit 67f3c0d

File tree

8 files changed

+699
-178
lines changed

8 files changed

+699
-178
lines changed

cmd/epp/runner/health.go

Lines changed: 67 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ package runner
1818

1919
import (
2020
"context"
21+
"fmt"
22+
"sync/atomic"
2123

2224
extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
2325
"github.com/go-logr/logr"
@@ -30,37 +32,83 @@ import (
3032
)
3133

3234
type healthServer struct {
33-
logger logr.Logger
34-
datastore datastore.Datastore
35+
logger logr.Logger
36+
datastore datastore.Datastore
37+
isLeader *atomic.Bool
38+
leaderElectionEnabled bool
3539
}
3640

41+
const (
42+
LivenessCheckService = "liveness"
43+
ReadinessCheckService = "readiness"
44+
)
45+
3746
func (s *healthServer) Check(ctx context.Context, in *healthPb.HealthCheckRequest) (*healthPb.HealthCheckResponse, error) {
38-
// TODO: we're accepting ANY service name for now as a temporary hack in alignment with
39-
// upstream issues. See https://github.com/kubernetes-sigs/gateway-api-inference-extension/pull/788
40-
// if in.Service != extProcPb.ExternalProcessor_ServiceDesc.ServiceName {
41-
// s.logger.V(logutil.DEFAULT).Info("gRPC health check requested unknown service", "available-services", []string{extProcPb.ExternalProcessor_ServiceDesc.ServiceName}, "requested-service", in.Service)
42-
// return &healthPb.HealthCheckResponse{Status: healthPb.HealthCheckResponse_SERVICE_UNKNOWN}, nil
43-
// }
44-
45-
if !s.datastore.PoolHasSynced() {
46-
s.logger.V(logutil.DEFAULT).Info("gRPC health check not serving", "service", in.Service)
47+
isLive := s.datastore.PoolHasSynced()
48+
49+
// If leader election is disabled, use current logic: all checks are based on whether the pool has synced.
50+
if !s.leaderElectionEnabled {
51+
if !isLive {
52+
s.logger.V(logutil.DEFAULT).Info("gRPC health check not serving (leader election disabled)", "service", in.Service)
53+
return &healthPb.HealthCheckResponse{Status: healthPb.HealthCheckResponse_NOT_SERVING}, nil
54+
}
55+
s.logger.V(logutil.TRACE).Info("gRPC health check serving (leader election disabled)", "service", in.Service)
56+
return &healthPb.HealthCheckResponse{Status: healthPb.HealthCheckResponse_SERVING}, nil
57+
}
58+
59+
// When leader election is enabled, differentiate between liveness and readiness.
60+
// The service name in the request determines which check to perform.
61+
var checkName string
62+
var isPassing bool
63+
64+
switch in.Service {
65+
case ReadinessCheckService:
66+
checkName = "readiness"
67+
isPassing = isLive && s.isLeader.Load()
68+
case LivenessCheckService, "": // Default to liveness check if service is empty
69+
checkName = "liveness"
70+
// Any pod that is running and can respond to this gRPC check is considered "live".
71+
// The datastore sync status should not affect liveness, only readiness.
72+
// This is to prevent the non-leader node from continurouse restarts
73+
isPassing = true
74+
case extProcPb.ExternalProcessor_ServiceDesc.ServiceName:
75+
// The main service is considered ready only on the leader.
76+
checkName = "ext_proc"
77+
isPassing = isLive && s.isLeader.Load()
78+
default:
79+
s.logger.V(logutil.DEFAULT).Info("gRPC health check requested unknown service", "available-services", []string{LivenessCheckService, ReadinessCheckService, extProcPb.ExternalProcessor_ServiceDesc.ServiceName}, "requested-service", in.Service)
80+
return &healthPb.HealthCheckResponse{Status: healthPb.HealthCheckResponse_SERVICE_UNKNOWN}, nil
81+
}
82+
83+
if !isPassing {
84+
s.logger.V(logutil.DEFAULT).Info(fmt.Sprintf("gRPC %s check not serving", checkName), "service", in.Service, "isLive", isLive, "isLeader", s.isLeader.Load())
4785
return &healthPb.HealthCheckResponse{Status: healthPb.HealthCheckResponse_NOT_SERVING}, nil
4886
}
49-
s.logger.V(logutil.TRACE).Info("gRPC health check serving", "service", in.Service)
87+
88+
s.logger.V(logutil.TRACE).Info(fmt.Sprintf("gRPC %s check serving", checkName), "service", in.Service)
5089
return &healthPb.HealthCheckResponse{Status: healthPb.HealthCheckResponse_SERVING}, nil
5190
}
5291

5392
func (s *healthServer) List(ctx context.Context, _ *healthPb.HealthListRequest) (*healthPb.HealthListResponse, error) {
54-
// currently only the ext_proc service is provided
55-
serviceHealthResponse, err := s.Check(ctx, &healthPb.HealthCheckRequest{Service: extProcPb.ExternalProcessor_ServiceDesc.ServiceName})
56-
if err != nil {
57-
return nil, err
93+
statuses := make(map[string]*healthPb.HealthCheckResponse)
94+
95+
services := []string{extProcPb.ExternalProcessor_ServiceDesc.ServiceName}
96+
if s.leaderElectionEnabled {
97+
services = append(services, LivenessCheckService, ReadinessCheckService)
98+
}
99+
100+
for _, service := range services {
101+
resp, err := s.Check(ctx, &healthPb.HealthCheckRequest{Service: service})
102+
if err != nil {
103+
// Check can return an error for unknown services, but here we are iterating known services.
104+
// If another error occurs, we should probably return it.
105+
return nil, err
106+
}
107+
statuses[service] = resp
58108
}
59109

60110
return &healthPb.HealthListResponse{
61-
Statuses: map[string]*healthPb.HealthCheckResponse{
62-
extProcPb.ExternalProcessor_ServiceDesc.ServiceName: serviceHealthResponse,
63-
},
111+
Statuses: statuses,
64112
}, nil
65113
}
66114

cmd/epp/runner/runner.go

Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"net/http"
2626
"net/http/pprof"
2727
"os"
28+
"sync/atomic"
2829

2930
"github.com/go-logr/logr"
3031
"github.com/prometheus/client_golang/prometheus"
@@ -151,6 +152,10 @@ var (
151152
modelServerMetricsPath = flag.String("model-server-metrics-path", "/metrics", "Path to scrape metrics from pods")
152153
modelServerMetricsScheme = flag.String("model-server-metrics-scheme", "http", "Scheme to scrape metrics from pods")
153154
modelServerMetricsHttpsInsecureSkipVerify = flag.Bool("model-server-metrics-https-insecure-skip-verify", true, "When using 'https' scheme for 'model-server-metrics-scheme', configure 'InsecureSkipVerify' (default to true)")
155+
haEnableLeaderElection = flag.Bool(
156+
"ha-enable-leader-election",
157+
false,
158+
"Enables leader election for high availability. When enabled, readiness probes will only pass on the leader.")
154159

155160
setupLog = ctrl.Log.WithName("setup")
156161
)
@@ -190,8 +195,9 @@ func bindEnvToFlags() {
190195
"POOL_NAME": "pool-name",
191196
"POOL_NAMESPACE": "pool-namespace",
192197
// durations & bools work too; flag.Set expects the *string* form
193-
"REFRESH_METRICS_INTERVAL": "refresh-metrics-interval",
194-
"SECURE_SERVING": "secure-serving",
198+
"REFRESH_METRICS_INTERVAL": "refresh-metrics-interval",
199+
"SECURE_SERVING": "secure-serving",
200+
"HA_ENABLE_LEADER_ELECTION": "ha-enable-leader-election",
195201
} {
196202
if v := os.Getenv(env); v != "" {
197203
// ignore error; Parse() will catch invalid values later
@@ -299,12 +305,28 @@ func (r *Runner) Run(ctx context.Context) error {
299305
NamespacedName: poolNamespacedName,
300306
GroupKind: poolGroupKind,
301307
}
302-
mgr, err := runserver.NewDefaultManager(poolGKNN, cfg, metricsServerOptions)
308+
309+
isLeader := &atomic.Bool{}
310+
isLeader.Store(false)
311+
312+
mgr, err := runserver.NewDefaultManager(poolGKNN, cfg, metricsServerOptions, *haEnableLeaderElection)
303313
if err != nil {
304314
setupLog.Error(err, "Failed to create controller manager")
305315
return err
306316
}
307317

318+
if *haEnableLeaderElection {
319+
setupLog.Info("Leader election enabled")
320+
go func() {
321+
<-mgr.Elected()
322+
isLeader.Store(true)
323+
setupLog.Info("This instance is now the leader!")
324+
}()
325+
} else {
326+
// If leader election is disabled, all instances are "leaders" for readiness purposes.
327+
isLeader.Store(true)
328+
}
329+
308330
if *enablePprof {
309331
setupLog.Info("Enabling pprof handlers")
310332
err = setupPprofHandlers(mgr)
@@ -356,7 +378,7 @@ func (r *Runner) Run(ctx context.Context) error {
356378

357379
// --- Add Runnables to Manager ---
358380
// Register health server.
359-
if err := registerHealthServer(mgr, ctrl.Log.WithName("health"), datastore, *grpcHealthPort); err != nil {
381+
if err := registerHealthServer(mgr, ctrl.Log.WithName("health"), datastore, *grpcHealthPort, isLeader, *haEnableLeaderElection); err != nil {
360382
return err
361383
}
362384

@@ -452,11 +474,13 @@ func registerExtProcServer(mgr manager.Manager, runner *runserver.ExtProcServerR
452474
}
453475

454476
// registerHealthServer adds the Health gRPC server as a Runnable to the given manager.
455-
func registerHealthServer(mgr manager.Manager, logger logr.Logger, ds datastore.Datastore, port int) error {
477+
func registerHealthServer(mgr manager.Manager, logger logr.Logger, ds datastore.Datastore, port int, isLeader *atomic.Bool, leaderElectionEnabled bool) error {
456478
srv := grpc.NewServer()
457479
healthPb.RegisterHealthServer(srv, &healthServer{
458-
logger: logger,
459-
datastore: ds,
480+
logger: logger,
481+
datastore: ds,
482+
isLeader: isLeader,
483+
leaderElectionEnabled: leaderElectionEnabled,
460484
})
461485
if err := mgr.Add(
462486
runnable.NoLeaderElection(runnable.GRPCServer("health", srv, port))); err != nil {

pkg/epp/server/controller_manager.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,12 +85,23 @@ func defaultManagerOptions(gknn common.GKNN, metricsServerOptions metricsserver.
8585
}
8686

8787
// NewDefaultManager creates a new controller manager with default configuration.
88-
func NewDefaultManager(gknn common.GKNN, restConfig *rest.Config, metricsServerOptions metricsserver.Options) (ctrl.Manager, error) {
88+
func NewDefaultManager(gknn common.GKNN, restConfig *rest.Config, metricsServerOptions metricsserver.Options, leaderElectionEnabled bool) (ctrl.Manager, error) {
8989
opt, err := defaultManagerOptions(gknn, metricsServerOptions)
9090
if err != nil {
9191
return nil, fmt.Errorf("failed to create controller manager options: %v", err)
9292
}
93+
94+
if leaderElectionEnabled {
95+
opt.LeaderElection = true
96+
opt.LeaderElectionResourceLock = "leases"
97+
// The lease name needs to be unique per EPP deployment.
98+
opt.LeaderElectionID = fmt.Sprintf("epp-%s-%s.gateway-api-inference-extension.sigs.k8s.io", gknn.Namespace, gknn.Name)
99+
opt.LeaderElectionNamespace = gknn.Namespace
100+
opt.LeaderElectionReleaseOnCancel = true
101+
}
102+
93103
manager, err := ctrl.NewManager(restConfig, opt)
104+
94105
if err != nil {
95106
return nil, fmt.Errorf("failed to create controller manager: %v", err)
96107
}

test/e2e/epp/README.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,19 @@ Follow these steps to run the end-to-end tests:
4545
export E2E_MANIFEST_PATH=[config/manifests/vllm/gpu-deployment.yaml|config/manifests/vllm/cpu-deployment.yaml]
4646
```
4747

48+
- **Enable leader election tests**: By default, the e2e test runs the EPP server as a single replica.
49+
To test the high-availability (HA) mode with leader election (3 replicas), set the following environment variable:
50+
51+
```sh
52+
export E2E_LEADER_ELECTION_ENABLED=true
53+
```
54+
55+
- **Pause before cleanup**: To pause the test run before cleaning up resources, set the `E2E_PAUSE_ON_EXIT` environment variable.
56+
This is useful for debugging the state of the cluster after the test has run.
57+
58+
- To pause indefinitely, set it to `true`: `export E2E_PAUSE_ON_EXIT=true`
59+
- To pause for a specific duration, provide a duration string: `export E2E_PAUSE_ON_EXIT=10m`
60+
4861
1. **Run the Tests**: Run the `test-e2e` target:
4962

5063
```sh

test/e2e/epp/e2e_suite_test.go

Lines changed: 43 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -85,8 +85,10 @@ const (
8585
xInferObjectiveManifest = "../../../config/crd/bases/inference.networking.x-k8s.io_inferenceobjectives.yaml"
8686
// inferPoolManifest is the manifest for the inference pool CRD with 'inference.networking.k8s.io' group.
8787
inferPoolManifest = "../../../config/crd/bases/inference.networking.k8s.io_inferencepools.yaml"
88-
// inferExtManifest is the manifest for the inference extension test resources.
89-
inferExtManifest = "../../testdata/inferencepool-e2e.yaml"
88+
// inferExtManifestDefault is the manifest for the default inference extension test resources (single replica).
89+
inferExtManifestDefault = "../../testdata/inferencepool-e2e.yaml"
90+
// inferExtManifestLeaderElection is the manifest for the inference extension test resources with leader election enabled (3 replicas).
91+
inferExtManifestLeaderElection = "../../testdata/inferencepool-leader-election-e2e.yaml"
9092
// envoyManifest is the manifest for the envoy proxy test resources.
9193
envoyManifest = "../../testdata/envoy.yaml"
9294
// metricsRbacManifest is the manifest for the rbac resources for testing metrics.
@@ -95,15 +97,18 @@ const (
9597
modelServerManifestFilepathEnvVar = "MANIFEST_PATH"
9698
)
9799

100+
const e2eLeaderElectionEnabledEnvVar = "E2E_LEADER_ELECTION_ENABLED"
101+
98102
var (
99103
ctx = context.Background()
100104
cli client.Client
101105
// Required for exec'ing in curl pod
102-
kubeCli *kubernetes.Clientset
103-
scheme = runtime.NewScheme()
104-
cfg = config.GetConfigOrDie()
105-
nsName string
106-
e2eImage string
106+
kubeCli *kubernetes.Clientset
107+
scheme = runtime.NewScheme()
108+
cfg = config.GetConfigOrDie()
109+
nsName string
110+
e2eImage string
111+
leaderElectionEnabled bool
107112
)
108113

109114
func TestAPIs(t *testing.T) {
@@ -121,6 +126,11 @@ var _ = ginkgo.BeforeSuite(func() {
121126
e2eImage = os.Getenv("E2E_IMAGE")
122127
gomega.Expect(e2eImage).NotTo(gomega.BeEmpty(), "E2E_IMAGE environment variable is not set")
123128

129+
if os.Getenv(e2eLeaderElectionEnabledEnvVar) == "true" {
130+
leaderElectionEnabled = true
131+
ginkgo.By("Leader election test mode enabled via " + e2eLeaderElectionEnabledEnvVar)
132+
}
133+
124134
ginkgo.By("Setting up the test suite")
125135
setupSuite()
126136

@@ -146,7 +156,12 @@ func setupInfra() {
146156
}
147157

148158
createCRDs(cli, crds)
149-
createInferExt(cli, inferExtManifest)
159+
160+
inferExtManifestPath := inferExtManifestDefault
161+
if leaderElectionEnabled {
162+
inferExtManifestPath = inferExtManifestLeaderElection
163+
}
164+
createInferExt(cli, inferExtManifestPath)
150165
createClient(cli, clientManifest)
151166
createEnvoy(cli, envoyManifest)
152167
createMetricsRbac(cli, metricsRbacManifest)
@@ -156,6 +171,20 @@ func setupInfra() {
156171
}
157172

158173
var _ = ginkgo.AfterSuite(func() {
174+
// If E2E_PAUSE_ON_EXIT is set, pause the test run before cleanup.
175+
// This is useful for debugging the state of the cluster after the test has run.
176+
if pauseStr := os.Getenv("E2E_PAUSE_ON_EXIT"); pauseStr != "" {
177+
ginkgo.By("Pausing before cleanup as requested by E2E_PAUSE_ON_EXIT=" + pauseStr)
178+
pauseDuration, err := time.ParseDuration(pauseStr)
179+
if err != nil {
180+
// If it's not a valid duration (e.g., "true"), just wait indefinitely.
181+
ginkgo.By("Invalid duration, pausing indefinitely. Press Ctrl+C to stop the test runner when you are done.")
182+
select {} // Block forever
183+
}
184+
ginkgo.By(fmt.Sprintf("Pausing for %v...", pauseDuration))
185+
time.Sleep(pauseDuration)
186+
}
187+
159188
ginkgo.By("Performing global cleanup")
160189
cleanupResources()
161190
})
@@ -423,8 +452,12 @@ func createInferExt(k8sClient client.Client, filePath string) {
423452
return k8sClient.Get(ctx, types.NamespacedName{Namespace: nsName, Name: inferExtName}, deploy)
424453
}, existsTimeout, interval)
425454

426-
// Wait for the deployment to be available.
427-
testutils.DeploymentAvailable(ctx, k8sClient, deploy, modelReadyTimeout, interval)
455+
if leaderElectionEnabled {
456+
// With leader election enabled, only 1 replica will be "Ready" at any given time (the leader).
457+
testutils.DeploymentReadyReplicas(ctx, k8sClient, deploy, 1, modelReadyTimeout, interval)
458+
} else {
459+
testutils.DeploymentAvailable(ctx, k8sClient, deploy, modelReadyTimeout, interval)
460+
}
428461

429462
// Wait for the service to exist.
430463
testutils.EventuallyExists(ctx, func() error {

0 commit comments

Comments
 (0)