diff --git a/hack/test-e2e.sh b/hack/test-e2e.sh index 497e5de1e..0c041a833 100755 --- a/hack/test-e2e.sh +++ b/hack/test-e2e.sh @@ -50,5 +50,6 @@ else fi fi +kubectl delete crd inferencepools.inference.networking.k8s.io --ignore-not-found echo "Found an active cluster. Running Go e2e tests in ./epp..." go test ./test/e2e/epp/ -v -ginkgo.v diff --git a/test/e2e/epp/e2e_suite_test.go b/test/e2e/epp/e2e_suite_test.go index 504e59e2e..5cadd394d 100644 --- a/test/e2e/epp/e2e_suite_test.go +++ b/test/e2e/epp/e2e_suite_test.go @@ -317,7 +317,9 @@ func createEnvoy(testConfig *testutils.TestConfig, filePath string) { // createInferExt creates the inference extension resources used for testing from the given filePath. func createInferExt(testConfig *testutils.TestConfig, filePath string) { - inManifests := testutils.ReadYaml(filePath) + + // This image needs to be updated to open multiple ports and respond. + inManifests := testutils.ReadYaml(filePath) // Modify inference-pool.yaml ginkgo.By("Replacing placeholders with environment variables") outManifests := []string{} replacer := strings.NewReplacer( diff --git a/test/e2e/epp/e2e_test.go b/test/e2e/epp/e2e_test.go index 4d34dfd9b..d0598eebd 100644 --- a/test/e2e/epp/e2e_test.go +++ b/test/e2e/epp/e2e_test.go @@ -20,6 +20,8 @@ import ( "encoding/json" "errors" "fmt" + "maps" + "math" "strconv" "strings" "time" @@ -31,20 +33,68 @@ import ( appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" + v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" "sigs.k8s.io/gateway-api-inference-extension/apix/v1alpha2" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metadata" testutils "sigs.k8s.io/gateway-api-inference-extension/test/utils" ) +const ( + firstPort = 8000 + numPorts = 8 +) + var _ = ginkgo.Describe("InferencePool", func() { var infObjective *v1alpha2.InferenceObjective ginkgo.BeforeEach(func() { ginkgo.By("Waiting for the namespace to exist.") namespaceExists(testConfig) + ginkgo.By("Modifying deployment using local image for testing (temporary).") + deploy := &appsv1.Deployment{} + key := types.NamespacedName{Name: "vllm-llama3-8b-instruct", Namespace: testConfig.NsName} + + gomega.Eventually(func() error { + err := testConfig.K8sClient.Get(testConfig.Context, key, deploy) + if err != nil { + return err + } + + deploy.Spec.Template.Spec.Containers[0].Image = "vllm-dynamic-backend:local" // TODO(ryanrosario): Change back to official image after testing + deploy.Spec.Template.Spec.Containers[0].ImagePullPolicy = corev1.PullNever + deploy.Spec.Template.Spec.Containers[0].Args = []string{strconv.Itoa(firstPort), strconv.Itoa(numPorts)} + deploy.Spec.Template.Spec.Containers[0].Ports = buildContainerPorts(firstPort, numPorts) + return testConfig.K8sClient.Update(testConfig.Context, deploy) + }, testConfig.ExistsTimeout, testConfig.Interval).Should(gomega.Succeed()) + + waitForDeploymentRollout(testConfig, deploy) + + pool := &v1.InferencePool{} + gomega.Eventually(func() error { + err := testConfig.K8sClient.Get(testConfig.Context, key, pool) + if err != nil { + return err + } + + pool.Spec.TargetPorts = buildTargetPorts(firstPort, numPorts) + + return testConfig.K8sClient.Update(testConfig.Context, pool) + }, testConfig.ExistsTimeout, testConfig.Interval).Should(gomega.Succeed()) + + ginkgo.By("Restarting EPP to force configuration reload") + // We delete the EPP *POD*, not the deployment. The Deployment will recreate it immediately. + // This forces the new EPP process to read the Multi-Port InferencePool from scratch. + eppLabels := client.MatchingLabels{"app": inferExtName} + gomega.Expect(testConfig.K8sClient.DeleteAllOf(testConfig.Context, &corev1.Pod{}, client.InNamespace(testConfig.NsName), eppLabels)).To(gomega.Succeed()) + + // Wait for the new EPP to be ready + eppDeploy := &appsv1.Deployment{ObjectMeta: metav1.ObjectMeta{Name: inferExtName, Namespace: testConfig.NsName}} + waitForDeploymentReady(testConfig, eppDeploy) + ginkgo.By("Creating an InferenceObjective resource") infObjective = newInferenceObjective(testConfig.NsName) gomega.Expect(testConfig.K8sClient.Create(testConfig.Context, infObjective)).To(gomega.Succeed()) @@ -76,7 +126,7 @@ var _ = ginkgo.Describe("InferencePool", func() { }) ginkgo.It("Should expose EPP metrics after generating traffic", func() { - verifyMetrics() + verifyMetrics() // TODO(RyanRosario) Needs to be updated. }) }) @@ -204,33 +254,79 @@ func verifyTrafficRouting() { } { ginkgo.By(fmt.Sprintf("Verifying connectivity through the inference extension with %s api and prompt/messages: %v", t.api, t.promptOrMessages)) - // Ensure the expected responses include the InferenceObjective target model names. - var expected []string - expected = append(expected, targetModelName) - curlCmd := getCurlCommand(envoyName, testConfig.NsName, envoyPort, modelName, curlTimeout, t.api, t.promptOrMessages, false) - - actual := make(map[string]int) + // Expected ports and InferenceObjective target models + expectedPort := generateSequence(firstPort, numPorts) + expectedModel := []string{targetModelName} + + // Observed ports and InferenceObjective target models + actualModel := make(map[string]int) + actualPort := make(map[int]int) + // Probability: need to compute estimate of number of batches to send to have high confidence of hitting all ports. + // Using the Coupon Collector's Problem formula: n * H_n, where H_n is the nth harmonic number. + // This gives us an expected number of trials to collect all coupons (ports). + batches := int(math.Ceil(numPorts * harmonicNumber(numPorts))) + // Send curl requests to verify routing to all target ports in the InferencePool. gomega.Eventually(func() error { - resp, err := testutils.ExecCommandInPod(testConfig, "curl", "curl", curlCmd) - if err != nil { - return err + // Run a small batch per retry (e.g., 5) to keep the test active + for i := range batches { + uniqueID := time.Now().UnixNano() + dynamicHashValue := fmt.Sprintf("Nonce-%d", uniqueID) + currentPromptOrMessages := t.promptOrMessages // Start with the original + + // Check if the payload is a slice of maps (e.g., for /chat/completions) + if originalMessages, ok := currentPromptOrMessages.([]map[string]any); ok { + messagesCopy := make([]map[string]any, len(originalMessages)) + for idx, msg := range originalMessages { + msgCopy := make(map[string]any, len(msg)) + maps.Copy(msgCopy, msg) + // Inject a unique nonce into the content of *EACH* message + if content, ok := msgCopy["content"].(string); ok { + msgCopy["content"] = fmt.Sprintf("(TestNonce: %s-%d-msg%d) %s", dynamicHashValue, i, idx, content) + } + messagesCopy[idx] = msgCopy + } + currentPromptOrMessages = messagesCopy // Use the modified messages for getCurlCommand + } + + curlCmd := getCurlCommand(envoyName, testConfig.NsName, envoyPort, modelName, curlTimeout, t.api, currentPromptOrMessages, false) + + resp, err := testutils.ExecCommandInPod(testConfig, "curl", "curl", curlCmd) + if err != nil { + return err + } + + if !strings.Contains(resp, "200 OK") { + return fmt.Errorf("did not get 200 OK: %s", resp) + } + + for _, m := range expectedModel { + if strings.Contains(resp, m) { + actualModel[m] = 0 + } + } + for _, p := range expectedPort { + if strings.Contains(resp, fmt.Sprintf("x-backend-port: %d", p)) { + actualPort[p] = 0 + } + } } - if !strings.Contains(resp, "200 OK") { - return fmt.Errorf("did not get 200 OK: %s", resp) + + var gotModel []string + for m := range actualModel { + gotModel = append(gotModel, m) } - for _, m := range expected { - if strings.Contains(resp, m) { - actual[m] = 0 - } + var gotPort []int + for p := range actualPort { + gotPort = append(gotPort, p) } - var got []string - for m := range actual { - got = append(got, m) + + if !cmp.Equal(gotModel, expectedModel, cmpopts.SortSlices(func(a, b string) bool { return a < b })) { + return fmt.Errorf("collecting models... have %v, want %v", gotModel, expectedModel) } - // Compare ignoring order - if !cmp.Equal(got, expected, cmpopts.SortSlices(func(a, b string) bool { return a < b })) { - return fmt.Errorf("actual (%v) != expected (%v); resp=%q", got, expected, resp) + if !cmp.Equal(gotPort, expectedPort, cmpopts.SortSlices(func(a, b int) bool { return a < b })) { + return fmt.Errorf("collecting ports... have %v, want %v", gotPort, expectedPort) } + return nil }, testConfig.ReadyTimeout, curlInterval).Should(gomega.Succeed()) } @@ -257,28 +353,28 @@ func verifyMetrics() { "inference_extension_info", } - // Generate traffic by sending requests through the inference extension + // Generate traffic by sending requests through the inference extension. ginkgo.By("Generating traffic through the inference extension") curlCmd := getCurlCommand(envoyName, testConfig.NsName, envoyPort, modelName, curlTimeout, "/completions", "Write as if you were a critic: San Francisco", true) - // Run the curl command multiple times to generate some metrics data - for i := 0; i < 5; i++ { + // Run the curl command multiple times to generate some metrics data. + for range 5 { _, err := testutils.ExecCommandInPod(testConfig, "curl", "curl", curlCmd) gomega.Expect(err).NotTo(gomega.HaveOccurred()) } - // modify the curl command to generate some error metrics + // Modify the curl command to generate some error metrics. curlCmd[len(curlCmd)-1] = "invalid input" - for i := 0; i < 5; i++ { + for range 5 { _, err := testutils.ExecCommandInPod(testConfig, "curl", "curl", curlCmd) gomega.Expect(err).NotTo(gomega.HaveOccurred()) } - // Now scrape metrics from the EPP endpoint via the curl pod + // Now scrape metrics from the EPP endpoint via the curl pod. ginkgo.By("Scraping metrics from the EPP endpoint") podIP := findReadyPod().Status.PodIP - // Get the authorization token for reading metrics + // Get the authorization token for reading metrics. token := "" gomega.Eventually(func(g gomega.Gomega) { t, err := getMetricsReaderToken(testConfig.K8sClient) @@ -287,21 +383,22 @@ func verifyMetrics() { token = t }, testConfig.ExistsTimeout, testConfig.Interval).Should(gomega.Succeed()) - // Construct the metric scraping curl command using Pod IP + // Construct the metric scraping curl command using Pod IP. metricScrapeCmd := getMetricsScrapeCommand(podIP, token) ginkgo.By("Verifying that all expected metrics are present.") gomega.Eventually(func() error { - // Execute the metrics scrape command inside the curl pod + // Execute the metrics scrape command inside the curl pod. + fmt.Printf("pod IP: %s", podIP) resp, err := testutils.ExecCommandInPod(testConfig, "curl", "curl", metricScrapeCmd) if err != nil { return err } - // Verify that we got a 200 OK responsecurl + // Verify that we got a 200 OK responsecurl. if !strings.Contains(resp, "200 OK") { return fmt.Errorf("did not get 200 OK: %s", resp) } - // Check if all expected metrics are present in the metrics output + // Check if all expected metrics are present in the metrics output. for _, metric := range expectedMetrics { if !strings.Contains(resp, metric) { return fmt.Errorf("expected metric %s not found in metrics output", metric) @@ -359,6 +456,7 @@ func getMetricsScrapeCommand(podIP, token string) []string { // getCurlCommand returns the command, as a slice of strings, for curl'ing // the test model server at the given name, namespace, port, and model name. +// This command gets executed by a dummy pod that communites with Envoy func getCurlCommand(name, ns, port, model string, timeout time.Duration, api string, promptOrMessages any, streaming bool) []string { body := map[string]any{ "model": model, @@ -389,10 +487,119 @@ func getCurlCommand(name, ns, port, model string, timeout time.Duration, api str "-H", "Content-Type: application/json", "-H", + "Cache-Control: no-cache", + "-H", fmt.Sprintf("%v: inferenceobjective-sample", metadata.ObjectiveKey), "-H", fmt.Sprintf("%v: %s", metadata.ModelNameRewriteKey, targetModelName), + "-H", + "Connection: close", "-d", string(b), } } + +// buildContainerPorts constructs a slice of corev1.ContainerPort starting from 'start' with 'count' ports. +func buildContainerPorts(start int, count int) []corev1.ContainerPort { + ports := make([]corev1.ContainerPort, count) + for i := range count { + portNum := int32(start + i) + ports[i] = corev1.ContainerPort{ + Name: fmt.Sprintf("http-%d", portNum), + ContainerPort: portNum, + Protocol: corev1.ProtocolTCP, + } + } + return ports +} + +// buildTargetPorts constructs a slice of v1.Port starting from 'start' with 'count' ports. +func buildTargetPorts(start int, count int) []v1.Port { + ports := make([]v1.Port, count) + for i := range count { + ports[i] = v1.Port{ + Number: v1.PortNumber(start + i), + } + } + return ports +} + +// waitForDeploymentRollout waits until the Deployment has completed its update. +// It ensures that the new version is fully rolled out and available. +func waitForDeploymentRollout(tc *testutils.TestConfig, deploy *appsv1.Deployment) { + ginkgo.By(fmt.Sprintf("Waiting for Deployment %s/%s to complete rollout", deploy.Namespace, deploy.Name)) + + key := types.NamespacedName{Name: deploy.Name, Namespace: deploy.Namespace} + + gomega.Eventually(func() error { + currentDeploy := &appsv1.Deployment{} + if err := tc.K8sClient.Get(tc.Context, key, currentDeploy); err != nil { + return err + } + + if currentDeploy.Generation > currentDeploy.Status.ObservedGeneration { + return fmt.Errorf("deployment generation not observed yet") + } + + desiredReplicas := *currentDeploy.Spec.Replicas + + if currentDeploy.Status.UpdatedReplicas < desiredReplicas { + return fmt.Errorf("waiting for updated replicas: %d/%d", currentDeploy.Status.UpdatedReplicas, desiredReplicas) + } + + if currentDeploy.Status.AvailableReplicas < desiredReplicas { + return fmt.Errorf("waiting for available replicas: %d/%d", currentDeploy.Status.AvailableReplicas, desiredReplicas) + } + + if currentDeploy.Status.Replicas > desiredReplicas { + return fmt.Errorf("waiting for old replicas to terminate: %d > %d", currentDeploy.Status.Replicas, desiredReplicas) + } + + return nil + }, testConfig.ReadyTimeout, testConfig.Interval).Should(gomega.Succeed(), "Deployment failed to roll out within timeout") + + ginkgo.By("Deployment rollout complete") +} + +// waitForDeploymentReady waits for the Deployment to have all replicas ready. +func waitForDeploymentReady(tc *testutils.TestConfig, deploy *appsv1.Deployment) { + ginkgo.By(fmt.Sprintf("waiting for Deployment %s/%s to be ready", deploy.Namespace, deploy.Name)) + + key := types.NamespacedName{Name: deploy.Name, Namespace: deploy.Namespace} + + gomega.Eventually(func() error { + current := &appsv1.Deployment{} + if err := tc.K8sClient.Get(tc.Context, key, current); err != nil { + return err + } + + if current.Status.Replicas != current.Status.ReadyReplicas { + return fmt.Errorf("replicas mismatch: expected %d, got %d ready", + current.Status.Replicas, current.Status.ReadyReplicas) + } + + if current.Status.ReadyReplicas == 0 { + return fmt.Errorf("no replicas are ready yet") + } + + return nil + }, testConfig.ReadyTimeout, testConfig.Interval).Should(gomega.Succeed()) +} + +// generateSequence generates a sequence of integers starting from 'start' with 'count' numbers. +func generateSequence(start int, count int) []int { + nums := make([]int, count) + for i := range count { + nums[i] = start + i + } + return nums +} + +// Calculates the nth harmonic number. +func harmonicNumber(n int) float64 { + h := float64(0.0) + for i := 1; i <= n; i++ { + h += 1 / float64(i) + } + return h +} diff --git a/test/utils/utils.go b/test/utils/utils.go index c2d97696c..c24134715 100644 --- a/test/utils/utils.go +++ b/test/utils/utils.go @@ -338,6 +338,8 @@ func checkCrdStatus( func ExecCommandInPod(testConfig *TestConfig, podName, containerName string, cmd []string) (string, error) { parameterCodec := runtime.NewParameterCodec(testConfig.Scheme) + // construct REST request to the API server. + // podName is curl, that is where this is being sent. req := testConfig.KubeCli.CoreV1().RESTClient(). Post(). Resource("pods"). @@ -353,11 +355,13 @@ func ExecCommandInPod(testConfig *TestConfig, podName, containerName string, cmd TTY: false, }, parameterCodec) + // req.URL() contains the request above exec, err := remotecommand.NewSPDYExecutor(testConfig.RestConfig, "POST", req.URL()) if err != nil { return "", fmt.Errorf("could not initialize executor: %w", err) } + // Stream the data using the executor and stream result var stdout, stderr bytes.Buffer execErr := exec.StreamWithContext(testConfig.Context, remotecommand.StreamOptions{ Stdout: &stdout,