@@ -27,6 +27,7 @@ import (
27
27
corev1 "k8s.io/api/core/v1"
28
28
"k8s.io/apimachinery/pkg/api/resource"
29
29
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
30
+ "sigs.k8s.io/kueue/apis/kueue/v1beta1"
30
31
31
32
. "github.com/opendatahub-io/distributed-workloads/tests/common"
32
33
. "github.com/opendatahub-io/distributed-workloads/tests/common/support"
@@ -82,10 +83,65 @@ func runKFTOPyTorchMnistJob(t *testing.T, accelerator Accelerator, image string,
82
83
"requirements.txt" : requirementsFileName ,
83
84
})
84
85
86
+ // Create Kueue resources
87
+ resourceFlavor := CreateKueueResourceFlavor (test , v1beta1.ResourceFlavorSpec {})
88
+ defer test .Client ().Kueue ().KueueV1beta1 ().ResourceFlavors ().Delete (test .Ctx (), resourceFlavor .Name , metav1.DeleteOptions {})
89
+ cqSpec := v1beta1.ClusterQueueSpec {
90
+ NamespaceSelector : & metav1.LabelSelector {},
91
+ ResourceGroups : []v1beta1.ResourceGroup {
92
+ {
93
+ CoveredResources : []corev1.ResourceName {corev1 .ResourceName ("cpu" ), corev1 .ResourceName ("memory" )},
94
+ Flavors : []v1beta1.FlavorQuotas {
95
+ {
96
+ Name : v1beta1 .ResourceFlavorReference (resourceFlavor .Name ),
97
+ Resources : []v1beta1.ResourceQuota {
98
+ {
99
+ Name : corev1 .ResourceCPU ,
100
+ NominalQuota : resource .MustParse ("8" ),
101
+ },
102
+ {
103
+ Name : corev1 .ResourceMemory ,
104
+ NominalQuota : resource .MustParse ("18Gi" ),
105
+ },
106
+ },
107
+ },
108
+ },
109
+ },
110
+ },
111
+ }
112
+
113
+ if accelerator .IsGpu () {
114
+ numGpus := (workerReplicas + 1 ) * numProcPerNode
115
+ cqSpec .ResourceGroups [0 ].CoveredResources = append (
116
+ cqSpec .ResourceGroups [0 ].CoveredResources ,
117
+ corev1 .ResourceName (accelerator .ResourceLabel ),
118
+ )
119
+ cqSpec .ResourceGroups [0 ].Flavors [0 ].Resources = append (
120
+ cqSpec .ResourceGroups [0 ].Flavors [0 ].Resources ,
121
+ v1beta1.ResourceQuota {
122
+ Name : corev1 .ResourceName (accelerator .ResourceLabel ),
123
+ NominalQuota : resource .MustParse (fmt .Sprint (numGpus )),
124
+ },
125
+ )
126
+ }
127
+
128
+ clusterQueue := CreateKueueClusterQueue (test , cqSpec )
129
+ defer test .Client ().Kueue ().KueueV1beta1 ().ClusterQueues ().Delete (test .Ctx (), clusterQueue .Name , metav1.DeleteOptions {})
130
+ localQueue := CreateKueueLocalQueue (test , namespace .Name , clusterQueue .Name , AsDefaultQueue )
131
+
85
132
// Create training PyTorch job
86
- tuningJob := createKFTOPyTorchMnistJob (test , namespace .Name , * config , accelerator , workerReplicas , numProcPerNode , image )
133
+ tuningJob := createKFTOPyTorchMnistJob (test , namespace .Name , * config , accelerator , workerReplicas , numProcPerNode , image , localQueue )
87
134
defer test .Client ().Kubeflow ().KubeflowV1 ().PyTorchJobs (namespace .Name ).Delete (test .Ctx (), tuningJob .Name , * metav1 .NewDeleteOptions (0 ))
88
135
136
+ // Make sure the Workload is created and running
137
+ test .Eventually (GetKueueWorkloads (test , namespace .Name ), TestTimeoutMedium ).
138
+ Should (
139
+ And (
140
+ HaveLen (1 ),
141
+ ContainElement (WithTransform (KueueWorkloadAdmitted , BeTrueBecause ("Workload failed to be admitted" ))),
142
+ ),
143
+ )
144
+
89
145
// Make sure the PyTorch job is running
90
146
test .Eventually (PyTorchJob (test , namespace .Name , tuningJob .Name ), TestTimeoutDouble ).
91
147
Should (WithTransform (PyTorchJobConditionRunning , Equal (corev1 .ConditionTrue )))
@@ -96,7 +152,7 @@ func runKFTOPyTorchMnistJob(t *testing.T, accelerator Accelerator, image string,
96
152
97
153
}
98
154
99
- func createKFTOPyTorchMnistJob (test Test , namespace string , config corev1.ConfigMap , accelerator Accelerator , workerReplicas int , numProcPerNode int , baseImage string ) * kftov1.PyTorchJob {
155
+ func createKFTOPyTorchMnistJob (test Test , namespace string , config corev1.ConfigMap , accelerator Accelerator , workerReplicas int , numProcPerNode int , baseImage string , localQueue * v1beta1. LocalQueue ) * kftov1.PyTorchJob {
100
156
var backend string
101
157
if accelerator .IsGpu () {
102
158
backend = "nccl"
@@ -117,6 +173,9 @@ func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.Config
117
173
},
118
174
ObjectMeta : metav1.ObjectMeta {
119
175
GenerateName : "kfto-mnist-" ,
176
+ Labels : map [string ]string {
177
+ "kueue.x-k8s.io/queue-name" : localQueue .Name ,
178
+ },
120
179
},
121
180
Spec : kftov1.PyTorchJobSpec {
122
181
PyTorchReplicaSpecs : map [kftov1.ReplicaType ]* kftov1.ReplicaSpec {
@@ -177,11 +236,11 @@ func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.Config
177
236
Resources : corev1.ResourceRequirements {
178
237
Requests : corev1.ResourceList {
179
238
corev1 .ResourceCPU : resource .MustParse (fmt .Sprintf ("%d" , numProcPerNode )),
180
- corev1 .ResourceMemory : resource .MustParse ("6Gi " ),
239
+ corev1 .ResourceMemory : resource .MustParse ("4Gi " ),
181
240
},
182
241
Limits : corev1.ResourceList {
183
242
corev1 .ResourceCPU : resource .MustParse (fmt .Sprintf ("%d" , numProcPerNode )),
184
- corev1 .ResourceMemory : resource .MustParse ("6Gi " ),
243
+ corev1 .ResourceMemory : resource .MustParse ("4Gi " ),
185
244
},
186
245
},
187
246
},
@@ -273,11 +332,11 @@ func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.Config
273
332
Resources : corev1.ResourceRequirements {
274
333
Requests : corev1.ResourceList {
275
334
corev1 .ResourceCPU : resource .MustParse (fmt .Sprintf ("%d" , numProcPerNode )),
276
- corev1 .ResourceMemory : resource .MustParse ("6Gi " ),
335
+ corev1 .ResourceMemory : resource .MustParse ("4Gi " ),
277
336
},
278
337
Limits : corev1.ResourceList {
279
338
corev1 .ResourceCPU : resource .MustParse (fmt .Sprintf ("%d" , numProcPerNode )),
280
- corev1 .ResourceMemory : resource .MustParse ("6Gi " ),
339
+ corev1 .ResourceMemory : resource .MustParse ("4Gi " ),
281
340
},
282
341
},
283
342
},
0 commit comments