Skip to content

Affiliate ready state with leader election with a flag ha-enable-leader-election #1337

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 67 additions & 19 deletions cmd/epp/runner/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package runner

import (
"context"
"fmt"
"sync/atomic"

extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
"github.com/go-logr/logr"
Expand All @@ -30,37 +32,83 @@ import (
)

type healthServer struct {
logger logr.Logger
datastore datastore.Datastore
logger logr.Logger
datastore datastore.Datastore
isLeader *atomic.Bool
leaderElectionEnabled bool
}

const (
LivenessCheckService = "liveness"
ReadinessCheckService = "readiness"
)

func (s *healthServer) Check(ctx context.Context, in *healthPb.HealthCheckRequest) (*healthPb.HealthCheckResponse, error) {
// TODO: we're accepting ANY service name for now as a temporary hack in alignment with
// upstream issues. See https://github.com/kubernetes-sigs/gateway-api-inference-extension/pull/788
// if in.Service != extProcPb.ExternalProcessor_ServiceDesc.ServiceName {
// s.logger.V(logutil.DEFAULT).Info("gRPC health check requested unknown service", "available-services", []string{extProcPb.ExternalProcessor_ServiceDesc.ServiceName}, "requested-service", in.Service)
// return &healthPb.HealthCheckResponse{Status: healthPb.HealthCheckResponse_SERVICE_UNKNOWN}, nil
// }

if !s.datastore.PoolHasSynced() {
s.logger.V(logutil.DEFAULT).Info("gRPC health check not serving", "service", in.Service)
isLive := s.datastore.PoolHasSynced()

// If leader election is disabled, use current logic: all checks are based on whether the pool has synced.
if !s.leaderElectionEnabled {
if !isLive {
s.logger.V(logutil.DEFAULT).Info("gRPC health check not serving (leader election disabled)", "service", in.Service)
return &healthPb.HealthCheckResponse{Status: healthPb.HealthCheckResponse_NOT_SERVING}, nil
}
s.logger.V(logutil.TRACE).Info("gRPC health check serving (leader election disabled)", "service", in.Service)
return &healthPb.HealthCheckResponse{Status: healthPb.HealthCheckResponse_SERVING}, nil
}

// When leader election is enabled, differentiate between liveness and readiness.
// The service name in the request determines which check to perform.
var checkName string
var isPassing bool

switch in.Service {
case ReadinessCheckService:
checkName = "readiness"
isPassing = isLive && s.isLeader.Load()
case LivenessCheckService, "": // Default to liveness check if service is empty
checkName = "liveness"
// Any pod that is running and can respond to this gRPC check is considered "live".
// The datastore sync status should not affect liveness, only readiness.
// This is to prevent the non-leader node from continuous restarts
isPassing = true
case extProcPb.ExternalProcessor_ServiceDesc.ServiceName:
// The main service is considered ready only on the leader.
checkName = "ext_proc"
isPassing = isLive && s.isLeader.Load()
default:
s.logger.V(logutil.DEFAULT).Info("gRPC health check requested unknown service", "available-services", []string{LivenessCheckService, ReadinessCheckService, extProcPb.ExternalProcessor_ServiceDesc.ServiceName}, "requested-service", in.Service)
return &healthPb.HealthCheckResponse{Status: healthPb.HealthCheckResponse_SERVICE_UNKNOWN}, nil
}

if !isPassing {
s.logger.V(logutil.DEFAULT).Info(fmt.Sprintf("gRPC %s check not serving", checkName), "service", in.Service, "isLive", isLive, "isLeader", s.isLeader.Load())
return &healthPb.HealthCheckResponse{Status: healthPb.HealthCheckResponse_NOT_SERVING}, nil
}
s.logger.V(logutil.TRACE).Info("gRPC health check serving", "service", in.Service)

s.logger.V(logutil.TRACE).Info(fmt.Sprintf("gRPC %s check serving", checkName), "service", in.Service)
return &healthPb.HealthCheckResponse{Status: healthPb.HealthCheckResponse_SERVING}, nil
}

func (s *healthServer) List(ctx context.Context, _ *healthPb.HealthListRequest) (*healthPb.HealthListResponse, error) {
// currently only the ext_proc service is provided
serviceHealthResponse, err := s.Check(ctx, &healthPb.HealthCheckRequest{Service: extProcPb.ExternalProcessor_ServiceDesc.ServiceName})
if err != nil {
return nil, err
statuses := make(map[string]*healthPb.HealthCheckResponse)

services := []string{extProcPb.ExternalProcessor_ServiceDesc.ServiceName}
if s.leaderElectionEnabled {
services = append(services, LivenessCheckService, ReadinessCheckService)
}

for _, service := range services {
resp, err := s.Check(ctx, &healthPb.HealthCheckRequest{Service: service})
if err != nil {
// Check can return an error for unknown services, but here we are iterating known services.
// If another error occurs, we should probably return it.
return nil, err
}
statuses[service] = resp
}

return &healthPb.HealthListResponse{
Statuses: map[string]*healthPb.HealthCheckResponse{
extProcPb.ExternalProcessor_ServiceDesc.ServiceName: serviceHealthResponse,
},
Statuses: statuses,
}, nil
}

Expand Down
38 changes: 31 additions & 7 deletions cmd/epp/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"net/http"
"net/http/pprof"
"os"
"sync/atomic"

"github.com/go-logr/logr"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -151,6 +152,10 @@ var (
modelServerMetricsPath = flag.String("model-server-metrics-path", "/metrics", "Path to scrape metrics from pods")
modelServerMetricsScheme = flag.String("model-server-metrics-scheme", "http", "Scheme to scrape metrics from pods")
modelServerMetricsHttpsInsecureSkipVerify = flag.Bool("model-server-metrics-https-insecure-skip-verify", true, "When using 'https' scheme for 'model-server-metrics-scheme', configure 'InsecureSkipVerify' (default to true)")
haEnableLeaderElection = flag.Bool(
"ha-enable-leader-election",
false,
"Enables leader election for high availability. When enabled, readiness probes will only pass on the leader.")

setupLog = ctrl.Log.WithName("setup")
)
Expand Down Expand Up @@ -190,8 +195,9 @@ func bindEnvToFlags() {
"POOL_NAME": "pool-name",
"POOL_NAMESPACE": "pool-namespace",
// durations & bools work too; flag.Set expects the *string* form
"REFRESH_METRICS_INTERVAL": "refresh-metrics-interval",
"SECURE_SERVING": "secure-serving",
"REFRESH_METRICS_INTERVAL": "refresh-metrics-interval",
"SECURE_SERVING": "secure-serving",
"HA_ENABLE_LEADER_ELECTION": "ha-enable-leader-election",
} {
if v := os.Getenv(env); v != "" {
// ignore error; Parse() will catch invalid values later
Expand Down Expand Up @@ -299,12 +305,28 @@ func (r *Runner) Run(ctx context.Context) error {
NamespacedName: poolNamespacedName,
GroupKind: poolGroupKind,
}
mgr, err := runserver.NewDefaultManager(poolGKNN, cfg, metricsServerOptions)

isLeader := &atomic.Bool{}
isLeader.Store(false)

mgr, err := runserver.NewDefaultManager(poolGKNN, cfg, metricsServerOptions, *haEnableLeaderElection)
if err != nil {
setupLog.Error(err, "Failed to create controller manager")
return err
}

if *haEnableLeaderElection {
setupLog.Info("Leader election enabled")
go func() {
<-mgr.Elected()
isLeader.Store(true)
setupLog.Info("This instance is now the leader!")
}()
} else {
// If leader election is disabled, all instances are "leaders" for readiness purposes.
isLeader.Store(true)
}

if *enablePprof {
setupLog.Info("Enabling pprof handlers")
err = setupPprofHandlers(mgr)
Expand Down Expand Up @@ -356,7 +378,7 @@ func (r *Runner) Run(ctx context.Context) error {

// --- Add Runnables to Manager ---
// Register health server.
if err := registerHealthServer(mgr, ctrl.Log.WithName("health"), datastore, *grpcHealthPort); err != nil {
if err := registerHealthServer(mgr, ctrl.Log.WithName("health"), datastore, *grpcHealthPort, isLeader, *haEnableLeaderElection); err != nil {
return err
}

Expand Down Expand Up @@ -452,11 +474,13 @@ func registerExtProcServer(mgr manager.Manager, runner *runserver.ExtProcServerR
}

// registerHealthServer adds the Health gRPC server as a Runnable to the given manager.
func registerHealthServer(mgr manager.Manager, logger logr.Logger, ds datastore.Datastore, port int) error {
func registerHealthServer(mgr manager.Manager, logger logr.Logger, ds datastore.Datastore, port int, isLeader *atomic.Bool, leaderElectionEnabled bool) error {
srv := grpc.NewServer()
healthPb.RegisterHealthServer(srv, &healthServer{
logger: logger,
datastore: ds,
logger: logger,
datastore: ds,
isLeader: isLeader,
leaderElectionEnabled: leaderElectionEnabled,
})
if err := mgr.Add(
runnable.NoLeaderElection(runnable.GRPCServer("health", srv, port))); err != nil {
Expand Down
15 changes: 15 additions & 0 deletions config/charts/inferencepool/templates/epp-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ spec:
- "--model-server-metrics-path={{ .Values.inferenceExtension.modelServerMetricsPath }}"
- "--model-server-metrics-scheme={{ .Values.inferenceExtension.modelServerMetricsScheme }}"
- "--model-server-metrics-https-insecure-skip-verify={{ .Values.inferenceExtension.modelServerMetricsHttpsInsecureSkipVerify }}"
{{- if .Values.inferenceExtension.enableLeaderElection }}
- "--ha-enable-leader-election"
{{- end }}
{{- if eq (.Values.inferencePool.modelServerType | default "vllm") "triton-tensorrt-llm" }}
- --total-queued-requests-metric
- "nv_trt_llm_request_metrics{request_type=waiting}"
Expand All @@ -63,15 +66,27 @@ spec:
{{- toYaml . | nindent 8 }}
{{- end }}
livenessProbe:
{{- if .Values.inferenceExtension.enableLeaderElection }}
grpc:
port: 9003
service: liveness
{{- else }}
grpc:
port: 9003
service: inference-extension
{{- end }}
initialDelaySeconds: 5
periodSeconds: 10
readinessProbe:
{{- if .Values.inferenceExtension.enableLeaderElection }}
grpc:
port: 9003
service: readiness
{{- else }}
grpc:
port: 9003
service: inference-extension
{{- end }}
initialDelaySeconds: 5
periodSeconds: 10
{{- with .Values.inferenceExtension.env }}
Expand Down
30 changes: 30 additions & 0 deletions config/charts/inferencepool/templates/leader-election-rbac.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
{{- if .Values.inferenceExtension.enableLeaderElection }}
---
kind: Role
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: {{ include "gateway-api-inference-extension.name" . }}-leader-election
namespace: {{ .Release.Namespace }}
labels:
{{- include "gateway-api-inference-extension.labels" . | nindent 4 }}
rules:
- apiGroups: [ "coordination.k8s.io" ]
resources: [ "leases" ]
verbs: [ "get", "list", "watch", "create", "update", "patch", "delete" ]
- apiGroups: [ "" ]
resources: [ "events" ]
verbs: [ "create", "patch" ]
---
kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: {{ include "gateway-api-inference-extension.name" . }}-leader-election-binding
namespace: {{ .Release.Namespace }}
subjects:
- kind: ServiceAccount
name: {{ include "gateway-api-inference-extension.name" . }}
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: Role
name: {{ include "gateway-api-inference-extension.name" . }}-leader-election
{{- end }}
1 change: 1 addition & 0 deletions config/charts/inferencepool/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ inferenceExtension:
extraContainerPorts: []
# Define additional service ports
extraServicePorts: []
enableLeaderElection: false

inferencePool:
targetPortNumber: 8000
Expand Down
13 changes: 12 additions & 1 deletion pkg/epp/server/controller_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,23 @@ func defaultManagerOptions(gknn common.GKNN, metricsServerOptions metricsserver.
}

// NewDefaultManager creates a new controller manager with default configuration.
func NewDefaultManager(gknn common.GKNN, restConfig *rest.Config, metricsServerOptions metricsserver.Options) (ctrl.Manager, error) {
func NewDefaultManager(gknn common.GKNN, restConfig *rest.Config, metricsServerOptions metricsserver.Options, leaderElectionEnabled bool) (ctrl.Manager, error) {
opt, err := defaultManagerOptions(gknn, metricsServerOptions)
if err != nil {
return nil, fmt.Errorf("failed to create controller manager options: %v", err)
}

if leaderElectionEnabled {
opt.LeaderElection = true
opt.LeaderElectionResourceLock = "leases"
// The lease name needs to be unique per EPP deployment.
opt.LeaderElectionID = fmt.Sprintf("epp-%s-%s.gateway-api-inference-extension.sigs.k8s.io", gknn.Namespace, gknn.Name)
opt.LeaderElectionNamespace = gknn.Namespace
opt.LeaderElectionReleaseOnCancel = true
}

manager, err := ctrl.NewManager(restConfig, opt)

if err != nil {
return nil, fmt.Errorf("failed to create controller manager: %v", err)
}
Expand Down
13 changes: 13 additions & 0 deletions test/e2e/epp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,19 @@ Follow these steps to run the end-to-end tests:
export E2E_MANIFEST_PATH=[config/manifests/vllm/gpu-deployment.yaml|config/manifests/vllm/cpu-deployment.yaml]
```

- **Enable leader election tests**: By default, the e2e test runs the EPP server as a single replica.
To test the high-availability (HA) mode with leader election (3 replicas), set the following environment variable:

```sh
export E2E_LEADER_ELECTION_ENABLED=true
```

- **Pause before cleanup**: To pause the test run before cleaning up resources, set the `E2E_PAUSE_ON_EXIT` environment variable.
This is useful for debugging the state of the cluster after the test has run.

- To pause indefinitely, set it to `true`: `export E2E_PAUSE_ON_EXIT=true`
- To pause for a specific duration, provide a duration string: `export E2E_PAUSE_ON_EXIT=10m`

1. **Run the Tests**: Run the `test-e2e` target:

```sh
Expand Down
Loading