Skip to content

Commit 05162fb

Browse files
committed
RHOAIENG-34085: Fix ConfigMap mount
1 parent f2a4cd0 commit 05162fb

File tree

4 files changed

+159
-74
lines changed

4 files changed

+159
-74
lines changed

src/codeflare_sdk/ray/rayjobs/config.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,7 @@ def build_ray_cluster_spec(self, cluster_name: str) -> Dict[str, Any]:
247247
"""
248248
ray_cluster_spec = {
249249
"rayVersion": RAY_VERSION,
250+
"enableInTreeAutoscaling": False, # Required for Kueue-managed jobs
250251
"headGroupSpec": self._build_head_group_spec(),
251252
"workerGroupSpecs": [self._build_worker_group_spec(cluster_name)],
252253
}

src/codeflare_sdk/ray/rayjobs/rayjob.py

Lines changed: 38 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,14 @@ def submit(self) -> str:
149149

150150
self._validate_ray_version_compatibility()
151151

152+
# Extract scripts to check if we need ConfigMaps
153+
scripts = self._extract_script_files_from_entrypoint()
154+
155+
# Pre-declare ConfigMap in cluster config for new clusters
156+
if scripts and self._cluster_config:
157+
configmap_name = f"{self.name}-scripts"
158+
self._cluster_config.add_script_volumes(configmap_name, MOUNT_PATH)
159+
152160
rayjob_cr = self._build_rayjob_cr()
153161

154162
logger.info(f"Submitting RayJob {self.name} to Kuberay operator")
@@ -157,20 +165,41 @@ def submit(self) -> str:
157165
if result:
158166
logger.info(f"Successfully submitted RayJob {self.name}")
159167

160-
# Handle script files after RayJob creation so we can set owner reference
161-
if self._cluster_config is not None:
162-
scripts = self._extract_script_files_from_entrypoint()
163-
if scripts:
164-
self._handle_script_volumes_for_new_cluster(scripts, result)
165-
elif self._cluster_name:
166-
scripts = self._extract_script_files_from_entrypoint()
167-
if scripts:
168-
self._handle_script_volumes_for_existing_cluster(scripts, result)
168+
# Create ConfigMap with owner reference after RayJob exists
169+
if scripts:
170+
self._create_script_configmap(scripts, result)
169171

170172
return self.name
171173
else:
172174
raise RuntimeError(f"Failed to submit RayJob {self.name}")
173175

176+
def _create_script_configmap(
177+
self, scripts: Dict[str, str], rayjob_result: Dict[str, Any]
178+
):
179+
"""
180+
Create ConfigMap with owner reference for script files.
181+
182+
For new clusters: ConfigMap volume was pre-declared, just create it.
183+
For existing clusters: Create ConfigMap and patch the cluster.
184+
"""
185+
# Get a config builder for utility methods
186+
config_builder = (
187+
self._cluster_config if self._cluster_config else ManagedClusterConfig()
188+
)
189+
190+
# Validate and build ConfigMap spec
191+
config_builder.validate_configmap_size(scripts)
192+
configmap_spec = config_builder.build_script_configmap_spec(
193+
job_name=self.name, namespace=self.namespace, scripts=scripts
194+
)
195+
196+
# Create ConfigMap with owner reference
197+
configmap_name = self._create_configmap_from_spec(configmap_spec, rayjob_result)
198+
199+
# For existing clusters, update the cluster with volumes
200+
if self._cluster_name and not self._cluster_config:
201+
self._update_existing_cluster_for_scripts(configmap_name, config_builder)
202+
174203
def stop(self):
175204
"""
176205
Suspend the Ray job.
@@ -488,47 +517,6 @@ def _find_local_imports(
488517
except (SyntaxError, ValueError) as e:
489518
logger.debug(f"Could not parse imports from {script_path}: {e}")
490519

491-
def _handle_script_volumes_for_new_cluster(
492-
self, scripts: Dict[str, str], rayjob_result: Dict[str, Any] = None
493-
):
494-
"""Handle script volumes for new clusters (uses ManagedClusterConfig)."""
495-
# Validate ConfigMap size before creation
496-
self._cluster_config.validate_configmap_size(scripts)
497-
498-
# Build ConfigMap spec using config.py
499-
configmap_spec = self._cluster_config.build_script_configmap_spec(
500-
job_name=self.name, namespace=self.namespace, scripts=scripts
501-
)
502-
503-
# Create ConfigMap via Kubernetes API with owner reference
504-
configmap_name = self._create_configmap_from_spec(configmap_spec, rayjob_result)
505-
506-
# Add volumes to cluster config (config.py handles spec building)
507-
self._cluster_config.add_script_volumes(
508-
configmap_name=configmap_name, mount_path=MOUNT_PATH
509-
)
510-
511-
def _handle_script_volumes_for_existing_cluster(
512-
self, scripts: Dict[str, str], rayjob_result: Dict[str, Any] = None
513-
):
514-
"""Handle script volumes for existing clusters (updates RayCluster CR)."""
515-
# Create config builder for utility methods
516-
config_builder = ManagedClusterConfig()
517-
518-
# Validate ConfigMap size before creation
519-
config_builder.validate_configmap_size(scripts)
520-
521-
# Build ConfigMap spec using config.py
522-
configmap_spec = config_builder.build_script_configmap_spec(
523-
job_name=self.name, namespace=self.namespace, scripts=scripts
524-
)
525-
526-
# Create ConfigMap via Kubernetes API with owner reference
527-
configmap_name = self._create_configmap_from_spec(configmap_spec, rayjob_result)
528-
529-
# Update existing RayCluster
530-
self._update_existing_cluster_for_scripts(configmap_name, config_builder)
531-
532520
def _create_configmap_from_spec(
533521
self, configmap_spec: Dict[str, Any], rayjob_result: Dict[str, Any] = None
534522
) -> str:

src/codeflare_sdk/ray/rayjobs/test_rayjob.py

Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -586,6 +586,8 @@ def test_build_ray_cluster_spec_function():
586586

587587
spec = cluster_config.build_ray_cluster_spec("test-cluster")
588588
assert "rayVersion" in spec
589+
assert "enableInTreeAutoscaling" in spec
590+
assert spec["enableInTreeAutoscaling"] is False # Required for Kueue
589591
assert "headGroupSpec" in spec
590592
assert "workerGroupSpecs" in spec
591593

@@ -1304,11 +1306,13 @@ def func2(): pass
13041306
os.chdir(original_cwd)
13051307

13061308

1307-
def test_script_handling_timing_after_rayjob_submission(
1308-
mocker, auto_mock_setup, tmp_path
1309-
):
1310-
"""Test that script handling happens after RayJob is submitted (not before)."""
1311-
mock_api_instance = auto_mock_setup["rayjob_api"]
1309+
def test_script_handling_kubernetes_best_practice_flow(mocker, tmp_path):
1310+
"""Test the Kubernetes best practice flow: pre-declare volume, submit, create ConfigMap."""
1311+
mocker.patch("kubernetes.config.load_kube_config")
1312+
1313+
mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi")
1314+
mock_api_instance = MagicMock()
1315+
mock_api_class.return_value = mock_api_instance
13121316

13131317
submit_result = {
13141318
"metadata": {
@@ -1319,9 +1323,8 @@ def test_script_handling_timing_after_rayjob_submission(
13191323
}
13201324
mock_api_instance.submit_job.return_value = submit_result
13211325

1322-
mock_handle_new = mocker.patch.object(
1323-
RayJob, "_handle_script_volumes_for_new_cluster"
1324-
)
1326+
mock_create_cm = mocker.patch.object(RayJob, "_create_script_configmap")
1327+
mock_add_volumes = mocker.patch.object(ManagedClusterConfig, "add_script_volumes")
13251328

13261329
# RayClusterApi is already mocked by auto_mock_setup
13271330

@@ -1330,17 +1333,22 @@ def test_script_handling_timing_after_rayjob_submission(
13301333

13311334
call_order = []
13321335

1336+
def track_add_volumes(*args, **kwargs):
1337+
call_order.append("add_volumes")
1338+
# Should be called with ConfigMap name
1339+
assert args[0] == "test-job-scripts"
1340+
13331341
def track_submit(*args, **kwargs):
13341342
call_order.append("submit_job")
13351343
return submit_result
13361344

1337-
def track_handle_scripts(*args, **kwargs):
1338-
call_order.append("handle_scripts")
1339-
assert len(args) >= 2
1345+
def track_create_cm(*args, **kwargs):
1346+
call_order.append("create_configmap")
13401347
assert args[1] == submit_result # rayjob_result should be second arg
13411348

1349+
mock_add_volumes.side_effect = track_add_volumes
13421350
mock_api_instance.submit_job.side_effect = track_submit
1343-
mock_handle_new.side_effect = track_handle_scripts
1351+
mock_create_cm.side_effect = track_create_cm
13441352

13451353
original_cwd = os.getcwd()
13461354
try:
@@ -1359,12 +1367,14 @@ def track_handle_scripts(*args, **kwargs):
13591367
finally:
13601368
os.chdir(original_cwd)
13611369

1362-
assert call_order == ["submit_job", "handle_scripts"]
1370+
# Verify the order: add volumes → submit → create ConfigMap
1371+
assert call_order == ["add_volumes", "submit_job", "create_configmap"]
13631372

1373+
mock_add_volumes.assert_called_once()
13641374
mock_api_instance.submit_job.assert_called_once()
1365-
mock_handle_new.assert_called_once()
1375+
mock_create_cm.assert_called_once()
13661376

1367-
mock_handle_new.assert_called_with({"test.py": "print('test')"}, submit_result)
1377+
mock_create_cm.assert_called_with({"test.py": "print('test')"}, submit_result)
13681378

13691379

13701380
def test_rayjob_submit_with_scripts_new_cluster(auto_mock_setup, tmp_path):

tests/e2e/rayjob/rayjob_lifecycled_cluster_test.py

Lines changed: 95 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,14 @@
22
import sys
33
import os
44
from time import sleep
5+
import tempfile
56

67
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
78
from support import *
89

910
from codeflare_sdk import RayJob, ManagedClusterConfig
11+
12+
from kubernetes import client
1013
from python_client.kuberay_job_api import RayjobApi
1114
from python_client.kuberay_cluster_api import RayClusterApi
1215

@@ -22,7 +25,7 @@ def teardown_method(self):
2225
delete_kueue_resources(self)
2326

2427
def test_lifecycled_kueue_managed(self):
25-
"""Test RayJob with Kueue-managed lifecycled cluster."""
28+
"""Test RayJob with Kueue-managed lifecycled cluster with ConfigMap validation."""
2629
self.setup_method()
2730
create_namespace(self)
2831
create_kueue_resources(self)
@@ -46,17 +49,36 @@ def test_lifecycled_kueue_managed(self):
4649
worker_memory_limits=resources["worker_memory_limits"],
4750
)
4851

49-
rayjob = RayJob(
50-
job_name=job_name,
51-
namespace=self.namespace,
52-
cluster_config=cluster_config,
53-
entrypoint="python -c \"import ray; ray.init(); print('Kueue job done')\"",
54-
runtime_env={"env_vars": get_setup_env_variables(ACCELERATOR="cpu")},
55-
local_queue=self.local_queues[0],
56-
)
52+
# Create a temporary script file to test ConfigMap functionality
53+
with tempfile.NamedTemporaryFile(
54+
mode="w", suffix=".py", delete=False, dir=os.getcwd()
55+
) as script_file:
56+
script_file.write(
57+
"""
58+
import ray
59+
ray.init()
60+
print('Kueue job with ConfigMap done')
61+
ray.shutdown()
62+
"""
63+
)
64+
script_file.flush()
65+
script_filename = os.path.basename(script_file.name)
5766

5867
try:
68+
rayjob = RayJob(
69+
job_name=job_name,
70+
namespace=self.namespace,
71+
cluster_config=cluster_config,
72+
entrypoint=f"python {script_filename}",
73+
runtime_env={"env_vars": get_setup_env_variables(ACCELERATOR="cpu")},
74+
local_queue=self.local_queues[0],
75+
)
76+
5977
assert rayjob.submit() == job_name
78+
79+
# Verify ConfigMap was created with owner reference
80+
self.verify_configmap_with_owner_reference(rayjob)
81+
6082
assert self.job_api.wait_until_job_running(
6183
name=rayjob.name, k8s_namespace=rayjob.namespace, timeout=600
6284
)
@@ -70,6 +92,12 @@ def test_lifecycled_kueue_managed(self):
7092
except Exception:
7193
pass # Job might already be deleted
7294
verify_rayjob_cluster_cleanup(cluster_api, rayjob.name, rayjob.namespace)
95+
# Clean up the temporary script file
96+
if "script_filename" in locals():
97+
try:
98+
os.remove(script_filename)
99+
except:
100+
pass
73101

74102
def test_lifecycled_kueue_resource_queueing(self):
75103
"""Test Kueue resource queueing with lifecycled clusters."""
@@ -161,3 +189,61 @@ def test_lifecycled_kueue_resource_queueing(self):
161189
)
162190
except:
163191
pass
192+
193+
def verify_configmap_with_owner_reference(self, rayjob: RayJob):
194+
"""Verify that the ConfigMap was created with proper owner reference to the RayJob."""
195+
v1 = client.CoreV1Api()
196+
configmap_name = f"{rayjob.name}-scripts"
197+
198+
try:
199+
# Get the ConfigMap
200+
configmap = v1.read_namespaced_config_map(
201+
name=configmap_name, namespace=rayjob.namespace
202+
)
203+
204+
# Verify ConfigMap exists
205+
assert configmap is not None, f"ConfigMap {configmap_name} not found"
206+
207+
# Verify it contains the script
208+
assert configmap.data is not None, "ConfigMap has no data"
209+
assert len(configmap.data) > 0, "ConfigMap data is empty"
210+
211+
# Verify owner reference
212+
assert (
213+
configmap.metadata.owner_references is not None
214+
), "ConfigMap has no owner references"
215+
assert (
216+
len(configmap.metadata.owner_references) > 0
217+
), "ConfigMap owner references list is empty"
218+
219+
owner_ref = configmap.metadata.owner_references[0]
220+
assert (
221+
owner_ref.api_version == "ray.io/v1"
222+
), f"Wrong API version: {owner_ref.api_version}"
223+
assert owner_ref.kind == "RayJob", f"Wrong kind: {owner_ref.kind}"
224+
assert owner_ref.name == rayjob.name, f"Wrong owner name: {owner_ref.name}"
225+
assert (
226+
owner_ref.controller is True
227+
), "Owner reference controller not set to true"
228+
assert (
229+
owner_ref.block_owner_deletion is True
230+
), "Owner reference blockOwnerDeletion not set to true"
231+
232+
# Verify labels
233+
assert configmap.metadata.labels.get("ray.io/job-name") == rayjob.name
234+
assert (
235+
configmap.metadata.labels.get("app.kubernetes.io/managed-by")
236+
== "codeflare-sdk"
237+
)
238+
assert (
239+
configmap.metadata.labels.get("app.kubernetes.io/component")
240+
== "rayjob-scripts"
241+
)
242+
243+
print(f"✓ ConfigMap {configmap_name} verified with proper owner reference")
244+
245+
except client.rest.ApiException as e:
246+
if e.status == 404:
247+
raise AssertionError(f"ConfigMap {configmap_name} not found")
248+
else:
249+
raise e

0 commit comments

Comments
 (0)