Skip to content

Commit 82dfbfa

Browse files
committed
kueue-tests
1 parent 538d345 commit 82dfbfa

File tree

5 files changed

+638
-22
lines changed

5 files changed

+638
-22
lines changed

src/codeflare_sdk/ray/rayjobs/config.py

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -131,10 +131,8 @@ class ManagedClusterConfig:
131131
accelerator_configs:
132132
A dictionary of custom resource mappings to map extended resource requests to RayCluster resource names.
133133
Defaults to DEFAULT_ACCELERATORS but can be overridden with custom mappings.
134-
local_queue:
135-
The name of the queue to use for the cluster.
136134
annotations:
137-
A dictionary of annotations to apply to the cluster.
135+
A dictionary of annotations to apply to the Job.
138136
volumes:
139137
A list of V1Volume objects to add to the Cluster
140138
volume_mounts:
@@ -161,7 +159,6 @@ class ManagedClusterConfig:
161159
accelerator_configs: Dict[str, str] = field(
162160
default_factory=lambda: DEFAULT_ACCELERATORS.copy()
163161
)
164-
local_queue: Optional[str] = None
165162
annotations: Dict[str, str] = field(default_factory=dict)
166163
volumes: list[V1Volume] = field(default_factory=list)
167164
volume_mounts: list[V1VolumeMount] = field(default_factory=list)
@@ -248,7 +245,6 @@ def build_ray_cluster_spec(self, cluster_name: str) -> Dict[str, Any]:
248245
"""
249246
ray_cluster_spec = {
250247
"rayVersion": RAY_VERSION,
251-
"enableInTreeAutoscaling": False,
252248
"headGroupSpec": self._build_head_group_spec(),
253249
"workerGroupSpecs": [self._build_worker_group_spec(cluster_name)],
254250
}
@@ -346,12 +342,9 @@ def _build_head_container(self) -> V1Container:
346342
self.head_accelerators,
347343
),
348344
volume_mounts=self._generate_volume_mounts(),
345+
env=self._build_env_vars() if hasattr(self, "envs") and self.envs else None,
349346
)
350347

351-
# Add environment variables if specified
352-
if hasattr(self, "envs") and self.envs:
353-
container.env = self._build_env_vars()
354-
355348
return container
356349

357350
def _build_worker_container(self) -> V1Container:
@@ -373,12 +366,9 @@ def _build_worker_container(self) -> V1Container:
373366
self.worker_accelerators,
374367
),
375368
volume_mounts=self._generate_volume_mounts(),
369+
env=self._build_env_vars() if hasattr(self, "envs") and self.envs else None,
376370
)
377371

378-
# Add environment variables if specified
379-
if hasattr(self, "envs") and self.envs:
380-
container.env = self._build_env_vars()
381-
382372
return container
383373

384374
def _build_resource_requirements(

src/codeflare_sdk/ray/rayjobs/rayjob.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import re
2323
import ast
2424
from typing import Dict, Any, Optional, Tuple
25+
from codeflare_sdk.common.kueue.kueue import get_default_kueue_name
2526
from codeflare_sdk.common.utils.constants import MOUNT_PATH
2627
from kubernetes import client
2728
from ...common.kubernetes_cluster.auth import get_api_client
@@ -62,6 +63,7 @@ def __init__(
6263
shutdown_after_job_finishes: Optional[bool] = None,
6364
ttl_seconds_after_finished: int = 0,
6465
active_deadline_seconds: Optional[int] = None,
66+
local_queue: Optional[str] = None,
6567
):
6668
"""
6769
Initialize a RayJob instance.
@@ -108,6 +110,7 @@ def __init__(
108110
self.runtime_env = runtime_env
109111
self.ttl_seconds_after_finished = ttl_seconds_after_finished
110112
self.active_deadline_seconds = active_deadline_seconds
113+
self.local_queue = local_queue
111114

112115
# Auto-set shutdown_after_job_finishes based on cluster_config presence
113116
# If cluster_config is provided, we want to clean up the cluster after job finishes
@@ -232,9 +235,20 @@ def _build_rayjob_cr(self) -> Dict[str, Any]:
232235
"entrypoint": self.entrypoint,
233236
"shutdownAfterJobFinishes": self.shutdown_after_job_finishes,
234237
"ttlSecondsAfterFinished": self.ttl_seconds_after_finished,
238+
"enableInTreeAutoscaling": False,
235239
},
236240
}
237241

