@@ -609,298 +609,6 @@ def _component_resources_down(
609609 yamls = yaml .safe_load_all (self .resource_yaml )
610610 _delete_resources (yamls , namespace , api_instance , cluster_name )
611611
612- @staticmethod
613- def run_job_with_managed_cluster (
614- cluster_config : ClusterConfiguration ,
615- job_config : RayJobSpec ,
616- job_cr_name : Optional [str ] = None ,
617- submission_mode : str = "K8sJobMode" ,
618- shutdown_after_job_finishes : bool = True ,
619- ttl_seconds_after_finished : Optional [int ] = None ,
620- suspend_rayjob_creation : bool = False ,
621- wait_for_completion : bool = True ,
622- job_timeout_seconds : Optional [int ] = 3600 ,
623- job_polling_interval_seconds : int = 10 ,
624- ):
625- """
626- Manages the lifecycle of a Ray cluster and a job by creating a RayJob custom resource.
627- KubeRay operator will then create/delete the RayCluster based on the RayJob definition.
628-
629- This method will:
630- 1. Generate a RayCluster specification from the provided 'cluster_config'.
631- 2. Construct a RayJob custom resource definition using 'job_config' and embedding the RayCluster spec.
632- 3. Create the RayJob resource in Kubernetes.
633- 4. Optionally, wait for the RayJob to complete or timeout, monitoring its status.
634- 5. The RayCluster lifecycle (creation and deletion) is managed by KubeRay
635- based on the RayJob's 'shutdownAfterJobFinishes' field.
636-
637- Args:
638- cluster_config: Configuration for the Ray cluster to be created by RayJob.
639- job_config: RayJobSpec object containing job-specific details like entrypoint, runtime_env, etc.
640- job_cr_name: Name for the RayJob Custom Resource. If None, a unique name is generated.
641- submission_mode: How the job is submitted ("K8sJobMode" or "RayClientMode").
642- shutdown_after_job_finishes: If True, RayCluster is deleted after job finishes.
643- ttl_seconds_after_finished: TTL for RayJob after it's finished.
644- suspend_rayjob_creation: If True, creates the RayJob in a suspended state.
645- wait_for_completion: If True, waits for the job to finish.
646- job_timeout_seconds: Timeout for waiting for job completion.
647- job_polling_interval_seconds: Interval for polling job status.
648-
649- Returns:
650- A dictionary containing details like RayJob CR name, reported job submission ID,
651- final job status, dashboard URL, and the RayCluster name.
652-
653- Raises:
654- TimeoutError: If the job doesn't complete within the specified timeout.
655- ApiException: For Kubernetes API errors.
656- ValueError: For configuration issues.
657- """
658- config_check ()
659- k8s_co_api = k8s_client .CustomObjectsApi (get_api_client ())
660- namespace = cluster_config .namespace
661-
662- if not job_config .entrypoint :
663- raise ValueError ("job_config.entrypoint must be specified." )
664-
665- # Warn if Pydantic V1/V2 specific fields in RayJobSpec are set, as they are not used for RayJob CR.
666- if (
667- job_config .entrypoint_num_cpus is not None
668- or job_config .entrypoint_num_gpus is not None
669- or job_config .entrypoint_memory is not None
670- ):
671- warnings .warn (
672- "RayJobSpec fields 'entrypoint_num_cpus', 'entrypoint_num_gpus', 'entrypoint_memory' "
673- "are not directly used when creating a RayJob CR. They are primarily for the Ray Job Submission Client. "
674- "Resource requests for the job driver pod should be configured in the RayCluster head node spec via ClusterConfiguration." ,
675- UserWarning ,
676- )
677-
678- # Generate rayClusterSpec from ClusterConfiguration
679- temp_config_for_spec = copy .deepcopy (cluster_config )
680- temp_config_for_spec .appwrapper = False
681-
682- with warnings .catch_warnings ():
683- warnings .simplefilter ("ignore" , UserWarning )
684- dummy_cluster_for_spec = Cluster (temp_config_for_spec )
685-
686- ray_cluster_cr_dict = dummy_cluster_for_spec .resource_yaml
687- if (
688- not isinstance (ray_cluster_cr_dict , dict )
689- or "spec" not in ray_cluster_cr_dict
690- ):
691- raise ValueError (
692- "Failed to generate RayCluster CR dictionary from ClusterConfiguration. "
693- f"Got: { type (ray_cluster_cr_dict )} "
694- )
695- ray_cluster_spec = ray_cluster_cr_dict ["spec" ]
696-
697- # Prepare RayJob CR
698- actual_job_cr_name = job_cr_name or f"rayjob-{ uuid .uuid4 ().hex [:10 ]} "
699-
700- runtime_env_yaml_str = ""
701- if job_config .runtime_env :
702- try :
703- runtime_env_yaml_str = yaml .dump (job_config .runtime_env )
704- except yaml .YAMLError as e :
705- raise ValueError (
706- f"Invalid job_config.runtime_env, failed to dump to YAML: { e } "
707- )
708-
709- ray_job_cr_spec = {
710- "entrypoint" : job_config .entrypoint ,
711- "shutdownAfterJobFinishes" : shutdown_after_job_finishes ,
712- "rayClusterSpec" : ray_cluster_spec ,
713- "submissionMode" : submission_mode ,
714- }
715-
716- if runtime_env_yaml_str :
717- ray_job_cr_spec ["runtimeEnvYAML" ] = runtime_env_yaml_str
718- if job_config .submission_id :
719- ray_job_cr_spec ["jobId" ] = job_config .submission_id
720- if job_config .metadata :
721- ray_job_cr_spec ["metadata" ] = job_config .metadata
722- if ttl_seconds_after_finished is not None :
723- ray_job_cr_spec ["ttlSecondsAfterFinished" ] = ttl_seconds_after_finished
724- if suspend_rayjob_creation :
725- ray_job_cr_spec ["suspend" ] = True
726- if job_config .entrypoint_resources :
727- ray_job_cr_spec ["entrypointResources" ] = job_config .entrypoint_resources
728-
729- ray_job_cr = {
730- "apiVersion" : "ray.io/v1" ,
731- "kind" : "RayJob" ,
732- "metadata" : {
733- "name" : actual_job_cr_name ,
734- "namespace" : namespace ,
735- },
736- "spec" : ray_job_cr_spec ,
737- }
738-
739- returned_job_submission_id = None
740- final_job_status = "UNKNOWN"
741- dashboard_url = None
742- ray_cluster_name_actual = None
743-
744- try :
745- print (
746- f"Submitting RayJob '{ actual_job_cr_name } ' to namespace '{ namespace } '..."
747- )
748- k8s_co_api .create_namespaced_custom_object (
749- group = "ray.io" ,
750- version = "v1" ,
751- namespace = namespace ,
752- plural = "rayjobs" ,
753- body = ray_job_cr ,
754- )
755- print (f"RayJob '{ actual_job_cr_name } ' created successfully." )
756-
757- if wait_for_completion :
758- print (f"Waiting for RayJob '{ actual_job_cr_name } ' to complete..." )
759- start_time = time .time ()
760- while True :
761- try :
762- ray_job_status_cr = (
763- k8s_co_api .get_namespaced_custom_object_status (
764- group = "ray.io" ,
765- version = "v1" ,
766- namespace = namespace ,
767- plural = "rayjobs" ,
768- name = actual_job_cr_name ,
769- )
770- )
771- except ApiException as e :
772- if e .status == 404 :
773- print (
774- f"RayJob '{ actual_job_cr_name } ' status not found yet, retrying..."
775- )
776- time .sleep (job_polling_interval_seconds )
777- continue
778- raise
779-
780- status_field = ray_job_status_cr .get ("status" , {})
781- job_deployment_status = status_field .get (
782- "jobDeploymentStatus" , "UNKNOWN"
783- )
784- current_job_status = status_field .get ("jobStatus" , "PENDING" )
785-
786- dashboard_url = status_field .get ("dashboardURL" , dashboard_url )
787- ray_cluster_name_actual = status_field .get (
788- "rayClusterName" , ray_cluster_name_actual
789- )
790- returned_job_submission_id = status_field .get (
791- "jobId" , job_config .submission_id
792- )
793-
794- final_job_status = current_job_status
795- print (
796- f"RayJob '{ actual_job_cr_name } ' status: JobDeployment='{ job_deployment_status } ', Job='{ current_job_status } '"
797- )
798-
799- if current_job_status in ["SUCCEEDED" , "FAILED" , "STOPPED" ]:
800- break
801-
802- if (
803- job_timeout_seconds
804- and (time .time () - start_time ) > job_timeout_seconds
805- ):
806- try :
807- ray_job_status_cr_final = (
808- k8s_co_api .get_namespaced_custom_object_status (
809- group = "ray.io" ,
810- version = "v1" ,
811- namespace = namespace ,
812- plural = "rayjobs" ,
813- name = actual_job_cr_name ,
814- )
815- )
816- status_field_final = ray_job_status_cr_final .get (
817- "status" , {}
818- )
819- final_job_status = status_field_final .get (
820- "jobStatus" , final_job_status
821- )
822- returned_job_submission_id = status_field_final .get (
823- "jobId" , returned_job_submission_id
824- )
825- dashboard_url = status_field_final .get (
826- "dashboardURL" , dashboard_url
827- )
828- ray_cluster_name_actual = status_field_final .get (
829- "rayClusterName" , ray_cluster_name_actual
830- )
831- except Exception :
832- pass
833- raise TimeoutError (
834- f"RayJob '{ actual_job_cr_name } ' timed out after { job_timeout_seconds } seconds. Last status: { final_job_status } "
835- )
836-
837- time .sleep (job_polling_interval_seconds )
838-
839- print (
840- f"RayJob '{ actual_job_cr_name } ' finished with status: { final_job_status } "
841- )
842- else :
843- try :
844- ray_job_status_cr = k8s_co_api .get_namespaced_custom_object_status (
845- group = "ray.io" ,
846- version = "v1" ,
847- namespace = namespace ,
848- plural = "rayjobs" ,
849- name = actual_job_cr_name ,
850- )
851- status_field = ray_job_status_cr .get ("status" , {})
852- final_job_status = status_field .get ("jobStatus" , "SUBMITTED" )
853- returned_job_submission_id = status_field .get (
854- "jobId" , job_config .submission_id
855- )
856- dashboard_url = status_field .get ("dashboardURL" , dashboard_url )
857- ray_cluster_name_actual = status_field .get (
858- "rayClusterName" , ray_cluster_name_actual
859- )
860- except ApiException as e :
861- if e .status == 404 :
862- final_job_status = "SUBMITTED_NOT_FOUND"
863- else :
864- print (
865- f"Warning: Could not fetch initial status for RayJob '{ actual_job_cr_name } ': { e } "
866- )
867- final_job_status = "UNKNOWN_API_ERROR"
868-
869- return {
870- "job_cr_name" : actual_job_cr_name ,
871- "job_submission_id" : returned_job_submission_id ,
872- "job_status" : final_job_status ,
873- "dashboard_url" : dashboard_url ,
874- "ray_cluster_name" : ray_cluster_name_actual ,
875- }
876-
877- except ApiException as e :
878- print (
879- f"Kubernetes API error during RayJob '{ actual_job_cr_name } ' management: { e .reason } (status: { e .status } )"
880- )
881- final_status_on_error = "ERROR_BEFORE_SUBMISSION"
882- if actual_job_cr_name :
883- try :
884- ray_job_status_cr = k8s_co_api .get_namespaced_custom_object_status (
885- group = "ray.io" ,
886- version = "v1" ,
887- namespace = namespace ,
888- plural = "rayjobs" ,
889- name = actual_job_cr_name ,
890- )
891- status_field = ray_job_status_cr .get ("status" , {})
892- final_status_on_error = status_field .get (
893- "jobStatus" , "UNKNOWN_AFTER_K8S_ERROR"
894- )
895- except Exception :
896- final_status_on_error = "UNKNOWN_FINAL_STATUS_FETCH_FAILED"
897- raise
898- except Exception as e :
899- print (
900- f"An unexpected error occurred during managed RayJob execution for '{ actual_job_cr_name } ': { e } "
901- )
902- raise
903-
904612
905613def list_all_clusters (namespace : str , print_to_console : bool = True ):
906614 """
@@ -1057,21 +765,15 @@ def get_cluster(
1057765 head_extended_resource_requests = head_extended_resources ,
1058766 worker_extended_resource_requests = worker_extended_resources ,
1059767 )
1060- # 1. Prepare RayClusterSpec from ClusterConfiguration
1061- # Create a temporary config with appwrapper=False to ensure build_ray_cluster returns RayCluster YAML
1062- temp_cluster_config_dict = cluster_config .dict (
1063- exclude_none = True
1064- ) # Assuming Pydantic V1 or similar .dict() method
1065- temp_cluster_config_dict ["appwrapper" ] = False
1066- temp_cluster_config_for_spec = ClusterConfiguration (** temp_cluster_config_dict )
768+
1067769 # Ignore the warning here for the lack of a ClusterConfiguration
1068770 with warnings .catch_warnings ():
1069771 warnings .filterwarnings (
1070772 "ignore" ,
1071773 message = "Please provide a ClusterConfiguration to initialise the Cluster object" ,
1072774 )
1073775 cluster = Cluster (None )
1074- cluster .config = temp_cluster_config_for_spec
776+ cluster .config = cluster_config
1075777
1076778 # Remove auto-generated fields like creationTimestamp, uid and etc.
1077779 remove_autogenerated_fields (resource )
0 commit comments