File tree Expand file tree Collapse file tree 1 file changed +26
-0
lines changed
google/cloud/dataproc_spark_connect Expand file tree Collapse file tree 1 file changed +26
-0
lines changed Original file line number Diff line number Diff line change @@ -472,6 +472,28 @@ def create_session_pbar():
472472 session_response , dataproc_config .name
473473 )
474474
475+ def _wait_for_spark_connect_endpoint (
476+ self , session_name : str , timeout : int = 300
477+ ) -> Session :
478+ """Waits for the Spark Connect endpoint to be available in the session."""
479+ start_time = time .time ()
480+ while time .time () - start_time < timeout :
481+ try :
482+ session = self .session_controller_client .get_session (
483+ name = session_name
484+ )
485+ if "Spark Connect Server" in session .runtime_info .endpoints :
486+ return session
487+ time .sleep (5 )
488+ except Exception as e :
489+ logger .warning (
490+ f"Error while polling for Spark Connect endpoint: { e } "
491+ )
492+ time .sleep (5 )
493+ raise RuntimeError (
494+ f"Spark Connect endpoint not available for session { session_name } after { timeout } seconds."
495+ )
496+
475497 def _display_session_link_on_creation (self , session_id ):
476498 session_url = f"https://console.cloud.google.com/dataproc/interactive/{ self ._region } /{ session_id } ?project={ self ._project_id } "
477499 plain_message = f"Creating Dataproc Session: { session_url } "
@@ -537,6 +559,10 @@ def _get_exiting_active_session(
537559 )
538560 self ._display_view_session_details_button (s8s_session_id )
539561 if session is None :
562+ # Wait for the Spark Connect endpoint to be available
563+ session_response = self ._wait_for_spark_connect_endpoint (
564+ session_name
565+ )
540566 session = self .__create_spark_connect_session_from_s8s (
541567 session_response , session_name
542568 )
You can’t perform that action at this time.
0 commit comments