2222import re
2323import ast
2424from typing import Dict , Any , Optional , Tuple
25+ from codeflare_sdk .common .kueue .kueue import get_default_kueue_name
2526from codeflare_sdk .common .utils .constants import MOUNT_PATH
2627from kubernetes import client
2728from ...common .kubernetes_cluster .auth import get_api_client
@@ -59,9 +60,9 @@ def __init__(
5960 cluster_config : Optional [ManagedClusterConfig ] = None ,
6061 namespace : Optional [str ] = None ,
6162 runtime_env : Optional [Dict [str , Any ]] = None ,
62- shutdown_after_job_finishes : Optional [bool ] = None ,
6363 ttl_seconds_after_finished : int = 0 ,
6464 active_deadline_seconds : Optional [int ] = None ,
65+ local_queue : Optional [str ] = None ,
6566 ):
6667 """
6768 Initialize a RayJob instance.
@@ -73,12 +74,11 @@ def __init__(
7374 cluster_config: Configuration for creating a new cluster (optional if cluster_name provided)
7475 namespace: The Kubernetes namespace (auto-detected if not specified)
7576 runtime_env: Ray runtime environment configuration (optional)
76- shutdown_after_job_finishes: Whether to shut down cluster after job finishes (optional)
7777 ttl_seconds_after_finished: Seconds to wait before cleanup after job finishes (default: 0)
7878 active_deadline_seconds: Maximum time the job can run before being terminated (optional)
79+ local_queue: The Kueue LocalQueue to submit the job to (optional)
7980
8081 Note:
81- shutdown_after_job_finishes is automatically detected but can be overridden:
8282 - True if cluster_config is provided (new cluster will be cleaned up)
8383 - False if cluster_name is provided (existing cluster will not be shut down)
8484 - User can explicitly set this value to override auto-detection
@@ -108,17 +108,7 @@ def __init__(
108108 self .runtime_env = runtime_env
109109 self .ttl_seconds_after_finished = ttl_seconds_after_finished
110110 self .active_deadline_seconds = active_deadline_seconds
111-
112- # Auto-set shutdown_after_job_finishes based on cluster_config presence
113- # If cluster_config is provided, we want to clean up the cluster after job finishes
114- # If using existing cluster, we don't want to shut it down
115- # User can override this behavior by explicitly setting shutdown_after_job_finishes
116- if shutdown_after_job_finishes is not None :
117- self .shutdown_after_job_finishes = shutdown_after_job_finishes
118- elif cluster_config is not None :
119- self .shutdown_after_job_finishes = True
120- else :
121- self .shutdown_after_job_finishes = False
111+ self .local_queue = local_queue
122112
123113 if namespace is None :
124114 detected_namespace = get_current_namespace ()
@@ -177,10 +167,6 @@ def submit(self) -> str:
177167 if scripts :
178168 self ._handle_script_volumes_for_existing_cluster (scripts , result )
179169
180- if self .shutdown_after_job_finishes :
181- logger .info (
182- f"Cluster will be automatically cleaned up { self .ttl_seconds_after_finished } s after job completion"
183- )
184170 return self .name
185171 else :
186172 raise RuntimeError (f"Failed to submit RayJob { self .name } " )
@@ -230,11 +216,32 @@ def _build_rayjob_cr(self) -> Dict[str, Any]:
230216 },
231217 "spec" : {
232218 "entrypoint" : self .entrypoint ,
233- "shutdownAfterJobFinishes" : self .shutdown_after_job_finishes ,
234219 "ttlSecondsAfterFinished" : self .ttl_seconds_after_finished ,
220+ "shutdownAfterJobFinishes" : self ._cluster_config is not None ,
235221 },
236222 }
237223
224+ labels = {}
225+ # If cluster_config is provided, use the local_queue from the cluster_config
226+ if self ._cluster_config is not None :
227+ if self .local_queue :
228+ labels ["kueue.x-k8s.io/queue-name" ] = self .local_queue
229+ else :
230+ default_queue = get_default_kueue_name (self .namespace )
231+ if default_queue :
232+ labels ["kueue.x-k8s.io/queue-name" ] = default_queue
233+ else :
234+ # No default queue found, use "default" as fallback
235+ labels ["kueue.x-k8s.io/queue-name" ] = "default"
236+ logger .warning (
237+ f"No default Kueue LocalQueue found in namespace '{ self .namespace } '. "
238+ f"Using 'default' as the queue name. If a LocalQueue named 'default' "
239+ f"does not exist, the RayJob submission will fail. "
240+ f"To fix this, please explicitly specify the 'local_queue' parameter."
241+ )
242+
243+ rayjob_cr ["metadata" ]["labels" ] = labels
244+
238245 # Add active deadline if specified
239246 if self .active_deadline_seconds :
240247 rayjob_cr ["spec" ]["activeDeadlineSeconds" ] = self .active_deadline_seconds
0 commit comments