Skip to content

Commit d346afe

Browse files
committed
address comments
1 parent 4762b1f commit d346afe

File tree

2 files changed

+21
-22
lines changed

2 files changed

+21
-22
lines changed

google/cloud/dataproc_spark_connect/session.py

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -98,11 +98,7 @@ def _is_valid_session_id(session_id: str) -> bool:
9898
if not session_id:
9999
return False
100100

101-
# Check length (4-63 characters for GCP session IDs)
102-
if len(session_id) < 4 or len(session_id) > 63:
103-
return False
104-
105-
# Check if the session ID matches the pattern
101+
# The pattern is sufficient for validation and already enforces length constraints.
106102
pattern = r"^[a-z][a-z0-9-]{2,61}[a-z0-9]$"
107103
return bool(re.match(pattern, session_id))
108104

@@ -150,6 +146,18 @@ def __init__(self):
150146
f"{self._region}-dataproc.googleapis.com",
151147
)
152148
)
149+
self._session_controller_client: Optional[
150+
SessionControllerClient
151+
] = None
152+
153+
@property
154+
def session_controller_client(self) -> SessionControllerClient:
155+
"""Get or create a SessionControllerClient instance."""
156+
if self._session_controller_client is None:
157+
self._session_controller_client = SessionControllerClient(
158+
client_options=self._client_options
159+
)
160+
return self._session_controller_client
153161

154162
def projectId(self, project_id):
155163
self._project_id = project_id
@@ -702,24 +710,21 @@ def _delete_session(self, session_name: str):
702710
"""Delete a session to free up the session ID for reuse."""
703711
try:
704712
delete_request = DeleteSessionRequest(name=session_name)
705-
SessionControllerClient(
706-
client_options=self._client_options
707-
).delete_session(delete_request)
713+
self.session_controller_client.delete_session(delete_request)
708714
logger.debug(f"Deleted session: {session_name}")
709715
except NotFound:
710716
logger.debug(f"Session already deleted: {session_name}")
711717

712718
def _wait_for_termination(self, session_name: str, timeout: int = 180):
713719
"""Wait for a session to finish terminating."""
714720
start_time = time.time()
715-
session_client = SessionControllerClient(
716-
client_options=self._client_options
717-
)
718721

719722
while time.time() - start_time < timeout:
720723
try:
721724
get_request = GetSessionRequest(name=session_name)
722-
session = session_client.get_session(get_request)
725+
session = self.session_controller_client.get_session(
726+
get_request
727+
)
723728

724729
if session.state in [
725730
Session.State.TERMINATED,
@@ -754,11 +759,10 @@ def _get_or_cleanup_session_by_id(
754759
session_name = f"projects/{self._project_id}/locations/{self._region}/sessions/{session_id}"
755760

756761
try:
757-
session_client = SessionControllerClient(
758-
client_options=self._client_options
759-
)
760762
get_request = GetSessionRequest(name=session_name)
761-
session = session_client.get_session(get_request)
763+
session = self.session_controller_client.get_session(
764+
get_request
765+
)
762766

763767
logger.debug(
764768
f"Found existing session {session_id} in state: {session.state}"

tests/integration/test_session.py

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -454,7 +454,7 @@ def test_session_reuse_with_custom_id(
454454
existing_session = DataprocSparkSession.getActiveSession()
455455
if existing_session:
456456
existing_session.stop()
457-
except:
457+
except Exception:
458458
pass
459459

460460
# PHASE 1: Create initial session with custom ID
@@ -491,11 +491,6 @@ def test_session_reuse_with_custom_id(
491491
# PHASE 3: Terminate session explicitly
492492
spark2.stop()
493493

494-
# Wait a moment for termination to be registered
495-
import time
496-
497-
time.sleep(5)
498-
499494
# PHASE 4: Recreate with same ID - this tests the cleanup and recreation logic
500495
# Clear all session state to ensure fresh lookup
501496
DataprocSparkSession._default_session = None

0 commit comments

Comments
 (0)