Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions hack/test-e2e.sh
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,6 @@ else
fi
fi

kubectl delete crd inferencepools.inference.networking.k8s.io --ignore-not-found
echo "Found an active cluster. Running Go e2e tests in ./epp..."
go test ./test/e2e/epp/ -v -ginkgo.v
4 changes: 3 additions & 1 deletion test/e2e/epp/e2e_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,9 @@ func createEnvoy(testConfig *testutils.TestConfig, filePath string) {

// createInferExt creates the inference extension resources used for testing from the given filePath.
func createInferExt(testConfig *testutils.TestConfig, filePath string) {
inManifests := testutils.ReadYaml(filePath)

// This image needs to be updated to open multiple ports and respond.
inManifests := testutils.ReadYaml(filePath) // Modify inference-pool.yaml
ginkgo.By("Replacing placeholders with environment variables")
outManifests := []string{}
replacer := strings.NewReplacer(
Expand Down
273 changes: 240 additions & 33 deletions test/e2e/epp/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"encoding/json"
"errors"
"fmt"
"maps"
"math"
"strconv"
"strings"
"time"
Expand All @@ -31,20 +33,68 @@ import (
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"

v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1"
"sigs.k8s.io/gateway-api-inference-extension/apix/v1alpha2"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metadata"
testutils "sigs.k8s.io/gateway-api-inference-extension/test/utils"
)

const (
firstPort = 8000
numPorts = 8
)

var _ = ginkgo.Describe("InferencePool", func() {
var infObjective *v1alpha2.InferenceObjective
ginkgo.BeforeEach(func() {
ginkgo.By("Waiting for the namespace to exist.")
namespaceExists(testConfig)

ginkgo.By("Modifying deployment using local image for testing (temporary).")
deploy := &appsv1.Deployment{}
key := types.NamespacedName{Name: "vllm-llama3-8b-instruct", Namespace: testConfig.NsName}

gomega.Eventually(func() error {
err := testConfig.K8sClient.Get(testConfig.Context, key, deploy)
if err != nil {
return err
}

deploy.Spec.Template.Spec.Containers[0].Image = "vllm-dynamic-backend:local" // TODO(ryanrosario): Change back to official image after testing
deploy.Spec.Template.Spec.Containers[0].ImagePullPolicy = corev1.PullNever
deploy.Spec.Template.Spec.Containers[0].Args = []string{strconv.Itoa(firstPort), strconv.Itoa(numPorts)}
deploy.Spec.Template.Spec.Containers[0].Ports = buildContainerPorts(firstPort, numPorts)
return testConfig.K8sClient.Update(testConfig.Context, deploy)
}, testConfig.ExistsTimeout, testConfig.Interval).Should(gomega.Succeed())

waitForDeploymentRollout(testConfig, deploy)

pool := &v1.InferencePool{}
gomega.Eventually(func() error {
err := testConfig.K8sClient.Get(testConfig.Context, key, pool)
if err != nil {
return err
}

pool.Spec.TargetPorts = buildTargetPorts(firstPort, numPorts)

return testConfig.K8sClient.Update(testConfig.Context, pool)
}, testConfig.ExistsTimeout, testConfig.Interval).Should(gomega.Succeed())

ginkgo.By("Restarting EPP to force configuration reload")
// We delete the EPP *POD*, not the deployment. The Deployment will recreate it immediately.
// This forces the new EPP process to read the Multi-Port InferencePool from scratch.
eppLabels := client.MatchingLabels{"app": inferExtName}
gomega.Expect(testConfig.K8sClient.DeleteAllOf(testConfig.Context, &corev1.Pod{}, client.InNamespace(testConfig.NsName), eppLabels)).To(gomega.Succeed())

// Wait for the new EPP to be ready
eppDeploy := &appsv1.Deployment{ObjectMeta: metav1.ObjectMeta{Name: inferExtName, Namespace: testConfig.NsName}}
waitForDeploymentReady(testConfig, eppDeploy)

ginkgo.By("Creating an InferenceObjective resource")
infObjective = newInferenceObjective(testConfig.NsName)
gomega.Expect(testConfig.K8sClient.Create(testConfig.Context, infObjective)).To(gomega.Succeed())
Expand Down Expand Up @@ -76,7 +126,7 @@ var _ = ginkgo.Describe("InferencePool", func() {
})

ginkgo.It("Should expose EPP metrics after generating traffic", func() {
verifyMetrics()
verifyMetrics() // TODO(RyanRosario) Needs to be updated.
})
})

Expand Down Expand Up @@ -204,33 +254,79 @@ func verifyTrafficRouting() {
} {
ginkgo.By(fmt.Sprintf("Verifying connectivity through the inference extension with %s api and prompt/messages: %v", t.api, t.promptOrMessages))

// Ensure the expected responses include the InferenceObjective target model names.
var expected []string
expected = append(expected, targetModelName)
curlCmd := getCurlCommand(envoyName, testConfig.NsName, envoyPort, modelName, curlTimeout, t.api, t.promptOrMessages, false)

actual := make(map[string]int)
// Expected ports and InferenceObjective target models
expectedPort := generateSequence(firstPort, numPorts)
expectedModel := []string{targetModelName}

// Observed ports and InferenceObjective target models
actualModel := make(map[string]int)
actualPort := make(map[int]int)
// Probability: need to compute estimate of number of batches to send to have high confidence of hitting all ports.
// Using the Coupon Collector's Problem formula: n * H_n, where H_n is the nth harmonic number.
// This gives us an expected number of trials to collect all coupons (ports).
batches := int(math.Ceil(numPorts * harmonicNumber(numPorts)))
// Send curl requests to verify routing to all target ports in the InferencePool.
gomega.Eventually(func() error {
resp, err := testutils.ExecCommandInPod(testConfig, "curl", "curl", curlCmd)
if err != nil {
return err
// Run a small batch per retry (e.g., 5) to keep the test active
for i := range batches {
uniqueID := time.Now().UnixNano()
dynamicHashValue := fmt.Sprintf("Nonce-%d", uniqueID)
currentPromptOrMessages := t.promptOrMessages // Start with the original

// Check if the payload is a slice of maps (e.g., for /chat/completions)
if originalMessages, ok := currentPromptOrMessages.([]map[string]any); ok {
messagesCopy := make([]map[string]any, len(originalMessages))
for idx, msg := range originalMessages {
msgCopy := make(map[string]any, len(msg))
maps.Copy(msgCopy, msg)
// Inject a unique nonce into the content of *EACH* message
if content, ok := msgCopy["content"].(string); ok {
msgCopy["content"] = fmt.Sprintf("(TestNonce: %s-%d-msg%d) %s", dynamicHashValue, i, idx, content)
}
messagesCopy[idx] = msgCopy
}
currentPromptOrMessages = messagesCopy // Use the modified messages for getCurlCommand
}

curlCmd := getCurlCommand(envoyName, testConfig.NsName, envoyPort, modelName, curlTimeout, t.api, currentPromptOrMessages, false)

resp, err := testutils.ExecCommandInPod(testConfig, "curl", "curl", curlCmd)
if err != nil {
return err
}

if !strings.Contains(resp, "200 OK") {
return fmt.Errorf("did not get 200 OK: %s", resp)
}

for _, m := range expectedModel {
if strings.Contains(resp, m) {
actualModel[m] = 0
}
}
for _, p := range expectedPort {
if strings.Contains(resp, fmt.Sprintf("x-backend-port: %d", p)) {
actualPort[p] = 0
}
}
}
if !strings.Contains(resp, "200 OK") {
return fmt.Errorf("did not get 200 OK: %s", resp)

var gotModel []string
for m := range actualModel {
gotModel = append(gotModel, m)
}
for _, m := range expected {
if strings.Contains(resp, m) {
actual[m] = 0
}
var gotPort []int
for p := range actualPort {
gotPort = append(gotPort, p)
}
var got []string
for m := range actual {
got = append(got, m)

if !cmp.Equal(gotModel, expectedModel, cmpopts.SortSlices(func(a, b string) bool { return a < b })) {
return fmt.Errorf("collecting models... have %v, want %v", gotModel, expectedModel)
}
// Compare ignoring order
if !cmp.Equal(got, expected, cmpopts.SortSlices(func(a, b string) bool { return a < b })) {
return fmt.Errorf("actual (%v) != expected (%v); resp=%q", got, expected, resp)
if !cmp.Equal(gotPort, expectedPort, cmpopts.SortSlices(func(a, b int) bool { return a < b })) {
return fmt.Errorf("collecting ports... have %v, want %v", gotPort, expectedPort)
}

return nil
}, testConfig.ReadyTimeout, curlInterval).Should(gomega.Succeed())
}
Expand All @@ -257,28 +353,28 @@ func verifyMetrics() {
"inference_extension_info",
}

// Generate traffic by sending requests through the inference extension
// Generate traffic by sending requests through the inference extension.
ginkgo.By("Generating traffic through the inference extension")
curlCmd := getCurlCommand(envoyName, testConfig.NsName, envoyPort, modelName, curlTimeout, "/completions", "Write as if you were a critic: San Francisco", true)

// Run the curl command multiple times to generate some metrics data
for i := 0; i < 5; i++ {
// Run the curl command multiple times to generate some metrics data.
for range 5 {
_, err := testutils.ExecCommandInPod(testConfig, "curl", "curl", curlCmd)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
}

// modify the curl command to generate some error metrics
// Modify the curl command to generate some error metrics.
curlCmd[len(curlCmd)-1] = "invalid input"
for i := 0; i < 5; i++ {
for range 5 {
_, err := testutils.ExecCommandInPod(testConfig, "curl", "curl", curlCmd)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
}

// Now scrape metrics from the EPP endpoint via the curl pod
// Now scrape metrics from the EPP endpoint via the curl pod.
ginkgo.By("Scraping metrics from the EPP endpoint")
podIP := findReadyPod().Status.PodIP

// Get the authorization token for reading metrics
// Get the authorization token for reading metrics.
token := ""
gomega.Eventually(func(g gomega.Gomega) {
t, err := getMetricsReaderToken(testConfig.K8sClient)
Expand All @@ -287,21 +383,22 @@ func verifyMetrics() {
token = t
}, testConfig.ExistsTimeout, testConfig.Interval).Should(gomega.Succeed())

// Construct the metric scraping curl command using Pod IP
// Construct the metric scraping curl command using Pod IP.
metricScrapeCmd := getMetricsScrapeCommand(podIP, token)

ginkgo.By("Verifying that all expected metrics are present.")
gomega.Eventually(func() error {
// Execute the metrics scrape command inside the curl pod
// Execute the metrics scrape command inside the curl pod.
fmt.Printf("pod IP: %s", podIP)
resp, err := testutils.ExecCommandInPod(testConfig, "curl", "curl", metricScrapeCmd)
if err != nil {
return err
}
// Verify that we got a 200 OK responsecurl
// Verify that we got a 200 OK responsecurl.
if !strings.Contains(resp, "200 OK") {
return fmt.Errorf("did not get 200 OK: %s", resp)
}
// Check if all expected metrics are present in the metrics output
// Check if all expected metrics are present in the metrics output.
for _, metric := range expectedMetrics {
if !strings.Contains(resp, metric) {
return fmt.Errorf("expected metric %s not found in metrics output", metric)
Expand Down Expand Up @@ -359,6 +456,7 @@ func getMetricsScrapeCommand(podIP, token string) []string {

// getCurlCommand returns the command, as a slice of strings, for curl'ing
// the test model server at the given name, namespace, port, and model name.
// This command gets executed by a dummy pod that communites with Envoy
func getCurlCommand(name, ns, port, model string, timeout time.Duration, api string, promptOrMessages any, streaming bool) []string {
body := map[string]any{
"model": model,
Expand Down Expand Up @@ -389,10 +487,119 @@ func getCurlCommand(name, ns, port, model string, timeout time.Duration, api str
"-H",
"Content-Type: application/json",
"-H",
"Cache-Control: no-cache",
"-H",
fmt.Sprintf("%v: inferenceobjective-sample", metadata.ObjectiveKey),
"-H",
fmt.Sprintf("%v: %s", metadata.ModelNameRewriteKey, targetModelName),
"-H",
"Connection: close",
"-d",
string(b),
}
}

// buildContainerPorts constructs a slice of corev1.ContainerPort starting from 'start' with 'count' ports.
func buildContainerPorts(start int, count int) []corev1.ContainerPort {
ports := make([]corev1.ContainerPort, count)
for i := range count {
portNum := int32(start + i)
ports[i] = corev1.ContainerPort{
Name: fmt.Sprintf("http-%d", portNum),
ContainerPort: portNum,
Protocol: corev1.ProtocolTCP,
}
}
return ports
}

// buildTargetPorts constructs a slice of v1.Port starting from 'start' with 'count' ports.
func buildTargetPorts(start int, count int) []v1.Port {
ports := make([]v1.Port, count)
for i := range count {
ports[i] = v1.Port{
Number: v1.PortNumber(start + i),
}
}
return ports
}

// waitForDeploymentRollout waits until the Deployment has completed its update.
// It ensures that the new version is fully rolled out and available.
func waitForDeploymentRollout(tc *testutils.TestConfig, deploy *appsv1.Deployment) {
ginkgo.By(fmt.Sprintf("Waiting for Deployment %s/%s to complete rollout", deploy.Namespace, deploy.Name))

key := types.NamespacedName{Name: deploy.Name, Namespace: deploy.Namespace}

gomega.Eventually(func() error {
currentDeploy := &appsv1.Deployment{}
if err := tc.K8sClient.Get(tc.Context, key, currentDeploy); err != nil {
return err
}

if currentDeploy.Generation > currentDeploy.Status.ObservedGeneration {
return fmt.Errorf("deployment generation not observed yet")
}

desiredReplicas := *currentDeploy.Spec.Replicas

if currentDeploy.Status.UpdatedReplicas < desiredReplicas {
return fmt.Errorf("waiting for updated replicas: %d/%d", currentDeploy.Status.UpdatedReplicas, desiredReplicas)
}

if currentDeploy.Status.AvailableReplicas < desiredReplicas {
return fmt.Errorf("waiting for available replicas: %d/%d", currentDeploy.Status.AvailableReplicas, desiredReplicas)
}

if currentDeploy.Status.Replicas > desiredReplicas {
return fmt.Errorf("waiting for old replicas to terminate: %d > %d", currentDeploy.Status.Replicas, desiredReplicas)
}

return nil
}, testConfig.ReadyTimeout, testConfig.Interval).Should(gomega.Succeed(), "Deployment failed to roll out within timeout")

ginkgo.By("Deployment rollout complete")
}

// waitForDeploymentReady waits for the Deployment to have all replicas ready.
func waitForDeploymentReady(tc *testutils.TestConfig, deploy *appsv1.Deployment) {
ginkgo.By(fmt.Sprintf("waiting for Deployment %s/%s to be ready", deploy.Namespace, deploy.Name))

key := types.NamespacedName{Name: deploy.Name, Namespace: deploy.Namespace}

gomega.Eventually(func() error {
current := &appsv1.Deployment{}
if err := tc.K8sClient.Get(tc.Context, key, current); err != nil {
return err
}

if current.Status.Replicas != current.Status.ReadyReplicas {
return fmt.Errorf("replicas mismatch: expected %d, got %d ready",
current.Status.Replicas, current.Status.ReadyReplicas)
}

if current.Status.ReadyReplicas == 0 {
return fmt.Errorf("no replicas are ready yet")
}

return nil
}, testConfig.ReadyTimeout, testConfig.Interval).Should(gomega.Succeed())
}

// generateSequence generates a sequence of integers starting from 'start' with 'count' numbers.
func generateSequence(start int, count int) []int {
nums := make([]int, count)
for i := range count {
nums[i] = start + i
}
return nums
}

// Calculates the nth harmonic number.
func harmonicNumber(n int) float64 {
h := float64(0.0)
for i := 1; i <= n; i++ {
h += 1 / float64(i)
}
return h
}
Loading