Skip to content

Commit 141c5a8

Browse files
committed
Merge remote-tracking branch 'upstream/main'
2 parents 9ee435e + fcc8fc7 commit 141c5a8

File tree

3 files changed

+82
-55
lines changed

3 files changed

+82
-55
lines changed

tests/kfto/core/support.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,14 @@ func PyTorchJob(t Test, namespace, name string) func(g Gomega) *kftov1.PyTorchJo
5050
}
5151
}
5252

53+
func PyTorchJobs(t Test, namespace string) func(g Gomega) []kftov1.PyTorchJob {
54+
return func(g Gomega) []kftov1.PyTorchJob {
55+
jobs, err := t.Client().Kubeflow().KubeflowV1().PyTorchJobs(namespace).List(t.Ctx(), metav1.ListOptions{})
56+
g.Expect(err).NotTo(HaveOccurred())
57+
return jobs.Items
58+
}
59+
}
60+
5361
func PyTorchJobConditionRunning(job *kftov1.PyTorchJob) corev1.ConditionStatus {
5462
return PyTorchJobCondition(job, kftov1.JobRunning)
5563
}

tests/kfto/upgrade/kfto_kueue_sft_upgrade_training_test.go

Lines changed: 60 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
kueueacv1beta1 "sigs.k8s.io/kueue/client-go/applyconfiguration/kueue/v1beta1"
2727

2828
corev1 "k8s.io/api/core/v1"
29+
"k8s.io/apimachinery/pkg/api/errors"
2930
"k8s.io/apimachinery/pkg/api/resource"
3031
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3132

@@ -36,20 +37,14 @@ var (
3637
namespaceName = "test-kfto-upgrade"
3738
resourceFlavorName = "rf-upgrade"
3839
clusterQueueName = "cq-upgrade"
40+
localQueueName = "lq-upgrade"
3941
pyTorchJobName = "pytorch-upgrade"
4042
)
4143

