diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index 54bf48e8..ef02bc3d 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -128,6 +128,62 @@ var _ = ginkgo.Describe("Run end to end tests", ginkgo.Ordered, func() { deleteObjects(modelServers) }) }) + + ginkgo.When("Scaling up and down the model servers", func() { + ginkgo.It("work should be distributed across all model servers", func() { + modelServers := createModelServers(false, false, 1, 0, 0) + + epp := createEndPointPicker(scaleConfig) + + prefillPods, decodePods := getModelServerPods(podSelector, prefillSelector, decodeSelector) + gomega.Expect(prefillPods).Should(gomega.BeEmpty()) + gomega.Expect(decodePods).Should(gomega.HaveLen(1)) + + var nsHdr, podHdr string + for range 5 { + nsHdr, podHdr = runCompletion(simplePrompt, modelName) + gomega.Expect(nsHdr).Should(gomega.Equal(nsName)) + gomega.Expect(podHdr).Should(gomega.Equal(decodePods[0])) + } + + scaleDeployment(modelServers, 1) + + scaledUpPrefillPods, scaledUpDecodePods := getModelServerPods(podSelector, prefillSelector, decodeSelector) + gomega.Expect(scaledUpPrefillPods).Should(gomega.BeEmpty()) + gomega.Expect(scaledUpDecodePods).Should(gomega.HaveLen(2)) + + time.Sleep(time.Second) + + var scaledNsHdr, scaledPodHdr string + for range 20 { + scaledNsHdr, scaledPodHdr = runCompletion(extraPrompt, modelName) + gomega.Expect(scaledNsHdr).Should(gomega.Equal(nsName)) + gomega.Expect(scaledPodHdr).Should(gomega.BeElementOf(scaledUpDecodePods)) + if scaledPodHdr != podHdr { + break + } + } + gomega.Expect(scaledPodHdr).ShouldNot(gomega.Equal(podHdr)) + + scaleDeployment(modelServers, -1) + + scaledDownPrefillPods, scaledDownDecodePods := getModelServerPods(podSelector, prefillSelector, decodeSelector) + gomega.Expect(scaledDownPrefillPods).Should(gomega.BeEmpty()) + gomega.Expect(scaledDownDecodePods).Should(gomega.HaveLen(1)) + gomega.Expect(scaledDownDecodePods[0]).Should(gomega.BeElementOf(scaledUpDecodePods)) + + time.Sleep(time.Second) + + for range 5 { + nsHdr, podHdr = runCompletion(simplePrompt, modelName) + gomega.Expect(nsHdr).Should(gomega.Equal(nsName)) + gomega.Expect(podHdr).Should(gomega.Equal(scaledDownDecodePods[0])) + } + + deleteObjects(epp) + deleteObjects(modelServers) + }) + }) }) // createModelServers creates the model server resources used for testing from the given filePaths. @@ -339,3 +395,15 @@ schedulingProfiles: - pluginRef: precise-prefix-cache-scorer weight: 10 ` + +// EPP configuration for running scale model server test +const scaleConfig = `apiVersion: inference.networking.x-k8s.io/v1alpha1 +kind: EndpointPickerConfig +plugins: +- type: max-score-picker +- type: single-profile-handler +schedulingProfiles: +- name: default + plugins: + - pluginRef: max-score-picker +` diff --git a/test/e2e/utils_test.go b/test/e2e/utils_test.go index a291f926..d067c1ea 100644 --- a/test/e2e/utils_test.go +++ b/test/e2e/utils_test.go @@ -15,15 +15,49 @@ import ( rbacv1 "k8s.io/api/rbac/v1" apiextv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" apilabels "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/config" "sigs.k8s.io/gateway-api-inference-extension/apix/v1alpha2" testutils "sigs.k8s.io/gateway-api-inference-extension/test/utils" ) +const ( + deploymentKind = "deployment" +) + +func scaleDeployment(objects []string, increment int) { + k8sCfg := config.GetConfigOrDie() + client, err := kubernetes.NewForConfig(k8sCfg) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + direction := "up" + absIncrement := increment + if increment < 0 { + direction = "down" + absIncrement = -increment + } + + for _, kindAndName := range objects { + split := strings.Split(kindAndName, "/") + if strings.ToLower(split[0]) == deploymentKind { + ginkgo.By(fmt.Sprintf("Scaling the deployment %s %s by %d", split[1], direction, absIncrement)) + scale, err := client.AppsV1().Deployments(nsName).GetScale(ctx, split[1], v1.GetOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + scale.Spec.Replicas += int32(increment) + _, err = client.AppsV1().Deployments(nsName).UpdateScale(ctx, split[1], scale, v1.UpdateOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + time.Sleep(time.Second) + } + } + podsInDeploymentsReady(objects) +} + func createObjsFromYaml(docs []string) []string { objNames := []string{} @@ -102,7 +136,7 @@ func getClientObject(kind string) client.Object { return &corev1.ConfigMap{} case "customresourcedefinition": return &apiextv1.CustomResourceDefinition{} - case "deployment": + case deploymentKind: return &appsv1.Deployment{} case "inferencepool": return &v1alpha2.InferencePool{} @@ -181,7 +215,7 @@ func podsInDeploymentsReady(objects []string) { } for _, kindAndName := range objects { split := strings.Split(kindAndName, "/") - if strings.ToLower(split[0]) == "deployment" { + if strings.ToLower(split[0]) == deploymentKind { ginkgo.By(fmt.Sprintf("Waiting for pods of %s to be ready", split[1])) gomega.Eventually(helper, readyTimeout, interval).WithArguments(split[1]).Should(gomega.BeTrue()) } diff --git a/test/scripts/run_e2e.sh b/test/scripts/run_e2e.sh index add66dc3..5018c484 100755 --- a/test/scripts/run_e2e.sh +++ b/test/scripts/run_e2e.sh @@ -4,7 +4,7 @@ export EPP_TAG="${EPP_TAG:-dev}" # Set a default VLLM_SIMULATOR_TAG if not provided -export VLLM_SIMULATOR_TAG="${VLLM_SIMULATOR_TAG:-v0.4.0}" +export VLLM_SIMULATOR_TAG="${VLLM_SIMULATOR_TAG:-v0.5.0}" # Set the default routing side car image tag export ROUTING_SIDECAR_TAG="${ROUTING_SIDECAR_TAG:-v0.2.0}"