Skip to content

Commit f0de172

Browse files
This is what Semaphores are for
1 parent 49fcc56 commit f0de172

File tree

2 files changed

+8
-6
lines changed

2 files changed

+8
-6
lines changed

src/replit_river/common_session.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -127,16 +127,17 @@ async def check_to_close_session(
127127

128128

129129
async def buffered_message_sender(
130+
message_enqueued: asyncio.Semaphore,
130131
get_ws: Callable[[], WebSocketCommonProtocol | ClientConnection | None],
131132
websocket_closed_callback: Callable[[], Coroutine[Any, Any, None]],
132133
get_next_pending: Callable[[], TransportMessage | None],
133134
commit: Callable[[TransportMessage], None],
134135
) -> None:
135136
logger.debug("Entering buffered_message_sender")
136137
while True:
137-
logger.debug("buffered_message_sender: LOOP")
138-
sent = False
139-
while msg := get_next_pending():
138+
await message_enqueued.acquire()
139+
logger.debug("buffered_message_sender: acquired")
140+
if msg := get_next_pending():
140141
ws = get_ws()
141142
logger.debug(
142143
"buffered_message_sender: Dequeued %r to send over %r",
@@ -150,7 +151,6 @@ async def buffered_message_sender(
150151
await send_transport_message(msg, ws, websocket_closed_callback)
151152
logger.debug("buffered_message_sender: committing!")
152153
commit(msg)
153-
sent = True
154154
except WebsocketClosedException as e:
155155
logger.debug(
156156
"Connection closed while sending message %r, waiting for "
@@ -168,8 +168,6 @@ async def buffered_message_sender(
168168
except Exception:
169169
logger.exception("Error attempting to send buffered messages")
170170
break
171-
if not sent:
172-
await asyncio.sleep(0.25)
173171

174172

175173
async def add_msg_to_stream(

src/replit_river/v2/session.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ def __init__(
129129
self._retry_connection_callback = retry_connection_callback
130130

131131
# message state
132+
self._message_enqueued = asyncio.Semaphore()
132133
self._space_available_cond = asyncio.Condition()
133134
self._queue_full_lock = asyncio.Lock()
134135

@@ -174,6 +175,7 @@ def get_next_pending() -> TransportMessage | None:
174175

175176
self._task_manager.create_task(
176177
buffered_message_sender(
178+
self._message_enqueued,
177179
get_ws=lambda: (
178180
cast(WebSocketCommonProtocol | ClientConnection, self._ws_unwrapped)
179181
if self.is_websocket_open()
@@ -309,6 +311,8 @@ async def send_message(
309311
await self._queue_full_lock.acquire()
310312
logger.warning("LOCK RELEASED %r", repr(payload))
311313
self._send_buffer.append(msg)
314+
# Wake up buffered_message_sender
315+
self._message_enqueued.release()
312316
self.seq += 1
313317

314318
async def close(self) -> None:

0 commit comments

Comments
 (0)