1818
1919import logging
2020import warnings
21- from typing import Dict , Any , Optional , Tuple
21+ from typing import Dict , Any , Optional , Tuple , Union
2222
2323from ray .runtime_env import RuntimeEnv
2424from codeflare_sdk .common .kueue .kueue import get_default_kueue_name
@@ -63,7 +63,7 @@ def __init__(
6363 cluster_name : Optional [str ] = None ,
6464 cluster_config : Optional [ManagedClusterConfig ] = None ,
6565 namespace : Optional [str ] = None ,
66- runtime_env : Optional [RuntimeEnv ] = None ,
66+ runtime_env : Optional [Union [ RuntimeEnv , Dict [ str , Any ]] ] = None ,
6767 ttl_seconds_after_finished : int = 0 ,
6868 active_deadline_seconds : Optional [int ] = None ,
6969 local_queue : Optional [str ] = None ,
@@ -77,7 +77,10 @@ def __init__(
7777 cluster_name: The name of an existing Ray cluster (optional if cluster_config provided)
7878 cluster_config: Configuration for creating a new cluster (optional if cluster_name provided)
7979 namespace: The Kubernetes namespace (auto-detected if not specified)
80- runtime_env: Ray runtime environment configuration as RuntimeEnv object (optional)
80+ runtime_env: Ray runtime environment configuration. Can be:
81+ - RuntimeEnv object from ray.runtime_env
82+ - Dict with keys like 'working_dir', 'pip', 'env_vars', etc.
83+ Example: {"working_dir": "./my-scripts", "pip": ["requests"]}
8184 ttl_seconds_after_finished: Seconds to wait before cleanup after job finishes (default: 0)
8285 active_deadline_seconds: Maximum time the job can run before being terminated (optional)
8386 local_queue: The Kueue LocalQueue to submit the job to (optional)
@@ -109,7 +112,13 @@ def __init__(
109112
110113 self .name = job_name
111114 self .entrypoint = entrypoint
112- self .runtime_env = runtime_env
115+
116+ # Convert dict to RuntimeEnv if needed for user convenience
117+ if isinstance (runtime_env , dict ):
118+ self .runtime_env = RuntimeEnv (** runtime_env )
119+ else :
120+ self .runtime_env = runtime_env
121+
113122 self .ttl_seconds_after_finished = ttl_seconds_after_finished
114123 self .active_deadline_seconds = active_deadline_seconds
115124 self .local_queue = local_queue
@@ -232,6 +241,9 @@ def _build_rayjob_cr(self) -> Dict[str, Any]:
232241 },
233242 }
234243
244+ # Extract files once and use for both runtime_env and submitter pod
245+ files = extract_all_local_files (self )
246+
235247 labels = {}
236248 # If cluster_config is provided, use the local_queue from the cluster_config
237249 if self ._cluster_config is not None :
@@ -262,9 +274,6 @@ def _build_rayjob_cr(self) -> Dict[str, Any]:
262274 if self .active_deadline_seconds :
263275 rayjob_cr ["spec" ]["activeDeadlineSeconds" ] = self .active_deadline_seconds
264276
265- # Extract files once and use for both runtime_env and submitter pod
266- files = extract_all_local_files (self )
267-
268277 # Add runtime environment (can be inferred even if not explicitly specified)
269278 processed_runtime_env = process_runtime_env (self , files )
270279 if processed_runtime_env :
@@ -326,8 +335,19 @@ def _build_submitter_pod_template(
326335
327336 # Build ConfigMap items for each file
328337 config_map_items = []
338+ entrypoint_path = files .get (
339+ "__entrypoint_path__"
340+ ) # Metadata for single file case
341+
329342 for file_name in files .keys ():
330- config_map_items .append ({"key" : file_name , "path" : file_name })
343+ if file_name == "__entrypoint_path__" :
344+ continue # Skip metadata key
345+
346+ # For single file case, use the preserved path structure
347+ if entrypoint_path :
348+ config_map_items .append ({"key" : file_name , "path" : entrypoint_path })
349+ else :
350+ config_map_items .append ({"key" : file_name , "path" : file_name })
331351
332352 # Check if we need to unzip working_dir
333353 has_working_dir_zip = "working_dir.zip" in files
0 commit comments