4244
func TestSetupPytorchjob(t *testing.T) {
4345
test := With(t)
4446

45-
// Create a namespace
46-
namespace := &corev1.Namespace{
47-
ObjectMeta: metav1.ObjectMeta{
48-
Name: namespaceName,
49-
},
50-
}
51-
_, err := test.Client().Core().CoreV1().Namespaces().Create(test.Ctx(), namespace, metav1.CreateOptions{})
52-
test.Expect(err).NotTo(HaveOccurred())
47+
createOrGetUpgradeTestNamespace(test, namespaceName)
5348

5449
// Create a ConfigMap with training dataset and configuration
5550
configData := map[string][]byte{
@@ -59,50 +54,43 @@ func TestSetupPytorchjob(t *testing.T) {
5954
config := CreateConfigMap(test, namespaceName, configData)
6055

6156
// Create Kueue resources
62-
resourceFlavor := &kueuev1beta1.ResourceFlavor{
63-
ObjectMeta: metav1.ObjectMeta{
64-
Name: resourceFlavorName,
65-
},
66-
}
67-
resourceFlavor, err = test.Client().Kueue().KueueV1beta1().ResourceFlavors().Create(test.Ctx(), resourceFlavor, metav1.CreateOptions{})
57+
resourceFlavor := kueueacv1beta1.ResourceFlavor(resourceFlavorName)
58+
appliedResourceFlavor, err := test.Client().Kueue().KueueV1beta1().ResourceFlavors().Apply(test.Ctx(), resourceFlavor, metav1.ApplyOptions{FieldManager: "setup-PyTorchJob", Force: true})
6859
test.Expect(err).NotTo(HaveOccurred())
60+
test.T().Logf("Applied Kueue ResourceFlavor %s successfully", appliedResourceFlavor.Name)
6961

70-
clusterQueue := &kueuev1beta1.ClusterQueue{
71-
ObjectMeta: metav1.ObjectMeta{
72-
Name: clusterQueueName,
73-
},
74-
Spec: kueuev1beta1.ClusterQueueSpec{
75-
NamespaceSelector: &metav1.LabelSelector{},
76-
ResourceGroups: []kueuev1beta1.ResourceGroup{
77-
{
78-
CoveredResources: []corev1.ResourceName{corev1.ResourceName("cpu"), corev1.ResourceName("memory")},
79-
Flavors: []kueuev1beta1.FlavorQuotas{
80-
{
81-
Name: kueuev1beta1.ResourceFlavorReference(resourceFlavor.Name),
82-
Resources: []kueuev1beta1.ResourceQuota{
83-
{
84-
Name: corev1.ResourceCPU,
85-
NominalQuota: resource.MustParse("8"),
86-
},
87-
{
88-
Name: corev1.ResourceMemory,
89-
NominalQuota: resource.MustParse("12Gi"),
90-
},
91-
},
92-
},
93-
},
94-
},
95-
},
96-
StopPolicy: Ptr(kueuev1beta1.Hold),
97-
},
98-
}
99-
clusterQueue, err = test.Client().Kueue().KueueV1beta1().ClusterQueues().Create(test.Ctx(), clusterQueue, metav1.CreateOptions{})
62+
clusterQueue := kueueacv1beta1.ClusterQueue(clusterQueueName).WithSpec(
63+
kueueacv1beta1.ClusterQueueSpec().
64+
WithNamespaceSelector(metav1.LabelSelector{}).
65+
WithResourceGroups(
66+
kueueacv1beta1.ResourceGroup().WithCoveredResources(
67+
corev1.ResourceName("cpu"), corev1.ResourceName("memory"),
68+
).WithFlavors(
69+
kueueacv1beta1.FlavorQuotas().
70+
WithName(kueuev1beta1.ResourceFlavorReference(resourceFlavorName)).
71+
WithResources(
72+
kueueacv1beta1.ResourceQuota().WithName(corev1.ResourceCPU).WithNominalQuota(resource.MustParse("8")),
73+
kueueacv1beta1.ResourceQuota().WithName(corev1.ResourceMemory).WithNominalQuota(resource.MustParse("12Gi")),
74+
),
75+
),
76+
).
77+
WithStopPolicy(kueuev1beta1.Hold),
78+
)
79+
appliedClusterQueue, err := test.Client().Kueue().KueueV1beta1().ClusterQueues().Apply(test.Ctx(), clusterQueue, metav1.ApplyOptions{FieldManager: "setup-PyTorchJob", Force: true})
10080
test.Expect(err).NotTo(HaveOccurred())
81+
test.T().Logf("Applied Kueue ClusterQueue %s successfully", appliedClusterQueue.Name)
10182

102-
localQueue := CreateKueueLocalQueue(test, namespaceName, clusterQueue.Name, AsDefaultQueue)
83+
localQueue := kueueacv1beta1.LocalQueue(localQueueName, namespaceName).
84+
WithAnnotations(map[string]string{"kueue.x-k8s.io/default-queue": "true"}).
85+
WithSpec(
86+
kueueacv1beta1.LocalQueueSpec().WithClusterQueue(kueuev1beta1.ClusterQueueReference(clusterQueueName)),
87+
)
88+
appliedLocalQueue, err := test.Client().Kueue().KueueV1beta1().LocalQueues(namespaceName).Apply(test.Ctx(), localQueue, metav1.ApplyOptions{FieldManager: "setup-PyTorchJob", Force: true})
89+
test.Expect(err).NotTo(HaveOccurred())
90+
test.T().Logf("Applied Kueue LocalQueue %s/%s successfully", appliedLocalQueue.Namespace, appliedLocalQueue.Name)
10391

10492
// Create training PyTorch job
105-
tuningJob := createPyTorchJob(test, namespaceName, localQueue.Name, *config)
93+
tuningJob := createPyTorchJob(test, namespaceName, appliedLocalQueue.Name, *config)
10694

10795
// Make sure the PyTorch job is suspended, waiting for ClusterQueue to be enabled
10896
test.Eventually(kftocore.PyTorchJob(test, tuningJob.Namespace, pyTorchJobName), TestTimeoutShort).
@@ -133,6 +121,17 @@ func TestRunPytorchjob(t *testing.T) {
133121
}
134122

135123
func createPyTorchJob(test Test, namespace, localQueueName string, config corev1.ConfigMap) *kftov1.PyTorchJob {
124+
// Does PyTorchJob already exist?
125+
_, err := test.Client().Kubeflow().KubeflowV1().PyTorchJobs(namespace).Get(test.Ctx(), pyTorchJobName, metav1.GetOptions{})
126+
if err == nil {
127+
// If yes then delete it and wait until there are no PyTorchJobs in the namespace
128+
err := test.Client().Kubeflow().KubeflowV1().PyTorchJobs(namespace).Delete(test.Ctx(), pyTorchJobName, metav1.DeleteOptions{})
129+
test.Expect(err).NotTo(HaveOccurred())
130+
test.Eventually(kftocore.PyTorchJobs(test, namespace), TestTimeoutShort).Should(BeEmpty())
131+
} else if !errors.IsNotFound(err) {
132+
test.T().Fatalf("Error retrieving PyTorchJob with name `%s`: %v", pyTorchJobName, err)
133+
}
134+
136135
tuningJob := &kftov1.PyTorchJob{
137136
ObjectMeta: metav1.ObjectMeta{
138137
Name: pyTorchJobName,
@@ -244,9 +243,23 @@ func createPyTorchJob(test Test, namespace, localQueueName string, config corev1
244243
},
245244
}
246245

247-
tuningJob, err := test.Client().Kubeflow().KubeflowV1().PyTorchJobs(namespace).Create(test.Ctx(), tuningJob, metav1.CreateOptions{})
246+
tuningJob, err = test.Client().Kubeflow().KubeflowV1().PyTorchJobs(namespace).Create(test.Ctx(), tuningJob, metav1.CreateOptions{})
248247
test.Expect(err).NotTo(HaveOccurred())
249248
test.T().Logf("Created PytorchJob %s/%s successfully", tuningJob.Namespace, tuningJob.Name)
250249

251250
return tuningJob
252251
}
252+
253+
func createOrGetUpgradeTestNamespace(test Test, name string, options ...Option[*corev1.Namespace]) (namespace *corev1.Namespace) {
254+
// Verify that the namespace really exists and return it, create it if doesn't exist yet
255+
namespace, err := test.Client().Core().CoreV1().Namespaces().Get(test.Ctx(), name, metav1.GetOptions{})
256+
if err == nil {
257+
return
258+
} else if errors.IsNotFound(err) {
259+
test.T().Logf("%s namespace doesn't exists. Creating ...", name)
260+
return CreateTestNamespaceWithName(test, name, options...)
261+
} else {
262+
test.T().Fatalf("Error retrieving namespace with name `%s`: %v", name, err)
263+
}
264+
return
265+
}

tests/kfto/upgrade/kfto_sft_upgrade_sleep_test.go

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
. "github.com/project-codeflare/codeflare-common/support"
2525

2626
corev1 "k8s.io/api/core/v1"
27+
"k8s.io/apimachinery/pkg/api/errors"
2728
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2829

2930
kftov1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
@@ -38,13 +39,7 @@ func TestSetupSleepPytorchjob(t *testing.T) {
3839
test := With(t)
3940

4041
// Create a namespace
41-
namespace := &corev1.Namespace{
42-
ObjectMeta: metav1.ObjectMeta{
43-
Name: sleepNamespaceName,
44-
},
45-
}
46-
_, err := test.Client().Core().CoreV1().Namespaces().Create(test.Ctx(), namespace, metav1.CreateOptions{})
47-
test.Expect(err).NotTo(HaveOccurred())
42+
createOrGetUpgradeTestNamespace(test, sleepNamespaceName)
4843

4944
// Create training PyTorch job
5045
createSleepPyTorchJob(test, sleepNamespaceName)
@@ -76,6 +71,17 @@ func TestVerifySleepPytorchjob(t *testing.T) {
7671
}
7772

7873
func createSleepPyTorchJob(test Test, namespace string) *kftov1.PyTorchJob {
74+
// Does PyTorchJob already exist?
75+
_, err := test.Client().Kubeflow().KubeflowV1().PyTorchJobs(namespace).Get(test.Ctx(), sleepPyTorchJobName, metav1.GetOptions{})
76+
if err == nil {
77+
// If yes then delete it and wait until there are no PyTorchJobs in the namespace
78+
err := test.Client().Kubeflow().KubeflowV1().PyTorchJobs(namespace).Delete(test.Ctx(), sleepPyTorchJobName, metav1.DeleteOptions{})
79+
test.Expect(err).NotTo(HaveOccurred())
80+
test.Eventually(kftocore.PyTorchJobs(test, namespace), TestTimeoutShort).Should(BeEmpty())
81+
} else if !errors.IsNotFound(err) {
82+
test.T().Fatalf("Error retrieving PyTorchJob with name `%s`: %v", sleepPyTorchJobName, err)
83+
}
84+
7985
tuningJob := &kftov1.PyTorchJob{
8086
ObjectMeta: metav1.ObjectMeta{
8187
Name: sleepPyTorchJobName,
@@ -102,7 +108,7 @@ func createSleepPyTorchJob(test Test, namespace string) *kftov1.PyTorchJob {
102108
},
103109
}
104110

105-
tuningJob, err := test.Client().Kubeflow().KubeflowV1().PyTorchJobs(namespace).Create(test.Ctx(), tuningJob, metav1.CreateOptions{})
111+
tuningJob, err = test.Client().Kubeflow().KubeflowV1().PyTorchJobs(namespace).Create(test.Ctx(), tuningJob, metav1.CreateOptions{})
106112
test.Expect(err).NotTo(HaveOccurred())
107113
test.T().Logf("Created PytorchJob %s/%s successfully", tuningJob.Namespace, tuningJob.Name)
108114

0 commit comments

Comments
 (0)