diff --git a/tests/common/environment.go b/tests/common/environment.go index 896bbb4f8..eaafab52c 100644 --- a/tests/common/environment.go +++ b/tests/common/environment.go @@ -27,6 +27,8 @@ import ( ) const ( + // The environment variable referring to image simulating sleep condition in container + sleepImageEnvVar = "SLEEP_IMAGE" // Name of the authenticated Notebook user notebookUserName = "NOTEBOOK_USER_NAME" // Token of the authenticated Notebook user @@ -154,6 +156,10 @@ func GetHuggingFaceToken(t Test) string { return token } +func GetSleepImage() string { + return lookupEnvOrDefault(sleepImageEnvVar, "gcr.io/k8s-staging-perf-tests/sleep@sha256:8d91ddf9f145b66475efda1a1b52269be542292891b5de2a7fad944052bab6ea") +} + func init() { flag.StringVar(&testTierParam, "testTier", "", "Test tier") } diff --git a/tests/common/support/namespace.go b/tests/common/support/namespace.go index 62dcec097..f122ac4b1 100644 --- a/tests/common/support/namespace.go +++ b/tests/common/support/namespace.go @@ -22,6 +22,7 @@ import ( "github.com/onsi/gomega" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -85,6 +86,23 @@ func GetNamespaceWithName(t Test, namespaceName string) *corev1.Namespace { return namespace } +// CreateOrGetTestNamespaceWithName creates a namespace with the given name if it doesn't exist, +// or returns the existing namespace if it does. This is useful for scenarios where +// the namespace needs to persist across multiple test phases. +func CreateOrGetTestNamespaceWithName(t Test, name string, options ...Option[*corev1.Namespace]) *corev1.Namespace { + t.T().Helper() + namespace, err := t.Client().Core().CoreV1().Namespaces().Get(t.Ctx(), name, metav1.GetOptions{}) + if err == nil { + return namespace + } else if errors.IsNotFound(err) { + t.T().Logf("%s namespace doesn't exist. Creating ...", name) + return CreateTestNamespaceWithName(t, name, options...) + } else { + t.T().Fatalf("Error retrieving namespace with name `%s`: %v", name, err) + } + return nil +} + func DeleteTestNamespace(t Test, namespace *corev1.Namespace) { t.T().Helper() propagationPolicy := metav1.DeletePropagationBackground diff --git a/tests/kfto/environment.go b/tests/kfto/environment.go index 4334b3316..79169bf2a 100644 --- a/tests/kfto/environment.go +++ b/tests/kfto/environment.go @@ -25,8 +25,6 @@ const ( bloomModelImageEnvVar = "BLOOM_MODEL_IMAGE" // The environment variable referring to image containing Stanford Alpaca dataset alpacaDatasetImageEnvVar = "ALPACA_DATASET_IMAGE" - // The environment variable referring to image simulating sleep condition in container - sleepImageEnvVar = "SLEEP_IMAGE" ) func GetBloomModelImage() string { @@ -37,10 +35,6 @@ func GetAlpacaDatasetImage() string { return lookupEnvOrDefault(alpacaDatasetImageEnvVar, "quay.io/ksuta/alpaca-dataset@sha256:2e90f631180c7b2c916f9569b914b336b612e8ae86efad82546adc5c9fcbbb8d") } -func GetSleepImage() string { - return lookupEnvOrDefault(sleepImageEnvVar, "gcr.io/k8s-staging-perf-tests/sleep@sha256:8d91ddf9f145b66475efda1a1b52269be542292891b5de2a7fad944052bab6ea") -} - func lookupEnvOrDefault(key, value string) string { if v, ok := os.LookupEnv(key); ok { return v diff --git a/tests/kfto/kfto_kueue_mnist_upgrade_training_test.go b/tests/kfto/kfto_kueue_mnist_upgrade_training_test.go index 903aad7cf..cde0b1120 100644 --- a/tests/kfto/kfto_kueue_mnist_upgrade_training_test.go +++ b/tests/kfto/kfto_kueue_mnist_upgrade_training_test.go @@ -45,7 +45,7 @@ func TestSetupPytorchjob(t *testing.T) { Tags(t, PreUpgrade) test := With(t) - createOrGetUpgradeTestNamespace(test, namespaceName) + CreateOrGetTestNamespaceWithName(test, namespaceName) // Create a ConfigMap with training dataset and configuration mnist := readFile(test, "resources/mnist.py") @@ -298,17 +298,3 @@ func createUpgradePyTorchJob(test Test, namespace, localQueueName string, config return tuningJob } - -func createOrGetUpgradeTestNamespace(test Test, name string, options ...Option[*corev1.Namespace]) (namespace *corev1.Namespace) { - // Verify that the namespace really exists and return it, create it if doesn't exist yet - namespace, err := test.Client().Core().CoreV1().Namespaces().Get(test.Ctx(), name, metav1.GetOptions{}) - if err == nil { - return - } else if errors.IsNotFound(err) { - test.T().Logf("%s namespace doesn't exists. Creating ...", name) - return CreateTestNamespaceWithName(test, name, options...) - } else { - test.T().Fatalf("Error retrieving namespace with name `%s`: %v", name, err) - } - return -} diff --git a/tests/kfto/kfto_upgrade_sleep_test.go b/tests/kfto/kfto_upgrade_sleep_test.go index 6f9b73142..400ab13b2 100644 --- a/tests/kfto/kfto_upgrade_sleep_test.go +++ b/tests/kfto/kfto_upgrade_sleep_test.go @@ -42,7 +42,7 @@ func TestSetupSleepPytorchjob(t *testing.T) { test := With(t) // Create a namespace - createOrGetUpgradeTestNamespace(test, sleepNamespaceName) + CreateOrGetTestNamespaceWithName(test, sleepNamespaceName) // Create Kueue resources resourceFlavor := CreateKueueResourceFlavor(test, v1beta1.ResourceFlavorSpec{}) diff --git a/tests/trainer/trainer_kueue_upgrade_training_test.go b/tests/trainer/trainer_kueue_upgrade_training_test.go new file mode 100644 index 000000000..6b31948f3 --- /dev/null +++ b/tests/trainer/trainer_kueue_upgrade_training_test.go @@ -0,0 +1,165 @@ +/* +Copyright 2025. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package trainer + +import ( + "testing" + + trainerv1alpha1 "github.com/kubeflow/trainer/v2/pkg/apis/trainer/v1alpha1" + . "github.com/onsi/gomega" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + kueuev1beta1 "sigs.k8s.io/kueue/apis/kueue/v1beta1" + kueueacv1beta1 "sigs.k8s.io/kueue/client-go/applyconfiguration/kueue/v1beta1" + + . "github.com/opendatahub-io/distributed-workloads/tests/common" + . "github.com/opendatahub-io/distributed-workloads/tests/common/support" + trainerutils "github.com/opendatahub-io/distributed-workloads/tests/trainer/utils" +) + +var ( + upgradeNamespaceName = "test-trainer-upgrade" + resourceFlavorName = "rf-trainer-upgrade" + clusterQueueName = "cq-trainer-upgrade" + localQueueName = "lq-trainer-upgrade" + upgradeTrainJobName = "trainjob-upgrade" +) + +func TestSetupUpgradeTrainJob(t *testing.T) { + Tags(t, PreUpgrade) + test := With(t) + setupKueue(test) + + // Create a namespace with Kueue label + CreateOrGetTestNamespaceWithName(test, upgradeNamespaceName, WithKueueManaged()) + test.T().Logf("Created/retrieved namespace with kueue label: %s", upgradeNamespaceName) + + // Create Kueue resources with StopPolicy + resourceFlavor := kueueacv1beta1.ResourceFlavor(resourceFlavorName) + appliedResourceFlavor, err := test.Client().Kueue().KueueV1beta1().ResourceFlavors().Apply(test.Ctx(), resourceFlavor, metav1.ApplyOptions{FieldManager: "setup-TrainJob", Force: true}) + test.Expect(err).NotTo(HaveOccurred()) + test.T().Logf("Applied Kueue ResourceFlavor %s successfully", appliedResourceFlavor.Name) + + clusterQueue := kueueacv1beta1.ClusterQueue(clusterQueueName).WithSpec( + kueueacv1beta1.ClusterQueueSpec(). + WithNamespaceSelector(metav1.LabelSelector{}). + WithResourceGroups( + kueueacv1beta1.ResourceGroup().WithCoveredResources( + corev1.ResourceName("cpu"), corev1.ResourceName("memory"), + ).WithFlavors( + kueueacv1beta1.FlavorQuotas(). + WithName(kueuev1beta1.ResourceFlavorReference(resourceFlavorName)). + WithResources( + kueueacv1beta1.ResourceQuota().WithName(corev1.ResourceCPU).WithNominalQuota(resource.MustParse("8")), + kueueacv1beta1.ResourceQuota().WithName(corev1.ResourceMemory).WithNominalQuota(resource.MustParse("18Gi")), + ), + ), + ). + WithStopPolicy(kueuev1beta1.Hold), + ) + appliedClusterQueue, err := test.Client().Kueue().KueueV1beta1().ClusterQueues().Apply(test.Ctx(), clusterQueue, metav1.ApplyOptions{FieldManager: "setup-TrainJob", Force: true}) + test.Expect(err).NotTo(HaveOccurred()) + test.T().Logf("Applied Kueue ClusterQueue %s with StopPolicy=Hold successfully", appliedClusterQueue.Name) + + localQueue := kueueacv1beta1.LocalQueue(localQueueName, upgradeNamespaceName). + WithAnnotations(map[string]string{"kueue.x-k8s.io/default-queue": "true"}). + WithSpec( + kueueacv1beta1.LocalQueueSpec().WithClusterQueue(kueuev1beta1.ClusterQueueReference(clusterQueueName)), + ) + appliedLocalQueue, err := test.Client().Kueue().KueueV1beta1().LocalQueues(upgradeNamespaceName).Apply(test.Ctx(), localQueue, metav1.ApplyOptions{FieldManager: "setup-TrainJob", Force: true}) + test.Expect(err).NotTo(HaveOccurred()) + test.T().Logf("Applied Kueue LocalQueue %s/%s successfully", appliedLocalQueue.Namespace, appliedLocalQueue.Name) + + // Create TrainJob + trainJob := createUpgradeTrainJob(test, upgradeNamespaceName, appliedLocalQueue.Name) + + // Make sure the TrainJob is suspended, waiting for ClusterQueue to be enabled + test.Eventually(TrainJob(test, trainJob.Namespace, upgradeTrainJobName), TestTimeoutShort). + Should(WithTransform(TrainJobConditionSuspended, Equal(metav1.ConditionTrue))) + test.T().Logf("TrainJob %s/%s is suspended, waiting for ClusterQueue to be enabled after upgrade", trainJob.Namespace, upgradeTrainJobName) +} + +func TestRunUpgradeTrainJob(t *testing.T) { + Tags(t, PostUpgrade) + test := With(t) + setupKueue(test) + namespace := GetNamespaceWithName(test, upgradeNamespaceName) + + // Cleanup everything in the end + defer test.Client().Kueue().KueueV1beta1().ResourceFlavors().Delete(test.Ctx(), resourceFlavorName, metav1.DeleteOptions{}) + defer test.Client().Kueue().KueueV1beta1().ClusterQueues().Delete(test.Ctx(), clusterQueueName, metav1.DeleteOptions{}) + defer DeleteTestNamespace(test, namespace) + + // Enable ClusterQueue to process waiting TrainJob + clusterQueue := kueueacv1beta1.ClusterQueue(clusterQueueName).WithSpec(kueueacv1beta1.ClusterQueueSpec().WithStopPolicy(kueuev1beta1.None)) + _, err := test.Client().Kueue().KueueV1beta1().ClusterQueues().Apply(test.Ctx(), clusterQueue, metav1.ApplyOptions{FieldManager: "application/apply-patch", Force: true}) + test.Expect(err).NotTo(HaveOccurred()) + test.T().Logf("Enabled ClusterQueue %s by setting StopPolicy to None", clusterQueueName) + + // TrainJob should be started now + test.Eventually(TrainJob(test, upgradeNamespaceName, upgradeTrainJobName), TestTimeoutLong). + Should(WithTransform(TrainJobConditionSuspended, Equal(metav1.ConditionFalse))) + test.T().Logf("TrainJob %s/%s is now running", upgradeNamespaceName, upgradeTrainJobName) + + // Make sure the TrainJob completes successfully + test.Eventually(TrainJob(test, upgradeNamespaceName, upgradeTrainJobName), TestTimeoutLong). + Should(WithTransform(TrainJobConditionComplete, Equal(metav1.ConditionTrue))) + test.T().Logf("TrainJob %s/%s completed successfully after upgrade", upgradeNamespaceName, upgradeTrainJobName) +} + +func createUpgradeTrainJob(test Test, namespace, localQueueName string) *trainerv1alpha1.TrainJob { + // Does TrainJob already exist? + _, err := test.Client().Trainer().TrainerV1alpha1().TrainJobs(namespace).Get(test.Ctx(), upgradeTrainJobName, metav1.GetOptions{}) + if err == nil { + // If yes then delete it and wait until there are no TrainJobs in the namespace + err := test.Client().Trainer().TrainerV1alpha1().TrainJobs(namespace).Delete(test.Ctx(), upgradeTrainJobName, metav1.DeleteOptions{}) + test.Expect(err).NotTo(HaveOccurred()) + test.Eventually(TrainJobs(test, namespace), TestTimeoutShort).Should(BeEmpty()) + } else if !errors.IsNotFound(err) { + test.T().Fatalf("Error retrieving TrainJob with name `%s`: %v", upgradeTrainJobName, err) + } + + trainJob := &trainerv1alpha1.TrainJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: upgradeTrainJobName, + Labels: map[string]string{ + "kueue.x-k8s.io/queue-name": localQueueName, + }, + }, + Spec: trainerv1alpha1.TrainJobSpec{ + RuntimeRef: trainerv1alpha1.RuntimeRef{ + Name: trainerutils.DefaultClusterTrainingRuntime, + }, + Trainer: &trainerv1alpha1.Trainer{ + Command: []string{ + "python", + "-c", + "import torch; print(f'PyTorch version: {torch.__version__}'); import time; time.sleep(5); print('Training completed successfully')", + }, + }, + }, + } + + trainJob, err = test.Client().Trainer().TrainerV1alpha1().TrainJobs(namespace).Create(test.Ctx(), trainJob, metav1.CreateOptions{}) + test.Expect(err).NotTo(HaveOccurred()) + test.T().Logf("Created TrainJob %s/%s successfully", trainJob.Namespace, trainJob.Name) + + return trainJob +} diff --git a/tests/trainer/trainer_trainingruntime_upgrade_test.go b/tests/trainer/trainer_trainingruntime_upgrade_test.go new file mode 100644 index 000000000..710755516 --- /dev/null +++ b/tests/trainer/trainer_trainingruntime_upgrade_test.go @@ -0,0 +1,238 @@ +/* +Copyright 2025. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package trainer + +import ( + "strings" + "testing" + + trainerv1alpha1 "github.com/kubeflow/trainer/v2/pkg/apis/trainer/v1alpha1" + . "github.com/onsi/gomega" + + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + jobsetv1alpha2 "sigs.k8s.io/jobset/api/jobset/v1alpha2" + + . "github.com/opendatahub-io/distributed-workloads/tests/common" + . "github.com/opendatahub-io/distributed-workloads/tests/common/support" +) + +var ( + runtimeNamespaceName = "test-trainer-upgrade-runtime" + customRuntimeName = "custom-sleep-runtime" + runtimesConfigMapNamespace = "default" + runtimesConfigMapName = "all-trainingruntimes-upgrade" + runtimesConfigMapKey = "runtime-names" +) + +func TestSetupTrainingRuntime(t *testing.T) { + Tags(t, PreUpgrade) + test := With(t) + + // Create namespace + CreateOrGetTestNamespaceWithName(test, runtimeNamespaceName) + + // Create custom TrainingRuntime + createCustomTrainingRuntime(test, runtimeNamespaceName) + + // Verify the TrainingRuntime exists + runtime, err := test.Client().Trainer().TrainerV1alpha1().TrainingRuntimes(runtimeNamespaceName).Get( + test.Ctx(), customRuntimeName, metav1.GetOptions{}) + test.Expect(err).NotTo(HaveOccurred()) + test.Expect(runtime.Name).To(Equal(customRuntimeName)) + test.T().Logf("Custom TrainingRuntime %s/%s created successfully", runtimeNamespaceName, customRuntimeName) +} + +func TestVerifyTrainingRuntime(t *testing.T) { + Tags(t, PostUpgrade) + test := With(t) + + namespace := GetNamespaceWithName(test, runtimeNamespaceName) + defer DeleteTestNamespace(test, namespace) + + // Verify TrainingRuntime still exists after upgrade by listing all runtimes + runtimes, err := test.Client().Trainer().TrainerV1alpha1().TrainingRuntimes(runtimeNamespaceName).List( + test.Ctx(), metav1.ListOptions{}) + test.Expect(err).NotTo(HaveOccurred(), "Failed to list TrainingRuntimes") + + var runtimeNames []string + for _, runtime := range runtimes.Items { + runtimeNames = append(runtimeNames, runtime.Name) + } + + test.Expect(runtimeNames).To(ContainElement(customRuntimeName), + "Custom TrainingRuntime should exist after upgrade. Found runtimes: %v", runtimeNames) + test.T().Logf("TrainingRuntime %s/%s is preserved after upgrade", runtimeNamespaceName, customRuntimeName) +} + +func createCustomTrainingRuntime(test Test, namespace string) *trainerv1alpha1.TrainingRuntime { + // Check if runtime already exists + _, err := test.Client().Trainer().TrainerV1alpha1().TrainingRuntimes(namespace).Get( + test.Ctx(), customRuntimeName, metav1.GetOptions{}) + if err == nil { + // Delete existing runtime + err := test.Client().Trainer().TrainerV1alpha1().TrainingRuntimes(namespace).Delete( + test.Ctx(), customRuntimeName, metav1.DeleteOptions{}) + test.Expect(err).NotTo(HaveOccurred()) + test.Eventually(func() bool { + _, err := test.Client().Trainer().TrainerV1alpha1().TrainingRuntimes(namespace).Get( + test.Ctx(), customRuntimeName, metav1.GetOptions{}) + return errors.IsNotFound(err) + }, TestTimeoutShort).Should(BeTrue()) + } else if !errors.IsNotFound(err) { + test.T().Fatalf("Error retrieving TrainingRuntime: %v", err) + } + + trainingRuntime := &trainerv1alpha1.TrainingRuntime{ + ObjectMeta: metav1.ObjectMeta{ + Name: customRuntimeName, + Namespace: namespace, + }, + Spec: trainerv1alpha1.TrainingRuntimeSpec{ + Template: trainerv1alpha1.JobSetTemplateSpec{ + Spec: jobsetv1alpha2.JobSetSpec{ + ReplicatedJobs: []jobsetv1alpha2.ReplicatedJob{ + { + Name: "node", + Replicas: 1, + Template: batchv1.JobTemplateSpec{ + Spec: batchv1.JobSpec{ + BackoffLimit: Ptr(int32(0)), + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + RestartPolicy: corev1.RestartPolicyNever, + Containers: []corev1.Container{ + { + Name: "trainer", + Image: GetSleepImage(), + ImagePullPolicy: corev1.PullIfNotPresent, + Command: []string{"sleep"}, + Args: []string{"24h"}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + } + + runtime, err := test.Client().Trainer().TrainerV1alpha1().TrainingRuntimes(namespace).Create( + test.Ctx(), trainingRuntime, metav1.CreateOptions{}) + test.Expect(err).NotTo(HaveOccurred(), "Failed to create TrainingRuntime") + test.T().Logf("Created custom TrainingRuntime %s/%s", runtime.Namespace, runtime.Name) + + return runtime +} + +// TestSetupAllTrainingRuntimes lists all TrainingRuntimes across the cluster +// and stores their names in a ConfigMap for post-upgrade verification. +func TestSetupAllTrainingRuntimes(t *testing.T) { + Tags(t, PreUpgrade) + test := With(t) + + // List all TrainingRuntimes across all namespaces + runtimes, err := test.Client().Trainer().TrainerV1alpha1().TrainingRuntimes("").List( + test.Ctx(), metav1.ListOptions{}) + test.Expect(err).NotTo(HaveOccurred(), "Failed to list all TrainingRuntimes") + + var runtimeFullNames []string + for _, runtime := range runtimes.Items { + runtimeFullNames = append(runtimeFullNames, runtime.Namespace+"/"+runtime.Name) + } + + storeRuntimeNamesInConfigMap(test, runtimeFullNames) + + if len(runtimeFullNames) == 0 { + test.T().Log("No TrainingRuntimes found in cluster before upgrade") + } else { + test.T().Logf("Stored %d TrainingRuntimes in ConfigMap: %v", len(runtimeFullNames), runtimeFullNames) + } +} + +// TestVerifyAllTrainingRuntimes verifies all pre-upgrade TrainingRuntimes still exist. +func TestVerifyAllTrainingRuntimes(t *testing.T) { + Tags(t, PostUpgrade) + test := With(t) + + preUpgradeNames := getRuntimeNamesFromConfigMap(test) + + defer func() { + _ = test.Client().Core().CoreV1().ConfigMaps(runtimesConfigMapNamespace).Delete( + test.Ctx(), runtimesConfigMapName, metav1.DeleteOptions{}) + }() + + if len(preUpgradeNames) == 0 { + test.T().Log("No TrainingRuntimes existed before upgrade, skipping preservation check") + return + } + test.T().Logf("Pre-upgrade TrainingRuntimes: %v", preUpgradeNames) + + runtimes, err := test.Client().Trainer().TrainerV1alpha1().TrainingRuntimes("").List( + test.Ctx(), metav1.ListOptions{}) + test.Expect(err).NotTo(HaveOccurred(), "Failed to list all TrainingRuntimes") + + var currentNames []string + for _, runtime := range runtimes.Items { + currentNames = append(currentNames, runtime.Namespace+"/"+runtime.Name) + } + test.T().Logf("Post-upgrade TrainingRuntimes: %v", currentNames) + + for _, preUpgradeName := range preUpgradeNames { + test.Expect(currentNames).To(ContainElement(preUpgradeName), + "TrainingRuntime %s should exist after upgrade. Current runtimes: %v", preUpgradeName, currentNames) + } + test.T().Logf("All %d TrainingRuntimes preserved after upgrade", len(preUpgradeNames)) +} + +func storeRuntimeNamesInConfigMap(test Test, names []string) { + // Delete existing ConfigMap if present + _ = test.Client().Core().CoreV1().ConfigMaps(runtimesConfigMapNamespace).Delete( + test.Ctx(), runtimesConfigMapName, metav1.DeleteOptions{}) + + configMap := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: runtimesConfigMapName, + Namespace: runtimesConfigMapNamespace, + }, + Data: map[string]string{ + runtimesConfigMapKey: strings.Join(names, ","), + }, + } + + _, err := test.Client().Core().CoreV1().ConfigMaps(runtimesConfigMapNamespace).Create( + test.Ctx(), configMap, metav1.CreateOptions{}) + test.Expect(err).NotTo(HaveOccurred(), "Failed to create ConfigMap for all runtime names") +} + +func getRuntimeNamesFromConfigMap(test Test) []string { + configMap, err := test.Client().Core().CoreV1().ConfigMaps(runtimesConfigMapNamespace).Get( + test.Ctx(), runtimesConfigMapName, metav1.GetOptions{}) + test.Expect(err).NotTo(HaveOccurred(), "Failed to get ConfigMap with all runtime names") + + namesStr := configMap.Data[runtimesConfigMapKey] + if namesStr == "" { + return []string{} + } + return strings.Split(namesStr, ",") +} diff --git a/tests/trainer/trainer_upgrade_sleep_test.go b/tests/trainer/trainer_upgrade_sleep_test.go new file mode 100644 index 000000000..170c264b1 --- /dev/null +++ b/tests/trainer/trainer_upgrade_sleep_test.go @@ -0,0 +1,120 @@ +/* +Copyright 2025. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package trainer + +import ( + "testing" + + trainerv1alpha1 "github.com/kubeflow/trainer/v2/pkg/apis/trainer/v1alpha1" + . "github.com/onsi/gomega" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + . "github.com/opendatahub-io/distributed-workloads/tests/common" + . "github.com/opendatahub-io/distributed-workloads/tests/common/support" + trainerutils "github.com/opendatahub-io/distributed-workloads/tests/trainer/utils" +) + +var ( + sleepNamespaceName = "test-trainer-upgrade-sleep" + sleepTrainJobName = "trainjob-upgrade-sleep" +) + +func TestSetupSleepTrainJob(t *testing.T) { + Tags(t, PreUpgrade) + test := With(t) + + // Create a namespace + CreateOrGetTestNamespaceWithName(test, sleepNamespaceName) + + // Create sleep TrainJob + createSleepTrainJob(test, sleepNamespaceName) + + // Make sure the TrainJob pod is running, waiting for Trainer upgrade + test.Eventually(GetPods(test, sleepNamespaceName, metav1.ListOptions{}), TestTimeoutDouble). + Should( + And( + HaveLen(1), + ContainElement(WithTransform(podPhase, Equal(corev1.PodRunning))), + ), + ) + test.T().Logf("TrainJob %s/%s pod is running", sleepNamespaceName, sleepTrainJobName) +} + +func TestVerifySleepTrainJob(t *testing.T) { + Tags(t, PostUpgrade) + test := With(t) + namespace := GetNamespaceWithName(test, sleepNamespaceName) + + // Cleanup namespace in the end + defer DeleteTestNamespace(test, namespace) + + // Pod should be still running without restart + test.Expect(GetPods(test, sleepNamespaceName, metav1.ListOptions{})). + Should( + And( + HaveLen(1), + ContainElement(WithTransform(sleepPodRestartCount, BeNumerically("==", 0))), + ), + ) + test.T().Logf("TrainJob %s/%s is still running after upgrade with no pod restarts", sleepNamespaceName, sleepTrainJobName) +} + +func createSleepTrainJob(test Test, namespace string) *trainerv1alpha1.TrainJob { + // Does TrainJob already exist? + _, err := test.Client().Trainer().TrainerV1alpha1().TrainJobs(namespace).Get(test.Ctx(), sleepTrainJobName, metav1.GetOptions{}) + if err == nil { + // If yes then delete it and wait until there are no TrainJobs in the namespace + err := test.Client().Trainer().TrainerV1alpha1().TrainJobs(namespace).Delete(test.Ctx(), sleepTrainJobName, metav1.DeleteOptions{}) + test.Expect(err).NotTo(HaveOccurred()) + test.Eventually(TrainJobs(test, namespace), TestTimeoutShort).Should(BeEmpty()) + } else if !errors.IsNotFound(err) { + test.T().Fatalf("Error retrieving TrainJob with name `%s`: %v", sleepTrainJobName, err) + } + + trainJob := &trainerv1alpha1.TrainJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: sleepTrainJobName, + }, + Spec: trainerv1alpha1.TrainJobSpec{ + RuntimeRef: trainerv1alpha1.RuntimeRef{ + Name: trainerutils.DefaultClusterTrainingRuntime, + }, + Trainer: &trainerv1alpha1.Trainer{ + Image: Ptr(GetSleepImage()), + Command: []string{"sleep"}, + Args: []string{"24h"}, + }, + }, + } + + trainJob, err = test.Client().Trainer().TrainerV1alpha1().TrainJobs(namespace).Create(test.Ctx(), trainJob, metav1.CreateOptions{}) + test.Expect(err).NotTo(HaveOccurred()) + test.T().Logf("Created TrainJob %s/%s successfully", trainJob.Namespace, trainJob.Name) + + return trainJob +} + +func sleepPodRestartCount(pod corev1.Pod) int { + return int(pod.Status.ContainerStatuses[0].RestartCount) +} + +func podPhase(pod corev1.Pod) corev1.PodPhase { + return pod.Status.Phase +}