88from typing import Dict , Any , Optional , List , TYPE_CHECKING
99from codeflare_sdk .common .utils .constants import MOUNT_PATH
1010from kubernetes import client
11+ from ray .runtime_env import RuntimeEnv
1112
1213from codeflare_sdk .ray .rayjobs .config import ManagedClusterConfig
1314from ...common .kubernetes_cluster .auth import get_api_client
2223PYTHON_FILE_PATTERN = r"(?:python\s+)?([./\w/]+\.py)"
2324
2425
26+ def _normalize_runtime_env (
27+ runtime_env : Optional [RuntimeEnv ],
28+ ) -> Optional [Dict [str , Any ]]:
29+ if runtime_env is None :
30+ return None
31+ return runtime_env .to_dict ()
32+
33+
2534def extract_all_local_files (job : RayJob ) -> Optional [Dict [str , str ]]:
2635 """
2736 Extract all local files from both entrypoint and runtime_env working_dir.
@@ -32,14 +41,17 @@ def extract_all_local_files(job: RayJob) -> Optional[Dict[str, str]]:
3241 Returns:
3342 Dict of {file_name: file_content} if local files found, None otherwise
3443 """
44+ # Convert RuntimeEnv to dict for processing
45+ runtime_env_dict = _normalize_runtime_env (job .runtime_env )
46+
3547 # If there's a remote working_dir, don't extract local files to avoid conflicts
3648 if (
37- job . runtime_env
38- and "working_dir" in job . runtime_env
39- and not os .path .isdir (job . runtime_env ["working_dir" ])
49+ runtime_env_dict
50+ and "working_dir" in runtime_env_dict
51+ and not os .path .isdir (runtime_env_dict ["working_dir" ])
4052 ):
4153 logger .info (
42- f"Remote working_dir detected: { job . runtime_env ['working_dir' ]} . "
54+ f"Remote working_dir detected: { runtime_env_dict ['working_dir' ]} . "
4355 "Skipping local file extraction - all files should come from remote source."
4456 )
4557 return None
@@ -55,18 +67,18 @@ def extract_all_local_files(job: RayJob) -> Optional[Dict[str, str]]:
5567
5668 # Extract files from runtime_env working_dir if it's a local directory
5769 if (
58- job . runtime_env
59- and "working_dir" in job . runtime_env
60- and os .path .isdir (job . runtime_env ["working_dir" ])
70+ runtime_env_dict
71+ and "working_dir" in runtime_env_dict
72+ and os .path .isdir (runtime_env_dict ["working_dir" ])
6173 ):
6274 working_dir_files = extract_working_dir_files (
63- job . runtime_env ["working_dir" ], processed_files
75+ runtime_env_dict ["working_dir" ], processed_files
6476 )
6577 if working_dir_files :
6678 files .update (working_dir_files )
6779
6880 # If no working_dir specified in runtime_env, try to infer and extract files from inferred directory
69- elif not job . runtime_env or "working_dir" not in job . runtime_env :
81+ elif not runtime_env_dict or "working_dir" not in runtime_env_dict :
7082 inferred_working_dir = infer_working_dir_from_entrypoint (job )
7183 if inferred_working_dir :
7284 working_dir_files = extract_working_dir_files (
@@ -221,24 +233,27 @@ def process_runtime_env(
221233 Returns:
222234 Processed runtime environment as YAML string, or None if no processing needed
223235 """
236+ # Convert RuntimeEnv to dict for processing
237+ runtime_env_dict = _normalize_runtime_env (job .runtime_env )
238+
224239 processed_env = {}
225240
226241 # Handle env_vars
227- if job . runtime_env and "env_vars" in job . runtime_env :
228- processed_env ["env_vars" ] = job . runtime_env ["env_vars" ]
242+ if runtime_env_dict and "env_vars" in runtime_env_dict :
243+ processed_env ["env_vars" ] = runtime_env_dict ["env_vars" ]
229244 logger .info (
230- f"Added { len (job . runtime_env ['env_vars' ])} environment variables to runtime_env"
245+ f"Added { len (runtime_env_dict ['env_vars' ])} environment variables to runtime_env"
231246 )
232247
233248 # Handle pip dependencies
234- if job . runtime_env and "pip" in job . runtime_env :
235- pip_deps = process_pip_dependencies (job , job . runtime_env ["pip" ])
249+ if runtime_env_dict and "pip" in runtime_env_dict :
250+ pip_deps = process_pip_dependencies (job , runtime_env_dict ["pip" ])
236251 if pip_deps :
237252 processed_env ["pip" ] = pip_deps
238253
239254 # Handle working_dir - if it's a local path, set it to mount path
240- if job . runtime_env and "working_dir" in job . runtime_env :
241- working_dir = job . runtime_env ["working_dir" ]
255+ if runtime_env_dict and "working_dir" in runtime_env_dict :
256+ working_dir = runtime_env_dict ["working_dir" ]
242257 if os .path .isdir (working_dir ):
243258 # Local working directory - will be mounted at MOUNT_PATH
244259 processed_env ["working_dir" ] = MOUNT_PATH
@@ -252,7 +267,7 @@ def process_runtime_env(
252267 logger .info (f"Using remote working directory: { working_dir } " )
253268
254269 # If no working_dir specified but we have files, set working_dir to mount path
255- elif not job . runtime_env or "working_dir" not in job . runtime_env :
270+ elif not runtime_env_dict or "working_dir" not in runtime_env_dict :
256271 if files :
257272 # Local files found - will be mounted at MOUNT_PATH
258273 processed_env ["working_dir" ] = MOUNT_PATH
0 commit comments