Skip to content

Commit ff2bfb3

Browse files
committed
Merge remote-tracking branch 'upstream/main'
2 parents f88d22a + a5a6d39 commit ff2bfb3

File tree

4 files changed

+560
-0
lines changed

4 files changed

+560
-0
lines changed
Lines changed: 306 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,306 @@
1+
/*
2+
Copyright 2023.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package kfto
18+
19+
import (
20+
"bytes"
21+
"fmt"
22+
"testing"
23+
24+
kftov1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
25+
. "github.com/onsi/gomega"
26+
. "github.com/project-codeflare/codeflare-common/support"
27+
28+
corev1 "k8s.io/api/core/v1"
29+
"k8s.io/apimachinery/pkg/api/resource"
30+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
31+
)
32+
33+
func TestPyTorchJobMnistCpu(t *testing.T) {
34+
runKFTOPyTorchMnistJob(t, 0, 2, "", GetCudaTrainingImage(), "resources/requirements.txt")
35+
}
36+
func TestPyTorchJobMnistWithCuda(t *testing.T) {
37+
runKFTOPyTorchMnistJob(t, 1, 1, "nvidia.com/gpu", GetCudaTrainingImage(), "resources/requirements.txt")
38+
}
39+
40+
func TestPyTorchJobMnistWithROCm(t *testing.T) {
41+
runKFTOPyTorchMnistJob(t, 1, 1, "amd.com/gpu", GetROCmTrainingImage(), "resources/requirements-rocm.txt")
42+
}
43+
44+
func runKFTOPyTorchMnistJob(t *testing.T, numGpus int, workerReplicas int, gpuLabel string, image string, requirementsFile string) {
45+
test := With(t)
46+
47+
// Create a namespace
48+
namespace := test.NewTestNamespace()
49+
50+
mnist := ReadFile(test, "resources/mnist.py")
51+
requirementsFileName := ReadFile(test, requirementsFile)
52+
53+
if numGpus > 0 {
54+
mnist = bytes.Replace(mnist, []byte("accelerator=\"has to be specified\""), []byte("accelerator=\"gpu\""), 1)
55+
} else {
56+
mnist = bytes.Replace(mnist, []byte("accelerator=\"has to be specified\""), []byte("accelerator=\"cpu\""), 1)
57+
}
58+
config := CreateConfigMap(test, namespace.Name, map[string][]byte{
59+
// MNIST Ray Notebook
60+
"mnist.py": mnist,
61+
"requirements.txt": requirementsFileName,
62+
})
63+
64+
outputPvc := CreatePersistentVolumeClaim(test, namespace.Name, "50Gi", corev1.ReadWriteOnce)
65+
defer test.Client().Core().CoreV1().PersistentVolumeClaims(namespace.Name).Delete(test.Ctx(), outputPvc.Name, metav1.DeleteOptions{})
66+
67+
// Create training PyTorch job
68+
tuningJob := createKFTOPyTorchMnistJob(test, namespace.Name, *config, gpuLabel, numGpus, workerReplicas, outputPvc.Name, image)
69+
defer test.Client().Kubeflow().KubeflowV1().PyTorchJobs(namespace.Name).Delete(test.Ctx(), tuningJob.Name, *metav1.NewDeleteOptions(0))
70+
71+
// Make sure the PyTorch job is running
72+
test.Eventually(PyTorchJob(test, namespace.Name, tuningJob.Name), TestTimeoutDouble).
73+
Should(WithTransform(PyTorchJobConditionRunning, Equal(corev1.ConditionTrue)))
74+
75+
// Make sure the PyTorch job succeeded
76+
test.Eventually(PyTorchJob(test, namespace.Name, tuningJob.Name), TestTimeoutDouble).Should(WithTransform(PyTorchJobConditionSucceeded, Equal(corev1.ConditionTrue)))
77+
test.T().Logf("PytorchJob %s/%s ran successfully", tuningJob.Namespace, tuningJob.Name)
78+
79+
}
80+
81+
func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.ConfigMap, gpuLabel string, numGpus int, workerReplicas int, outputPvcName string, baseImage string) *kftov1.PyTorchJob {
82+
var useGPU = false
83+
var backend string
84+
85+
if numGpus > 0 {
86+
useGPU = true
87+
backend = "nccl"
88+
} else {
89+
backend = "gloo"
90+
}
91+
92+
tuningJob := &kftov1.PyTorchJob{
93+
TypeMeta: metav1.TypeMeta{
94+
APIVersion: corev1.SchemeGroupVersion.String(),
95+
Kind: "PyTorchJob",
96+
},
97+
ObjectMeta: metav1.ObjectMeta{
98+
GenerateName: "kfto-mnist-",
99+
},
100+
Spec: kftov1.PyTorchJobSpec{
101+
PyTorchReplicaSpecs: map[kftov1.ReplicaType]*kftov1.ReplicaSpec{
102+
"Master": {
103+
Replicas: Ptr(int32(1)),
104+
RestartPolicy: kftov1.RestartPolicyOnFailure,
105+
Template: corev1.PodTemplateSpec{
106+
ObjectMeta: metav1.ObjectMeta{
107+
Labels: map[string]string{
108+
"app": "kfto-mnist",
109+
},
110+
},
111+
Spec: corev1.PodSpec{
112+
Affinity: &corev1.Affinity{
113+
PodAntiAffinity: &corev1.PodAntiAffinity{
114+
RequiredDuringSchedulingIgnoredDuringExecution: []corev1.PodAffinityTerm{
115+
{
116+
LabelSelector: &metav1.LabelSelector{
117+
MatchLabels: map[string]string{
118+
"app": "kfto-mnist",
119+
},
120+
},
121+
TopologyKey: "kubernetes.io/hostname",
122+
},
123+
},
124+
},
125+
},
126+
Containers: []corev1.Container{
127+
{
128+
Name: "pytorch",
129+
Image: baseImage,
130+
ImagePullPolicy: corev1.PullIfNotPresent,
131+
Command: []string{
132+
"/bin/bash", "-c",
133+
fmt.Sprintf(`mkdir -p /tmp/lib && export PYTHONPATH=$PYTHONPATH:/tmp/lib && \
134+
pip install --no-cache-dir -r /mnt/files/requirements.txt --target=/tmp/lib && \
135+
python /mnt/files/mnist.py --epochs 3 --save-model --output-path /mnt/output --backend %s`, backend),
136+
},
137+
VolumeMounts: []corev1.VolumeMount{
138+
{
139+
Name: config.Name,
140+
MountPath: "/mnt/files",
141+
},
142+
{
143+
Name: "tmp-volume",
144+
MountPath: "/tmp",
145+
},
146+
{
147+
Name: "output-volume",
148+
MountPath: "/mnt/output",
149+
},
150+
},
151+
Resources: corev1.ResourceRequirements{
152+
Limits: corev1.ResourceList{
153+
corev1.ResourceCPU: resource.MustParse("1"),
154+
corev1.ResourceMemory: resource.MustParse("6Gi"),
155+
},
156+
},
157+
},
158+
},
159+
Volumes: []corev1.Volume{
160+
{
161+
Name: config.Name,
162+
VolumeSource: corev1.VolumeSource{
163+
ConfigMap: &corev1.ConfigMapVolumeSource{
164+
LocalObjectReference: corev1.LocalObjectReference{
165+
Name: config.Name,
166+
},
167+
},
168+
},
169+
},
170+
{
171+
Name: "tmp-volume",
172+
VolumeSource: corev1.VolumeSource{
173+
EmptyDir: &corev1.EmptyDirVolumeSource{},
174+
},
175+
},
176+
{
177+
Name: "output-volume",
178+
VolumeSource: corev1.VolumeSource{
179+
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
180+
ClaimName: outputPvcName,
181+
},
182+
},
183+
},
184+
},
185+
RestartPolicy: corev1.RestartPolicyOnFailure,
186+
},
187+
},
188+
},
189+
"Worker": {
190+
Replicas: Ptr(int32(workerReplicas)),
191+
RestartPolicy: kftov1.RestartPolicyOnFailure,
192+
Template: corev1.PodTemplateSpec{
193+
ObjectMeta: metav1.ObjectMeta{
194+
Labels: map[string]string{
195+
"app": "kfto-mnist",
196+
},
197+
},
198+
Spec: corev1.PodSpec{
199+
Affinity: &corev1.Affinity{
200+
PodAntiAffinity: &corev1.PodAntiAffinity{
201+
RequiredDuringSchedulingIgnoredDuringExecution: []corev1.PodAffinityTerm{
202+
{
203+
LabelSelector: &metav1.LabelSelector{
204+
MatchLabels: map[string]string{
205+
"app": "kfto-mnist",
206+
},
207+
},
208+
TopologyKey: "kubernetes.io/hostname",
209+
},
210+
},
211+
},
212+
},
213+
Containers: []corev1.Container{
214+
{
215+
Name: "pytorch",
216+
Image: baseImage,
217+
ImagePullPolicy: corev1.PullIfNotPresent,
218+
Command: []string{
219+
"/bin/bash", "-c",
220+
fmt.Sprintf(`mkdir -p /tmp/lib && export PYTHONPATH=$PYTHONPATH:/tmp/lib && \
221+
pip install --no-cache-dir -r /mnt/files/requirements.txt --target=/tmp/lib && \
222+
python /mnt/files/mnist.py --epochs 3 --save-model --backend %s`, backend),
223+
},
224+
VolumeMounts: []corev1.VolumeMount{
225+
{
226+
Name: config.Name,
227+
MountPath: "/mnt/files",
228+
},
229+
{
230+
Name: "tmp-volume",
231+
MountPath: "/tmp",
232+
},
233+
},
234+
Resources: corev1.ResourceRequirements{
235+
Limits: corev1.ResourceList{
236+
corev1.ResourceCPU: resource.MustParse("1"),
237+
corev1.ResourceMemory: resource.MustParse("6Gi"),
238+
},
239+
},
240+
},
241+
},
242+
Volumes: []corev1.Volume{
243+
{
244+
Name: config.Name,
245+
VolumeSource: corev1.VolumeSource{
246+
ConfigMap: &corev1.ConfigMapVolumeSource{
247+
LocalObjectReference: corev1.LocalObjectReference{
248+
Name: config.Name,
249+
},
250+
},
251+
},
252+
},
253+
{
254+
Name: "tmp-volume",
255+
VolumeSource: corev1.VolumeSource{
256+
EmptyDir: &corev1.EmptyDirVolumeSource{},
257+
},
258+
},
259+
},
260+
RestartPolicy: corev1.RestartPolicyOnFailure,
261+
},
262+
},
263+
},
264+
},
265+
},
266+
}
267+
268+
if useGPU {
269+
// Update resource lists for GPU (NVIDIA/ROCm) usecase
270+
tuningJob.Spec.PyTorchReplicaSpecs["Master"].Template.Spec.Containers[0].Resources.Limits[corev1.ResourceName(gpuLabel)] = resource.MustParse(fmt.Sprint(numGpus))
271+
tuningJob.Spec.PyTorchReplicaSpecs["Worker"].Template.Spec.Containers[0].Resources.Limits[corev1.ResourceName(gpuLabel)] = resource.MustParse(fmt.Sprint(numGpus))
272+
273+
tuningJob.Spec.PyTorchReplicaSpecs["Master"].Template.Spec.Containers[0].Env = []corev1.EnvVar{
274+
{
275+
Name: "NCCL_DEBUG",
276+
Value: "INFO",
277+
},
278+
}
279+
tuningJob.Spec.PyTorchReplicaSpecs["Worker"].Template.Spec.Containers[0].Env = []corev1.EnvVar{
280+
{
281+
Name: "NCCL_DEBUG",
282+
Value: "INFO",
283+
},
284+
}
285+
286+
// Update tolerations
287+
tuningJob.Spec.PyTorchReplicaSpecs["Master"].Template.Spec.Tolerations = []corev1.Toleration{
288+
{
289+
Key: gpuLabel,
290+
Operator: corev1.TolerationOpExists,
291+
},
292+
}
293+
tuningJob.Spec.PyTorchReplicaSpecs["Worker"].Template.Spec.Tolerations = []corev1.Toleration{
294+
{
295+
Key: gpuLabel,
296+
Operator: corev1.TolerationOpExists,
297+
},
298+
}
299+
}
300+
301+
tuningJob, err := test.Client().Kubeflow().KubeflowV1().PyTorchJobs(namespace).Create(test.Ctx(), tuningJob, metav1.CreateOptions{})
302+
test.Expect(err).NotTo(HaveOccurred())
303+
test.T().Logf("Created PytorchJob %s/%s successfully", tuningJob.Namespace, tuningJob.Name)
304+
305+
return tuningJob
306+
}

0 commit comments

Comments
 (0)