Skip to content

Commit 01bf472

Browse files
committed
Merge remote-tracking branch 'upstream/main'
2 parents ff817cc + 37a59e4 commit 01bf472

File tree

3 files changed

+265
-20
lines changed

3 files changed

+265
-20
lines changed

tests/kfto/kfto_training_test.go

Lines changed: 227 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package kfto
1919
import (
2020
"fmt"
2121
"testing"
22+
"time"
2223

2324
kftov1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
2425
. "github.com/onsi/gomega"
@@ -29,15 +30,39 @@ import (
2930
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3031
)
3132

32-
func TestPyTorchJobWithCuda(t *testing.T) {
33-
runKFTOPyTorchJob(t, GetCudaTrainingImage(), "nvidia.com/gpu", 1)
33+
func TestPyTorchJobSingleNodeSingleGpuWithCuda(t *testing.T) {
34+
runKFTOPyTorchJob(t, GetCudaTrainingImage(), NVIDIA, 1, 0)
3435
}
3536

36-
func TestPyTorchJobWithROCm(t *testing.T) {
37-
runKFTOPyTorchJob(t, GetROCmTrainingImage(), "amd.com/gpu", 1)
37+
func TestPyTorchJobSingleNodeMultiGpuWithCuda(t *testing.T) {
38+
runKFTOPyTorchJob(t, GetCudaTrainingImage(), NVIDIA, 2, 0)
3839
}
3940

40-
func runKFTOPyTorchJob(t *testing.T, image string, gpuLabel string, numGpus int) {
41+
func TestPyTorchJobMultiNodeSingleGpuWithCuda(t *testing.T) {
42+
runKFTOPyTorchJob(t, GetCudaTrainingImage(), NVIDIA, 1, 1)
43+
}
44+
45+
func TestPyTorchJobMultiNodeMultiGpuWithCuda(t *testing.T) {
46+
runKFTOPyTorchJob(t, GetCudaTrainingImage(), NVIDIA, 2, 1)
47+
}
48+
49+
func TestPyTorchJobSingleNodeSingleGpuWithROCm(t *testing.T) {
50+
runKFTOPyTorchJob(t, GetROCmTrainingImage(), AMD, 1, 0)
51+
}
52+
53+
func TestPyTorchJobSingleNodeMultiGpuWithROCm(t *testing.T) {
54+
runKFTOPyTorchJob(t, GetROCmTrainingImage(), AMD, 2, 0)
55+
}
56+
57+
func TestPyTorchJobMultiNodeSingleGpuWithROCm(t *testing.T) {
58+
runKFTOPyTorchJob(t, GetROCmTrainingImage(), AMD, 1, 1)
59+
}
60+
61+
func TestPyTorchJobMultiNodeMultiGpuWithROCm(t *testing.T) {
62+
runKFTOPyTorchJob(t, GetROCmTrainingImage(), AMD, 2, 1)
63+
}
64+
65+
func runKFTOPyTorchJob(t *testing.T, image string, gpu Gpu, numGpus, numberOfWorkerNodes int) {
4166
test := With(t)
4267

4368
// Create a namespace
@@ -54,20 +79,40 @@ func runKFTOPyTorchJob(t *testing.T, image string, gpuLabel string, numGpus int)
5479
defer test.Client().Core().CoreV1().PersistentVolumeClaims(namespace).Delete(test.Ctx(), outputPvc.Name, metav1.DeleteOptions{})
5580

5681
// Create training PyTorch job
57-
tuningJob := createKFTOPyTorchJob(test, namespace, *config, gpuLabel, numGpus, outputPvc.Name, image)
82+
tuningJob := createKFTOPyTorchJob(test, namespace, *config, gpu, numGpus, numberOfWorkerNodes, outputPvc.Name, image)
5883
defer test.Client().Kubeflow().KubeflowV1().PyTorchJobs(namespace).Delete(test.Ctx(), tuningJob.Name, *metav1.NewDeleteOptions(0))
5984

6085
// Make sure the PyTorch job is running
6186
test.Eventually(PyTorchJob(test, namespace, tuningJob.Name), TestTimeoutDouble).
6287
Should(WithTransform(PyTorchJobConditionRunning, Equal(corev1.ConditionTrue)))
6388

89+
// Verify GPU utilization
90+
if IsOpenShift(test) && gpu == NVIDIA {
91+
trainingPods := GetPods(test, namespace, metav1.ListOptions{LabelSelector: "training.kubeflow.org/job-name=" + tuningJob.GetName()})
92+
test.Expect(trainingPods).To(HaveLen(numberOfWorkerNodes + 1)) // +1 is a master node
93+
94+
for _, trainingPod := range trainingPods {
95+
// Check that GPUs for training pods were utilized recently
96+
test.Eventually(OpenShiftPrometheusGpuUtil(test, trainingPod, gpu), 15*time.Minute).
97+
Should(
98+
And(
99+
HaveLen(numGpus),
100+
ContainElement(
101+
// Check that at lest some GPU was utilized on more than 50%
102+
HaveField("Value", BeNumerically(">", 50)),
103+
),
104+
),
105+
)
106+
}
107+
test.T().Log("All GPUs were successfully utilized")
108+
}
109+
64110
// Make sure the PyTorch job succeeded
65111
test.Eventually(PyTorchJob(test, namespace, tuningJob.Name), TestTimeoutDouble).Should(WithTransform(PyTorchJobConditionSucceeded, Equal(corev1.ConditionTrue)))
66112
test.T().Logf("PytorchJob %s/%s ran successfully", tuningJob.Namespace, tuningJob.Name)
67-
68113
}
69114

70-
func createKFTOPyTorchJob(test Test, namespace string, config corev1.ConfigMap, gpuLabel string, numGpus int, outputPvcName string, baseImage string) *kftov1.PyTorchJob {
115+
func createKFTOPyTorchJob(test Test, namespace string, config corev1.ConfigMap, gpu Gpu, numGpus, numberOfWorkerNodes int, outputPvcName string, baseImage string) *kftov1.PyTorchJob {
71116
tuningJob := &kftov1.PyTorchJob{
72117
TypeMeta: metav1.TypeMeta{
73118
APIVersion: corev1.SchemeGroupVersion.String(),
@@ -78,14 +123,33 @@ func createKFTOPyTorchJob(test Test, namespace string, config corev1.ConfigMap,
78123
},
79124
Spec: kftov1.PyTorchJobSpec{
80125
PyTorchReplicaSpecs: map[kftov1.ReplicaType]*kftov1.ReplicaSpec{
81-
"Master": {
126+
kftov1.PyTorchJobReplicaTypeMaster: {
82127
Replicas: Ptr(int32(1)),
83128
RestartPolicy: "OnFailure",
84129
Template: corev1.PodTemplateSpec{
130+
ObjectMeta: metav1.ObjectMeta{
131+
Labels: map[string]string{
132+
"app": "kfto-llm",
133+
},
134+
},
85135
Spec: corev1.PodSpec{
136+
Affinity: &corev1.Affinity{
137+
PodAntiAffinity: &corev1.PodAntiAffinity{
138+
RequiredDuringSchedulingIgnoredDuringExecution: []corev1.PodAffinityTerm{
139+
{
140+
LabelSelector: &metav1.LabelSelector{
141+
MatchLabels: map[string]string{
142+
"app": "kfto-llm",
143+
},
144+
},
145+
TopologyKey: "kubernetes.io/hostname",
146+
},
147+
},
148+
},
149+
},
86150
Tolerations: []corev1.Toleration{
87151
{
88-
Key: gpuLabel,
152+
Key: gpu.ResourceLabel,
89153
Operator: corev1.TolerationOpExists,
90154
},
91155
},
@@ -124,12 +188,12 @@ func createKFTOPyTorchJob(test Test, namespace string, config corev1.ConfigMap,
124188
ImagePullPolicy: corev1.PullIfNotPresent,
125189
Command: []string{
126190
"/bin/bash", "-c",
127-
`python /etc/config/hf_llm_training.py \
191+
`torchrun /etc/config/hf_llm_training.py \
128192
--model_uri /tmp/model/bloom-560m \
129193
--model_dir /tmp/model/bloom-560m \
130-
--dataset_file /tmp/all_datasets/alpaca_data_hundredth.json \
194+
--dataset_file /tmp/all_datasets/alpaca_data_tenth.json \
131195
--transformer_type AutoModelForCausalLM \
132-
--training_parameters '{"output_dir": "/mnt/output", "per_device_train_batch_size": 8, "num_train_epochs": 3, "logging_dir": "/logs", "eval_strategy": "epoch"}' \
196+
--training_parameters '{"output_dir": "/mnt/output", "per_device_train_batch_size": 8, "num_train_epochs": 3, "logging_dir": "/logs", "eval_strategy": "epoch", "save_strategy": "no"}' \
133197
--lora_config '{"r": 4, "lora_alpha": 16, "lora_dropout": 0.1, "bias": "none"}'`,
134198
},
135199
Env: []corev1.EnvVar{
@@ -145,6 +209,10 @@ func createKFTOPyTorchJob(test Test, namespace string, config corev1.ConfigMap,
145209
Name: "TOKENIZERS_PARALLELISM",
146210
Value: "false",
147211
},
212+
{
213+
Name: "NCCL_DEBUG",
214+
Value: "INFO",
215+
},
148216
},
149217
VolumeMounts: []corev1.VolumeMount{
150218
{
@@ -162,14 +230,14 @@ func createKFTOPyTorchJob(test Test, namespace string, config corev1.ConfigMap,
162230
},
163231
Resources: corev1.ResourceRequirements{
164232
Requests: corev1.ResourceList{
165-
corev1.ResourceCPU: resource.MustParse("2"),
166-
corev1.ResourceMemory: resource.MustParse("8Gi"),
167-
corev1.ResourceName(gpuLabel): resource.MustParse(fmt.Sprint(numGpus)),
233+
corev1.ResourceCPU: resource.MustParse("2"),
234+
corev1.ResourceMemory: resource.MustParse("8Gi"),
235+
corev1.ResourceName(gpu.ResourceLabel): resource.MustParse(fmt.Sprint(numGpus)),
168236
},
169237
Limits: corev1.ResourceList{
170-
corev1.ResourceCPU: resource.MustParse("2"),
171-
corev1.ResourceMemory: resource.MustParse("8Gi"),
172-
corev1.ResourceName(gpuLabel): resource.MustParse(fmt.Sprint(numGpus)),
238+
corev1.ResourceCPU: resource.MustParse("2"),
239+
corev1.ResourceMemory: resource.MustParse("8Gi"),
240+
corev1.ResourceName(gpu.ResourceLabel): resource.MustParse(fmt.Sprint(numGpus)),
173241
},
174242
},
175243
SecurityContext: &corev1.SecurityContext{
@@ -207,6 +275,146 @@ func createKFTOPyTorchJob(test Test, namespace string, config corev1.ConfigMap,
207275
},
208276
},
209277
},
278+
kftov1.PyTorchJobReplicaTypeWorker: {
279+
Replicas: Ptr(int32(numberOfWorkerNodes)),
280+
RestartPolicy: "OnFailure",
281+
Template: corev1.PodTemplateSpec{
282+
ObjectMeta: metav1.ObjectMeta{
283+
Labels: map[string]string{
284+
"app": "kfto-llm",
285+
},
286+
},
287+
Spec: corev1.PodSpec{
288+
Affinity: &corev1.Affinity{
289+
PodAntiAffinity: &corev1.PodAntiAffinity{
290+
RequiredDuringSchedulingIgnoredDuringExecution: []corev1.PodAffinityTerm{
291+
{
292+
LabelSelector: &metav1.LabelSelector{
293+
MatchLabels: map[string]string{
294+
"app": "kfto-llm",
295+
},
296+
},
297+
TopologyKey: "kubernetes.io/hostname",
298+
},
299+
},
300+
},
301+
},
302+
Tolerations: []corev1.Toleration{
303+
{
304+
Key: gpu.ResourceLabel,
305+
Operator: corev1.TolerationOpExists,
306+
},
307+
},
308+
InitContainers: []corev1.Container{
309+
{
310+
Name: "copy-model",
311+
Image: GetBloomModelImage(),
312+
ImagePullPolicy: corev1.PullIfNotPresent,
313+
VolumeMounts: []corev1.VolumeMount{
314+
{
315+
Name: "tmp-volume",
316+
MountPath: "/tmp",
317+
},
318+
},
319+
Command: []string{"/bin/sh", "-c"},
320+
Args: []string{"mkdir /tmp/model; cp -r /models/bloom-560m /tmp/model"},
321+
},
322+
{
323+
Name: "copy-dataset",
324+
Image: GetAlpacaDatasetImage(),
325+
ImagePullPolicy: corev1.PullIfNotPresent,
326+
VolumeMounts: []corev1.VolumeMount{
327+
{
328+
Name: "tmp-volume",
329+
MountPath: "/tmp",
330+
},
331+
},
332+
Command: []string{"/bin/sh", "-c"},
333+
Args: []string{"mkdir /tmp/all_datasets; cp -r /dataset/* /tmp/all_datasets;ls /tmp/all_datasets"},
334+
},
335+
},
336+
Containers: []corev1.Container{
337+
{
338+
Name: "pytorch",
339+
Image: baseImage,
340+
ImagePullPolicy: corev1.PullIfNotPresent,
341+
Command: []string{
342+
"/bin/bash", "-c",
343+
`torchrun /etc/config/hf_llm_training.py \
344+
--model_uri /tmp/model/bloom-560m \
345+
--model_dir /tmp/model/bloom-560m \
346+
--dataset_file /tmp/all_datasets/alpaca_data_tenth.json \
347+
--transformer_type AutoModelForCausalLM \
348+
--training_parameters '{"output_dir": "/mnt/output", "per_device_train_batch_size": 8, "num_train_epochs": 3, "logging_dir": "/logs", "eval_strategy": "epoch", "save_strategy": "no"}' \
349+
--lora_config '{"r": 4, "lora_alpha": 16, "lora_dropout": 0.1, "bias": "none"}'`,
350+
},
351+
Env: []corev1.EnvVar{
352+
{
353+
Name: "HF_HOME",
354+
Value: "/tmp/.cache",
355+
},
356+
{
357+
Name: "TRITON_CACHE_DIR",
358+
Value: "/tmp/.triton",
359+
},
360+
{
361+
Name: "TOKENIZERS_PARALLELISM",
362+
Value: "false",
363+
},
364+
{
365+
Name: "NCCL_DEBUG",
366+
Value: "INFO",
367+
},
368+
},
369+
VolumeMounts: []corev1.VolumeMount{
370+
{
371+
Name: "config-volume",
372+
MountPath: "/etc/config",
373+
},
374+
{
375+
Name: "tmp-volume",
376+
MountPath: "/tmp",
377+
},
378+
},
379+
Resources: corev1.ResourceRequirements{
380+
Requests: corev1.ResourceList{
381+
corev1.ResourceCPU: resource.MustParse("2"),
382+
corev1.ResourceMemory: resource.MustParse("8Gi"),
383+
corev1.ResourceName(gpu.ResourceLabel): resource.MustParse(fmt.Sprint(numGpus)),
384+
},
385+
Limits: corev1.ResourceList{
386+
corev1.ResourceCPU: resource.MustParse("2"),
387+
corev1.ResourceMemory: resource.MustParse("8Gi"),
388+
corev1.ResourceName(gpu.ResourceLabel): resource.MustParse(fmt.Sprint(numGpus)),
389+
},
390+
},
391+
SecurityContext: &corev1.SecurityContext{
392+
RunAsNonRoot: Ptr(true),
393+
ReadOnlyRootFilesystem: Ptr(true),
394+
},
395+
},
396+
},
397+
Volumes: []corev1.Volume{
398+
{
399+
Name: "config-volume",
400+
VolumeSource: corev1.VolumeSource{
401+
ConfigMap: &corev1.ConfigMapVolumeSource{
402+
LocalObjectReference: corev1.LocalObjectReference{
403+
Name: config.Name,
404+
},
405+
},
406+
},
407+
},
408+
{
409+
Name: "tmp-volume",
410+
VolumeSource: corev1.VolumeSource{
411+
EmptyDir: &corev1.EmptyDirVolumeSource{},
412+
},
413+
},
414+
},
415+
},
416+
},
417+
},
210418
},
211419
},
212420
}

tests/kfto/resources/hf_llm_training.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,8 +160,12 @@ def train_model(model, transformer_type, train_data, eval_data, tokenizer, train
160160
mlm=False,
161161
)
162162

163-
# Train the model.
163+
# Train and save the model.
164164
trainer.train()
165+
trainer.save_model()
166+
logger.info("parallel_mode: '{0}'".format(trainer.args.parallel_mode))
167+
logger.info("is_model_parallel: '{0}'".format(trainer.is_model_parallel))
168+
logger.info("model_wrapped: '{0}'".format(trainer.model_wrapped))
165169

166170

167171
def parse_arguments():

tests/kfto/support.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,24 @@ package kfto
1818

1919
import (
2020
"embed"
21+
"time"
2122

2223
. "github.com/onsi/gomega"
2324
. "github.com/project-codeflare/codeflare-common/support"
25+
prometheusapiv1 "github.com/prometheus/client_golang/api/prometheus/v1"
26+
prometheusmodel "github.com/prometheus/common/model"
27+
28+
corev1 "k8s.io/api/core/v1"
29+
)
30+
31+
type Gpu struct {
32+
ResourceLabel string
33+
PrometheusGpuUtilizationLabel string
34+
}
35+
36+
var (
37+
NVIDIA = Gpu{ResourceLabel: "nvidia.com/gpu", PrometheusGpuUtilizationLabel: "DCGM_FI_DEV_GPU_UTIL"}
38+
AMD = Gpu{ResourceLabel: "amd.com/gpu"}
2439
)
2540

2641
//go:embed resources/*
@@ -32,3 +47,21 @@ func ReadFile(t Test, fileName string) []byte {
3247
t.Expect(err).NotTo(HaveOccurred())
3348
return file
3449
}
50+
51+
func OpenShiftPrometheusGpuUtil(test Test, pod corev1.Pod, gpu Gpu) func(g Gomega) prometheusmodel.Vector {
52+
return func(g Gomega) prometheusmodel.Vector {
53+
prometheusApiClient := GetOpenShiftPrometheusApiClient(test)
54+
result, warnings, err := prometheusApiClient.Query(test.Ctx(), gpu.PrometheusGpuUtilizationLabel, time.Now(), prometheusapiv1.WithTimeout(5*time.Second))
55+
g.Expect(err).NotTo(HaveOccurred())
56+
g.Expect(warnings).Should(HaveLen(0))
57+
58+
var util prometheusmodel.Vector
59+
for _, sample := range result.(prometheusmodel.Vector) {
60+
if string(sample.Metric["exported_namespace"]) == pod.GetNamespace() && string(sample.Metric["exported_pod"]) == pod.GetName() {
61+
util = append(util, sample)
62+
}
63+
}
64+
65+
return util
66+
}
67+
}

0 commit comments

Comments
 (0)