1+ import json
12import os
23import random
34import string
@@ -65,19 +66,30 @@ def create_namespace(self):
6566 return RuntimeError (e )
6667
6768
68- def create_new_resource_flavor (self ):
69- self .resource_flavor = f"test-resource-flavor-{ random_choice ()} "
70- create_resource_flavor (self , self .resource_flavor )
69+ def create_new_resource_flavor (self , num_flavors ):
70+ self .resource_flavors = []
71+ for i in range (num_flavors ):
72+ default = i < 1
73+ resource_flavor = f"test-resource-flavor-{ random_choice ()} "
74+ create_resource_flavor (self , resource_flavor , default )
75+ self .resource_flavors .append (resource_flavor )
7176
7277
73- def create_new_cluster_queue (self ):
74- self .cluster_queue = f"test-cluster-queue-{ random_choice ()} "
75- create_cluster_queue (self , self .cluster_queue , self .resource_flavor )
78+ def create_new_cluster_queue (self , num_queues ):
79+ self .cluster_queues = []
80+ for i in range (num_queues ):
81+ cluster_queue_name = f"test-cluster-queue-{ random_choice ()} "
82+ create_cluster_queue (self , cluster_queue_name , self .resource_flavors [i ])
83+ self .cluster_queues .append (cluster_queue_name )
7684
7785
78- def create_new_local_queue (self ):
79- self .local_queue = f"test-local-queue-{ random_choice ()} "
80- create_local_queue (self , self .cluster_queue , self .local_queue )
86+ def create_new_local_queue (self , num_queues ):
87+ self .local_queues = []
88+ for i in range (num_queues ):
89+ is_default = i == 0
90+ local_queue_name = f"test-local-queue-{ random_choice ()} "
91+ create_local_queue (self , self .cluster_queues [i ], local_queue_name , is_default )
92+ self .local_queues .append (local_queue_name )
8193
8294
8395def create_namespace_with_name (self , namespace_name ):
@@ -132,7 +144,7 @@ def create_cluster_queue(self, cluster_queue, flavor):
132144 {"name" : "memory" , "nominalQuota" : "36Gi" },
133145 {"name" : "nvidia.com/gpu" , "nominalQuota" : 1 },
134146 ],
135- }
147+ },
136148 ],
137149 }
138150 ],
@@ -161,11 +173,33 @@ def create_cluster_queue(self, cluster_queue, flavor):
161173 self .cluster_queue = cluster_queue
162174
163175
164- def create_resource_flavor (self , flavor ):
176+ def create_resource_flavor (self , flavor , default = True ):
177+ worker_label , worker_value = os .getenv ("WORKER_LABEL" , "worker-1=true" ).split ("=" )
178+ control_label , control_value = os .getenv (
179+ "CONTROL_LABEL" , "ingress-ready=true"
180+ ).split ("=" )
181+ toleration_key = os .getenv (
182+ "TOLERATION_KEY" , "node-role.kubernetes.io/control-plane"
183+ )
184+
185+ node_labels = (
186+ {worker_label : worker_value } if default else {control_label : control_value }
187+ )
188+
165189 resource_flavor_json = {
166190 "apiVersion" : "kueue.x-k8s.io/v1beta1" ,
167191 "kind" : "ResourceFlavor" ,
168192 "metadata" : {"name" : flavor },
193+ "spec" : {
194+ "nodeLabels" : node_labels ,
195+ "tolerations" : [
196+ {
197+ "key" : toleration_key ,
198+ "operator" : "Exists" ,
199+ "effect" : "NoSchedule" ,
200+ }
201+ ],
202+ },
169203 }
170204
171205 try :
@@ -190,14 +224,14 @@ def create_resource_flavor(self, flavor):
190224 self .resource_flavor = flavor
191225
192226
193- def create_local_queue (self , cluster_queue , local_queue ):
227+ def create_local_queue (self , cluster_queue , local_queue , is_default = True ):
194228 local_queue_json = {
195229 "apiVersion" : "kueue.x-k8s.io/v1beta1" ,
196230 "kind" : "LocalQueue" ,
197231 "metadata" : {
198232 "namespace" : self .namespace ,
199233 "name" : local_queue ,
200- "annotations" : {"kueue.x-k8s.io/default-queue" : "true" },
234+ "annotations" : {"kueue.x-k8s.io/default-queue" : str ( is_default ). lower () },
201235 },
202236 "spec" : {"clusterQueue" : cluster_queue },
203237 }
@@ -226,34 +260,77 @@ def create_local_queue(self, cluster_queue, local_queue):
226260 self .local_queue = local_queue
227261
228262
229- def create_kueue_resources (self ):
263+ def create_kueue_resources (self , resource_ammount = 1 ):
230264 print ("creating Kueue resources ..." )
231- create_new_resource_flavor (self )
232- create_new_cluster_queue (self )
233- create_new_local_queue (self )
265+ create_new_resource_flavor (self , resource_ammount )
266+ create_new_cluster_queue (self , resource_ammount )
267+ create_new_local_queue (self , resource_ammount )
234268
235269
236270def delete_kueue_resources (self ):
237271 # Delete if given cluster-queue exists
238- try :
239- self .custom_api .delete_cluster_custom_object (
240- group = "kueue.x-k8s.io" ,
241- plural = "clusterqueues" ,
242- version = "v1beta1" ,
243- name = self .cluster_queue ,
244- )
245- print (f"\n '{ self .cluster_queue } ' cluster-queue deleted" )
246- except Exception as e :
247- print (f"\n Error deleting cluster-queue '{ self .cluster_queue } ' : { e } " )
272+ for cq in self .cluster_queues :
273+ try :
274+ self .custom_api .delete_cluster_custom_object (
275+ group = "kueue.x-k8s.io" ,
276+ plural = "clusterqueues" ,
277+ version = "v1beta1" ,
278+ name = cq ,
279+ )
280+ print (f"\n '{ cq } ' cluster-queue deleted" )
281+ except Exception as e :
282+ print (f"\n Error deleting cluster-queue '{ cq } ' : { e } " )
248283
249284 # Delete if given resource-flavor exists
285+ for flavor in self .resource_flavors :
286+ try :
287+ self .custom_api .delete_cluster_custom_object (
288+ group = "kueue.x-k8s.io" ,
289+ plural = "resourceflavors" ,
290+ version = "v1beta1" ,
291+ name = flavor ,
292+ )
293+ print (f"'{ flavor } ' resource-flavor deleted" )
294+ except Exception as e :
295+ print (f"\n Error deleting resource-flavor '{ flavor } ': { e } " )
296+
297+
298+ def get_pod_node (self , namespace , name ):
299+ label_selector = f"ray.io/cluster={ name } "
300+ pods = self .api_instance .list_namespaced_pod (
301+ namespace , label_selector = label_selector
302+ )
303+ if not pods .items :
304+ raise ValueError (
305+ f"Unable to retrieve node name for pod '{ name } ' in namespace '{ namespace } '"
306+ )
307+ pod = pods .items [0 ]
308+ node_name = pod .spec .node_name
309+ if node_name is None :
310+ raise ValueError (
311+ f"No node selected for pod '{ name } ' in namespace '{ namespace } '"
312+ )
313+ return node_name
314+
315+
316+ def get_flavor_spec (self , flavor_name ):
250317 try :
251- self .custom_api .delete_cluster_custom_object (
318+ flavor = self .custom_api .get_cluster_custom_object (
252319 group = "kueue.x-k8s.io" ,
253- plural = "resourceflavors" ,
254320 version = "v1beta1" ,
255- name = self .resource_flavor ,
321+ plural = "resourceflavors" ,
322+ name = flavor_name ,
256323 )
257- print (f"'{ self .resource_flavor } ' resource-flavor deleted" )
258- except Exception as e :
259- print (f"\n Error deleting resource-flavor '{ self .resource_flavor } ' : { e } " )
324+ return flavor
325+ except client .exceptions .ApiException as e :
326+ if e .status == 404 :
327+ print (f"ResourceFlavor '{ flavor_name } ' not found." )
328+ else :
329+ print (f"Error retrieving ResourceFlavor '{ flavor_name } ': { e } " )
330+ raise
331+
332+
333+ def get_nodes_by_label (self , node_labels ):
334+ label_selector = "," .join (f"{ k } ={ v } " for k , v in node_labels .items ())
335+ nodes = self .api_instance .list_node (label_selector = label_selector )
336+ return [node .metadata .name for node in nodes .items ]
0 commit comments