Skip to content

Commit 8f60744

Browse files
committed
amd support for ray clusters
1 parent e2ba556 commit 8f60744

File tree

4 files changed

+56
-47
lines changed

4 files changed

+56
-47
lines changed

test/e2e/deployment_appwrapper_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ func TestDeploymentAppWrapper(t *testing.T) {
4545
defer func() {
4646
_ = test.Client().Kueue().KueueV1beta1().ResourceFlavors().Delete(test.Ctx(), resourceFlavor.Name, metav1.DeleteOptions{})
4747
}()
48-
clusterQueue := createClusterQueue(test, resourceFlavor, 0)
48+
clusterQueue := createClusterQueue(test, resourceFlavor, 0, "nvidia.com/gpu")
4949
defer func() {
5050
_ = test.Client().Kueue().KueueV1beta1().ClusterQueues().Delete(test.Ctx(), clusterQueue.Name, metav1.DeleteOptions{})
5151
}()

test/e2e/job_appwrapper_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ func TestBatchJobAppWrapper(t *testing.T) {
4343
defer func() {
4444
_ = test.Client().Kueue().KueueV1beta1().ResourceFlavors().Delete(test.Ctx(), resourceFlavor.Name, metav1.DeleteOptions{})
4545
}()
46-
clusterQueue := createClusterQueue(test, resourceFlavor, 0)
46+
clusterQueue := createClusterQueue(test, resourceFlavor, 0, "nvidia.com/gpu")
4747
defer func() {
4848
_ = test.Client().Kueue().KueueV1beta1().ClusterQueues().Delete(test.Ctx(), clusterQueue.Name, metav1.DeleteOptions{})
4949
}()

test/e2e/mnist_pytorch_appwrapper_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ func runMnistPyTorchAppWrapper(t *testing.T, accelerator string, numberOfGpus in
5151
defer func() {
5252
_ = test.Client().Kueue().KueueV1beta1().ResourceFlavors().Delete(test.Ctx(), resourceFlavor.Name, metav1.DeleteOptions{})
5353
}()
54-
clusterQueue := createClusterQueue(test, resourceFlavor, numberOfGpus)
54+
clusterQueue := createClusterQueue(test, resourceFlavor, numberOfGpus, "nvidia.com/gpu")
5555
defer func() {
5656
_ = test.Client().Kueue().KueueV1beta1().ClusterQueues().Delete(test.Ctx(), clusterQueue.Name, metav1.DeleteOptions{})
5757
}()

test/e2e/mnist_rayjob_raycluster_test.go

Lines changed: 53 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -40,29 +40,33 @@ import (
4040
// directly managed by Kueue, and asserts successful completion of the training job.
4141

4242
func TestMnistRayJobRayClusterCpu(t *testing.T) {
43-
runMnistRayJobRayCluster(t, "cpu", 0)
43+
runMnistRayJobRayCluster(t, "cpu", 0, "nvidia.com/gpu", GetRayImage())
4444
}
4545

46-
func TestMnistRayJobRayClusterGpu(t *testing.T) {
47-
runMnistRayJobRayCluster(t, "gpu", 1)
46+
func TestMnistRayJobRayClusterCudaGpu(t *testing.T) {
47+
runMnistRayJobRayCluster(t, "gpu", 1, "nvidia.com/gpu", GetRayImage())
4848
}
4949

50-
func runMnistRayJobRayCluster(t *testing.T, accelerator string, numberOfGpus int) {
50+
func TestMnistRayJobRayClusterROCmGpu(t *testing.T) {
51+
runMnistRayJobRayCluster(t, "gpu", 1, "amd.com/gpu", GetRayROCmImage())
52+
}
53+
54+
func runMnistRayJobRayCluster(t *testing.T, accelerator string, numberOfGpus int, gpuResourceName string, rayImage string) {
5155
test := With(t)
5256

5357
// Create a namespace
5458
namespace := test.NewTestNamespace()
5559

5660
// Create Kueue resources
5761
resourceFlavor := CreateKueueResourceFlavor(test, v1beta1.ResourceFlavorSpec{})
58-
defer func() {
59-
_ = test.Client().Kueue().KueueV1beta1().ResourceFlavors().Delete(test.Ctx(), resourceFlavor.Name, metav1.DeleteOptions{})
60-
}()
61-
clusterQueue := createClusterQueue(test, resourceFlavor, numberOfGpus)
62-
defer func() {
63-
_ = test.Client().Kueue().KueueV1beta1().ClusterQueues().Delete(test.Ctx(), clusterQueue.Name, metav1.DeleteOptions{})
64-
}()
65-
CreateKueueLocalQueue(test, namespace.Name, clusterQueue.Name, AsDefaultQueue)
62+
// defer func() {
63+
// _ = test.Client().Kueue().KueueV1beta1().ResourceFlavors().Delete(test.Ctx(), resourceFlavor.Name, metav1.DeleteOptions{})
64+
// }()
65+
clusterQueue := createClusterQueue(test, resourceFlavor, numberOfGpus, gpuResourceName)
66+
// defer func() {
67+
// _ = test.Client().Kueue().KueueV1beta1().ClusterQueues().Delete(test.Ctx(), clusterQueue.Name, metav1.DeleteOptions{})
68+
// }()
69+
localQueue := CreateKueueLocalQueue(test, namespace.Name, clusterQueue.Name, AsDefaultQueue)
6670

6771
// Create MNIST training script
6872
mnist := constructMNISTConfigMap(test, namespace)
@@ -71,17 +75,17 @@ func runMnistRayJobRayCluster(t *testing.T, accelerator string, numberOfGpus int
7175
test.T().Logf("Created ConfigMap %s/%s successfully", mnist.Namespace, mnist.Name)
7276

7377
// Create RayCluster and assign it to the localqueue
74-
rayCluster := constructRayCluster(test, namespace, mnist, numberOfGpus)
78+
rayCluster := constructRayCluster(test, namespace, localQueue.Name, mnist, numberOfGpus, gpuResourceName, rayImage)
7579
rayCluster, err = test.Client().Ray().RayV1().RayClusters(namespace.Name).Create(test.Ctx(), rayCluster, metav1.CreateOptions{})
7680
test.Expect(err).NotTo(HaveOccurred())
7781
test.T().Logf("Created RayCluster %s/%s successfully", rayCluster.Namespace, rayCluster.Name)
7882

7983
test.T().Logf("Waiting for RayCluster %s/%s to be running", rayCluster.Namespace, rayCluster.Name)
80-
test.Eventually(RayCluster(test, namespace.Name, rayCluster.Name), TestTimeoutMedium).
84+
test.Eventually(RayCluster(test, namespace.Name, rayCluster.Name), TestTimeoutLong).
8185
Should(WithTransform(RayClusterState, Equal(rayv1.Ready)))
8286

8387
// Create RayJob
84-
rayJob := constructRayJob(test, namespace, rayCluster, accelerator, numberOfGpus)
88+
rayJob := constructRayJob(test, namespace, rayCluster, accelerator, numberOfGpus, rayImage)
8589
rayJob, err = test.Client().Ray().RayV1().RayJobs(namespace.Name).Create(test.Ctx(), rayJob, metav1.CreateOptions{})
8690
test.Expect(err).NotTo(HaveOccurred())
8791
test.T().Logf("Created RayJob %s/%s successfully", rayJob.Namespace, rayJob.Name)
@@ -110,15 +114,15 @@ func runMnistRayJobRayCluster(t *testing.T, accelerator string, numberOfGpus int
110114
}
111115

112116
func TestMnistRayJobRayClusterAppWrapperCpu(t *testing.T) {
113-
runMnistRayJobRayClusterAppWrapper(t, "cpu", 0)
117+
runMnistRayJobRayClusterAppWrapper(t, "cpu", 0, "nvidia.com/gpu", GetRayImage())
114118
}
115119

116-
func TestMnistRayJobRayClusterAppWrapperGpu(t *testing.T) {
117-
runMnistRayJobRayClusterAppWrapper(t, "gpu", 1)
120+
func TestMnistRayJobRayClusterAppWrapperCudaGpu(t *testing.T) {
121+
runMnistRayJobRayClusterAppWrapper(t, "gpu", 1, "nvidia.com/gpu", GetRayImage())
118122
}
119123

120124
// Same as TestMNISTRayJobRayCluster, except the RayCluster is wrapped in an AppWrapper
121-
func runMnistRayJobRayClusterAppWrapper(t *testing.T, accelerator string, numberOfGpus int) {
125+
func runMnistRayJobRayClusterAppWrapper(t *testing.T, accelerator string, numberOfGpus int, gpuResourceName string, rayImage string) {
122126
test := With(t)
123127

124128
// Create a namespace
@@ -129,7 +133,7 @@ func runMnistRayJobRayClusterAppWrapper(t *testing.T, accelerator string, number
129133
defer func() {
130134
_ = test.Client().Kueue().KueueV1beta1().ResourceFlavors().Delete(test.Ctx(), resourceFlavor.Name, metav1.DeleteOptions{})
131135
}()
132-
clusterQueue := createClusterQueue(test, resourceFlavor, numberOfGpus)
136+
clusterQueue := createClusterQueue(test, resourceFlavor, numberOfGpus, gpuResourceName)
133137
defer func() {
134138
_ = test.Client().Kueue().KueueV1beta1().ClusterQueues().Delete(test.Ctx(), clusterQueue.Name, metav1.DeleteOptions{})
135139
}()
@@ -142,7 +146,7 @@ func runMnistRayJobRayClusterAppWrapper(t *testing.T, accelerator string, number
142146
test.T().Logf("Created ConfigMap %s/%s successfully", mnist.Namespace, mnist.Name)
143147

144148
// Create RayCluster, wrap in AppWrapper and assign to localqueue
145-
rayCluster := constructRayCluster(test, namespace, mnist, numberOfGpus)
149+
rayCluster := constructRayCluster(test, namespace, localQueue.Name, mnist, numberOfGpus, gpuResourceName, rayImage)
146150
raw := Raw(test, rayCluster)
147151
raw = RemoveCreationTimestamp(test, raw)
148152

@@ -175,15 +179,15 @@ func runMnistRayJobRayClusterAppWrapper(t *testing.T, accelerator string, number
175179
test.T().Logf("Created AppWrapper %s/%s successfully", aw.Namespace, aw.Name)
176180

177181
test.T().Logf("Waiting for AppWrapper %s/%s to be running", aw.Namespace, aw.Name)
178-
test.Eventually(AppWrappers(test, namespace), TestTimeoutMedium).
182+
test.Eventually(AppWrappers(test, namespace), TestTimeoutLong).
179183
Should(ContainElement(WithTransform(AppWrapperPhase, Equal(mcadv1beta2.AppWrapperRunning))))
180184

181185
test.T().Logf("Waiting for RayCluster %s/%s to be running", rayCluster.Namespace, rayCluster.Name)
182-
test.Eventually(RayCluster(test, namespace.Name, rayCluster.Name), TestTimeoutMedium).
186+
test.Eventually(RayCluster(test, namespace.Name, rayCluster.Name), TestTimeoutLong).
183187
Should(WithTransform(RayClusterState, Equal(rayv1.Ready)))
184188

185189
// Create RayJob
186-
rayJob := constructRayJob(test, namespace, rayCluster, accelerator, numberOfGpus)
190+
rayJob := constructRayJob(test, namespace, rayCluster, accelerator, numberOfGpus, rayImage)
187191
rayJob, err = test.Client().Ray().RayV1().RayJobs(namespace.Name).Create(test.Ctx(), rayJob, metav1.CreateOptions{})
188192
test.Expect(err).NotTo(HaveOccurred())
189193
test.T().Logf("Created RayJob %s/%s successfully", rayJob.Namespace, rayJob.Name)
@@ -208,7 +212,7 @@ func runMnistRayJobRayClusterAppWrapper(t *testing.T, accelerator string, number
208212
test.Expect(err).NotTo(HaveOccurred())
209213

210214
test.T().Logf("Waiting for AppWrapper %s/%s to be deleted", aw.Namespace, aw.Name)
211-
test.Eventually(AppWrappers(test, namespace), TestTimeoutShort).Should(BeEmpty())
215+
test.Eventually(AppWrappers(test, namespace), TestTimeoutMedium).Should(BeEmpty())
212216
}
213217

214218
// Verifying https://github.com/project-codeflare/codeflare-operator/issues/649
@@ -223,11 +227,11 @@ func TestRayClusterImagePullSecret(t *testing.T) {
223227
defer func() {
224228
_ = test.Client().Kueue().KueueV1beta1().ResourceFlavors().Delete(test.Ctx(), resourceFlavor.Name, metav1.DeleteOptions{})
225229
}()
226-
clusterQueue := createClusterQueue(test, resourceFlavor, 0)
230+
clusterQueue := createClusterQueue(test, resourceFlavor, 0, "nvidia.com/gpu")
227231
defer func() {
228232
_ = test.Client().Kueue().KueueV1beta1().ClusterQueues().Delete(test.Ctx(), clusterQueue.Name, metav1.DeleteOptions{})
229233
}()
230-
CreateKueueLocalQueue(test, namespace.Name, clusterQueue.Name, AsDefaultQueue)
234+
localQueue := CreateKueueLocalQueue(test, namespace.Name, clusterQueue.Name, AsDefaultQueue)
231235

232236
// Create MNIST training script
233237
mnist := constructMNISTConfigMap(test, namespace)
@@ -236,14 +240,14 @@ func TestRayClusterImagePullSecret(t *testing.T) {
236240
test.T().Logf("Created ConfigMap %s/%s successfully", mnist.Namespace, mnist.Name)
237241

238242
// Create RayCluster with imagePullSecret and assign it to the localqueue
239-
rayCluster := constructRayCluster(test, namespace, mnist, 0)
243+
rayCluster := constructRayCluster(test, namespace, localQueue.Name, mnist, 0, "nvidia.com/gpu", GetRayImage())
240244
rayCluster.Spec.HeadGroupSpec.Template.Spec.ImagePullSecrets = []corev1.LocalObjectReference{{Name: "custom-pull-secret"}}
241245
rayCluster, err = test.Client().Ray().RayV1().RayClusters(namespace.Name).Create(test.Ctx(), rayCluster, metav1.CreateOptions{})
242246
test.Expect(err).NotTo(HaveOccurred())
243247
test.T().Logf("Created RayCluster %s/%s successfully", rayCluster.Namespace, rayCluster.Name)
244248

245249
test.T().Logf("Waiting for RayCluster %s/%s to be running", rayCluster.Namespace, rayCluster.Name)
246-
test.Eventually(RayCluster(test, namespace.Name, rayCluster.Name), TestTimeoutMedium).
250+
test.Eventually(RayCluster(test, namespace.Name, rayCluster.Name), TestTimeoutLong).
247251
Should(WithTransform(RayClusterState, Equal(rayv1.Ready)))
248252
}
249253

@@ -266,7 +270,7 @@ func constructMNISTConfigMap(test Test, namespace *corev1.Namespace) *corev1.Con
266270
}
267271
}
268272

269-
func constructRayCluster(_ Test, namespace *corev1.Namespace, mnist *corev1.ConfigMap, numberOfGpus int) *rayv1.RayCluster {
273+
func constructRayCluster(_ Test, namespace *corev1.Namespace, localQueueName string, mnist *corev1.ConfigMap, numberOfGpus int, gpuResourceName string, rayImage string) *rayv1.RayCluster {
270274
return &rayv1.RayCluster{
271275
TypeMeta: metav1.TypeMeta{
272276
APIVersion: rayv1.GroupVersion.String(),
@@ -275,6 +279,9 @@ func constructRayCluster(_ Test, namespace *corev1.Namespace, mnist *corev1.Conf
275279
ObjectMeta: metav1.ObjectMeta{
276280
Name: "raycluster",
277281
Namespace: namespace.Name,
282+
Labels: map[string]string{
283+
"kueue.x-k8s.io/queue-name": localQueueName,
284+
},
278285
},
279286
Spec: rayv1.RayClusterSpec{
280287
RayVersion: GetRayVersion(),
@@ -287,7 +294,7 @@ func constructRayCluster(_ Test, namespace *corev1.Namespace, mnist *corev1.Conf
287294
Containers: []corev1.Container{
288295
{
289296
Name: "ray-head",
290-
Image: GetRayImage(),
297+
Image: rayImage,
291298
Ports: []corev1.ContainerPort{
292299
{
293300
ContainerPort: 6379,
@@ -335,14 +342,14 @@ func constructRayCluster(_ Test, namespace *corev1.Namespace, mnist *corev1.Conf
335342
Spec: corev1.PodSpec{
336343
Tolerations: []corev1.Toleration{
337344
{
338-
Key: "nvidia.com/gpu",
345+
Key: gpuResourceName,
339346
Operator: corev1.TolerationOpExists,
340347
},
341348
},
342349
Containers: []corev1.Container{
343350
{
344351
Name: "ray-worker",
345-
Image: GetRayImage(),
352+
Image: rayImage,
346353
Lifecycle: &corev1.Lifecycle{
347354
PreStop: &corev1.LifecycleHandler{
348355
Exec: &corev1.ExecAction{
@@ -352,14 +359,14 @@ func constructRayCluster(_ Test, namespace *corev1.Namespace, mnist *corev1.Conf
352359
},
353360
Resources: corev1.ResourceRequirements{
354361
Requests: corev1.ResourceList{
355-
corev1.ResourceCPU: resource.MustParse("250m"),
356-
corev1.ResourceMemory: resource.MustParse("1G"),
357-
"nvidia.com/gpu": resource.MustParse(fmt.Sprint(numberOfGpus)),
362+
corev1.ResourceCPU: resource.MustParse("250m"),
363+
corev1.ResourceMemory: resource.MustParse("1G"),
364+
corev1.ResourceName(gpuResourceName): resource.MustParse(fmt.Sprint(numberOfGpus)),
358365
},
359366
Limits: corev1.ResourceList{
360-
corev1.ResourceCPU: resource.MustParse("2"),
361-
corev1.ResourceMemory: resource.MustParse("4G"),
362-
"nvidia.com/gpu": resource.MustParse(fmt.Sprint(numberOfGpus)),
367+
corev1.ResourceCPU: resource.MustParse("2"),
368+
corev1.ResourceMemory: resource.MustParse("4G"),
369+
corev1.ResourceName(gpuResourceName): resource.MustParse(fmt.Sprint(numberOfGpus)),
363370
},
364371
},
365372
VolumeMounts: []corev1.VolumeMount{
@@ -390,7 +397,7 @@ func constructRayCluster(_ Test, namespace *corev1.Namespace, mnist *corev1.Conf
390397
}
391398
}
392399

393-
func constructRayJob(_ Test, namespace *corev1.Namespace, rayCluster *rayv1.RayCluster, accelerator string, numberOfGpus int) *rayv1.RayJob {
400+
func constructRayJob(_ Test, namespace *corev1.Namespace, rayCluster *rayv1.RayCluster, accelerator string, numberOfGpus int, rayImage string) *rayv1.RayJob {
394401
return &rayv1.RayJob{
395402
TypeMeta: metav1.TypeMeta{
396403
APIVersion: rayv1.GroupVersion.String(),
@@ -404,6 +411,8 @@ func constructRayJob(_ Test, namespace *corev1.Namespace, rayCluster *rayv1.RayC
404411
Entrypoint: "python /home/ray/jobs/mnist.py",
405412
RuntimeEnvYAML: `
406413
pip:
414+
--extra-index-url https://download.pytorch.org/whl/rocm6.1
415+
- torch==2.4.0+rocm6.1
407416
- pytorch_lightning==2.4.0
408417
- torchmetrics==1.6.0
409418
- torchvision==0.20.1
@@ -422,7 +431,7 @@ func constructRayJob(_ Test, namespace *corev1.Namespace, rayCluster *rayv1.RayC
422431
RestartPolicy: corev1.RestartPolicyNever,
423432
Containers: []corev1.Container{
424433
{
425-
Image: GetRayImage(),
434+
Image: rayImage,
426435
Name: "rayjob-submitter-pod",
427436
},
428437
},
@@ -477,12 +486,12 @@ func getRayDashboardURL(test Test, namespace, rayClusterName string) string {
477486
}
478487

479488
// Create ClusterQueue
480-
func createClusterQueue(test Test, resourceFlavor *v1beta1.ResourceFlavor, numberOfGpus int) *v1beta1.ClusterQueue {
489+
func createClusterQueue(test Test, resourceFlavor *v1beta1.ResourceFlavor, numberOfGpus int, gpuResourceName string) *v1beta1.ClusterQueue {
481490
cqSpec := v1beta1.ClusterQueueSpec{
482491
NamespaceSelector: &metav1.LabelSelector{},
483492
ResourceGroups: []v1beta1.ResourceGroup{
484493
{
485-
CoveredResources: []corev1.ResourceName{corev1.ResourceName("cpu"), corev1.ResourceName("memory"), corev1.ResourceName("nvidia.com/gpu")},
494+
CoveredResources: []corev1.ResourceName{corev1.ResourceName("cpu"), corev1.ResourceName("memory"), corev1.ResourceName(gpuResourceName)},
486495
Flavors: []v1beta1.FlavorQuotas{
487496
{
488497
Name: v1beta1.ResourceFlavorReference(resourceFlavor.Name),
@@ -496,7 +505,7 @@ func createClusterQueue(test Test, resourceFlavor *v1beta1.ResourceFlavor, numbe
496505
NominalQuota: resource.MustParse("12Gi"),
497506
},
498507
{
499-
Name: corev1.ResourceName("nvidia.com/gpu"),
508+
Name: corev1.ResourceName(gpuResourceName),
500509
NominalQuota: resource.MustParse(fmt.Sprint(numberOfGpus)),
501510
},
502511
},

0 commit comments

Comments
 (0)