Skip to content

Commit 018f034

Browse files
committed
kueue-tests
1 parent 538d345 commit 018f034

File tree

9 files changed

+878
-1033
lines changed

9 files changed

+878
-1033
lines changed

src/codeflare_sdk/ray/rayjobs/config.py

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -131,10 +131,8 @@ class ManagedClusterConfig:
131131
accelerator_configs:
132132
A dictionary of custom resource mappings to map extended resource requests to RayCluster resource names.
133133
Defaults to DEFAULT_ACCELERATORS but can be overridden with custom mappings.
134-
local_queue:
135-
The name of the queue to use for the cluster.
136134
annotations:
137-
A dictionary of annotations to apply to the cluster.
135+
A dictionary of annotations to apply to the Job.
138136
volumes:
139137
A list of V1Volume objects to add to the Cluster
140138
volume_mounts:
@@ -161,7 +159,6 @@ class ManagedClusterConfig:
161159
accelerator_configs: Dict[str, str] = field(
162160
default_factory=lambda: DEFAULT_ACCELERATORS.copy()
163161
)
164-
local_queue: Optional[str] = None
165162
annotations: Dict[str, str] = field(default_factory=dict)
166163
volumes: list[V1Volume] = field(default_factory=list)
167164
volume_mounts: list[V1VolumeMount] = field(default_factory=list)
@@ -248,7 +245,6 @@ def build_ray_cluster_spec(self, cluster_name: str) -> Dict[str, Any]:
248245
"""
249246
ray_cluster_spec = {
250247
"rayVersion": RAY_VERSION,
251-
"enableInTreeAutoscaling": False,
252248
"headGroupSpec": self._build_head_group_spec(),
253249
"workerGroupSpecs": [self._build_worker_group_spec(cluster_name)],
254250
}
@@ -346,12 +342,9 @@ def _build_head_container(self) -> V1Container:
346342
self.head_accelerators,
347343
),
348344
volume_mounts=self._generate_volume_mounts(),
345+
env=self._build_env_vars() if hasattr(self, "envs") and self.envs else None,
349346
)
350347

351-
# Add environment variables if specified
352-
if hasattr(self, "envs") and self.envs:
353-
container.env = self._build_env_vars()
354-
355348
return container
356349

357350
def _build_worker_container(self) -> V1Container:
@@ -373,12 +366,9 @@ def _build_worker_container(self) -> V1Container:
373366
self.worker_accelerators,
374367
),
375368
volume_mounts=self._generate_volume_mounts(),
369+
env=self._build_env_vars() if hasattr(self, "envs") and self.envs else None,
376370
)
377371

378-
# Add environment variables if specified
379-
if hasattr(self, "envs") and self.envs:
380-
container.env = self._build_env_vars()
381-
382372
return container
383373

384374
def _build_resource_requirements(

src/codeflare_sdk/ray/rayjobs/rayjob.py

Lines changed: 42 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import re
2323
import ast
2424
from typing import Dict, Any, Optional, Tuple
25+
from codeflare_sdk.common.kueue.kueue import get_default_kueue_name
2526
from codeflare_sdk.common.utils.constants import MOUNT_PATH
2627
from kubernetes import client
2728
from ...common.kubernetes_cluster.auth import get_api_client
@@ -59,9 +60,9 @@ def __init__(
5960
cluster_config: Optional[ManagedClusterConfig] = None,
6061
namespace: Optional[str] = None,
6162
runtime_env: Optional[Dict[str, Any]] = None,
62-
shutdown_after_job_finishes: Optional[bool] = None,
6363
ttl_seconds_after_finished: int = 0,
6464
active_deadline_seconds: Optional[int] = None,
65+
local_queue: Optional[str] = None,
6566
):
6667
"""
6768
Initialize a RayJob instance.
@@ -73,12 +74,11 @@ def __init__(
7374
cluster_config: Configuration for creating a new cluster (optional if cluster_name provided)
7475
namespace: The Kubernetes namespace (auto-detected if not specified)
7576
runtime_env: Ray runtime environment configuration (optional)
76-
shutdown_after_job_finishes: Whether to shut down cluster after job finishes (optional)
7777
ttl_seconds_after_finished: Seconds to wait before cleanup after job finishes (default: 0)
7878
active_deadline_seconds: Maximum time the job can run before being terminated (optional)
79+
local_queue: The Kueue LocalQueue to submit the job to (optional)
7980
8081
Note:
81-
shutdown_after_job_finishes is automatically detected but can be overridden:
8282
- True if cluster_config is provided (new cluster will be cleaned up)
8383
- False if cluster_name is provided (existing cluster will not be shut down)
8484
- User can explicitly set this value to override auto-detection
@@ -108,17 +108,7 @@ def __init__(
108108
self.runtime_env = runtime_env
109109
self.ttl_seconds_after_finished = ttl_seconds_after_finished
110110
self.active_deadline_seconds = active_deadline_seconds
111-
112-
# Auto-set shutdown_after_job_finishes based on cluster_config presence
113-
# If cluster_config is provided, we want to clean up the cluster after job finishes
114-
# If using existing cluster, we don't want to shut it down
115-
# User can override this behavior by explicitly setting shutdown_after_job_finishes
116-
if shutdown_after_job_finishes is not None:
117-
self.shutdown_after_job_finishes = shutdown_after_job_finishes
118-
elif cluster_config is not None:
119-
self.shutdown_after_job_finishes = True
120-
else:
121-
self.shutdown_after_job_finishes = False
111+
self.local_queue = local_queue
122112

123113
if namespace is None:
124114
detected_namespace = get_current_namespace()
@@ -177,18 +167,41 @@ def submit(self) -> str:
177167
if scripts:
178168
self._handle_script_volumes_for_existing_cluster(scripts, result)
179169

180-
if self.shutdown_after_job_finishes:
181-
logger.info(
182-
f"Cluster will be automatically cleaned up {self.ttl_seconds_after_finished}s after job completion"
183-
)
184170
return self.name
185171
else:
186172
raise RuntimeError(f"Failed to submit RayJob {self.name}")
187173

188174
def stop(self):
189175
"""
190-
Suspend the Ray job.
176+
Suspend the Ray job. For Kueue-managed RayJobs, suspension is not supported.
191177
"""
178+
try:
179+
# Check if this is a Kueue-managed RayJob
180+
job_cr = self._api.get_job(name=self.name, k8s_namespace=self.namespace)
181+
is_kueue_managed = "kueue.x-k8s.io/queue-name" in job_cr.get(
182+
"metadata", {}
183+
).get("labels", {})
184+
185+
if is_kueue_managed:
186+
# Provide a user-friendly message for Kueue-managed jobs
187+
queue_name = job_cr["metadata"]["labels"]["kueue.x-k8s.io/queue-name"]
188+
logger.warning(
189+
f"RayJob '{self.name}' is managed by Kueue (queue: {queue_name}). "
190+
f"Kueue-managed RayJobs cannot be suspended/stopped. "
191+
f"To terminate this job, use job.delete() which will remove the job and free up resources."
192+
)
193+
print(f"\n⚠️ Cannot stop Kueue-managed RayJob '{self.name}'")
194+
print(f" This job is controlled by Kueue queue '{queue_name}'.")
195+
print(f" To terminate it, use: job.delete()")
196+
print(
197+
f" This will delete the RayJob and free up cluster resources.\n"
198+
)
199+
return False
200+
except Exception as e:
201+
logger.debug(f"Could not check if job is Kueue-managed: {e}")
202+
# Continue with normal suspension attempt
203+
204+
# For non-Kueue jobs or if we couldn't determine, try normal suspension
192205
stopped = self._api.suspend_job(name=self.name, k8s_namespace=self.namespace)
193206
if stopped:
194207
logger.info(f"Successfully stopped the RayJob {self.name}")
@@ -230,11 +243,20 @@ def _build_rayjob_cr(self) -> Dict[str, Any]:
230243
},
231244
"spec": {
232245
"entrypoint": self.entrypoint,
233-
"shutdownAfterJobFinishes": self.shutdown_after_job_finishes,
234246
"ttlSecondsAfterFinished": self.ttl_seconds_after_finished,
247+
"shutdownAfterJobFinishes": self._cluster_config is not None,
248+
"enableInTreeAutoscaling": False,
235249
},
236250
}
237251

252+
labels = {}
253+
if self.local_queue:
254+
labels["kueue.x-k8s.io/queue-name"] = self.local_queue
255+
else:
256+
labels["kueue.x-k8s.io/queue-name"] = get_default_kueue_name(self.namespace)
257+
258+
rayjob_cr["metadata"]["labels"] = labels
259+
238260
# Add active deadline if specified
239261
if self.active_deadline_seconds:
240262
rayjob_cr["spec"]["activeDeadlineSeconds"] = self.active_deadline_seconds

0 commit comments

Comments
 (0)