@@ -160,23 +160,24 @@ def submit(self) -> str:
160160
161161 self ._validate_ray_version_compatibility ()
162162
163- # Automatically handle script files for new clusters
164- if self ._cluster_config is not None :
165- scripts = self ._extract_script_files_from_entrypoint ()
166- if scripts :
167- self ._handle_script_volumes_for_new_cluster (scripts )
168- elif self ._cluster_name :
169- scripts = self ._extract_script_files_from_entrypoint ()
170- if scripts :
171- self ._handle_script_volumes_for_existing_cluster (scripts )
172-
173163 rayjob_cr = self ._build_rayjob_cr ()
174164
175165 logger .info (f"Submitting RayJob { self .name } to Kuberay operator" )
176166 result = self ._api .submit_job (k8s_namespace = self .namespace , job = rayjob_cr )
177167
178168 if result :
179169 logger .info (f"Successfully submitted RayJob { self .name } " )
170+
171+ # Handle script files after RayJob creation so we can set owner reference
172+ if self ._cluster_config is not None :
173+ scripts = self ._extract_script_files_from_entrypoint ()
174+ if scripts :
175+ self ._handle_script_volumes_for_new_cluster (scripts , result )
176+ elif self ._cluster_name :
177+ scripts = self ._extract_script_files_from_entrypoint ()
178+ if scripts :
179+ self ._handle_script_volumes_for_existing_cluster (scripts , result )
180+
180181 if self .shutdown_after_job_finishes :
181182 logger .info (
182183 f"Cluster will be automatically cleaned up { self .ttl_seconds_after_finished } s after job completion"
@@ -206,6 +207,17 @@ def resubmit(self):
206207 else :
207208 raise RuntimeError (f"Failed to resubmit the RayJob { self .name } " )
208209
210+ def delete_job (self ):
211+ """
212+ Delete the Ray job.
213+ """
214+ deleted = self ._api .delete_job (name = self .name , k8s_namespace = self .namespace )
215+ if deleted :
216+ logger .info (f"Successfully deleted the RayJob { self .name } " )
217+ return True
218+ else :
219+ raise RuntimeError (f"Failed to delete the RayJob { self .name } " )
220+
209221 def _build_rayjob_cr (self ) -> Dict [str , Any ]:
210222 """
211223 Build the RayJob custom resource specification using native RayJob capabilities.
@@ -466,7 +478,9 @@ def _find_local_imports(
466478 except (SyntaxError , ValueError ) as e :
467479 logger .debug (f"Could not parse imports from { script_path } : { e } " )
468480
469- def _handle_script_volumes_for_new_cluster (self , scripts : Dict [str , str ]):
481+ def _handle_script_volumes_for_new_cluster (
482+ self , scripts : Dict [str , str ], rayjob_result : Dict [str , Any ] = None
483+ ):
470484 """Handle script volumes for new clusters (uses ManagedClusterConfig)."""
471485 # Validate ConfigMap size before creation
472486 self ._cluster_config .validate_configmap_size (scripts )
@@ -476,15 +490,17 @@ def _handle_script_volumes_for_new_cluster(self, scripts: Dict[str, str]):
476490 job_name = self .name , namespace = self .namespace , scripts = scripts
477491 )
478492
479- # Create ConfigMap via Kubernetes API
480- configmap_name = self ._create_configmap_from_spec (configmap_spec )
493+ # Create ConfigMap via Kubernetes API with owner reference
494+ configmap_name = self ._create_configmap_from_spec (configmap_spec , rayjob_result )
481495
482496 # Add volumes to cluster config (config.py handles spec building)
483497 self ._cluster_config .add_script_volumes (
484498 configmap_name = configmap_name , mount_path = "/home/ray/scripts"
485499 )
486500
487- def _handle_script_volumes_for_existing_cluster (self , scripts : Dict [str , str ]):
501+ def _handle_script_volumes_for_existing_cluster (
502+ self , scripts : Dict [str , str ], rayjob_result : Dict [str , Any ] = None
503+ ):
488504 """Handle script volumes for existing clusters (updates RayCluster CR)."""
489505 # Create config builder for utility methods
490506 config_builder = ManagedClusterConfig ()
@@ -497,28 +513,57 @@ def _handle_script_volumes_for_existing_cluster(self, scripts: Dict[str, str]):
497513 job_name = self .name , namespace = self .namespace , scripts = scripts
498514 )
499515
500- # Create ConfigMap via Kubernetes API
501- configmap_name = self ._create_configmap_from_spec (configmap_spec )
516+ # Create ConfigMap via Kubernetes API with owner reference
517+ configmap_name = self ._create_configmap_from_spec (configmap_spec , rayjob_result )
502518
503519 # Update existing RayCluster
504520 self ._update_existing_cluster_for_scripts (configmap_name , config_builder )
505521
506- def _create_configmap_from_spec (self , configmap_spec : Dict [str , Any ]) -> str :
522+ def _create_configmap_from_spec (
523+ self , configmap_spec : Dict [str , Any ], rayjob_result : Dict [str , Any ] = None
524+ ) -> str :
507525 """
508526 Create ConfigMap from specification via Kubernetes API.
509527
510528 Args:
511529 configmap_spec: ConfigMap specification dictionary
530+ rayjob_result: The result from RayJob creation containing UID
512531
513532 Returns:
514533 str: Name of the created ConfigMap
515534 """
516535
517536 configmap_name = configmap_spec ["metadata" ]["name" ]
518537
538+ metadata = client .V1ObjectMeta (** configmap_spec ["metadata" ])
539+
540+ # Add owner reference if we have the RayJob result
541+ if (
542+ rayjob_result
543+ and isinstance (rayjob_result , dict )
544+ and rayjob_result .get ("metadata" , {}).get ("uid" )
545+ ):
546+ logger .info (
547+ f"Adding owner reference to ConfigMap '{ configmap_name } ' with RayJob UID: { rayjob_result ['metadata' ]['uid' ]} "
548+ )
549+ metadata .owner_references = [
550+ client .V1OwnerReference (
551+ api_version = "ray.io/v1" ,
552+ kind = "RayJob" ,
553+ name = self .name ,
554+ uid = rayjob_result ["metadata" ]["uid" ],
555+ controller = True ,
556+ block_owner_deletion = True ,
557+ )
558+ ]
559+ else :
560+ logger .warning (
561+ f"No valid RayJob result with UID found, ConfigMap '{ configmap_name } ' will not have owner reference. Result: { rayjob_result } "
562+ )
563+
519564 # Convert dict spec to V1ConfigMap
520565 configmap = client .V1ConfigMap (
521- metadata = client . V1ObjectMeta ( ** configmap_spec [ " metadata" ]) ,
566+ metadata = metadata ,
522567 data = configmap_spec ["data" ],
523568 )
524569
0 commit comments