@@ -26,6 +26,7 @@ import (
26
26
kueueacv1beta1 "sigs.k8s.io/kueue/client-go/applyconfiguration/kueue/v1beta1"
27
27
28
28
corev1 "k8s.io/api/core/v1"
29
+ "k8s.io/apimachinery/pkg/api/errors"
29
30
"k8s.io/apimachinery/pkg/api/resource"
30
31
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
31
32
@@ -36,20 +37,14 @@ var (
36
37
namespaceName = "test-kfto-upgrade"
37
38
resourceFlavorName = "rf-upgrade"
38
39
clusterQueueName = "cq-upgrade"
40
+ localQueueName = "lq-upgrade"
39
41
pyTorchJobName = "pytorch-upgrade"
40
42
)
41
43
42
44
func TestSetupPytorchjob (t * testing.T ) {
43
45
test := With (t )
44
46
45
- // Create a namespace
46
- namespace := & corev1.Namespace {
47
- ObjectMeta : metav1.ObjectMeta {
48
- Name : namespaceName ,
49
- },
50
- }
51
- _ , err := test .Client ().Core ().CoreV1 ().Namespaces ().Create (test .Ctx (), namespace , metav1.CreateOptions {})
52
- test .Expect (err ).NotTo (HaveOccurred ())
47
+ createOrGetUpgradeTestNamespace (test , namespaceName )
53
48
54
49
// Create a ConfigMap with training dataset and configuration
55
50
configData := map [string ][]byte {
@@ -59,50 +54,43 @@ func TestSetupPytorchjob(t *testing.T) {
59
54
config := CreateConfigMap (test , namespaceName , configData )
60
55
61
56
// Create Kueue resources
62
- resourceFlavor := & kueuev1beta1.ResourceFlavor {
63
- ObjectMeta : metav1.ObjectMeta {
64
- Name : resourceFlavorName ,
65
- },
66
- }
67
- resourceFlavor , err = test .Client ().Kueue ().KueueV1beta1 ().ResourceFlavors ().Create (test .Ctx (), resourceFlavor , metav1.CreateOptions {})
57
+ resourceFlavor := kueueacv1beta1 .ResourceFlavor (resourceFlavorName )
58
+ appliedResourceFlavor , err := test .Client ().Kueue ().KueueV1beta1 ().ResourceFlavors ().Apply (test .Ctx (), resourceFlavor , metav1.ApplyOptions {FieldManager : "setup-PyTorchJob" , Force : true })
68
59
test .Expect (err ).NotTo (HaveOccurred ())
60
+ test .T ().Logf ("Applied Kueue ResourceFlavor %s successfully" , appliedResourceFlavor .Name )
69
61
70
- clusterQueue := & kueuev1beta1.ClusterQueue {
71
- ObjectMeta : metav1.ObjectMeta {
72
- Name : clusterQueueName ,
73
- },
74
- Spec : kueuev1beta1.ClusterQueueSpec {
75
- NamespaceSelector : & metav1.LabelSelector {},
76
- ResourceGroups : []kueuev1beta1.ResourceGroup {
77
- {
78
- CoveredResources : []corev1.ResourceName {corev1 .ResourceName ("cpu" ), corev1 .ResourceName ("memory" )},
79
- Flavors : []kueuev1beta1.FlavorQuotas {
80
- {
81
- Name : kueuev1beta1 .ResourceFlavorReference (resourceFlavor .Name ),
82
- Resources : []kueuev1beta1.ResourceQuota {
83
- {
84
- Name : corev1 .ResourceCPU ,
85
- NominalQuota : resource .MustParse ("8" ),
86
- },
87
- {
88
- Name : corev1 .ResourceMemory ,
89
- NominalQuota : resource .MustParse ("12Gi" ),
90
- },
91
- },
92
- },
93
- },
94
- },
95
- },
96
- StopPolicy : Ptr (kueuev1beta1 .Hold ),
97
- },
98
- }
99
- clusterQueue , err = test .Client ().Kueue ().KueueV1beta1 ().ClusterQueues ().Create (test .Ctx (), clusterQueue , metav1.CreateOptions {})
62
+ clusterQueue := kueueacv1beta1 .ClusterQueue (clusterQueueName ).WithSpec (
63
+ kueueacv1beta1 .ClusterQueueSpec ().
64
+ WithNamespaceSelector (metav1.LabelSelector {}).
65
+ WithResourceGroups (
66
+ kueueacv1beta1 .ResourceGroup ().WithCoveredResources (
67
+ corev1 .ResourceName ("cpu" ), corev1 .ResourceName ("memory" ),
68
+ ).WithFlavors (
69
+ kueueacv1beta1 .FlavorQuotas ().
70
+ WithName (kueuev1beta1 .ResourceFlavorReference (resourceFlavorName )).
71
+ WithResources (
72
+ kueueacv1beta1 .ResourceQuota ().WithName (corev1 .ResourceCPU ).WithNominalQuota (resource .MustParse ("8" )),
73
+ kueueacv1beta1 .ResourceQuota ().WithName (corev1 .ResourceMemory ).WithNominalQuota (resource .MustParse ("12Gi" )),
74
+ ),
75
+ ),
76
+ ).
77
+ WithStopPolicy (kueuev1beta1 .Hold ),
78
+ )
79
+ appliedClusterQueue , err := test .Client ().Kueue ().KueueV1beta1 ().ClusterQueues ().Apply (test .Ctx (), clusterQueue , metav1.ApplyOptions {FieldManager : "setup-PyTorchJob" , Force : true })
100
80
test .Expect (err ).NotTo (HaveOccurred ())
81
+ test .T ().Logf ("Applied Kueue ClusterQueue %s successfully" , appliedClusterQueue .Name )
101
82
102
- localQueue := CreateKueueLocalQueue (test , namespaceName , clusterQueue .Name , AsDefaultQueue )
83
+ localQueue := kueueacv1beta1 .LocalQueue (localQueueName , namespaceName ).
84
+ WithAnnotations (map [string ]string {"kueue.x-k8s.io/default-queue" : "true" }).
85
+ WithSpec (
86
+ kueueacv1beta1 .LocalQueueSpec ().WithClusterQueue (kueuev1beta1 .ClusterQueueReference (clusterQueueName )),
87
+ )
88
+ appliedLocalQueue , err := test .Client ().Kueue ().KueueV1beta1 ().LocalQueues (namespaceName ).Apply (test .Ctx (), localQueue , metav1.ApplyOptions {FieldManager : "setup-PyTorchJob" , Force : true })
89
+ test .Expect (err ).NotTo (HaveOccurred ())
90
+ test .T ().Logf ("Applied Kueue LocalQueue %s/%s successfully" , appliedLocalQueue .Namespace , appliedLocalQueue .Name )
103
91
104
92
// Create training PyTorch job
105
- tuningJob := createPyTorchJob (test , namespaceName , localQueue .Name , * config )
93
+ tuningJob := createPyTorchJob (test , namespaceName , appliedLocalQueue .Name , * config )
106
94
107
95
// Make sure the PyTorch job is suspended, waiting for ClusterQueue to be enabled
108
96
test .Eventually (kftocore .PyTorchJob (test , tuningJob .Namespace , pyTorchJobName ), TestTimeoutShort ).
@@ -133,6 +121,17 @@ func TestRunPytorchjob(t *testing.T) {
133
121
}
134
122
135
123
func createPyTorchJob (test Test , namespace , localQueueName string , config corev1.ConfigMap ) * kftov1.PyTorchJob {
124
+ // Does PyTorchJob already exist?
125
+ _ , err := test .Client ().Kubeflow ().KubeflowV1 ().PyTorchJobs (namespace ).Get (test .Ctx (), pyTorchJobName , metav1.GetOptions {})
126
+ if err == nil {
127
+ // If yes then delete it and wait until there are no PyTorchJobs in the namespace
128
+ err := test .Client ().Kubeflow ().KubeflowV1 ().PyTorchJobs (namespace ).Delete (test .Ctx (), pyTorchJobName , metav1.DeleteOptions {})
129
+ test .Expect (err ).NotTo (HaveOccurred ())
130
+ test .Eventually (kftocore .PyTorchJobs (test , namespace ), TestTimeoutShort ).Should (BeEmpty ())
131
+ } else if ! errors .IsNotFound (err ) {
132
+ test .T ().Fatalf ("Error retrieving PyTorchJob with name `%s`: %v" , pyTorchJobName , err )
133
+ }
134
+
136
135
tuningJob := & kftov1.PyTorchJob {
137
136
ObjectMeta : metav1.ObjectMeta {
138
137
Name : pyTorchJobName ,
@@ -244,9 +243,23 @@ func createPyTorchJob(test Test, namespace, localQueueName string, config corev1
244
243
},
245
244
}
246
245
247
- tuningJob , err : = test .Client ().Kubeflow ().KubeflowV1 ().PyTorchJobs (namespace ).Create (test .Ctx (), tuningJob , metav1.CreateOptions {})
246
+ tuningJob , err = test .Client ().Kubeflow ().KubeflowV1 ().PyTorchJobs (namespace ).Create (test .Ctx (), tuningJob , metav1.CreateOptions {})
248
247
test .Expect (err ).NotTo (HaveOccurred ())
249
248
test .T ().Logf ("Created PytorchJob %s/%s successfully" , tuningJob .Namespace , tuningJob .Name )
250
249
251
250
return tuningJob
252
251
}
252
+
253
+ func createOrGetUpgradeTestNamespace (test Test , name string , options ... Option [* corev1.Namespace ]) (namespace * corev1.Namespace ) {
254
+ // Verify that the namespace really exists and return it, create it if doesn't exist yet
255
+ namespace , err := test .Client ().Core ().CoreV1 ().Namespaces ().Get (test .Ctx (), name , metav1.GetOptions {})
256
+ if err == nil {
257
+ return
258
+ } else if errors .IsNotFound (err ) {
259
+ test .T ().Logf ("%s namespace doesn't exists. Creating ..." , name )
260
+ return CreateTestNamespaceWithName (test , name , options ... )
261
+ } else {
262
+ test .T ().Fatalf ("Error retrieving namespace with name `%s`: %v" , name , err )
263
+ }
264
+ return
265
+ }
0 commit comments