Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions test/e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,62 @@ var _ = ginkgo.Describe("Run end to end tests", ginkgo.Ordered, func() {
deleteObjects(modelServers)
})
})

ginkgo.When("Scaling up and down the model servers", func() {
ginkgo.It("work should be distributed across all model servers", func() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: if using ginkgo.It the text should probably align with that...

Suggested change
ginkgo.It("work should be distributed across all model servers", func() {
ginkgo.It("should distribute inference requests across all model servers", func() {

modelServers := createModelServers(false, false, 1, 0, 0)

epp := createEndPointPicker(scaleConfig)

prefillPods, decodePods := getModelServerPods(podSelector, prefillSelector, decodeSelector)
gomega.Expect(prefillPods).Should(gomega.BeEmpty())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when is the goal of validating that there are no prefill Pods?
Can it somehow fail and if so, how does a failure affect the test if at all?

gomega.Expect(decodePods).Should(gomega.HaveLen(1))

var nsHdr, podHdr string
for range 5 {
nsHdr, podHdr = runCompletion(simplePrompt, modelName)
gomega.Expect(nsHdr).Should(gomega.Equal(nsName))
gomega.Expect(podHdr).Should(gomega.Equal(decodePods[0]))
}

scaleDeployment(modelServers, 1)

scaledUpPrefillPods, scaledUpDecodePods := getModelServerPods(podSelector, prefillSelector, decodeSelector)
gomega.Expect(scaledUpPrefillPods).Should(gomega.BeEmpty())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

gomega.Expect(scaledUpDecodePods).Should(gomega.HaveLen(2))

time.Sleep(time.Second)
Comment on lines +153 to +155
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might it be worthwhile to check that pods are in ready state and not rely on the 1s sleep?
Or maybe there's a different reason for sleeping...?


var scaledNsHdr, scaledPodHdr string
for range 20 {
scaledNsHdr, scaledPodHdr = runCompletion(extraPrompt, modelName)
gomega.Expect(scaledNsHdr).Should(gomega.Equal(nsName))
gomega.Expect(scaledPodHdr).Should(gomega.BeElementOf(scaledUpDecodePods))
if scaledPodHdr != podHdr {
break
}
}
gomega.Expect(scaledPodHdr).ShouldNot(gomega.Equal(podHdr))

scaleDeployment(modelServers, -1)

scaledDownPrefillPods, scaledDownDecodePods := getModelServerPods(podSelector, prefillSelector, decodeSelector)
gomega.Expect(scaledDownPrefillPods).Should(gomega.BeEmpty())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

gomega.Expect(scaledDownDecodePods).Should(gomega.HaveLen(1))
gomega.Expect(scaledDownDecodePods[0]).Should(gomega.BeElementOf(scaledUpDecodePods))

time.Sleep(time.Second)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same. Expect on a condition (with Eventually...) would be quicker.


for range 5 {
nsHdr, podHdr = runCompletion(simplePrompt, modelName)
gomega.Expect(nsHdr).Should(gomega.Equal(nsName))
gomega.Expect(podHdr).Should(gomega.Equal(scaledDownDecodePods[0]))
}

deleteObjects(epp)
deleteObjects(modelServers)
})
})
})

// createModelServers creates the model server resources used for testing from the given filePaths.
Expand Down Expand Up @@ -339,3 +395,15 @@ schedulingProfiles:
- pluginRef: precise-prefix-cache-scorer
weight: 10
`

// EPP configuration for running scale model server test
const scaleConfig = `apiVersion: inference.networking.x-k8s.io/v1alpha1
kind: EndpointPickerConfig
plugins:
- type: max-score-picker
- type: single-profile-handler
schedulingProfiles:
- name: default
plugins:
- pluginRef: max-score-picker
`
38 changes: 36 additions & 2 deletions test/e2e/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,49 @@ import (
rbacv1 "k8s.io/api/rbac/v1"
apiextv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
apilabels "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/config"
"sigs.k8s.io/gateway-api-inference-extension/apix/v1alpha2"
testutils "sigs.k8s.io/gateway-api-inference-extension/test/utils"
)

const (
deploymentKind = "deployment"
)

func scaleDeployment(objects []string, increment int) {
k8sCfg := config.GetConfigOrDie()
client, err := kubernetes.NewForConfig(k8sCfg)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
direction := "up"
absIncrement := increment
if increment < 0 {
direction = "down"
absIncrement = -increment
}

for _, kindAndName := range objects {
split := strings.Split(kindAndName, "/")
if strings.ToLower(split[0]) == deploymentKind {
ginkgo.By(fmt.Sprintf("Scaling the deployment %s %s by %d", split[1], direction, absIncrement))
scale, err := client.AppsV1().Deployments(nsName).GetScale(ctx, split[1], v1.GetOptions{})
gomega.Expect(err).NotTo(gomega.HaveOccurred())

scale.Spec.Replicas += int32(increment)
_, err = client.AppsV1().Deployments(nsName).UpdateScale(ctx, split[1], scale, v1.UpdateOptions{})
gomega.Expect(err).NotTo(gomega.HaveOccurred())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the scale change can have succeeded but the replica count not have increased or the new pods not ready (or deleted) yet...

time.Sleep(time.Second)
}
}
podsInDeploymentsReady(objects)
}

func createObjsFromYaml(docs []string) []string {
objNames := []string{}

Expand Down Expand Up @@ -102,7 +136,7 @@ func getClientObject(kind string) client.Object {
return &corev1.ConfigMap{}
case "customresourcedefinition":
return &apiextv1.CustomResourceDefinition{}
case "deployment":
case deploymentKind:
return &appsv1.Deployment{}
case "inferencepool":
return &v1alpha2.InferencePool{}
Expand Down Expand Up @@ -181,7 +215,7 @@ func podsInDeploymentsReady(objects []string) {
}
for _, kindAndName := range objects {
split := strings.Split(kindAndName, "/")
if strings.ToLower(split[0]) == "deployment" {
if strings.ToLower(split[0]) == deploymentKind {
ginkgo.By(fmt.Sprintf("Waiting for pods of %s to be ready", split[1]))
gomega.Eventually(helper, readyTimeout, interval).WithArguments(split[1]).Should(gomega.BeTrue())
}
Expand Down
2 changes: 1 addition & 1 deletion test/scripts/run_e2e.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
export EPP_TAG="${EPP_TAG:-dev}"

# Set a default VLLM_SIMULATOR_TAG if not provided
export VLLM_SIMULATOR_TAG="${VLLM_SIMULATOR_TAG:-v0.4.0}"
export VLLM_SIMULATOR_TAG="${VLLM_SIMULATOR_TAG:-v0.5.0}"

# Set the default routing side car image tag
export ROUTING_SIDECAR_TAG="${ROUTING_SIDECAR_TAG:-v0.2.0}"
Expand Down