2222import re
2323import ast
2424from typing import Dict , Any , Optional , Tuple
25+ from codeflare_sdk .common .kueue .kueue import get_default_kueue_name
2526from codeflare_sdk .common .utils .constants import MOUNT_PATH
2627from kubernetes import client
2728from ...common .kubernetes_cluster .auth import get_api_client
@@ -59,9 +60,9 @@ def __init__(
5960 cluster_config : Optional [ManagedClusterConfig ] = None ,
6061 namespace : Optional [str ] = None ,
6162 runtime_env : Optional [Dict [str , Any ]] = None ,
62- shutdown_after_job_finishes : Optional [bool ] = None ,
6363 ttl_seconds_after_finished : int = 0 ,
6464 active_deadline_seconds : Optional [int ] = None ,
65+ local_queue : Optional [str ] = None ,
6566 ):
6667 """
6768 Initialize a RayJob instance.
@@ -73,12 +74,11 @@ def __init__(
7374 cluster_config: Configuration for creating a new cluster (optional if cluster_name provided)
7475 namespace: The Kubernetes namespace (auto-detected if not specified)
7576 runtime_env: Ray runtime environment configuration (optional)
76- shutdown_after_job_finishes: Whether to shut down cluster after job finishes (optional)
7777 ttl_seconds_after_finished: Seconds to wait before cleanup after job finishes (default: 0)
7878 active_deadline_seconds: Maximum time the job can run before being terminated (optional)
79+ local_queue: The Kueue LocalQueue to submit the job to (optional)
7980
8081 Note:
81- shutdown_after_job_finishes is automatically detected but can be overridden:
8282 - True if cluster_config is provided (new cluster will be cleaned up)
8383 - False if cluster_name is provided (existing cluster will not be shut down)
8484 - User can explicitly set this value to override auto-detection
@@ -108,17 +108,7 @@ def __init__(
108108 self .runtime_env = runtime_env
109109 self .ttl_seconds_after_finished = ttl_seconds_after_finished
110110 self .active_deadline_seconds = active_deadline_seconds
111-
112- # Auto-set shutdown_after_job_finishes based on cluster_config presence
113- # If cluster_config is provided, we want to clean up the cluster after job finishes
114- # If using existing cluster, we don't want to shut it down
115- # User can override this behavior by explicitly setting shutdown_after_job_finishes
116- if shutdown_after_job_finishes is not None :
117- self .shutdown_after_job_finishes = shutdown_after_job_finishes
118- elif cluster_config is not None :
119- self .shutdown_after_job_finishes = True
120- else :
121- self .shutdown_after_job_finishes = False
111+ self .local_queue = local_queue
122112
123113 if namespace is None :
124114 detected_namespace = get_current_namespace ()
@@ -177,10 +167,6 @@ def submit(self) -> str:
177167 if scripts :
178168 self ._handle_script_volumes_for_existing_cluster (scripts , result )
179169
180- if self .shutdown_after_job_finishes :
181- logger .info (
182- f"Cluster will be automatically cleaned up { self .ttl_seconds_after_finished } s after job completion"
183- )
184170 return self .name
185171 else :
186172 raise RuntimeError (f"Failed to submit RayJob { self .name } " )
@@ -230,11 +216,20 @@ def _build_rayjob_cr(self) -> Dict[str, Any]:
230216 },
231217 "spec" : {
232218 "entrypoint" : self .entrypoint ,
233- "shutdownAfterJobFinishes" : self .shutdown_after_job_finishes ,
234219 "ttlSecondsAfterFinished" : self .ttl_seconds_after_finished ,
220+ "shutdownAfterJobFinishes" : self ._cluster_config is not None ,
221+ "enableInTreeAutoscaling" : False ,
235222 },
236223 }
237224
225+ labels = {}
226+ if self .local_queue :
227+ labels ["kueue.x-k8s.io/queue-name" ] = self .local_queue
228+ else :
229+ labels ["kueue.x-k8s.io/queue-name" ] = get_default_kueue_name (self .namespace )
230+
231+ rayjob_cr ["metadata" ]["labels" ] = labels
232+
238233 # Add active deadline if specified
239234 if self .active_deadline_seconds :
240235 rayjob_cr ["spec" ]["activeDeadlineSeconds" ] = self .active_deadline_seconds
0 commit comments