Skip to content

Commit 4d8a8f3

Browse files
committed
test(RHOAIENG-26485): Add new SDK e2e tests which cover Job creation for existing cluster
1 parent 5a77f7b commit 4d8a8f3

File tree

2 files changed

+272
-0
lines changed

2 files changed

+272
-0
lines changed
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
from time import sleep
2+
3+
from codeflare_sdk import Cluster, ClusterConfiguration
4+
from codeflare_sdk.ray.rayjobs import RayJob
5+
from codeflare_sdk.ray.rayjobs.status import CodeflareRayJobStatus
6+
7+
import pytest
8+
9+
from support import *
10+
11+
# This test creates a Ray Cluster and then submits a RayJob against the existing cluster on Kind Cluster
12+
13+
@pytest.mark.kind
14+
class TestRayJobExistingClusterKind:
15+
def setup_method(self):
16+
initialize_kubernetes_client(self)
17+
18+
def teardown_method(self):
19+
delete_namespace(self)
20+
delete_kueue_resources(self)
21+
22+
def test_rayjob_ray_cluster_sdk_kind(self):
23+
self.setup_method()
24+
create_namespace(self)
25+
create_kueue_resources(self)
26+
self.run_rayjob_against_existing_cluster_kind(accelerator="cpu")
27+
28+
@pytest.mark.nvidia_gpu
29+
def test_rayjob_ray_cluster_sdk_kind_nvidia_gpu(self):
30+
self.setup_method()
31+
create_namespace(self)
32+
create_kueue_resources(self)
33+
self.run_rayjob_against_existing_cluster_kind(accelerator="gpu", number_of_gpus=1)
34+
35+
def run_rayjob_against_existing_cluster_kind(
36+
self, accelerator, gpu_resource_name="nvidia.com/gpu", number_of_gpus=0
37+
):
38+
39+
cluster_name = "existing-cluster"
40+
cluster = Cluster(
41+
ClusterConfiguration(
42+
name=cluster_name,
43+
namespace=self.namespace,
44+
num_workers=1,
45+
head_cpu_requests="500m",
46+
head_cpu_limits="500m",
47+
worker_cpu_requests="500m",
48+
worker_cpu_limits=1,
49+
worker_memory_requests=1,
50+
worker_memory_limits=4,
51+
worker_extended_resource_requests={gpu_resource_name: number_of_gpus},
52+
write_to_file=True,
53+
verify_tls=False,
54+
)
55+
)
56+
57+
cluster.apply()
58+
cluster.status()
59+
cluster.wait_ready()
60+
cluster.status()
61+
cluster.details()
62+
63+
print(f"✅ Ray cluster '{cluster_name}' is ready")
64+
65+
# test RayJob submission against the existing cluster
66+
self.assert_rayjob_submit_against_existing_cluster(cluster, accelerator, number_of_gpus)
67+
68+
# Cleanup - manually tear down the cluster since job won't do it
69+
print("🧹 Cleaning up Ray cluster")
70+
cluster.down()
71+
72+
def assert_rayjob_submit_against_existing_cluster(self, cluster, accelerator, number_of_gpus):
73+
"""
74+
Test RayJob submission against an existing Ray cluster.
75+
"""
76+
cluster_name = cluster.config.name
77+
job_name = f"mnist-rayjob-{accelerator}"
78+
79+
print(f"🚀 Testing RayJob submission against existing cluster '{cluster_name}'")
80+
81+
# Create RayJob targeting the existing cluster
82+
rayjob = RayJob(
83+
job_name=job_name,
84+
cluster_name=cluster_name,
85+
namespace=self.namespace,
86+
entrypoint="python mnist.py",
87+
runtime_env={
88+
"working_dir": "./tests/e2e/",
89+
"pip": "./tests/e2e/mnist_pip_requirements.txt",
90+
"env_vars": get_setup_env_variables(ACCELERATOR=accelerator),
91+
},
92+
shutdown_after_job_finishes=False, # Don't shutdown the existing cluster
93+
)
94+
95+
# Submit the job
96+
submission_result = rayjob.submit()
97+
assert submission_result == job_name, f"Job submission failed, expected {job_name}, got {submission_result}"
98+
print(f"✅ Successfully submitted RayJob '{job_name}' against existing cluster")
99+
100+
# Monitor the job status until completion
101+
self.monitor_rayjob_completion(rayjob, timeout=900)
102+
103+
print(f"✅ RayJob '{job_name}' completed successfully against existing cluster!")
104+
105+
def monitor_rayjob_completion(self, rayjob: RayJob, timeout: int = 900):
106+
"""
107+
Monitor a RayJob until it completes or fails.
108+
109+
Args:
110+
rayjob: The RayJob instance to monitor
111+
timeout: Maximum time to wait in seconds (default: 15 minutes)
112+
"""
113+
print(f"⏳ Monitoring RayJob '{rayjob.name}' status...")
114+
115+
elapsed_time = 0
116+
check_interval = 10 # Check every 10 seconds
117+
118+
while elapsed_time < timeout:
119+
status, ready = rayjob.status(print_to_console=True)
120+
121+
# Check if job has completed (either successfully or failed)
122+
if status == CodeflareRayJobStatus.COMPLETE:
123+
print(f"✅ RayJob '{rayjob.name}' completed successfully!")
124+
return
125+
elif status == CodeflareRayJobStatus.FAILED:
126+
raise AssertionError(f"❌ RayJob '{rayjob.name}' failed!")
127+
elif status == CodeflareRayJobStatus.RUNNING:
128+
print(f"🏃 RayJob '{rayjob.name}' is still running...")
129+
elif status == CodeflareRayJobStatus.UNKNOWN:
130+
print(f"❓ RayJob '{rayjob.name}' status is unknown")
131+
132+
# Wait before next check
133+
sleep(check_interval)
134+
elapsed_time += check_interval
135+
136+
# If we reach here, the job has timed out
137+
final_status, _ = rayjob.status(print_to_console=True)
138+
raise TimeoutError(
139+
f"⏰ RayJob '{rayjob.name}' did not complete within {timeout} seconds. "
140+
f"Final status: {final_status}"
141+
)
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
import pytest
2+
from time import sleep
3+
4+
from codeflare_sdk import (
5+
Cluster,
6+
ClusterConfiguration,
7+
TokenAuthentication,
8+
)
9+
from codeflare_sdk.ray.rayjobs import RayJob
10+
from codeflare_sdk.ray.rayjobs.status import CodeflareRayJobStatus
11+
12+
from support import *
13+
14+
# This test creates a Ray Cluster and then submits a RayJob against the existing cluster on OpenShift
15+
16+
@pytest.mark.openshift
17+
class TestRayJobExistingClusterOauth:
18+
def setup_method(self):
19+
initialize_kubernetes_client(self)
20+
21+
def teardown_method(self):
22+
delete_namespace(self)
23+
delete_kueue_resources(self)
24+
25+
def test_rayjob_against_existing_cluster_oauth(self):
26+
self.setup_method()
27+
create_namespace(self)
28+
create_kueue_resources(self)
29+
self.run_rayjob_against_existing_cluster_oauth()
30+
31+
def run_rayjob_against_existing_cluster_oauth(self):
32+
ray_image = get_ray_image()
33+
34+
auth = TokenAuthentication(
35+
token=run_oc_command(["whoami", "--show-token=true"]),
36+
server=run_oc_command(["whoami", "--show-server=true"]),
37+
skip_tls=True,
38+
)
39+
auth.login()
40+
41+
cluster_name = "existing-cluster"
42+
43+
cluster = Cluster(
44+
ClusterConfiguration(
45+
name=cluster_name,
46+
namespace=self.namespace,
47+
num_workers=1,
48+
head_cpu_requests="500m",
49+
head_cpu_limits="500m",
50+
worker_cpu_requests=1,
51+
worker_cpu_limits=1,
52+
worker_memory_requests=1,
53+
worker_memory_limits=4,
54+
image=ray_image,
55+
write_to_file=True,
56+
verify_tls=False,
57+
)
58+
)
59+
60+
cluster.apply()
61+
cluster.status()
62+
cluster.wait_ready()
63+
cluster.status()
64+
cluster.details()
65+
66+
print(f"✅ Ray cluster '{cluster_name}' is ready")
67+
68+
job_name = "existing-cluster-rayjob"
69+
70+
rayjob = RayJob(
71+
job_name=job_name,
72+
cluster_name=cluster_name,
73+
namespace=self.namespace,
74+
entrypoint="python -c \"import ray; ray.init(); print('Hello from RayJob!'); print(f'Ray version: {ray.__version__}'); import time; time.sleep(30); print('RayJob completed successfully!')\"",
75+
runtime_env={
76+
"pip": ["torch", "pytorch-lightning", "torchmetrics", "torchvision"],
77+
"env_vars": get_setup_env_variables(ACCELERATOR="cpu"),
78+
},
79+
shutdown_after_job_finishes=False,
80+
)
81+
82+
# Submit the job
83+
print(f"🚀 Submitting RayJob '{job_name}' against existing cluster '{cluster_name}'")
84+
submission_result = rayjob.submit()
85+
assert submission_result == job_name, f"Job submission failed, expected {job_name}, got {submission_result}"
86+
print(f"✅ Successfully submitted RayJob '{job_name}'")
87+
88+
# Monitor the job status until completion
89+
self.monitor_rayjob_completion(rayjob)
90+
91+
# Cleanup - manually tear down the cluster since job won't do it
92+
print("🧹 Cleaning up Ray cluster")
93+
cluster.down()
94+
95+
def monitor_rayjob_completion(self, rayjob: RayJob, timeout: int = 300):
96+
"""
97+
Monitor a RayJob until it completes or fails.
98+
99+
Args:
100+
rayjob: The RayJob instance to monitor
101+
timeout: Maximum time to wait in seconds (default: 15 minutes)
102+
"""
103+
print(f"⏳ Monitoring RayJob '{rayjob.name}' status...")
104+
105+
elapsed_time = 0
106+
check_interval = 10 # Check every 10 seconds
107+
108+
while elapsed_time < timeout:
109+
status, ready = rayjob.status(print_to_console=True)
110+
111+
# Check if job has completed (either successfully or failed)
112+
if status == CodeflareRayJobStatus.COMPLETE:
113+
print(f"✅ RayJob '{rayjob.name}' completed successfully!")
114+
return
115+
elif status == CodeflareRayJobStatus.FAILED:
116+
raise AssertionError(f"❌ RayJob '{rayjob.name}' failed!")
117+
elif status == CodeflareRayJobStatus.RUNNING:
118+
print(f"🏃 RayJob '{rayjob.name}' is still running...")
119+
elif status == CodeflareRayJobStatus.UNKNOWN:
120+
print(f"❓ RayJob '{rayjob.name}' status is unknown")
121+
122+
# Wait before next check
123+
sleep(check_interval)
124+
elapsed_time += check_interval
125+
126+
# If we reach here, the job has timed out
127+
final_status, _ = rayjob.status(print_to_console=True)
128+
raise TimeoutError(
129+
f"⏰ RayJob '{rayjob.name}' did not complete within {timeout} seconds. "
130+
f"Final status: {final_status}"
131+
)

0 commit comments

Comments
 (0)