242+
# Configure Kueue label
243+
if self.local_queue:
244+
rayjob_cr["metadata"]["labels"] = {
245+
"kueue.x-k8s.io/queue-name": self.local_queue
246+
}
247+
else:
248+
rayjob_cr["metadata"]["labels"] = {
249+
"kueue.x-k8s.io/queue-name": get_default_kueue_name(self.namespace)
250+
}
251+
238252
# Add active deadline if specified
239253
if self.active_deadline_seconds:
240254
rayjob_cr["spec"]["activeDeadlineSeconds"] = self.active_deadline_seconds
Lines changed: 295 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,295 @@
1+
import pytest
2+
import sys
3+
import os
4+
from time import sleep
5+
6+
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
7+
from support import *
8+
9+
from codeflare_sdk import RayJob, ManagedClusterConfig
10+
import kubernetes.client.rest
11+
from python_client.kuberay_job_api import RayjobApi
12+
from python_client.kuberay_cluster_api import RayClusterApi
13+
14+
15+
@pytest.mark.openshift
16+
class TestRayJobLifecycledClusterKueue:
17+
"""Test RayJob with Kueue integration and auto-created cluster lifecycle management."""
18+
19+
def setup_method(self):
20+
initialize_kubernetes_client(self)
21+
22+
def teardown_method(self):
23+
delete_namespace(self)
24+
delete_kueue_resources(self)
25+
26+
def test_rayjob_with_kueue_integration(self):
27+
"""
28+
Test RayJob submission with Kueue queue management, including:
29+
1. Job submission to Kueue queue
30+
2. Waiting for Kueue admission
31+
3. Job execution and completion
32+
4. Automatic cluster cleanup after job deletion
33+
34+
Note: This test does NOT test manual suspend/resume as that conflicts
35+
with Kueue's queue management.
36+
"""
37+
self.setup_method()
38+
create_namespace(self)
39+
create_kueue_resources(self)
40+
41+
ray_image = get_ray_image()
42+
self.job_api = RayjobApi()
43+
job_name = "kueue-managed-job"
44+
45+
cluster_config = ManagedClusterConfig(
46+
head_cpu_requests="500m",
47+
head_cpu_limits="500m",
48+
head_memory_requests=4,
49+
head_memory_limits=6,
50+
num_workers=1,
51+
worker_cpu_requests="500m",
52+
worker_cpu_limits="500m",
53+
worker_memory_requests=4,
54+
worker_memory_limits=5,
55+
image=ray_image,
56+
)
57+
58+
rayjob = RayJob(
59+
job_name=job_name,
60+
namespace=self.namespace,
61+
cluster_config=cluster_config,
62+
entrypoint="python -c \"import ray; ray.init(); print('Kueue-managed RayJob completed successfully')\"",
63+
runtime_env={"env_vars": get_setup_env_variables(ACCELERATOR="cpu")},
64+
shutdown_after_job_finishes=True,
65+
local_queue=self.local_queues[0],
66+
)
67+
68+
try:
69+
# 1. Submit job to Kueue queue
70+
print(f"Submitting RayJob to Kueue queue: {self.local_queues[0]}")
71+
assert rayjob.submit() == job_name
72+
73+
# 2. Check if job is suspended or immediately admitted by Kueue
74+
job_cr = self.job_api.get_job(
75+
name=rayjob.name, k8s_namespace=rayjob.namespace
76+
)
77+
is_suspended = job_cr.get("spec", {}).get("suspend", False)
78+
79+
if is_suspended:
80+
print("✓ Job is queued and suspended by Kueue (waiting for resources)")
81+
# 3. Wait for Kueue to admit the job
82+
print("Waiting for Kueue to admit the job...")
83+
admitted = wait_for_kueue_admission(
84+
self, self.job_api, rayjob.name, rayjob.namespace, timeout=120
85+
)
86+
assert admitted, "Job was not admitted by Kueue within timeout"
87+
print("✓ Job admitted by Kueue")
88+
else:
89+
print("✓ Job was immediately admitted by Kueue (resources available)")
90+
91+
# 4. Wait for job to reach running state
92+
print("Waiting for job to start running...")
93+
assert self.job_api.wait_until_job_running(
94+
name=rayjob.name, k8s_namespace=rayjob.namespace, timeout=300
95+
), "Job did not reach running state after Kueue admission"
96+
print("✓ Job is running")
97+
98+
# 5. Verify RayCluster was created (KubeRay adds random suffix to cluster name)
99+
cluster_api = RayClusterApi()
100+
# List all RayClusters in the namespace since KubeRay adds a suffix
101+
clusters = cluster_api.list_ray_clusters(
102+
k8s_namespace=rayjob.namespace, async_req=False
103+
)
104+
105+
# Find the cluster that starts with our job name
106+
found_cluster = None
107+
for cluster in clusters.get("items", []):
108+
cluster_name = cluster.get("metadata", {}).get("name", "")
109+
if cluster_name.startswith(f"{rayjob.name}-raycluster"):
110+
found_cluster = cluster
111+
break
112+
113+
assert (
114+
found_cluster is not None
115+
), f"RayCluster not found for RayJob {rayjob.name}"
116+
print(
117+
f"✓ RayCluster created successfully: {found_cluster['metadata']['name']}"
118+
)
119+
120+
# 6. Wait for job completion
121+
print("Waiting for job to complete...")
122+
assert self.job_api.wait_until_job_finished(
123+
name=rayjob.name, k8s_namespace=rayjob.namespace, timeout=300
124+
), "Job did not complete"
125+
126+
# Verify final job status
127+
final_status = self.job_api.get_job_status(
128+
name=rayjob.name, k8s_namespace=rayjob.namespace
129+
)
130+
print(
131+
f"✓ Job completed with status: {final_status.get('jobDeploymentStatus')}"
132+
)
133+
134+
finally:
135+
# 7. Delete the job and verify cleanup
136+
print("Cleaning up...")
137+
assert rayjob.delete()
138+
self.verify_cluster_cleanup(rayjob)
139+
print("✓ Cleanup complete")
140+
141+
def test_rayjob_kueue_with_preemption(self):
142+
"""
143+
Test RayJob behavior when using Kueue with potential preemption scenarios.
144+
This tests that manual suspend/resume still works even with Kueue management.
145+
"""
146+
self.setup_method()
147+
create_namespace(self)
148+
# Create Kueue resources with limited quota to force suspension
149+
create_limited_kueue_resources(self)
150+
151+
ray_image = get_ray_image()
152+
self.job_api = RayjobApi()
153+
job_name = "kueue-job"
154+
155+
cluster_config = ManagedClusterConfig(
156+
head_cpu_requests="500m",
157+
head_cpu_limits="500m",
158+
head_memory_requests=4,
159+
head_memory_limits=6,
160+
num_workers=1,
161+
worker_cpu_requests="500m",
162+
worker_cpu_limits="500m",
163+
worker_memory_requests=4,
164+
worker_memory_limits=6,
165+
image=ray_image,
166+
)
167+
168+
rayjob = RayJob(
169+
job_name=job_name,
170+
namespace=self.namespace,
171+
cluster_config=cluster_config,
172+
entrypoint="python -c \"import ray; import time; ray.init(); time.sleep(30); print('Job completed')\"",
173+
runtime_env={"env_vars": get_setup_env_variables(ACCELERATOR="cpu")},
174+
shutdown_after_job_finishes=True,
175+
local_queue=self.local_queues[0],
176+
)
177+
178+
try:
179+
# 1. Submit job
180+
assert rayjob.submit() == job_name
181+
182+
# 2. Check if job is suspended or immediately admitted by Kueue
183+
job_cr = self.job_api.get_job(
184+
name=rayjob.name, k8s_namespace=rayjob.namespace
185+
)
186+
is_suspended = job_cr.get("spec", {}).get("suspend", False)
187+
188+
if is_suspended:
189+
print("Job is queued and suspended by Kueue (waiting for resources)")
190+
assert wait_for_kueue_admission(
191+
self, self.job_api, rayjob.name, rayjob.namespace, timeout=120
192+
), "Job was not admitted by Kueue"
193+
print("✓ Job admitted by Kueue")
194+
else:
195+
print("✓ Job was immediately admitted by Kueue (resources available)")
196+
197+
# 3. Wait for job to be running
198+
assert self.job_api.wait_until_job_running(
199+
name=rayjob.name, k8s_namespace=rayjob.namespace, timeout=300
200+
), "Job did not reach running state"
201+
print("✓ Job is running")
202+
203+
# 4. Manually suspend the job (simulating preemption or manual intervention)
204+
print("Manually suspending the job...")
205+
assert rayjob.stop(), "Job stop failed"
206+
207+
# Verify suspension
208+
job_cr = self.job_api.get_job(
209+
name=rayjob.name, k8s_namespace=rayjob.namespace
210+
)
211+
assert job_cr["spec"]["suspend"] is True, "Job suspend not set to true"
212+
213+
# Wait for suspended state
214+
assert self._wait_for_job_status(
215+
rayjob, "Suspended", timeout=30
216+
), "Job did not reach Suspended state"
217+
print("✓ Job manually suspended")
218+
219+
# 5. Resume the job
220+
print("Resuming the job...")
221+
assert rayjob.resubmit(), "Job resubmit failed"
222+
223+
# Note: With Kueue, the job might go back to the queue and need re-admission
224+
# Check if Kueue re-queued it
225+
job_cr = self.job_api.get_job(
226+
name=rayjob.name, k8s_namespace=rayjob.namespace
227+
)
228+
if job_cr.get("spec", {}).get("suspend", False):
229+
print("Job re-queued by Kueue, waiting for re-admission...")
230+
assert wait_for_kueue_admission(
231+
self, self.job_api, rayjob.name, rayjob.namespace, timeout=120
232+
), "Job was not re-admitted by Kueue"
233+
234+
# 6. Wait for job to complete
235+
assert self.job_api.wait_until_job_finished(
236+
name=rayjob.name, k8s_namespace=rayjob.namespace, timeout=300
237+
), "Job did not complete after resume"
238+
print("✓ Job completed after manual suspend/resume")
239+
240+
finally:
241+
assert rayjob.delete()
242+
self.verify_cluster_cleanup(rayjob)
243+
244+
def _wait_for_job_status(
245+
self,
246+
rayjob: RayJob,
247+
expected_status: str,
248+
timeout: int = 30,
249+
) -> bool:
250+
"""Wait for a job to reach a specific deployment status."""
251+
elapsed_time = 0
252+
check_interval = 2
253+
254+
while elapsed_time < timeout:
255+
status = self.job_api.get_job_status(
256+
name=rayjob.name, k8s_namespace=rayjob.namespace
257+
)
258+
if status and status.get("jobDeploymentStatus") == expected_status:
259+
return True
260+
261+
sleep(check_interval)
262+
elapsed_time += check_interval
263+
264+
return False
265+
266+
def verify_cluster_cleanup(self, rayjob: RayJob, timeout: int = 60):
267+
"""Verify that the cluster created by the RayJob has been cleaned up."""
268+
elapsed_time = 0
269+
check_interval = 5
270+
cluster_api = RayClusterApi()
271+
272+
while elapsed_time < timeout:
273+
# List all RayClusters in the namespace
274+
clusters = cluster_api.list_ray_clusters(
275+
k8s_namespace=rayjob.namespace, async_req=False
276+
)
277+
278+
# Check if any cluster exists that starts with our job name
279+
found = False
280+
for cluster in clusters.get("items", []):
281+
cluster_name = cluster.get("metadata", {}).get("name", "")
282+
if cluster_name.startswith(f"{rayjob.name}-raycluster"):
283+
found = True
284+
break
285+
286+
if not found:
287+
# No cluster found, cleanup successful
288+
return
289+
290+
sleep(check_interval)
291+
elapsed_time += check_interval
292+
293+
raise TimeoutError(
294+
f"RayCluster for job '{rayjob.name}' was not cleaned up within {timeout} seconds"
295+
)

0 commit comments

Comments
 (0)