Skip to content

Commit e9669e2

Browse files
committed
task(RHOAIENG-33283): Zip working directory
1 parent ef92ba7 commit e9669e2

File tree

8 files changed

+1074
-1390
lines changed

8 files changed

+1074
-1390
lines changed

src/codeflare_sdk/ray/rayjobs/rayjob.py

Lines changed: 83 additions & 471 deletions
Large diffs are not rendered by default.
Lines changed: 373 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,373 @@
1+
from __future__ import annotations # Postpone evaluation of annotations
2+
3+
import logging
4+
import os
5+
import re
6+
import yaml
7+
import zipfile
8+
import base64
9+
import io
10+
from typing import Dict, Any, Optional, List, TYPE_CHECKING
11+
from codeflare_sdk.common.utils.constants import MOUNT_PATH
12+
from kubernetes import client
13+
from ray.runtime_env import RuntimeEnv
14+
15+
from codeflare_sdk.ray.rayjobs.config import ManagedClusterConfig
16+
from ...common.kubernetes_cluster.auth import get_api_client
17+
18+
# Use TYPE_CHECKING to avoid circular import at runtime
19+
if TYPE_CHECKING:
20+
from codeflare_sdk.ray.rayjobs.rayjob import RayJob
21+
22+
logger = logging.getLogger(__name__)
23+
24+
# Regex pattern for finding Python files in entrypoint commands
25+
# Matches paths like: test.py, ./test.py, dir/test.py, my-dir/test.py
26+
PYTHON_FILE_PATTERN = r"(?:python\s+)?([./\w/-]+\.py)"
27+
28+
# Path where working_dir will be unzipped on submitter pod
29+
UNZIP_PATH = "/tmp/rayjob-working-dir"
30+
31+
32+
def _normalize_runtime_env(
33+
runtime_env: Optional[RuntimeEnv],
34+
) -> Optional[Dict[str, Any]]:
35+
if runtime_env is None:
36+
return None
37+
return runtime_env.to_dict()
38+
39+
40+
def extract_all_local_files(job: RayJob) -> Optional[Dict[str, str]]:
41+
"""
42+
Prepare local files for ConfigMap upload.
43+
44+
- If runtime_env has local working_dir: zip entire directory into single file
45+
- If single entrypoint file (no working_dir): extract that file
46+
- If remote working_dir URL: return None (pass through to Ray)
47+
48+
Returns:
49+
Dict with either:
50+
- {"working_dir.zip": <base64_encoded_zip>} for zipped directories
51+
- {"script.py": <file_content>} for single files
52+
- None for remote working_dir or no files
53+
"""
54+
# Convert RuntimeEnv to dict for processing
55+
runtime_env_dict = _normalize_runtime_env(job.runtime_env)
56+
57+
# If there's a remote working_dir, don't extract local files
58+
if (
59+
runtime_env_dict
60+
and "working_dir" in runtime_env_dict
61+
and not os.path.isdir(runtime_env_dict["working_dir"])
62+
):
63+
logger.info(
64+
f"Remote working_dir detected: {runtime_env_dict['working_dir']}. "
65+
"Skipping local file extraction - using remote source."
66+
)
67+
return None
68+
69+
# If there's a local working_dir, zip it
70+
if (
71+
runtime_env_dict
72+
and "working_dir" in runtime_env_dict
73+
and os.path.isdir(runtime_env_dict["working_dir"])
74+
):
75+
working_dir = runtime_env_dict["working_dir"]
76+
logger.info(f"Zipping local working_dir: {working_dir}")
77+
zip_data = _zip_directory(working_dir)
78+
if zip_data:
79+
# Encode zip as base64 for ConfigMap storage
80+
zip_base64 = base64.b64encode(zip_data).decode("utf-8")
81+
return {"working_dir.zip": zip_base64}
82+
83+
# If no working_dir, check for single entrypoint file
84+
entrypoint_file = _extract_single_entrypoint_file(job)
85+
if entrypoint_file:
86+
return entrypoint_file
87+
88+
return None
89+
90+
91+
def _zip_directory(directory_path: str) -> Optional[bytes]:
92+
"""
93+
Zip entire directory preserving structure.
94+
95+
Args:
96+
directory_path: Path to directory to zip
97+
98+
Returns:
99+
Bytes of zip file, or None on error
100+
"""
101+
try:
102+
# Create in-memory zip file
103+
zip_buffer = io.BytesIO()
104+
105+
with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zipf:
106+
# Walk through directory and add all files
107+
for root, dirs, files in os.walk(directory_path):
108+
for file in files:
109+
file_path = os.path.join(root, file)
110+
# Calculate relative path from directory_path
111+
arcname = os.path.relpath(file_path, directory_path)
112+
zipf.write(file_path, arcname)
113+
logger.debug(f"Added to zip: {arcname}")
114+
115+
zip_data = zip_buffer.getvalue()
116+
logger.info(
117+
f"Successfully zipped directory: {directory_path} ({len(zip_data)} bytes)"
118+
)
119+
return zip_data
120+
121+
except (IOError, OSError) as e:
122+
logger.error(f"Failed to zip directory {directory_path}: {e}")
123+
return None
124+
125+
126+
def _extract_single_entrypoint_file(job: RayJob) -> Optional[Dict[str, str]]:
127+
"""
128+
Extract single Python file from entrypoint if no working_dir specified.
129+
130+
Returns a dict with metadata about the file path structure so we can
131+
preserve it when mounting via ConfigMap.
132+
133+
Args:
134+
job: RayJob instance
135+
136+
Returns:
137+
Dict with special format: {"__entrypoint_path__": path, "filename": content}
138+
This allows us to preserve directory structure when mounting
139+
"""
140+
if not job.entrypoint:
141+
return None
142+
143+
# Look for Python file in entrypoint
144+
matches = re.findall(PYTHON_FILE_PATTERN, job.entrypoint)
145+
146+
for file_path in matches:
147+
# Check if it's a local file
148+
if os.path.isfile(file_path):
149+
try:
150+
with open(file_path, "r") as f:
151+
content = f.read()
152+
153+
# Use basename as key (ConfigMap keys can't have slashes)
154+
# But store the full path for later use in ConfigMap item.path
155+
filename = os.path.basename(file_path)
156+
relative_path = file_path.lstrip("./")
157+
158+
logger.info(f"Extracted single entrypoint file: {file_path}")
159+
160+
# Return special format with metadata
161+
return {"__entrypoint_path__": relative_path, filename: content}
162+
163+
except (IOError, OSError) as e:
164+
logger.warning(f"Could not read entrypoint file {file_path}: {e}")
165+
166+
return None
167+
168+
169+
def process_runtime_env(
170+
job: RayJob, files: Optional[Dict[str, str]] = None
171+
) -> Optional[str]:
172+
"""
173+
Process runtime_env field to handle env_vars, pip dependencies, and working_dir.
174+
175+
Returns:
176+
Processed runtime environment as YAML string, or None if no processing needed
177+
"""
178+
# Convert RuntimeEnv to dict for processing
179+
runtime_env_dict = _normalize_runtime_env(job.runtime_env)
180+
181+
processed_env = {}
182+
183+
# Handle env_vars
184+
if runtime_env_dict and "env_vars" in runtime_env_dict:
185+
processed_env["env_vars"] = runtime_env_dict["env_vars"]
186+
logger.info(
187+
f"Added {len(runtime_env_dict['env_vars'])} environment variables to runtime_env"
188+
)
189+
190+
# Handle pip dependencies
191+
if runtime_env_dict and "pip" in runtime_env_dict:
192+
pip_deps = process_pip_dependencies(job, runtime_env_dict["pip"])
193+
if pip_deps:
194+
processed_env["pip"] = pip_deps
195+
196+
# Handle working_dir
197+
if runtime_env_dict and "working_dir" in runtime_env_dict:
198+
working_dir = runtime_env_dict["working_dir"]
199+
if os.path.isdir(working_dir):
200+
# Local working directory - will be zipped and unzipped to UNZIP_PATH by submitter pod
201+
processed_env["working_dir"] = UNZIP_PATH
202+
logger.info(
203+
f"Local working_dir will be zipped, mounted, and unzipped to: {UNZIP_PATH}"
204+
)
205+
else:
206+
# Remote URI (e.g., GitHub, S3) - pass through as-is
207+
processed_env["working_dir"] = working_dir
208+
logger.info(f"Using remote working_dir: {working_dir}")
209+
210+
# If no working_dir specified but we have files (single file case)
211+
elif not runtime_env_dict or "working_dir" not in runtime_env_dict:
212+
if files and "working_dir.zip" not in files:
213+
# Single file case - mount at MOUNT_PATH
214+
processed_env["working_dir"] = MOUNT_PATH
215+
logger.info(f"Single file will be mounted at: {MOUNT_PATH}")
216+
217+
# Convert to YAML string if we have any processed environment
218+
if processed_env:
219+
return yaml.dump(processed_env, default_flow_style=False)
220+
221+
return None
222+
223+
224+
def process_pip_dependencies(job: RayJob, pip_spec) -> Optional[List[str]]:
225+
"""
226+
Process pip dependencies from runtime_env.
227+
228+
Args:
229+
pip_spec: Can be a list of packages, a string path to requirements.txt, or dict
230+
231+
Returns:
232+
List of pip dependencies
233+
"""
234+
if isinstance(pip_spec, list):
235+
# Already a list of dependencies
236+
logger.info(f"Using provided pip dependencies: {len(pip_spec)} packages")
237+
return pip_spec
238+
elif isinstance(pip_spec, str):
239+
# Assume it's a path to requirements.txt
240+
return parse_requirements_file(pip_spec)
241+
elif isinstance(pip_spec, dict):
242+
# Handle dict format (e.g., {"packages": [...], "pip_check": False})
243+
if "packages" in pip_spec:
244+
logger.info(
245+
f"Using pip dependencies from dict: {len(pip_spec['packages'])} packages"
246+
)
247+
return pip_spec["packages"]
248+
249+
logger.warning(f"Unsupported pip specification format: {type(pip_spec)}")
250+
return None
251+
252+
253+
def parse_requirements_file(requirements_path: str) -> Optional[List[str]]:
254+
"""
255+
Parse a requirements.txt file and return list of dependencies.
256+
257+
Args:
258+
requirements_path: Path to requirements.txt file
259+
260+
Returns:
261+
List of pip dependencies
262+
"""
263+
if not os.path.isfile(requirements_path):
264+
logger.warning(f"Requirements file not found: {requirements_path}")
265+
return None
266+
267+
try:
268+
with open(requirements_path, "r") as f:
269+
lines = f.readlines()
270+
271+
# Parse requirements, filtering out comments and empty lines
272+
requirements = []
273+
for line in lines:
274+
line = line.strip()
275+
if line and not line.startswith("#"):
276+
requirements.append(line)
277+
278+
logger.info(f"Parsed {len(requirements)} dependencies from {requirements_path}")
279+
return requirements
280+
281+
except (IOError, OSError) as e:
282+
logger.warning(f"Could not read requirements file {requirements_path}: {e}")
283+
return None
284+
285+
286+
def create_configmap_from_spec(
287+
job: RayJob, configmap_spec: Dict[str, Any], rayjob_result: Dict[str, Any] = None
288+
) -> str:
289+
"""
290+
Create ConfigMap from specification via Kubernetes API.
291+
292+
Args:
293+
configmap_spec: ConfigMap specification dictionary
294+
rayjob_result: The result from RayJob creation containing UID
295+
296+
Returns:
297+
str: Name of the created ConfigMap
298+
"""
299+
300+
configmap_name = configmap_spec["metadata"]["name"]
301+
302+
metadata = client.V1ObjectMeta(**configmap_spec["metadata"])
303+
304+
# Add owner reference if we have the RayJob result
305+
if (
306+
rayjob_result
307+
and isinstance(rayjob_result, dict)
308+
and rayjob_result.get("metadata", {}).get("uid")
309+
):
310+
logger.info(
311+
f"Adding owner reference to ConfigMap '{configmap_name}' with RayJob UID: {rayjob_result['metadata']['uid']}"
312+
)
313+
metadata.owner_references = [
314+
client.V1OwnerReference(
315+
api_version="ray.io/v1",
316+
kind="RayJob",
317+
name=job.name,
318+
uid=rayjob_result["metadata"]["uid"],
319+
controller=True,
320+
block_owner_deletion=True,
321+
)
322+
]
323+
else:
324+
logger.warning(
325+
f"No valid RayJob result with UID found, ConfigMap '{configmap_name}' will not have owner reference. Result: {rayjob_result}"
326+
)
327+
328+
# Convert dict spec to V1ConfigMap
329+
configmap = client.V1ConfigMap(
330+
metadata=metadata,
331+
data=configmap_spec["data"],
332+
)
333+
334+
# Create ConfigMap via Kubernetes API
335+
k8s_api = client.CoreV1Api(get_api_client())
336+
try:
337+
k8s_api.create_namespaced_config_map(namespace=job.namespace, body=configmap)
338+
logger.info(
339+
f"Created ConfigMap '{configmap_name}' with {len(configmap_spec['data'])} files"
340+
)
341+
except client.ApiException as e:
342+
if e.status == 409: # Already exists
343+
logger.info(f"ConfigMap '{configmap_name}' already exists, updating...")
344+
k8s_api.replace_namespaced_config_map(
345+
name=configmap_name, namespace=job.namespace, body=configmap
346+
)
347+
else:
348+
raise RuntimeError(f"Failed to create ConfigMap '{configmap_name}': {e}")
349+
350+
return configmap_name
351+
352+
353+
def create_file_configmap(
354+
job: RayJob, files: Dict[str, str], rayjob_result: Dict[str, Any]
355+
):
356+
"""
357+
Create ConfigMap with owner reference for local files.
358+
"""
359+
# Use a basic config builder for ConfigMap creation
360+
config_builder = ManagedClusterConfig()
361+
362+
# Filter out metadata keys (like __entrypoint_path__) from ConfigMap data
363+
configmap_files = {k: v for k, v in files.items() if not k.startswith("__")}
364+
365+
# Validate and build ConfigMap spec
366+
config_builder.validate_configmap_size(configmap_files)
367+
configmap_spec = config_builder.build_file_configmap_spec(
368+
job_name=job.name, namespace=job.namespace, files=configmap_files
369+
)
370+
371+
# Create ConfigMap with owner reference
372+
# TODO Error handling
373+
create_configmap_from_spec(job, configmap_spec, rayjob_result)

0 commit comments

Comments
 (0)