diff --git a/cmd/epp/runner/health.go b/cmd/epp/runner/health.go index cc5da8d2f..6e63e177b 100644 --- a/cmd/epp/runner/health.go +++ b/cmd/epp/runner/health.go @@ -18,6 +18,8 @@ package runner import ( "context" + "fmt" + "sync/atomic" extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" "github.com/go-logr/logr" @@ -30,37 +32,83 @@ import ( ) type healthServer struct { - logger logr.Logger - datastore datastore.Datastore + logger logr.Logger + datastore datastore.Datastore + isLeader *atomic.Bool + leaderElectionEnabled bool } +const ( + LivenessCheckService = "liveness" + ReadinessCheckService = "readiness" +) + func (s *healthServer) Check(ctx context.Context, in *healthPb.HealthCheckRequest) (*healthPb.HealthCheckResponse, error) { - // TODO: we're accepting ANY service name for now as a temporary hack in alignment with - // upstream issues. See https://github.com/kubernetes-sigs/gateway-api-inference-extension/pull/788 - // if in.Service != extProcPb.ExternalProcessor_ServiceDesc.ServiceName { - // s.logger.V(logutil.DEFAULT).Info("gRPC health check requested unknown service", "available-services", []string{extProcPb.ExternalProcessor_ServiceDesc.ServiceName}, "requested-service", in.Service) - // return &healthPb.HealthCheckResponse{Status: healthPb.HealthCheckResponse_SERVICE_UNKNOWN}, nil - // } - - if !s.datastore.PoolHasSynced() { - s.logger.V(logutil.DEFAULT).Info("gRPC health check not serving", "service", in.Service) + isLive := s.datastore.PoolHasSynced() + + // If leader election is disabled, use current logic: all checks are based on whether the pool has synced. + if !s.leaderElectionEnabled { + if !isLive { + s.logger.V(logutil.DEFAULT).Info("gRPC health check not serving (leader election disabled)", "service", in.Service) + return &healthPb.HealthCheckResponse{Status: healthPb.HealthCheckResponse_NOT_SERVING}, nil + } + s.logger.V(logutil.TRACE).Info("gRPC health check serving (leader election disabled)", "service", in.Service) + return &healthPb.HealthCheckResponse{Status: healthPb.HealthCheckResponse_SERVING}, nil + } + + // When leader election is enabled, differentiate between liveness and readiness. + // The service name in the request determines which check to perform. + var checkName string + var isPassing bool + + switch in.Service { + case ReadinessCheckService: + checkName = "readiness" + isPassing = isLive && s.isLeader.Load() + case LivenessCheckService, "": // Default to liveness check if service is empty + checkName = "liveness" + // Any pod that is running and can respond to this gRPC check is considered "live". + // The datastore sync status should not affect liveness, only readiness. + // This is to prevent the non-leader node from continuous restarts + isPassing = true + case extProcPb.ExternalProcessor_ServiceDesc.ServiceName: + // The main service is considered ready only on the leader. + checkName = "ext_proc" + isPassing = isLive && s.isLeader.Load() + default: + s.logger.V(logutil.DEFAULT).Info("gRPC health check requested unknown service", "available-services", []string{LivenessCheckService, ReadinessCheckService, extProcPb.ExternalProcessor_ServiceDesc.ServiceName}, "requested-service", in.Service) + return &healthPb.HealthCheckResponse{Status: healthPb.HealthCheckResponse_SERVICE_UNKNOWN}, nil + } + + if !isPassing { + s.logger.V(logutil.DEFAULT).Info(fmt.Sprintf("gRPC %s check not serving", checkName), "service", in.Service, "isLive", isLive, "isLeader", s.isLeader.Load()) return &healthPb.HealthCheckResponse{Status: healthPb.HealthCheckResponse_NOT_SERVING}, nil } - s.logger.V(logutil.TRACE).Info("gRPC health check serving", "service", in.Service) + + s.logger.V(logutil.TRACE).Info(fmt.Sprintf("gRPC %s check serving", checkName), "service", in.Service) return &healthPb.HealthCheckResponse{Status: healthPb.HealthCheckResponse_SERVING}, nil } func (s *healthServer) List(ctx context.Context, _ *healthPb.HealthListRequest) (*healthPb.HealthListResponse, error) { - // currently only the ext_proc service is provided - serviceHealthResponse, err := s.Check(ctx, &healthPb.HealthCheckRequest{Service: extProcPb.ExternalProcessor_ServiceDesc.ServiceName}) - if err != nil { - return nil, err + statuses := make(map[string]*healthPb.HealthCheckResponse) + + services := []string{extProcPb.ExternalProcessor_ServiceDesc.ServiceName} + if s.leaderElectionEnabled { + services = append(services, LivenessCheckService, ReadinessCheckService) + } + + for _, service := range services { + resp, err := s.Check(ctx, &healthPb.HealthCheckRequest{Service: service}) + if err != nil { + // Check can return an error for unknown services, but here we are iterating known services. + // If another error occurs, we should probably return it. + return nil, err + } + statuses[service] = resp } return &healthPb.HealthListResponse{ - Statuses: map[string]*healthPb.HealthCheckResponse{ - extProcPb.ExternalProcessor_ServiceDesc.ServiceName: serviceHealthResponse, - }, + Statuses: statuses, }, nil } diff --git a/cmd/epp/runner/runner.go b/cmd/epp/runner/runner.go index d8e21c416..10cf14504 100644 --- a/cmd/epp/runner/runner.go +++ b/cmd/epp/runner/runner.go @@ -25,6 +25,7 @@ import ( "net/http" "net/http/pprof" "os" + "sync/atomic" "github.com/go-logr/logr" "github.com/prometheus/client_golang/prometheus" @@ -151,6 +152,10 @@ var ( modelServerMetricsPath = flag.String("model-server-metrics-path", "/metrics", "Path to scrape metrics from pods") modelServerMetricsScheme = flag.String("model-server-metrics-scheme", "http", "Scheme to scrape metrics from pods") modelServerMetricsHttpsInsecureSkipVerify = flag.Bool("model-server-metrics-https-insecure-skip-verify", true, "When using 'https' scheme for 'model-server-metrics-scheme', configure 'InsecureSkipVerify' (default to true)") + haEnableLeaderElection = flag.Bool( + "ha-enable-leader-election", + false, + "Enables leader election for high availability. When enabled, readiness probes will only pass on the leader.") setupLog = ctrl.Log.WithName("setup") ) @@ -190,8 +195,9 @@ func bindEnvToFlags() { "POOL_NAME": "pool-name", "POOL_NAMESPACE": "pool-namespace", // durations & bools work too; flag.Set expects the *string* form - "REFRESH_METRICS_INTERVAL": "refresh-metrics-interval", - "SECURE_SERVING": "secure-serving", + "REFRESH_METRICS_INTERVAL": "refresh-metrics-interval", + "SECURE_SERVING": "secure-serving", + "HA_ENABLE_LEADER_ELECTION": "ha-enable-leader-election", } { if v := os.Getenv(env); v != "" { // ignore error; Parse() will catch invalid values later @@ -299,12 +305,28 @@ func (r *Runner) Run(ctx context.Context) error { NamespacedName: poolNamespacedName, GroupKind: poolGroupKind, } - mgr, err := runserver.NewDefaultManager(poolGKNN, cfg, metricsServerOptions) + + isLeader := &atomic.Bool{} + isLeader.Store(false) + + mgr, err := runserver.NewDefaultManager(poolGKNN, cfg, metricsServerOptions, *haEnableLeaderElection) if err != nil { setupLog.Error(err, "Failed to create controller manager") return err } + if *haEnableLeaderElection { + setupLog.Info("Leader election enabled") + go func() { + <-mgr.Elected() + isLeader.Store(true) + setupLog.Info("This instance is now the leader!") + }() + } else { + // If leader election is disabled, all instances are "leaders" for readiness purposes. + isLeader.Store(true) + } + if *enablePprof { setupLog.Info("Enabling pprof handlers") err = setupPprofHandlers(mgr) @@ -356,7 +378,7 @@ func (r *Runner) Run(ctx context.Context) error { // --- Add Runnables to Manager --- // Register health server. - if err := registerHealthServer(mgr, ctrl.Log.WithName("health"), datastore, *grpcHealthPort); err != nil { + if err := registerHealthServer(mgr, ctrl.Log.WithName("health"), datastore, *grpcHealthPort, isLeader, *haEnableLeaderElection); err != nil { return err } @@ -452,11 +474,13 @@ func registerExtProcServer(mgr manager.Manager, runner *runserver.ExtProcServerR } // registerHealthServer adds the Health gRPC server as a Runnable to the given manager. -func registerHealthServer(mgr manager.Manager, logger logr.Logger, ds datastore.Datastore, port int) error { +func registerHealthServer(mgr manager.Manager, logger logr.Logger, ds datastore.Datastore, port int, isLeader *atomic.Bool, leaderElectionEnabled bool) error { srv := grpc.NewServer() healthPb.RegisterHealthServer(srv, &healthServer{ - logger: logger, - datastore: ds, + logger: logger, + datastore: ds, + isLeader: isLeader, + leaderElectionEnabled: leaderElectionEnabled, }) if err := mgr.Add( runnable.NoLeaderElection(runnable.GRPCServer("health", srv, port))); err != nil { diff --git a/config/charts/inferencepool/templates/epp-deployment.yaml b/config/charts/inferencepool/templates/epp-deployment.yaml index 23957a3da..fc15449dc 100644 --- a/config/charts/inferencepool/templates/epp-deployment.yaml +++ b/config/charts/inferencepool/templates/epp-deployment.yaml @@ -44,6 +44,9 @@ spec: - "--model-server-metrics-path={{ .Values.inferenceExtension.modelServerMetricsPath }}" - "--model-server-metrics-scheme={{ .Values.inferenceExtension.modelServerMetricsScheme }}" - "--model-server-metrics-https-insecure-skip-verify={{ .Values.inferenceExtension.modelServerMetricsHttpsInsecureSkipVerify }}" + {{- if .Values.inferenceExtension.enableLeaderElection }} + - "--ha-enable-leader-election" + {{- end }} {{- if eq (.Values.inferencePool.modelServerType | default "vllm") "triton-tensorrt-llm" }} - --total-queued-requests-metric - "nv_trt_llm_request_metrics{request_type=waiting}" @@ -63,15 +66,27 @@ spec: {{- toYaml . | nindent 8 }} {{- end }} livenessProbe: + {{- if .Values.inferenceExtension.enableLeaderElection }} + grpc: + port: 9003 + service: liveness + {{- else }} grpc: port: 9003 service: inference-extension + {{- end }} initialDelaySeconds: 5 periodSeconds: 10 readinessProbe: + {{- if .Values.inferenceExtension.enableLeaderElection }} + grpc: + port: 9003 + service: readiness + {{- else }} grpc: port: 9003 service: inference-extension + {{- end }} initialDelaySeconds: 5 periodSeconds: 10 {{- with .Values.inferenceExtension.env }} diff --git a/config/charts/inferencepool/templates/leader-election-rbac.yaml b/config/charts/inferencepool/templates/leader-election-rbac.yaml new file mode 100644 index 000000000..923bdd6f4 --- /dev/null +++ b/config/charts/inferencepool/templates/leader-election-rbac.yaml @@ -0,0 +1,30 @@ +{{- if .Values.inferenceExtension.enableLeaderElection }} +--- +kind: Role +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: {{ include "gateway-api-inference-extension.name" . }}-leader-election + namespace: {{ .Release.Namespace }} + labels: + {{- include "gateway-api-inference-extension.labels" . | nindent 4 }} +rules: +- apiGroups: [ "coordination.k8s.io" ] + resources: [ "leases" ] + verbs: [ "get", "list", "watch", "create", "update", "patch", "delete" ] +- apiGroups: [ "" ] + resources: [ "events" ] + verbs: [ "create", "patch" ] +--- +kind: RoleBinding +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: {{ include "gateway-api-inference-extension.name" . }}-leader-election-binding + namespace: {{ .Release.Namespace }} +subjects: +- kind: ServiceAccount + name: {{ include "gateway-api-inference-extension.name" . }} +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: Role + name: {{ include "gateway-api-inference-extension.name" . }}-leader-election +{{- end }} diff --git a/config/charts/inferencepool/values.yaml b/config/charts/inferencepool/values.yaml index 2c295c1f7..18fbf3aa9 100644 --- a/config/charts/inferencepool/values.yaml +++ b/config/charts/inferencepool/values.yaml @@ -34,6 +34,7 @@ inferenceExtension: extraContainerPorts: [] # Define additional service ports extraServicePorts: [] + enableLeaderElection: false inferencePool: targetPortNumber: 8000 diff --git a/pkg/epp/server/controller_manager.go b/pkg/epp/server/controller_manager.go index 05038f7c0..47e4f12d4 100644 --- a/pkg/epp/server/controller_manager.go +++ b/pkg/epp/server/controller_manager.go @@ -85,12 +85,23 @@ func defaultManagerOptions(gknn common.GKNN, metricsServerOptions metricsserver. } // NewDefaultManager creates a new controller manager with default configuration. -func NewDefaultManager(gknn common.GKNN, restConfig *rest.Config, metricsServerOptions metricsserver.Options) (ctrl.Manager, error) { +func NewDefaultManager(gknn common.GKNN, restConfig *rest.Config, metricsServerOptions metricsserver.Options, leaderElectionEnabled bool) (ctrl.Manager, error) { opt, err := defaultManagerOptions(gknn, metricsServerOptions) if err != nil { return nil, fmt.Errorf("failed to create controller manager options: %v", err) } + + if leaderElectionEnabled { + opt.LeaderElection = true + opt.LeaderElectionResourceLock = "leases" + // The lease name needs to be unique per EPP deployment. + opt.LeaderElectionID = fmt.Sprintf("epp-%s-%s.gateway-api-inference-extension.sigs.k8s.io", gknn.Namespace, gknn.Name) + opt.LeaderElectionNamespace = gknn.Namespace + opt.LeaderElectionReleaseOnCancel = true + } + manager, err := ctrl.NewManager(restConfig, opt) + if err != nil { return nil, fmt.Errorf("failed to create controller manager: %v", err) } diff --git a/test/e2e/epp/README.md b/test/e2e/epp/README.md index c3e4aa17b..5a83f068c 100644 --- a/test/e2e/epp/README.md +++ b/test/e2e/epp/README.md @@ -45,6 +45,19 @@ Follow these steps to run the end-to-end tests: export E2E_MANIFEST_PATH=[config/manifests/vllm/gpu-deployment.yaml|config/manifests/vllm/cpu-deployment.yaml] ``` + - **Enable leader election tests**: By default, the e2e test runs the EPP server as a single replica. + To test the high-availability (HA) mode with leader election (3 replicas), set the following environment variable: + + ```sh + export E2E_LEADER_ELECTION_ENABLED=true + ``` + + - **Pause before cleanup**: To pause the test run before cleaning up resources, set the `E2E_PAUSE_ON_EXIT` environment variable. + This is useful for debugging the state of the cluster after the test has run. + + - To pause indefinitely, set it to `true`: `export E2E_PAUSE_ON_EXIT=true` + - To pause for a specific duration, provide a duration string: `export E2E_PAUSE_ON_EXIT=10m` + 1. **Run the Tests**: Run the `test-e2e` target: ```sh diff --git a/test/e2e/epp/e2e_suite_test.go b/test/e2e/epp/e2e_suite_test.go index a40d586ef..e6e5b83ba 100644 --- a/test/e2e/epp/e2e_suite_test.go +++ b/test/e2e/epp/e2e_suite_test.go @@ -85,8 +85,10 @@ const ( xInferObjectiveManifest = "../../../config/crd/bases/inference.networking.x-k8s.io_inferenceobjectives.yaml" // inferPoolManifest is the manifest for the inference pool CRD with 'inference.networking.k8s.io' group. inferPoolManifest = "../../../config/crd/bases/inference.networking.k8s.io_inferencepools.yaml" - // inferExtManifest is the manifest for the inference extension test resources. - inferExtManifest = "../../testdata/inferencepool-e2e.yaml" + // inferExtManifestDefault is the manifest for the default inference extension test resources (single replica). + inferExtManifestDefault = "../../testdata/inferencepool-e2e.yaml" + // inferExtManifestLeaderElection is the manifest for the inference extension test resources with leader election enabled (3 replicas). + inferExtManifestLeaderElection = "../../testdata/inferencepool-leader-election-e2e.yaml" // envoyManifest is the manifest for the envoy proxy test resources. envoyManifest = "../../testdata/envoy.yaml" // metricsRbacManifest is the manifest for the rbac resources for testing metrics. @@ -95,15 +97,18 @@ const ( modelServerManifestFilepathEnvVar = "MANIFEST_PATH" ) +const e2eLeaderElectionEnabledEnvVar = "E2E_LEADER_ELECTION_ENABLED" + var ( ctx = context.Background() cli client.Client // Required for exec'ing in curl pod - kubeCli *kubernetes.Clientset - scheme = runtime.NewScheme() - cfg = config.GetConfigOrDie() - nsName string - e2eImage string + kubeCli *kubernetes.Clientset + scheme = runtime.NewScheme() + cfg = config.GetConfigOrDie() + nsName string + e2eImage string + leaderElectionEnabled bool ) func TestAPIs(t *testing.T) { @@ -121,6 +126,11 @@ var _ = ginkgo.BeforeSuite(func() { e2eImage = os.Getenv("E2E_IMAGE") gomega.Expect(e2eImage).NotTo(gomega.BeEmpty(), "E2E_IMAGE environment variable is not set") + if os.Getenv(e2eLeaderElectionEnabledEnvVar) == "true" { + leaderElectionEnabled = true + ginkgo.By("Leader election test mode enabled via " + e2eLeaderElectionEnabledEnvVar) + } + ginkgo.By("Setting up the test suite") setupSuite() @@ -146,7 +156,12 @@ func setupInfra() { } createCRDs(cli, crds) - createInferExt(cli, inferExtManifest) + + inferExtManifestPath := inferExtManifestDefault + if leaderElectionEnabled { + inferExtManifestPath = inferExtManifestLeaderElection + } + createInferExt(cli, inferExtManifestPath) createClient(cli, clientManifest) createEnvoy(cli, envoyManifest) createMetricsRbac(cli, metricsRbacManifest) @@ -156,6 +171,20 @@ func setupInfra() { } var _ = ginkgo.AfterSuite(func() { + // If E2E_PAUSE_ON_EXIT is set, pause the test run before cleanup. + // This is useful for debugging the state of the cluster after the test has run. + if pauseStr := os.Getenv("E2E_PAUSE_ON_EXIT"); pauseStr != "" { + ginkgo.By("Pausing before cleanup as requested by E2E_PAUSE_ON_EXIT=" + pauseStr) + pauseDuration, err := time.ParseDuration(pauseStr) + if err != nil { + // If it's not a valid duration (e.g., "true"), just wait indefinitely. + ginkgo.By("Invalid duration, pausing indefinitely. Press Ctrl+C to stop the test runner when you are done.") + select {} // Block forever + } + ginkgo.By(fmt.Sprintf("Pausing for %v...", pauseDuration)) + time.Sleep(pauseDuration) + } + ginkgo.By("Performing global cleanup") cleanupResources() }) @@ -423,8 +452,12 @@ func createInferExt(k8sClient client.Client, filePath string) { return k8sClient.Get(ctx, types.NamespacedName{Namespace: nsName, Name: inferExtName}, deploy) }, existsTimeout, interval) - // Wait for the deployment to be available. - testutils.DeploymentAvailable(ctx, k8sClient, deploy, modelReadyTimeout, interval) + if leaderElectionEnabled { + // With leader election enabled, only 1 replica will be "Ready" at any given time (the leader). + testutils.DeploymentReadyReplicas(ctx, k8sClient, deploy, 1, modelReadyTimeout, interval) + } else { + testutils.DeploymentAvailable(ctx, k8sClient, deploy, modelReadyTimeout, interval) + } // Wait for the service to exist. testutils.EventuallyExists(ctx, func() error { diff --git a/test/e2e/epp/e2e_test.go b/test/e2e/epp/e2e_test.go index 5e9b40860..33f0a9032 100644 --- a/test/e2e/epp/e2e_test.go +++ b/test/e2e/epp/e2e_test.go @@ -28,6 +28,7 @@ import ( "github.com/google/go-cmp/cmp/cmpopts" "github.com/onsi/ginkgo/v2" "github.com/onsi/gomega" + appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" @@ -72,162 +73,92 @@ var _ = ginkgo.Describe("InferencePool", func() { ginkgo.When("The Inference Extension is running", func() { ginkgo.It("Should route traffic to target model servers", func() { - for _, t := range []struct { - api string - promptOrMessages any - }{ - { - api: "/completions", - promptOrMessages: "Write as if you were a critic: San Francisco", - }, - { - api: "/chat/completions", - promptOrMessages: []map[string]any{ - { - "role": "user", - "content": "Write as if you were a critic: San Francisco", - }, - }, - }, - { - api: "/chat/completions", - promptOrMessages: []map[string]any{ - { - "role": "user", - "content": "Write as if you were a critic: San Francisco", - }, - {"role": "assistant", "content": "Okay, let's see..."}, - {"role": "user", "content": "Now summarize your thoughts."}, - }, - }, - } { - ginkgo.By(fmt.Sprintf("Verifying connectivity through the inference extension with %s api and prompt/messages: %v", t.api, t.promptOrMessages)) - - // Ensure the expected responses include the InferenceObjective target model names. - var expected []string - for _, m := range infObjective.Spec.TargetModels { - expected = append(expected, m.Name) - } - curlCmd := getCurlCommand(envoyName, nsName, envoyPort, modelName, curlTimeout, t.api, t.promptOrMessages, false) - - actual := make(map[string]int) - gomega.Eventually(func() error { - resp, err := testutils.ExecCommandInPod(ctx, cfg, scheme, kubeCli, nsName, "curl", "curl", curlCmd) - if err != nil { - return err - } - if !strings.Contains(resp, "200 OK") { - return fmt.Errorf("did not get 200 OK: %s", resp) - } - for _, m := range expected { - if strings.Contains(resp, m) { - actual[m] = 0 - } - } - var got []string - for m := range actual { - got = append(got, m) - } - // Compare ignoring order - if !cmp.Equal(got, expected, cmpopts.SortSlices(func(a, b string) bool { return a < b })) { - return fmt.Errorf("actual (%v) != expected (%v); resp=%q", got, expected, resp) - } - return nil - }, readyTimeout, curlInterval).Should(gomega.Succeed()) - } + verifyTrafficRouting(infObjective) }) ginkgo.It("Should expose EPP metrics after generating traffic", func() { - // Define the metrics we expect to see - expectedMetrics := []string{ - "inference_model_request_total", - "inference_model_request_error_total", - "inference_model_request_duration_seconds", - // TODO: normalized_time_per_output_token_seconds is not actually recorded yet - // "normalized_time_per_output_token_seconds", - "inference_model_request_sizes", - "inference_model_response_sizes", - "inference_model_input_tokens", - "inference_model_output_tokens", - "inference_pool_average_kv_cache_utilization", - "inference_pool_average_queue_size", - "inference_pool_per_pod_queue_size", - "inference_model_running_requests", - "inference_pool_ready_pods", - "inference_extension_info", + verifyMetrics() + }) + }) + + ginkgo.When("Leader election is enabled", func() { + ginkgo.It("Should elect one leader and have other pods as not ready", func() { + if !leaderElectionEnabled { + ginkgo.Skip("Leader election is not enabled for this test run, skipping.") } - // Generate traffic by sending requests through the inference extension - ginkgo.By("Generating traffic through the inference extension") - curlCmd := getCurlCommand(envoyName, nsName, envoyPort, modelName, curlTimeout, "/completions", "Write as if you were a critic: San Francisco", true) + ginkgo.By("Verifying that exactly one EPP pod is ready") + gomega.Eventually(func(g gomega.Gomega) { + podList := &corev1.PodList{} + err := cli.List(ctx, podList, client.InNamespace(nsName), client.MatchingLabels{"app": inferExtName}) + g.Expect(err).NotTo(gomega.HaveOccurred()) - // Run the curl command multiple times to generate some metrics data - for i := 0; i < 5; i++ { - _, err := testutils.ExecCommandInPod(ctx, cfg, scheme, kubeCli, nsName, "curl", "curl", curlCmd) - gomega.Expect(err).NotTo(gomega.HaveOccurred()) - } + // The deployment should have 3 replicas for leader election. + g.Expect(podList.Items).To(gomega.HaveLen(3)) - // modify the curl command to generate some error metrics - curlCmd[len(curlCmd)-1] = "invalid input" - for i := 0; i < 5; i++ { - _, err := testutils.ExecCommandInPod(ctx, cfg, scheme, kubeCli, nsName, "curl", "curl", curlCmd) - gomega.Expect(err).NotTo(gomega.HaveOccurred()) + readyPods := 0 + for _, pod := range podList.Items { + for _, cond := range pod.Status.Conditions { + if cond.Type == corev1.PodReady && cond.Status == corev1.ConditionTrue { + readyPods++ + } + } + } + g.Expect(readyPods).To(gomega.Equal(1), "Expected exactly one pod to be ready") + }, readyTimeout, interval).Should(gomega.Succeed()) + }) + + ginkgo.It("Should successfully failover and serve traffic after the leader pod is deleted", func() { + if !leaderElectionEnabled { + ginkgo.Skip("Leader election is not enabled for this test run, skipping.") } - // Now scrape metrics from the EPP endpoint via the curl pod - ginkgo.By("Scraping metrics from the EPP endpoint") + ginkgo.By("STEP 1: Verifying initial leader is working correctly before failover") + verifyTrafficRouting(infObjective) + verifyMetrics() + + ginkgo.By("STEP 2: Finding and deleting the current leader pod") + oldLeaderPod := findReadyPod() + ginkgo.By("Found initial leader pod: " + oldLeaderPod.Name) - // Get Pod IP instead of Service - podList := &corev1.PodList{} - err := cli.List(ctx, podList, client.InNamespace(nsName), client.MatchingLabels{"app": inferExtName}) - gomega.Expect(err).NotTo(gomega.HaveOccurred()) - gomega.Expect(podList.Items).NotTo(gomega.BeEmpty()) - podIP := podList.Items[0].Status.PodIP - gomega.Expect(podIP).NotTo(gomega.BeEmpty()) + ginkgo.By(fmt.Sprintf("Deleting leader pod %s to trigger failover", oldLeaderPod.Name)) + gomega.Expect(cli.Delete(ctx, oldLeaderPod)).To(gomega.Succeed()) - // Get the authorization token for reading metrics - token := "" + ginkgo.By("STEP 3: Waiting for a new leader to be elected") + // The deployment controller will create a new pod. We need to wait for the total number of pods + // to be back to 3, and for one of the other pods to become the new leader. + deploy := &appsv1.Deployment{} gomega.Eventually(func() error { - token, err = getMetricsReaderToken(cli) - if err != nil { - return err - } - if token == "" { - return errors.New("token not found") - } - return nil + return cli.Get(ctx, types.NamespacedName{Namespace: nsName, Name: inferExtName}, deploy) }, existsTimeout, interval).Should(gomega.Succeed()) - // Construct the metric scraping curl command using Pod IP - metricScrapeCmd := []string{ - "curl", - "-i", - "--max-time", - strconv.Itoa((int)(curlTimeout.Seconds())), - "-H", - "Authorization: Bearer " + token, - fmt.Sprintf("http://%s:%d/metrics", podIP, 9090), - } + // Wait for one replica to become ready again. + testutils.DeploymentReadyReplicas(ctx, cli, deploy, 1, readyTimeout, interval) - ginkgo.By("Verifying that all expected metrics are present.") - gomega.Eventually(func() error { - // Execute the metrics scrape command inside the curl pod - resp, err := testutils.ExecCommandInPod(ctx, cfg, scheme, kubeCli, nsName, "curl", "curl", metricScrapeCmd) - if err != nil { - return err - } - // Verify that we got a 200 OK responsecurl - if !strings.Contains(resp, "200 OK") { - return fmt.Errorf("did not get 200 OK: %s", resp) - } - // Check if all expected metrics are present in the metrics output - for _, metric := range expectedMetrics { - if !strings.Contains(resp, metric) { - return fmt.Errorf("expected metric %s not found in metrics output", metric) - } - } - return nil - }, readyTimeout, curlInterval).Should(gomega.Succeed()) + // Also wait for the total number of replicas to be back to 3. + gomega.Eventually(func(g gomega.Gomega) { + d := &appsv1.Deployment{} + err := cli.Get(ctx, types.NamespacedName{Namespace: nsName, Name: inferExtName}, d) + g.Expect(err).NotTo(gomega.HaveOccurred()) + g.Expect(d.Status.Replicas).To(gomega.Equal(int32(3)), "Deployment should have 3 replicas") + }, readyTimeout, interval).Should(gomega.Succeed()) + + ginkgo.By("STEP 4: Verifying a new, different leader is elected") + var newLeaderPod *corev1.Pod + gomega.Eventually(func(g gomega.Gomega) { + // Find the current ready pod. + newLeaderPod = findReadyPod() + + // Ensure the new leader is not the same as the one we just deleted. + // This guards against a race condition where we might find the old leader + // before its status is updated to NotReady. + g.Expect(newLeaderPod.Name).NotTo(gomega.Equal(oldLeaderPod.Name), "The new leader should not be the same as the old deleted leader") + }, readyTimeout, interval).Should(gomega.Succeed()) + ginkgo.By("Found new leader pod: " + newLeaderPod.Name) + + ginkgo.By("STEP 5: Verifying the new leader is working correctly after failover") + verifyTrafficRouting(infObjective) + verifyMetrics() }) }) }) @@ -247,6 +178,150 @@ func newInferenceObjective(ns string) *v1alpha2.InferenceObjective { Obj() } +// verifyTrafficRouting contains the logic for the "Should route traffic to target model servers" test. +func verifyTrafficRouting(infObjective *v1alpha2.InferenceObjective) { + ginkgo.By("Verifying traffic routing") + for _, t := range []struct { + api string + promptOrMessages any + }{ + { + api: "/completions", + promptOrMessages: "Write as if you were a critic: San Francisco", + }, + { + api: "/chat/completions", + promptOrMessages: []map[string]any{ + { + "role": "user", + "content": "Write as if you were a critic: San Francisco", + }, + }, + }, + { + api: "/chat/completions", + promptOrMessages: []map[string]any{ + { + "role": "user", + "content": "Write as if you were a critic: San Francisco", + }, + {"role": "assistant", "content": "Okay, let's see..."}, + {"role": "user", "content": "Now summarize your thoughts."}, + }, + }, + } { + ginkgo.By(fmt.Sprintf("Verifying connectivity through the inference extension with %s api and prompt/messages: %v", t.api, t.promptOrMessages)) + + // Ensure the expected responses include the InferenceObjective target model names. + var expected []string + for _, m := range infObjective.Spec.TargetModels { + expected = append(expected, m.Name) + } + curlCmd := getCurlCommand(envoyName, nsName, envoyPort, modelName, curlTimeout, t.api, t.promptOrMessages, false) + + actual := make(map[string]int) + gomega.Eventually(func() error { + resp, err := testutils.ExecCommandInPod(ctx, cfg, scheme, kubeCli, nsName, "curl", "curl", curlCmd) + if err != nil { + return err + } + if !strings.Contains(resp, "200 OK") { + return fmt.Errorf("did not get 200 OK: %s", resp) + } + for _, m := range expected { + if strings.Contains(resp, m) { + actual[m] = 0 + } + } + var got []string + for m := range actual { + got = append(got, m) + } + // Compare ignoring order + if !cmp.Equal(got, expected, cmpopts.SortSlices(func(a, b string) bool { return a < b })) { + return fmt.Errorf("actual (%v) != expected (%v); resp=%q", got, expected, resp) + } + return nil + }, readyTimeout, curlInterval).Should(gomega.Succeed()) + } +} + +// verifyMetrics contains the logic for the "Should expose EPP metrics after generating traffic" test. +func verifyMetrics() { + ginkgo.By("Verifying metrics exposure") + // Define the metrics we expect to see + expectedMetrics := []string{ + "inference_model_request_total", + "inference_model_request_error_total", + "inference_model_request_duration_seconds", + // TODO: normalized_time_per_output_token_seconds is not actually recorded yet + // "normalized_time_per_output_token_seconds", + "inference_model_request_sizes", + "inference_model_response_sizes", + "inference_model_input_tokens", + "inference_model_output_tokens", + "inference_pool_average_kv_cache_utilization", + "inference_pool_average_queue_size", + "inference_pool_per_pod_queue_size", + "inference_model_running_requests", + "inference_pool_ready_pods", + "inference_extension_info", + } + + // Generate traffic by sending requests through the inference extension + ginkgo.By("Generating traffic through the inference extension") + curlCmd := getCurlCommand(envoyName, nsName, envoyPort, modelName, curlTimeout, "/completions", "Write as if you were a critic: San Francisco", true) + + // Run the curl command multiple times to generate some metrics data + for i := 0; i < 5; i++ { + _, err := testutils.ExecCommandInPod(ctx, cfg, scheme, kubeCli, nsName, "curl", "curl", curlCmd) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + } + + // modify the curl command to generate some error metrics + curlCmd[len(curlCmd)-1] = "invalid input" + for i := 0; i < 5; i++ { + _, err := testutils.ExecCommandInPod(ctx, cfg, scheme, kubeCli, nsName, "curl", "curl", curlCmd) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + } + + // Now scrape metrics from the EPP endpoint via the curl pod + ginkgo.By("Scraping metrics from the EPP endpoint") + podIP := findReadyPod().Status.PodIP + + // Get the authorization token for reading metrics + token := "" + gomega.Eventually(func(g gomega.Gomega) { + t, err := getMetricsReaderToken(cli) + g.Expect(err).NotTo(gomega.HaveOccurred()) + g.Expect(t).NotTo(gomega.BeEmpty()) + token = t + }, existsTimeout, interval).Should(gomega.Succeed()) + + // Construct the metric scraping curl command using Pod IP + metricScrapeCmd := getMetricsScrapeCommand(podIP, token) + + ginkgo.By("Verifying that all expected metrics are present.") + gomega.Eventually(func() error { + // Execute the metrics scrape command inside the curl pod + resp, err := testutils.ExecCommandInPod(ctx, cfg, scheme, kubeCli, nsName, "curl", "curl", metricScrapeCmd) + if err != nil { + return err + } + // Verify that we got a 200 OK responsecurl + if !strings.Contains(resp, "200 OK") { + return fmt.Errorf("did not get 200 OK: %s", resp) + } + // Check if all expected metrics are present in the metrics output + for _, metric := range expectedMetrics { + if !strings.Contains(resp, metric) { + return fmt.Errorf("expected metric %s not found in metrics output", metric) + } + } + return nil + }, readyTimeout, curlInterval).Should(gomega.Succeed()) +} + func getMetricsReaderToken(k8sClient client.Client) (string, error) { secret := &corev1.Secret{} err := k8sClient.Get(ctx, types.NamespacedName{Namespace: nsName, Name: metricsReaderSecretName}, secret) @@ -256,6 +331,43 @@ func getMetricsReaderToken(k8sClient client.Client) (string, error) { return string(secret.Data["token"]), nil } +// findReadyPod finds the first EPP pod that has a "Ready" status condition. +// It's used to target the leader pod in an HA setup. +func findReadyPod() *corev1.Pod { + var readyPod *corev1.Pod + gomega.Eventually(func(g gomega.Gomega) { + podList := &corev1.PodList{} + err := cli.List(ctx, podList, client.InNamespace(nsName), client.MatchingLabels{"app": inferExtName}) + g.Expect(err).NotTo(gomega.HaveOccurred()) + + foundReadyPod := false + for i := range podList.Items { + pod := &podList.Items[i] + for _, cond := range pod.Status.Conditions { + if cond.Type == corev1.PodReady && cond.Status == corev1.ConditionTrue { + g.Expect(pod.Status.PodIP).NotTo(gomega.BeEmpty(), "Ready pod must have an IP") + readyPod = pod + foundReadyPod = true + break // break inner loop + } + } + if foundReadyPod { + break // break outer loop + } + } + g.Expect(foundReadyPod).To(gomega.BeTrue(), "No ready EPP pod found") + }, readyTimeout, interval).Should(gomega.Succeed()) + return readyPod +} + +// getMetricsScrapeCommand returns the command to scrape the /metrics endpoint. +func getMetricsScrapeCommand(podIP, token string) []string { + return []string{ + "curl", "-i", "--max-time", strconv.Itoa((int)(curlTimeout.Seconds())), + "-H", "Authorization: Bearer " + token, fmt.Sprintf("http://%s:%d/metrics", podIP, 9090), + } +} + // getCurlCommand returns the command, as a slice of strings, for curl'ing // the test model server at the given name, namespace, port, and model name. func getCurlCommand(name, ns, port, model string, timeout time.Duration, api string, promptOrMessages any, streaming bool) []string { diff --git a/test/testdata/inferencepool-leader-election-e2e.yaml b/test/testdata/inferencepool-leader-election-e2e.yaml new file mode 100644 index 000000000..413fa55d9 --- /dev/null +++ b/test/testdata/inferencepool-leader-election-e2e.yaml @@ -0,0 +1,206 @@ +apiVersion: inference.networking.k8s.io/v1 +kind: InferencePool +metadata: + labels: + name: vllm-llama3-8b-instruct +spec: + targetPortNumber: 8000 + selector: + app: vllm-llama3-8b-instruct + extensionRef: + name: vllm-llama3-8b-instruct-epp + namespace: $E2E_NS +--- +apiVersion: v1 +kind: Service +metadata: + name: vllm-llama3-8b-instruct-epp + namespace: $E2E_NS +spec: + selector: + app: vllm-llama3-8b-instruct-epp + ports: + - protocol: TCP + port: 9002 + targetPort: 9002 + appProtocol: http2 + type: ClusterIP +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: vllm-llama3-8b-instruct-epp + namespace: $E2E_NS +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: vllm-llama3-8b-instruct-epp + namespace: $E2E_NS + labels: + app: vllm-llama3-8b-instruct-epp +spec: + replicas: 3 + selector: + matchLabels: + app: vllm-llama3-8b-instruct-epp + template: + metadata: + labels: + app: vllm-llama3-8b-instruct-epp + spec: + serviceAccountName: vllm-llama3-8b-instruct-epp + # Conservatively, this timeout should mirror the longest grace period of the pods within the pool + terminationGracePeriodSeconds: 130 + containers: + - name: epp + image: $E2E_IMAGE + imagePullPolicy: IfNotPresent + args: + - --pool-name + - "vllm-llama3-8b-instruct" + - --pool-namespace + - "$E2E_NS" + - --v + - "4" + - --zap-encoder + - "json" + - --grpc-port + - "9002" + - --grpc-health-port + - "9003" + - --ha-enable-leader-election + - "--config-file" + - "/config/default-plugins.yaml" + ports: + - containerPort: 9002 + - containerPort: 9003 + - name: metrics + containerPort: 9090 + livenessProbe: + grpc: + port: 9003 + service: liveness + initialDelaySeconds: 5 + periodSeconds: 10 + readinessProbe: + grpc: + port: 9003 + service: readiness + initialDelaySeconds: 5 + periodSeconds: 10 + volumeMounts: + - name: plugins-config-volume + mountPath: "/config" + volumes: + - name: plugins-config-volume + configMap: + name: plugins-config +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: plugins-config + namespace: $E2E_NS +data: + default-plugins.yaml: | + apiVersion: inference.networking.x-k8s.io/v1alpha1 + kind: EndpointPickerConfig + plugins: + - type: queue-scorer + - type: kv-cache-utilization-scorer + - type: prefix-cache-scorer + schedulingProfiles: + - name: default + plugins: + - pluginRef: queue-scorer + - pluginRef: kv-cache-utilization-scorer + - pluginRef: prefix-cache-scorer +--- +kind: Role +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: pod-read + namespace: $E2E_NS +rules: +- apiGroups: [ "inference.networking.x-k8s.io" ] + resources: [ "inferenceobjectives", "inferencepools" ] + verbs: [ "get", "watch", "list" ] +- apiGroups: [ "inference.networking.k8s.io" ] + resources: [ "inferencepools" ] + verbs: [ "get", "watch", "list" ] +- apiGroups: [ "" ] + resources: [ "pods" ] + verbs: [ "get", "watch", "list" ] +--- +kind: Role +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: epp-leader-election + namespace: $E2E_NS +rules: +- apiGroups: [ "coordination.k8s.io" ] + resources: [ "leases" ] + verbs: [ "get", "list", "watch", "create", "update", "patch", "delete" ] +- apiGroups: [ "" ] + resources: [ "events" ] + verbs: [ "create", "patch" ] +--- +kind: RoleBinding +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: epp-leader-election-binding + namespace: $E2E_NS +subjects: +- kind: ServiceAccount + name: vllm-llama3-8b-instruct-epp + namespace: $E2E_NS +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: Role + name: epp-leader-election +--- +kind: RoleBinding +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: pod-read-binding + namespace: $E2E_NS +subjects: +- kind: ServiceAccount + name: vllm-llama3-8b-instruct-epp + namespace: $E2E_NS +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: Role + name: pod-read +--- +kind: ClusterRole +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: auth-reviewer +rules: +- apiGroups: + - authentication.k8s.io + resources: + - tokenreviews + verbs: + - create +- apiGroups: + - authorization.k8s.io + resources: + - subjectaccessreviews + verbs: + - create +--- +kind: ClusterRoleBinding +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: auth-reviewer-binding +subjects: +- kind: ServiceAccount + name: vllm-llama3-8b-instruct-epp + namespace: $E2E_NS +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: auth-reviewer diff --git a/test/utils/utils.go b/test/utils/utils.go index c3e275952..8cc19b7ac 100644 --- a/test/utils/utils.go +++ b/test/utils/utils.go @@ -204,6 +204,18 @@ func DeploymentAvailable(ctx context.Context, cli client.Client, deploy *appsv1. gomega.Eventually(checkDeploymentStatus, timeout, interval).WithArguments(ctx, cli, deploy, conditions).Should(gomega.BeTrue()) } +// DeploymentReadyReplicas checks if the given Deployment has at least `count` ready replicas before the given timeout. +func DeploymentReadyReplicas(ctx context.Context, cli client.Client, deploy *appsv1.Deployment, count int, timeout, interval time.Duration) { + ginkgo.By(fmt.Sprintf("Checking if deployment %s/%s has at least %d ready replica(s)", deploy.Namespace, deploy.Name, count)) + gomega.Eventually(func(g gomega.Gomega) { + var fetchedDeploy appsv1.Deployment + err := cli.Get(ctx, types.NamespacedName{Namespace: deploy.Namespace, Name: deploy.Name}, &fetchedDeploy) + g.Expect(err).NotTo(gomega.HaveOccurred()) + g.Expect(fetchedDeploy.Status.ReadyReplicas).To(gomega.BeNumerically(">=", count), + fmt.Sprintf("Deployment only has %d ready replicas, want at least %d", fetchedDeploy.Status.ReadyReplicas, count)) + }, timeout, interval).Should(gomega.Succeed()) +} + // checkDeploymentStatus checks if the given Deployment status matches the expected conditions. func checkDeploymentStatus(ctx context.Context, cli client.Client, deploy *appsv1.Deployment, conditions []appsv1.DeploymentCondition) (bool, error) { var fetchedDeploy appsv1.Deployment