@@ -214,7 +214,6 @@ async def _do_ensure_connected[HandshakeMetadata](
214214
215215 last_error : Exception | None = None
216216 i = 0
217- await self ._connection_condition .acquire ()
218217 while rate_limiter .has_budget_or_throw (client_id , ERROR_HANDSHAKE , last_error ):
219218 if i > 0 :
220219 logger .info (f"Retrying build handshake number { i } times" )
@@ -342,7 +341,10 @@ async def websocket_closed_callback() -> None:
342341 rate_limiter .start_restoring_budget (client_id )
343342 self ._state = SessionState .ACTIVE
344343 self ._ws_unwrapped = ws
345- self ._connection_condition .notify_all ()
344+
345+ # We're connected, wake everybody up
346+ async with self ._connection_condition :
347+ self ._connection_condition .notify_all ()
346348 break
347349 except Exception as e :
348350 if ws :
@@ -370,10 +372,6 @@ async def websocket_closed_callback() -> None:
370372 ):
371373 self ._connecting_task = None
372374
373- # Release the lock we took earlier so we can use it again in the next
374- # connection attempt
375- self ._connection_condition .release ()
376-
377375 if last_error is not None :
378376 raise RiverException (
379377 ERROR_HANDSHAKE ,
@@ -493,10 +491,8 @@ async def close(self) -> None:
493491 self ._state = SessionState .CLOSING
494492
495493 # We need to wake up all tasks waiting for connection to be established
496- if not self ._connection_condition .locked ():
497- await self ._connection_condition .acquire ()
498- self ._connection_condition .notify_all ()
499- self ._connection_condition .release ()
494+ async with self ._connection_condition :
495+ self ._connection_condition .notify_all ()
500496
501497 await self ._task_manager .cancel_all_tasks ()
502498
0 commit comments