1111 Coroutine ,
1212 Literal ,
1313 TypeAlias ,
14- cast ,
1514)
1615
1716import nanoid # type: ignore
@@ -217,7 +216,6 @@ def get_next_pending() -> TransportMessage | None:
217216
218217 # TODO: Just return _ws_unwrapped once we are no longer using the legacy client
219218 def get_ws () -> WebSocketCommonProtocol | ClientConnection | None :
220- logger .debug ("get_ws: %r %r" , self .is_connected (), self ._ws_unwrapped )
221219 if self .is_connected ():
222220 return self ._ws_unwrapped
223221 return None
@@ -250,7 +248,7 @@ async def ensure_connected[HandshakeMetadata](
250248 logic that actually establishes the connection.
251249 """
252250
253- logger .debug ("ensure_connected: %r" , self .is_connected ())
251+ logger .debug ("ensure_connected: is_connected= %r" , self .is_connected ())
254252 if self .is_connected ():
255253 return
256254
@@ -264,9 +262,7 @@ async def ensure_connected[HandshakeMetadata](
264262 )
265263 )
266264
267- logger .debug ("BEFORE await _do_ensure_connected" )
268265 await self ._connecting_task
269- logger .debug ("AFTER await _do_ensure_connected" )
270266
271267 async def _do_ensure_connected [HandshakeMetadata ](
272268 self ,
@@ -391,9 +387,7 @@ async def websocket_closed_callback() -> None:
391387 rate_limiter .start_restoring_budget (client_id )
392388 self ._state = SessionState .ACTIVE
393389 self ._ws_unwrapped = ws
394- logger .debug ("Before notify_all: %r %r %r" , self ._state , self ._ws_unwrapped , self ._connection_condition )
395390 self ._connection_condition .notify_all ()
396- self ._connection_condition .release ()
397391 break
398392 except RiverException as e :
399393 await ws .close ()
@@ -422,13 +416,16 @@ async def websocket_closed_callback() -> None:
422416 ):
423417 self ._connecting_task = None
424418
419+ # Release the lock we took earlier so we can use it again in the next
420+ # connection attempt
421+ self ._connection_condition .release ()
422+
425423 if last_error is not None :
426424 raise RiverException (
427425 ERROR_HANDSHAKE ,
428426 f"Failed to create ws after retrying { max_retry } number of times" ,
429427 ) from last_error
430428
431- logger .debug ("EXITING _do_ensure_connected" )
432429 return True
433430
434431 def is_closed (self ) -> bool :
@@ -495,7 +492,6 @@ async def send_message(
495492 serviceName = service_name ,
496493 procedureName = procedure_name ,
497494 )
498- logger .debug ("SENDING MESSAGE: %r" , msg )
499495
500496 if span :
501497 with use_span (span ):
@@ -514,9 +510,7 @@ async def send_message(
514510 self ._queue_full_lock .locked ()
515511 or len (self ._send_buffer ) >= self ._transport_options .buffer_size
516512 ):
517- logger .warning ("LOCK ACQUIRED %r" , repr (payload ))
518513 await self ._queue_full_lock .acquire ()
519- logger .warning ("LOCK RELEASED %r" , repr (payload ))
520514 self ._send_buffer .append (msg )
521515 # Wake up buffered_message_sender
522516 self ._message_enqueued .release ()
0 commit comments