66from typing import Dict , Any , Optional , Tuple
77from odh_kuberay_client .kuberay_job_api import RayjobApi
88
9+ from ..cluster .cluster import Cluster
10+ from ..cluster .config import ClusterConfiguration
11+ from ..cluster .build_ray_cluster import build_ray_cluster
12+
913from .status import (
1014 RayJobDeploymentStatus ,
1115 CodeflareRayJobStatus ,
1216 RayJobInfo ,
1317)
1418from . import pretty_print
1519
16- # Set up logging
20+
1721logger = logging .getLogger (__name__ )
1822
1923
@@ -22,80 +26,113 @@ class RayJob:
2226 A client for managing Ray jobs using the KubeRay operator.
2327
2428 This class provides a simplified interface for submitting and managing
25- Ray jobs via the Codeflare SDK (using the KubeRay RayJob python client).
29+ Ray jobs via the CodeFlare SDK. It can work with existing clusters or
30+ create new clusters automatically using the native RayJob CRD capabilities.
2631 """
2732
2833 def __init__ (
2934 self ,
3035 job_name : str ,
31- cluster_name : str ,
36+ cluster_name : Optional [str ] = None ,
37+ cluster_config : Optional [ClusterConfiguration ] = None ,
3238 namespace : str = "default" ,
33- entrypoint : str = " None" ,
39+ entrypoint : Optional [ str ] = None ,
3440 runtime_env : Optional [Dict [str , Any ]] = None ,
41+ shutdown_after_job_finishes : bool = True ,
42+ ttl_seconds_after_finished : int = 0 ,
43+ active_deadline_seconds : Optional [int ] = None ,
3544 ):
3645 """
3746 Initialize a RayJob instance.
3847
3948 Args:
40- name: The name for the Ray job
41- namespace: The Kubernetes namespace to submit the job to (default: "default")
42- cluster_name: The name of the Ray cluster to submit the job to
43- **kwargs: Additional configuration options
49+ job_name: The name for the Ray job
50+ cluster_name: The name of an existing Ray cluster (optional if cluster_config provided)
51+ cluster_config: Configuration for creating a new cluster (optional if cluster_name provided)
52+ namespace: The Kubernetes namespace (default: "default")
53+ entrypoint: The Python script or command to run (required for submission)
54+ runtime_env: Ray runtime environment configuration (optional)
55+ shutdown_after_job_finishes: Whether to automatically cleanup the cluster after job completion (default: True)
56+ ttl_seconds_after_finished: Seconds to wait before cleanup after job finishes (default: 0)
57+ active_deadline_seconds: Maximum time the job can run before being terminated (optional)
4458 """
59+ # Validate input parameters
60+ if cluster_name is None and cluster_config is None :
61+ raise ValueError ("Either cluster_name or cluster_config must be provided" )
62+
63+ if cluster_name is not None and cluster_config is not None :
64+ raise ValueError ("Cannot specify both cluster_name and cluster_config" )
65+
4566 self .name = job_name
4667 self .namespace = namespace
47- self .cluster_name = cluster_name
4868 self .entrypoint = entrypoint
4969 self .runtime_env = runtime_env
70+ self .shutdown_after_job_finishes = shutdown_after_job_finishes
71+ self .ttl_seconds_after_finished = ttl_seconds_after_finished
72+ self .active_deadline_seconds = active_deadline_seconds
73+
74+ # Cluster configuration
75+ self ._cluster_name = cluster_name
76+ self ._cluster_config = cluster_config
77+
78+ # Determine cluster name for the job
79+ if cluster_config is not None :
80+ # Ensure cluster config has the same namespace as the job
81+ if cluster_config .namespace is None :
82+ cluster_config .namespace = namespace
83+ elif cluster_config .namespace != namespace :
84+ logger .warning (f"Cluster config namespace ({ cluster_config .namespace } ) differs from job namespace ({ namespace } )" )
85+
86+ self .cluster_name = cluster_config .name or f"{ job_name } -cluster"
87+ # Update the cluster config name if it wasn't set
88+ if not cluster_config .name :
89+ cluster_config .name = self .cluster_name
90+ else :
91+ self .cluster_name = cluster_name
5092
5193 # Initialize the KubeRay job API client
5294 self ._api = RayjobApi ()
5395
5496 logger .info (f"Initialized RayJob: { self .name } in namespace: { self .namespace } " )
5597
56- def submit (
57- self ,
58- ) -> str :
98+ def submit (self ) -> str :
5999 """
60100 Submit the Ray job to the Kubernetes cluster.
61-
62- Args:
63- entrypoint: The Python script or command to run
64- runtime_env: Ray runtime environment configuration (optional)
101+
102+ The RayJob CRD will automatically:
103+ - Create a new cluster if cluster_config was provided
104+ - Use existing cluster if cluster_name was provided
105+ - Clean up resources based on shutdown_after_job_finishes setting
65106
66107 Returns:
67108 The job ID/name if submission was successful
68109
69110 Raises:
70- RuntimeError: If the job has already been submitted or submission fails
111+ ValueError: If entrypoint is not provided
112+ RuntimeError: If job submission fails
71113 """
114+ # Validate required parameters
115+ if not self .entrypoint :
116+ raise ValueError ("entrypoint must be provided to submit a RayJob" )
117+
72118 # Build the RayJob custom resource
73- rayjob_cr = self ._build_rayjob_cr (
74- entrypoint = self .entrypoint ,
75- runtime_env = self .runtime_env ,
76- )
119+ rayjob_cr = self ._build_rayjob_cr ()
77120
78- # Submit the job
79- logger .info (
80- f"Submitting RayJob { self .name } to RayCluster { self .cluster_name } in namespace { self .namespace } "
81- )
121+ # Submit the job - KubeRay operator handles everything else
122+ logger .info (f"Submitting RayJob { self .name } to KubeRay operator" )
82123 result = self ._api .submit_job (k8s_namespace = self .namespace , job = rayjob_cr )
83124
84125 if result :
85126 logger .info (f"Successfully submitted RayJob { self .name } " )
127+ if self .shutdown_after_job_finishes :
128+ logger .info (f"Cluster will be automatically cleaned up { self .ttl_seconds_after_finished } s after job completion" )
86129 return self .name
87130 else :
88131 raise RuntimeError (f"Failed to submit RayJob { self .name } " )
89132
90- def _build_rayjob_cr (
91- self ,
92- entrypoint : str ,
93- runtime_env : Optional [Dict [str , Any ]] = None ,
94- ) -> Dict [str , Any ]:
133+ def _build_rayjob_cr (self ) -> Dict [str , Any ]:
95134 """
96- Build the RayJob custom resource specification.
97-
98- This creates a minimal RayJob CR that can be extended later.
135+ Build the RayJob custom resource specification using native RayJob capabilities.
99136 """
100137 # Basic RayJob custom resource structure
101138 rayjob_cr = {
@@ -106,17 +143,71 @@ def _build_rayjob_cr(
106143 "namespace" : self .namespace ,
107144 },
108145 "spec" : {
109- "entrypoint" : entrypoint ,
110- "clusterSelector" : {"ray.io/cluster" : self .cluster_name },
146+ "entrypoint" : self .entrypoint ,
147+ "shutdownAfterJobFinishes" : self .shutdown_after_job_finishes ,
148+ "ttlSecondsAfterFinished" : self .ttl_seconds_after_finished ,
111149 },
112150 }
113151
152+ # Add active deadline if specified
153+ if self .active_deadline_seconds :
154+ rayjob_cr ["spec" ]["activeDeadlineSeconds" ] = self .active_deadline_seconds
155+
114156 # Add runtime environment if specified
115- if runtime_env :
116- rayjob_cr ["spec" ]["runtimeEnvYAML" ] = str (runtime_env )
157+ if self .runtime_env :
158+ rayjob_cr ["spec" ]["runtimeEnvYAML" ] = str (self .runtime_env )
159+
160+ # Configure cluster: either use existing or create new
161+ if self ._cluster_config is not None :
162+ # Use rayClusterSpec to create a new cluster - leverage existing build logic
163+ ray_cluster_spec = self ._build_ray_cluster_spec ()
164+ rayjob_cr ["spec" ]["rayClusterSpec" ] = ray_cluster_spec
165+ logger .info (f"RayJob will create new cluster: { self .cluster_name } " )
166+ else :
167+ # Use clusterSelector to reference existing cluster
168+ rayjob_cr ["spec" ]["clusterSelector" ] = {"ray.io/cluster" : self .cluster_name }
169+ logger .info (f"RayJob will use existing cluster: { self .cluster_name } " )
117170
118171 return rayjob_cr
119-
172+
173+ def _build_ray_cluster_spec (self ) -> Dict [str , Any ]:
174+ """
175+ Build the RayCluster spec from ClusterConfiguration using existing build_ray_cluster logic.
176+
177+ Returns:
178+ Dict containing the RayCluster spec for embedding in RayJob
179+ """
180+ if not self ._cluster_config :
181+ raise RuntimeError ("No cluster configuration provided" )
182+
183+ # Create a shallow copy of the cluster config to avoid modifying the original
184+ import copy
185+ temp_config = copy .copy (self ._cluster_config )
186+
187+ # Ensure we get a RayCluster (not AppWrapper) and don't write to file
188+ temp_config .appwrapper = False
189+ temp_config .write_to_file = False
190+
191+ # Create a minimal Cluster object for the build process
192+ from ..cluster .cluster import Cluster
193+ temp_cluster = Cluster .__new__ (Cluster ) # Create without calling __init__
194+ temp_cluster .config = temp_config
195+
196+ '''
197+ For now, RayJob with a new/auto-created cluster will not work with Kueue.
198+ This is due to the Kueue label not being propagated to the RayCluster.
199+ '''
200+
201+ # Use the existing build_ray_cluster function to generate the RayCluster
202+ ray_cluster_dict = build_ray_cluster (temp_cluster )
203+
204+ # Extract just the RayCluster spec - RayJob CRD doesn't support metadata in rayClusterSpec
205+ # Note: CodeFlare Operator should still create dashboard routes for the RayCluster
206+ ray_cluster_spec = ray_cluster_dict ["spec" ]
207+
208+ logger .info (f"Built RayCluster spec using existing build logic for cluster: { self .cluster_name } " )
209+ return ray_cluster_spec
210+
120211 def status (self , print_to_console : bool = True ) -> Tuple [CodeflareRayJobStatus , bool ]:
121212 """
122213 Get the status of the Ray job.
@@ -152,8 +243,7 @@ def status(self, print_to_console: bool = True) -> Tuple[CodeflareRayJobStatus,
152243 start_time = status_data .get ('startTime' ),
153244 end_time = status_data .get ('endTime' ),
154245 failed_attempts = status_data .get ('failed' , 0 ),
155- succeeded_attempts = status_data .get ('succeeded' , 0 ),
156- dashboard_url = status_data .get ('dashboardURL' )
246+ succeeded_attempts = status_data .get ('succeeded' , 0 )
157247 )
158248
159249 # Map to CodeFlare status and determine readiness
0 commit comments