Skip to content

Commit 1d33fba

Browse files
committed
kueue-tests
1 parent 538d345 commit 1d33fba

File tree

5 files changed

+521
-24
lines changed

5 files changed

+521
-24
lines changed

src/codeflare_sdk/ray/rayjobs/config.py

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -346,12 +346,9 @@ def _build_head_container(self) -> V1Container:
346346
self.head_accelerators,
347347
),
348348
volume_mounts=self._generate_volume_mounts(),
349+
env=self._build_env_vars() if hasattr(self, "envs") and self.envs else None,
349350
)
350351

351-
# Add environment variables if specified
352-
if hasattr(self, "envs") and self.envs:
353-
container.env = self._build_env_vars()
354-
355352
return container
356353

357354
def _build_worker_container(self) -> V1Container:
@@ -373,12 +370,9 @@ def _build_worker_container(self) -> V1Container:
373370
self.worker_accelerators,
374371
),
375372
volume_mounts=self._generate_volume_mounts(),
373+
env=self._build_env_vars() if hasattr(self, "envs") and self.envs else None,
376374
)
377375

378-
# Add environment variables if specified
379-
if hasattr(self, "envs") and self.envs:
380-
container.env = self._build_env_vars()
381-
382376
return container
383377

384378
def _build_resource_requirements(

src/codeflare_sdk/ray/rayjobs/rayjob.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ def __init__(
6262
shutdown_after_job_finishes: Optional[bool] = None,
6363
ttl_seconds_after_finished: int = 0,
6464
active_deadline_seconds: Optional[int] = None,
65+
local_queue: Optional[str] = None,
6566
):
6667
"""
6768
Initialize a RayJob instance.
@@ -108,6 +109,7 @@ def __init__(
108109
self.runtime_env = runtime_env
109110
self.ttl_seconds_after_finished = ttl_seconds_after_finished
110111
self.active_deadline_seconds = active_deadline_seconds
112+
self.local_queue = local_queue
111113

112114
# Auto-set shutdown_after_job_finishes based on cluster_config presence
113115
# If cluster_config is provided, we want to clean up the cluster after job finishes
@@ -232,9 +234,16 @@ def _build_rayjob_cr(self) -> Dict[str, Any]:
232234
"entrypoint": self.entrypoint,
233235
"shutdownAfterJobFinishes": self.shutdown_after_job_finishes,
234236
"ttlSecondsAfterFinished": self.ttl_seconds_after_finished,
237+
"enableInTreeAutoscaling": False,
235238
},
236239
}
237240

241+
# Add Kueue label if local_queue is specified
242+
if self.local_queue:
243+
rayjob_cr["metadata"]["labels"] = {
244+
"kueue.x-k8s.io/queue-name": self.local_queue
245+
}
246+
238247
# Add active deadline if specified
239248
if self.active_deadline_seconds:
240249
rayjob_cr["spec"]["activeDeadlineSeconds"] = self.active_deadline_seconds
Lines changed: 295 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,295 @@
1+
import pytest
2+
import sys
3+
import os
4+
from time import sleep
5+
6+
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
7+
from support import *
8+
9+
from codeflare_sdk import RayJob, ManagedClusterConfig
10+
import kubernetes.client.rest
11+
from python_client.kuberay_job_api import RayjobApi
12+
from python_client.kuberay_cluster_api import RayClusterApi
13+
14+
15+
@pytest.mark.openshift
16+
class TestRayJobLifecycledClusterKueue:
17+
"""Test RayJob with Kueue integration and auto-created cluster lifecycle management."""
18+
19+
def setup_method(self):
20+
initialize_kubernetes_client(self)
21+
22+
def teardown_method(self):
23+
delete_namespace(self)
24+
delete_kueue_resources(self)
25+
26+
def test_rayjob_with_kueue_integration(self):
27+
"""
28+
Test RayJob submission with Kueue queue management, including:
29+
1. Job submission to Kueue queue
30+
2. Waiting for Kueue admission
31+
3. Job execution and completion
32+
4. Automatic cluster cleanup after job deletion
33+
34+
Note: This test does NOT test manual suspend/resume as that conflicts
35+
with Kueue's queue management.
36+
"""
37+
self.setup_method()
38+
create_namespace(self)
39+
create_kueue_resources(self)
40+
41+
ray_image = get_ray_image()
42+
self.job_api = RayjobApi()
43+
job_name = "kueue-managed-job"
44+
45+
cluster_config = ManagedClusterConfig(
46+
head_cpu_requests="500m",
47+
head_cpu_limits="500m",
48+
head_memory_requests=4,
49+
head_memory_limits=6,
50+
num_workers=1,
51+
worker_cpu_requests="500m",
52+
worker_cpu_limits="500m",
53+
worker_memory_requests=4,
54+
worker_memory_limits=5,
55+
image=ray_image,
56+
)
57+
58+
rayjob = RayJob(
59+
job_name=job_name,
60+
namespace=self.namespace,
61+
cluster_config=cluster_config,
62+
entrypoint="python -c \"import ray; ray.init(); print('Kueue-managed RayJob completed successfully')\"",
63+
runtime_env={"env_vars": get_setup_env_variables(ACCELERATOR="cpu")},
64+
shutdown_after_job_finishes=True,
65+
local_queue=self.local_queues[0],
66+
)
67+
68+
try:
69+
# 1. Submit job to Kueue queue
70+
print(f"Submitting RayJob to Kueue queue: {self.local_queues[0]}")
71+
assert rayjob.submit() == job_name
72+
73+
# 2. Check if job is suspended or immediately admitted by Kueue
74+
job_cr = self.job_api.get_job(
75+
name=rayjob.name, k8s_namespace=rayjob.namespace
76+
)
77+
is_suspended = job_cr.get("spec", {}).get("suspend", False)
78+
79+
if is_suspended:
80+
print("✓ Job is queued and suspended by Kueue (waiting for resources)")
81+
# 3. Wait for Kueue to admit the job
82+
print("Waiting for Kueue to admit the job...")
83+
admitted = wait_for_kueue_admission(
84+
self, self.job_api, rayjob.name, rayjob.namespace, timeout=120
85+
)
86+
assert admitted, "Job was not admitted by Kueue within timeout"
87+
print("✓ Job admitted by Kueue")
88+
else:
89+
print("✓ Job was immediately admitted by Kueue (resources available)")
90+
91+
# 4. Wait for job to reach running state
92+
print("Waiting for job to start running...")
93+
assert self.job_api.wait_until_job_running(
94+
name=rayjob.name, k8s_namespace=rayjob.namespace, timeout=300
95+
), "Job did not reach running state after Kueue admission"
96+
print("✓ Job is running")
97+
98+
# 5. Verify RayCluster was created (KubeRay adds random suffix to cluster name)
99+
cluster_api = RayClusterApi()
100+
# List all RayClusters in the namespace since KubeRay adds a suffix
101+
clusters = cluster_api.list_ray_clusters(
102+
k8s_namespace=rayjob.namespace, async_req=False
103+
)
104+
105+
# Find the cluster that starts with our job name
106+
found_cluster = None
107+
for cluster in clusters.get("items", []):
108+
cluster_name = cluster.get("metadata", {}).get("name", "")
109+
if cluster_name.startswith(f"{rayjob.name}-raycluster"):
110+
found_cluster = cluster
111+
break
112+
113+
assert (
114+
found_cluster is not None
115+
), f"RayCluster not found for RayJob {rayjob.name}"
116+
print(
117+
f"✓ RayCluster created successfully: {found_cluster['metadata']['name']}"
118+
)
119+
120+
# 6. Wait for job completion
121+
print("Waiting for job to complete...")
122+
assert self.job_api.wait_until_job_finished(
123+
name=rayjob.name, k8s_namespace=rayjob.namespace, timeout=300
124+
), "Job did not complete"
125+
126+
# Verify final job status
127+
final_status = self.job_api.get_job_status(
128+
name=rayjob.name, k8s_namespace=rayjob.namespace
129+
)
130+
print(
131+
f"✓ Job completed with status: {final_status.get('jobDeploymentStatus')}"
132+
)
133+
134+
finally:
135+
# 7. Delete the job and verify cleanup
136+
print("Cleaning up...")
137+
assert rayjob.delete()
138+
self.verify_cluster_cleanup(rayjob)
139+
print("✓ Cleanup complete")
140+
141+
def test_rayjob_kueue_with_preemption(self):
142+
"""
143+
Test RayJob behavior when using Kueue with potential preemption scenarios.
144+
This tests that manual suspend/resume still works even with Kueue management.
145+
"""
146+
self.setup_method()
147+
create_namespace(self)
148+
# Create Kueue resources with limited quota to force suspension
149+
create_limited_kueue_resources(self)
150+
151+
ray_image = get_ray_image()
152+
self.job_api = RayjobApi()
153+
job_name = "kueue-job"
154+
155+
cluster_config = ManagedClusterConfig(
156+
head_cpu_requests="500m",
157+
head_cpu_limits="500m",
158+
head_memory_requests=4,
159+
head_memory_limits=6,
160+
num_workers=1,
161+
worker_cpu_requests="500m",
162+
worker_cpu_limits="500m",
163+
worker_memory_requests=4,
164+
worker_memory_limits=6,
165+
image=ray_image,
166+
)
167+
168+
rayjob = RayJob(
169+
job_name=job_name,
170+
namespace=self.namespace,
171+
cluster_config=cluster_config,
172+
entrypoint="python -c \"import ray; import time; ray.init(); time.sleep(30); print('Job completed')\"",
173+
runtime_env={"env_vars": get_setup_env_variables(ACCELERATOR="cpu")},
174+
shutdown_after_job_finishes=True,
175+
local_queue=self.local_queues[0],
176+
)
177+
178+
try:
179+
# 1. Submit job
180+
assert rayjob.submit() == job_name
181+
182+
# 2. Check if job is suspended or immediately admitted by Kueue
183+
job_cr = self.job_api.get_job(
184+
name=rayjob.name, k8s_namespace=rayjob.namespace
185+
)
186+
is_suspended = job_cr.get("spec", {}).get("suspend", False)
187+
188+
if is_suspended:
189+
print("Job is queued and suspended by Kueue (waiting for resources)")
190+
assert wait_for_kueue_admission(
191+
self, self.job_api, rayjob.name, rayjob.namespace, timeout=120
192+
), "Job was not admitted by Kueue"
193+
print("✓ Job admitted by Kueue")
194+
else:
195+
print("✓ Job was immediately admitted by Kueue (resources available)")
196+
197+
# 3. Wait for job to be running
198+
assert self.job_api.wait_until_job_running(
199+
name=rayjob.name, k8s_namespace=rayjob.namespace, timeout=300
200+
), "Job did not reach running state"
201+
print("✓ Job is running")
202+
203+
# 4. Manually suspend the job (simulating preemption or manual intervention)
204+
print("Manually suspending the job...")
205+
assert rayjob.stop(), "Job stop failed"
206+
207+
# Verify suspension
208+
job_cr = self.job_api.get_job(
209+
name=rayjob.name, k8s_namespace=rayjob.namespace
210+
)
211+
assert job_cr["spec"]["suspend"] is True, "Job suspend not set to true"
212+
213+
# Wait for suspended state
214+
assert self._wait_for_job_status(
215+
rayjob, "Suspended", timeout=30
216+
), "Job did not reach Suspended state"
217+
print("✓ Job manually suspended")
218+
219+
# 5. Resume the job
220+
print("Resuming the job...")
221+
assert rayjob.resubmit(), "Job resubmit failed"
222+
223+
# Note: With Kueue, the job might go back to the queue and need re-admission
224+
# Check if Kueue re-queued it
225+
job_cr = self.job_api.get_job(
226+
name=rayjob.name, k8s_namespace=rayjob.namespace
227+
)
228+
if job_cr.get("spec", {}).get("suspend", False):
229+
print("Job re-queued by Kueue, waiting for re-admission...")
230+
assert wait_for_kueue_admission(
231+
self, self.job_api, rayjob.name, rayjob.namespace, timeout=120
232+
), "Job was not re-admitted by Kueue"
233+
234+
# 6. Wait for job to complete
235+
assert self.job_api.wait_until_job_finished(
236+
name=rayjob.name, k8s_namespace=rayjob.namespace, timeout=300
237+
), "Job did not complete after resume"
238+
print("✓ Job completed after manual suspend/resume")
239+
240+
finally:
241+
assert rayjob.delete()
242+
self.verify_cluster_cleanup(rayjob)
243+
244+
def _wait_for_job_status(
245+
self,
246+
rayjob: RayJob,
247+
expected_status: str,
248+
timeout: int = 30,
249+
) -> bool:
250+
"""Wait for a job to reach a specific deployment status."""
251+
elapsed_time = 0
252+
check_interval = 2
253+
254+
while elapsed_time < timeout:
255+
status = self.job_api.get_job_status(
256+
name=rayjob.name, k8s_namespace=rayjob.namespace
257+
)
258+
if status and status.get("jobDeploymentStatus") == expected_status:
259+
return True
260+
261+
sleep(check_interval)
262+
elapsed_time += check_interval
263+
264+
return False
265+
266+
def verify_cluster_cleanup(self, rayjob: RayJob, timeout: int = 60):
267+
"""Verify that the cluster created by the RayJob has been cleaned up."""
268+
elapsed_time = 0
269+
check_interval = 5
270+
cluster_api = RayClusterApi()
271+
272+
while elapsed_time < timeout:
273+
# List all RayClusters in the namespace
274+
clusters = cluster_api.list_ray_clusters(
275+
k8s_namespace=rayjob.namespace, async_req=False
276+
)
277+
278+
# Check if any cluster exists that starts with our job name
279+
found = False
280+
for cluster in clusters.get("items", []):
281+
cluster_name = cluster.get("metadata", {}).get("name", "")
282+
if cluster_name.startswith(f"{rayjob.name}-raycluster"):
283+
found = True
284+
break
285+
286+
if not found:
287+
# No cluster found, cleanup successful
288+
return
289+
290+
sleep(check_interval)
291+
elapsed_time += check_interval
292+
293+
raise TimeoutError(
294+
f"RayCluster for job '{rayjob.name}' was not cleaned up within {timeout} seconds"
295+
)

0 commit comments

Comments
 (0)