@@ -114,3 +114,54 @@ def to_dict(self) -> Dict[str, Any]:
114114 "driver_info" : self .spec .driver_info ,
115115 } if self .status is None else self .status
116116 }
117+
118+ def apply (self , force = False ):
119+ """
120+ Applies the RayJob using server-side apply.
121+ If 'force' is set to True, conflicts will be forced.
122+
123+ Args:
124+ force (bool): If True, force conflicts during server-side apply.
125+ """
126+ from kubernetes import client
127+ from kubernetes .dynamic import DynamicClient
128+ from ...common .kubernetes_cluster .auth import get_api_client , config_check
129+ from ...common import _kube_api_error_handling
130+
131+ CF_SDK_FIELD_MANAGER = "codeflare-sdk"
132+
133+ try :
134+ # Check Kubernetes configuration
135+ config_check ()
136+
137+ # Get the dynamic client
138+ crds = DynamicClient (get_api_client ()).resources
139+
140+ # Get the RayJob API instance
141+ api_version = "ray.io/v1"
142+ api_instance = crds .get (api_version = api_version , kind = "RayJob" )
143+
144+ # Get namespace from metadata
145+ namespace = self .metadata .get ("namespace" , "default" )
146+ name = self .metadata .get ("name" )
147+
148+ # Convert job to dictionary
149+ body = self .to_dict ()
150+
151+ # Apply the job using server-side apply
152+ api_instance .server_side_apply (
153+ field_manager = CF_SDK_FIELD_MANAGER ,
154+ group = "ray.io" ,
155+ version = "v1" ,
156+ namespace = namespace ,
157+ plural = "rayjobs" ,
158+ body = body ,
159+ force_conflicts = force ,
160+ )
161+
162+ print (f"RayJob: '{ name } ' has successfully been applied" )
163+
164+ except AttributeError as e :
165+ raise RuntimeError (f"Failed to initialize DynamicClient: { e } " )
166+ except Exception as e :
167+ return _kube_api_error_handling (e )
0 commit comments