Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions tests/common/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
)

const (
// The environment variable referring to image simulating sleep condition in container
sleepImageEnvVar = "SLEEP_IMAGE"
// Name of the authenticated Notebook user
notebookUserName = "NOTEBOOK_USER_NAME"
// Token of the authenticated Notebook user
Expand Down Expand Up @@ -154,6 +156,10 @@ func GetHuggingFaceToken(t Test) string {
return token
}

func GetSleepImage() string {
return lookupEnvOrDefault(sleepImageEnvVar, "gcr.io/k8s-staging-perf-tests/sleep@sha256:8d91ddf9f145b66475efda1a1b52269be542292891b5de2a7fad944052bab6ea")
}

func init() {
flag.StringVar(&testTierParam, "testTier", "", "Test tier")
}
Expand Down
18 changes: 18 additions & 0 deletions tests/common/support/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/onsi/gomega"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

Expand Down Expand Up @@ -85,6 +86,23 @@ func GetNamespaceWithName(t Test, namespaceName string) *corev1.Namespace {
return namespace
}

// CreateOrGetTestNamespaceWithName creates a namespace with the given name if it doesn't exist,
// or returns the existing namespace if it does. This is useful for scenarios where
// the namespace needs to persist across multiple test phases.
func CreateOrGetTestNamespaceWithName(t Test, name string, options ...Option[*corev1.Namespace]) *corev1.Namespace {
t.T().Helper()
namespace, err := t.Client().Core().CoreV1().Namespaces().Get(t.Ctx(), name, metav1.GetOptions{})
if err == nil {
return namespace
} else if errors.IsNotFound(err) {
t.T().Logf("%s namespace doesn't exist. Creating ...", name)
return CreateTestNamespaceWithName(t, name, options...)
} else {
t.T().Fatalf("Error retrieving namespace with name `%s`: %v", name, err)
}
return nil
}

func DeleteTestNamespace(t Test, namespace *corev1.Namespace) {
t.T().Helper()
propagationPolicy := metav1.DeletePropagationBackground
Expand Down
6 changes: 0 additions & 6 deletions tests/kfto/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ const (
bloomModelImageEnvVar = "BLOOM_MODEL_IMAGE"
// The environment variable referring to image containing Stanford Alpaca dataset
alpacaDatasetImageEnvVar = "ALPACA_DATASET_IMAGE"
// The environment variable referring to image simulating sleep condition in container
sleepImageEnvVar = "SLEEP_IMAGE"
)

func GetBloomModelImage() string {
Expand All @@ -37,10 +35,6 @@ func GetAlpacaDatasetImage() string {
return lookupEnvOrDefault(alpacaDatasetImageEnvVar, "quay.io/ksuta/alpaca-dataset@sha256:2e90f631180c7b2c916f9569b914b336b612e8ae86efad82546adc5c9fcbbb8d")
}

func GetSleepImage() string {
return lookupEnvOrDefault(sleepImageEnvVar, "gcr.io/k8s-staging-perf-tests/sleep@sha256:8d91ddf9f145b66475efda1a1b52269be542292891b5de2a7fad944052bab6ea")
}

func lookupEnvOrDefault(key, value string) string {
if v, ok := os.LookupEnv(key); ok {
return v
Expand Down
16 changes: 1 addition & 15 deletions tests/kfto/kfto_kueue_mnist_upgrade_training_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestSetupPytorchjob(t *testing.T) {
Tags(t, PreUpgrade)
test := With(t)

createOrGetUpgradeTestNamespace(test, namespaceName)
CreateOrGetTestNamespaceWithName(test, namespaceName)

// Create a ConfigMap with training dataset and configuration
mnist := readFile(test, "resources/mnist.py")
Expand Down Expand Up @@ -298,17 +298,3 @@ func createUpgradePyTorchJob(test Test, namespace, localQueueName string, config

return tuningJob
}

func createOrGetUpgradeTestNamespace(test Test, name string, options ...Option[*corev1.Namespace]) (namespace *corev1.Namespace) {
// Verify that the namespace really exists and return it, create it if doesn't exist yet
namespace, err := test.Client().Core().CoreV1().Namespaces().Get(test.Ctx(), name, metav1.GetOptions{})
if err == nil {
return
} else if errors.IsNotFound(err) {
test.T().Logf("%s namespace doesn't exists. Creating ...", name)
return CreateTestNamespaceWithName(test, name, options...)
} else {
test.T().Fatalf("Error retrieving namespace with name `%s`: %v", name, err)
}
return
}
2 changes: 1 addition & 1 deletion tests/kfto/kfto_upgrade_sleep_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestSetupSleepPytorchjob(t *testing.T) {
test := With(t)

// Create a namespace
createOrGetUpgradeTestNamespace(test, sleepNamespaceName)
CreateOrGetTestNamespaceWithName(test, sleepNamespaceName)

// Create Kueue resources
resourceFlavor := CreateKueueResourceFlavor(test, v1beta1.ResourceFlavorSpec{})
Expand Down
165 changes: 165 additions & 0 deletions tests/trainer/trainer_kueue_upgrade_training_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
/*
Copyright 2025.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package trainer

import (
"testing"

trainerv1alpha1 "github.com/kubeflow/trainer/v2/pkg/apis/trainer/v1alpha1"
. "github.com/onsi/gomega"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kueuev1beta1 "sigs.k8s.io/kueue/apis/kueue/v1beta1"
kueueacv1beta1 "sigs.k8s.io/kueue/client-go/applyconfiguration/kueue/v1beta1"

. "github.com/opendatahub-io/distributed-workloads/tests/common"
. "github.com/opendatahub-io/distributed-workloads/tests/common/support"
trainerutils "github.com/opendatahub-io/distributed-workloads/tests/trainer/utils"
)

var (
upgradeNamespaceName = "test-trainer-upgrade"
resourceFlavorName = "rf-trainer-upgrade"
clusterQueueName = "cq-trainer-upgrade"
localQueueName = "lq-trainer-upgrade"
upgradeTrainJobName = "trainjob-upgrade"
)

func TestSetupUpgradeTrainJob(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am thinking about another test case - same like this one, with the difference of using specific TrainingRuntime (i.e. torch_distributed_cuda128_torch29_py312.yaml, runtime name would have to be fetched somehow so we don't have to hardcode it here).

The reason for this additional test is to verify that runtime which is not updated still works properly. This test would have a main meaning for upgrades between stable versions, where default training runtime is updated, but specific training runtime isn't. It is good to verify that updated Trainer v2 operator still works properly with old training runtime.

WDYT?

Tags(t, PreUpgrade)
test := With(t)
setupKueue(test)

// Create a namespace with Kueue label
CreateOrGetTestNamespaceWithName(test, upgradeNamespaceName, WithKueueManaged())
test.T().Logf("Created/retrieved namespace with kueue label: %s", upgradeNamespaceName)

// Create Kueue resources with StopPolicy
resourceFlavor := kueueacv1beta1.ResourceFlavor(resourceFlavorName)
appliedResourceFlavor, err := test.Client().Kueue().KueueV1beta1().ResourceFlavors().Apply(test.Ctx(), resourceFlavor, metav1.ApplyOptions{FieldManager: "setup-TrainJob", Force: true})
test.Expect(err).NotTo(HaveOccurred())
test.T().Logf("Applied Kueue ResourceFlavor %s successfully", appliedResourceFlavor.Name)

clusterQueue := kueueacv1beta1.ClusterQueue(clusterQueueName).WithSpec(
kueueacv1beta1.ClusterQueueSpec().
WithNamespaceSelector(metav1.LabelSelector{}).
WithResourceGroups(
kueueacv1beta1.ResourceGroup().WithCoveredResources(
corev1.ResourceName("cpu"), corev1.ResourceName("memory"),
).WithFlavors(
kueueacv1beta1.FlavorQuotas().
WithName(kueuev1beta1.ResourceFlavorReference(resourceFlavorName)).
WithResources(
kueueacv1beta1.ResourceQuota().WithName(corev1.ResourceCPU).WithNominalQuota(resource.MustParse("8")),
kueueacv1beta1.ResourceQuota().WithName(corev1.ResourceMemory).WithNominalQuota(resource.MustParse("18Gi")),
),
),
).
WithStopPolicy(kueuev1beta1.Hold),
)
appliedClusterQueue, err := test.Client().Kueue().KueueV1beta1().ClusterQueues().Apply(test.Ctx(), clusterQueue, metav1.ApplyOptions{FieldManager: "setup-TrainJob", Force: true})
test.Expect(err).NotTo(HaveOccurred())
test.T().Logf("Applied Kueue ClusterQueue %s with StopPolicy=Hold successfully", appliedClusterQueue.Name)

localQueue := kueueacv1beta1.LocalQueue(localQueueName, upgradeNamespaceName).
WithAnnotations(map[string]string{"kueue.x-k8s.io/default-queue": "true"}).
WithSpec(
kueueacv1beta1.LocalQueueSpec().WithClusterQueue(kueuev1beta1.ClusterQueueReference(clusterQueueName)),
)
appliedLocalQueue, err := test.Client().Kueue().KueueV1beta1().LocalQueues(upgradeNamespaceName).Apply(test.Ctx(), localQueue, metav1.ApplyOptions{FieldManager: "setup-TrainJob", Force: true})
test.Expect(err).NotTo(HaveOccurred())
test.T().Logf("Applied Kueue LocalQueue %s/%s successfully", appliedLocalQueue.Namespace, appliedLocalQueue.Name)

// Create TrainJob
trainJob := createUpgradeTrainJob(test, upgradeNamespaceName, appliedLocalQueue.Name)

// Make sure the TrainJob is suspended, waiting for ClusterQueue to be enabled
test.Eventually(TrainJob(test, trainJob.Namespace, upgradeTrainJobName), TestTimeoutShort).
Should(WithTransform(TrainJobConditionSuspended, Equal(metav1.ConditionTrue)))
test.T().Logf("TrainJob %s/%s is suspended, waiting for ClusterQueue to be enabled after upgrade", trainJob.Namespace, upgradeTrainJobName)
}

func TestRunUpgradeTrainJob(t *testing.T) {
Tags(t, PostUpgrade)
test := With(t)
setupKueue(test)
namespace := GetNamespaceWithName(test, upgradeNamespaceName)

// Cleanup everything in the end
defer test.Client().Kueue().KueueV1beta1().ResourceFlavors().Delete(test.Ctx(), resourceFlavorName, metav1.DeleteOptions{})
defer test.Client().Kueue().KueueV1beta1().ClusterQueues().Delete(test.Ctx(), clusterQueueName, metav1.DeleteOptions{})
defer DeleteTestNamespace(test, namespace)

// Enable ClusterQueue to process waiting TrainJob
clusterQueue := kueueacv1beta1.ClusterQueue(clusterQueueName).WithSpec(kueueacv1beta1.ClusterQueueSpec().WithStopPolicy(kueuev1beta1.None))
_, err := test.Client().Kueue().KueueV1beta1().ClusterQueues().Apply(test.Ctx(), clusterQueue, metav1.ApplyOptions{FieldManager: "application/apply-patch", Force: true})
test.Expect(err).NotTo(HaveOccurred())
test.T().Logf("Enabled ClusterQueue %s by setting StopPolicy to None", clusterQueueName)

// TrainJob should be started now
test.Eventually(TrainJob(test, upgradeNamespaceName, upgradeTrainJobName), TestTimeoutLong).
Should(WithTransform(TrainJobConditionSuspended, Equal(metav1.ConditionFalse)))
test.T().Logf("TrainJob %s/%s is now running", upgradeNamespaceName, upgradeTrainJobName)

// Make sure the TrainJob completes successfully
test.Eventually(TrainJob(test, upgradeNamespaceName, upgradeTrainJobName), TestTimeoutLong).
Should(WithTransform(TrainJobConditionComplete, Equal(metav1.ConditionTrue)))
test.T().Logf("TrainJob %s/%s completed successfully after upgrade", upgradeNamespaceName, upgradeTrainJobName)
}

func createUpgradeTrainJob(test Test, namespace, localQueueName string) *trainerv1alpha1.TrainJob {
// Does TrainJob already exist?
_, err := test.Client().Trainer().TrainerV1alpha1().TrainJobs(namespace).Get(test.Ctx(), upgradeTrainJobName, metav1.GetOptions{})
if err == nil {
// If yes then delete it and wait until there are no TrainJobs in the namespace
err := test.Client().Trainer().TrainerV1alpha1().TrainJobs(namespace).Delete(test.Ctx(), upgradeTrainJobName, metav1.DeleteOptions{})
test.Expect(err).NotTo(HaveOccurred())
test.Eventually(TrainJobs(test, namespace), TestTimeoutShort).Should(BeEmpty())
} else if !errors.IsNotFound(err) {
test.T().Fatalf("Error retrieving TrainJob with name `%s`: %v", upgradeTrainJobName, err)
}

trainJob := &trainerv1alpha1.TrainJob{
ObjectMeta: metav1.ObjectMeta{
Name: upgradeTrainJobName,
Labels: map[string]string{
"kueue.x-k8s.io/queue-name": localQueueName,
},
},
Spec: trainerv1alpha1.TrainJobSpec{
RuntimeRef: trainerv1alpha1.RuntimeRef{
Name: trainerutils.DefaultClusterTrainingRuntime,
},
Trainer: &trainerv1alpha1.Trainer{
Command: []string{
"python",
"-c",
"import torch; print(f'PyTorch version: {torch.__version__}'); import time; time.sleep(5); print('Training completed successfully')",
},
},
},
}

trainJob, err = test.Client().Trainer().TrainerV1alpha1().TrainJobs(namespace).Create(test.Ctx(), trainJob, metav1.CreateOptions{})
test.Expect(err).NotTo(HaveOccurred())
test.T().Logf("Created TrainJob %s/%s successfully", trainJob.Namespace, trainJob.Name)

return trainJob
}
Loading
Loading