@@ -159,23 +159,24 @@ def submit(self) -> str:
159159
160160 self ._validate_ray_version_compatibility ()
161161
162- # Automatically handle script files for new clusters
163- if self ._cluster_config is not None :
164- scripts = self ._extract_script_files_from_entrypoint ()
165- if scripts :
166- self ._handle_script_volumes_for_new_cluster (scripts )
167- elif self ._cluster_name :
168- scripts = self ._extract_script_files_from_entrypoint ()
169- if scripts :
170- self ._handle_script_volumes_for_existing_cluster (scripts )
171-
172162 rayjob_cr = self ._build_rayjob_cr ()
173163
174164 logger .info (f"Submitting RayJob { self .name } to Kuberay operator" )
175165 result = self ._api .submit_job (k8s_namespace = self .namespace , job = rayjob_cr )
176166
177167 if result :
178168 logger .info (f"Successfully submitted RayJob { self .name } " )
169+
170+ # Handle script files after RayJob creation so we can set owner reference
171+ if self ._cluster_config is not None :
172+ scripts = self ._extract_script_files_from_entrypoint ()
173+ if scripts :
174+ self ._handle_script_volumes_for_new_cluster (scripts , result )
175+ elif self ._cluster_name :
176+ scripts = self ._extract_script_files_from_entrypoint ()
177+ if scripts :
178+ self ._handle_script_volumes_for_existing_cluster (scripts , result )
179+
179180 if self .shutdown_after_job_finishes :
180181 logger .info (
181182 f"Cluster will be automatically cleaned up { self .ttl_seconds_after_finished } s after job completion"
@@ -205,6 +206,17 @@ def resubmit(self):
205206 else :
206207 raise RuntimeError (f"Failed to resubmit the RayJob { self .name } " )
207208
209+ def delete (self ):
210+ """
211+ Delete the Ray job.
212+ """
213+ deleted = self ._api .delete_job (name = self .name , k8s_namespace = self .namespace )
214+ if deleted :
215+ logger .info (f"Successfully deleted the RayJob { self .name } " )
216+ return True
217+ else :
218+ raise RuntimeError (f"Failed to delete the RayJob { self .name } " )
219+
208220 def _build_rayjob_cr (self ) -> Dict [str , Any ]:
209221 """
210222 Build the RayJob custom resource specification using native RayJob capabilities.
@@ -464,7 +476,9 @@ def _find_local_imports(
464476 except (SyntaxError , ValueError ) as e :
465477 logger .debug (f"Could not parse imports from { script_path } : { e } " )
466478
467- def _handle_script_volumes_for_new_cluster (self , scripts : Dict [str , str ]):
479+ def _handle_script_volumes_for_new_cluster (
480+ self , scripts : Dict [str , str ], rayjob_result : Dict [str , Any ] = None
481+ ):
468482 """Handle script volumes for new clusters (uses ManagedClusterConfig)."""
469483 # Validate ConfigMap size before creation
470484 self ._cluster_config .validate_configmap_size (scripts )
@@ -474,15 +488,17 @@ def _handle_script_volumes_for_new_cluster(self, scripts: Dict[str, str]):
474488 job_name = self .name , namespace = self .namespace , scripts = scripts
475489 )
476490
477- # Create ConfigMap via Kubernetes API
478- configmap_name = self ._create_configmap_from_spec (configmap_spec )
491+ # Create ConfigMap via Kubernetes API with owner reference
492+ configmap_name = self ._create_configmap_from_spec (configmap_spec , rayjob_result )
479493
480494 # Add volumes to cluster config (config.py handles spec building)
481495 self ._cluster_config .add_script_volumes (
482496 configmap_name = configmap_name , mount_path = MOUNT_PATH
483497 )
484498
485- def _handle_script_volumes_for_existing_cluster (self , scripts : Dict [str , str ]):
499+ def _handle_script_volumes_for_existing_cluster (
500+ self , scripts : Dict [str , str ], rayjob_result : Dict [str , Any ] = None
501+ ):
486502 """Handle script volumes for existing clusters (updates RayCluster CR)."""
487503 # Create config builder for utility methods
488504 config_builder = ManagedClusterConfig ()
@@ -495,28 +511,57 @@ def _handle_script_volumes_for_existing_cluster(self, scripts: Dict[str, str]):
495511 job_name = self .name , namespace = self .namespace , scripts = scripts
496512 )
497513
498- # Create ConfigMap via Kubernetes API
499- configmap_name = self ._create_configmap_from_spec (configmap_spec )
514+ # Create ConfigMap via Kubernetes API with owner reference
515+ configmap_name = self ._create_configmap_from_spec (configmap_spec , rayjob_result )
500516
501517 # Update existing RayCluster
502518 self ._update_existing_cluster_for_scripts (configmap_name , config_builder )
503519
504- def _create_configmap_from_spec (self , configmap_spec : Dict [str , Any ]) -> str :
520+ def _create_configmap_from_spec (
521+ self , configmap_spec : Dict [str , Any ], rayjob_result : Dict [str , Any ] = None
522+ ) -> str :
505523 """
506524 Create ConfigMap from specification via Kubernetes API.
507525
508526 Args:
509527 configmap_spec: ConfigMap specification dictionary
528+ rayjob_result: The result from RayJob creation containing UID
510529
511530 Returns:
512531 str: Name of the created ConfigMap
513532 """
514533
515534 configmap_name = configmap_spec ["metadata" ]["name" ]
516535
536+ metadata = client .V1ObjectMeta (** configmap_spec ["metadata" ])
537+
538+ # Add owner reference if we have the RayJob result
539+ if (
540+ rayjob_result
541+ and isinstance (rayjob_result , dict )
542+ and rayjob_result .get ("metadata" , {}).get ("uid" )
543+ ):
544+ logger .info (
545+ f"Adding owner reference to ConfigMap '{ configmap_name } ' with RayJob UID: { rayjob_result ['metadata' ]['uid' ]} "
546+ )
547+ metadata .owner_references = [
548+ client .V1OwnerReference (
549+ api_version = "ray.io/v1" ,
550+ kind = "RayJob" ,
551+ name = self .name ,
552+ uid = rayjob_result ["metadata" ]["uid" ],
553+ controller = True ,
554+ block_owner_deletion = True ,
555+ )
556+ ]
557+ else :
558+ logger .warning (
559+ f"No valid RayJob result with UID found, ConfigMap '{ configmap_name } ' will not have owner reference. Result: { rayjob_result } "
560+ )
561+
517562 # Convert dict spec to V1ConfigMap
518563 configmap = client .V1ConfigMap (
519- metadata = client . V1ObjectMeta ( ** configmap_spec [ " metadata" ]) ,
564+ metadata = metadata ,
520565 data = configmap_spec ["data" ],
521566 )
522567
0 commit comments