|
16 | 16 | from replit_river.message_buffer import MessageBuffer, MessageBufferClosedError |
17 | 17 | from replit_river.messages import ( |
18 | 18 | FailedSendingMessageException, |
19 | | - WebsocketClosedException, |
20 | | - send_transport_message, |
21 | 19 | ) |
22 | 20 | from replit_river.seq_manager import ( |
23 | 21 | SeqManager, |
@@ -107,6 +105,9 @@ def __init__( |
107 | 105 | self._buffer = MessageBuffer(self._transport_options.buffer_size) |
108 | 106 | self._task_manager = BackgroundTaskManager() |
109 | 107 |
|
| 108 | + # Start the buffered message sender task |
| 109 | + self._start_buffered_message_sender() |
| 110 | + |
110 | 111 | def _setup_heartbeats_task( |
111 | 112 | self, |
112 | 113 | do_close_websocket: Callable[[], Awaitable[None]], |
@@ -142,6 +143,38 @@ def increment_and_get_heartbeat_misses() -> int: |
142 | 143 | ) |
143 | 144 | ) |
144 | 145 |
|
| 146 | + def _start_buffered_message_sender(self) -> None: |
| 147 | + """Start the background task that sends messages from the buffer.""" |
| 148 | + from replit_river.common_session import buffered_message_sender |
| 149 | + |
| 150 | + async def commit(msg: TransportMessage) -> None: |
| 151 | + # Remove messages that have been acknowledged |
| 152 | + await self._buffer.remove_old_messages(msg.seq + 1) |
| 153 | + |
| 154 | + def get_next_pending() -> TransportMessage | None: |
| 155 | + return self._buffer.peek() |
| 156 | + |
| 157 | + def get_ws() -> websockets.WebSocketCommonProtocol | None: |
| 158 | + if self._ws_wrapper.is_open(): |
| 159 | + return self._ws_wrapper.ws |
| 160 | + return None |
| 161 | + |
| 162 | + async def block_until_connected() -> None: |
| 163 | + while self._state in [SessionState.NO_CONNECTION, SessionState.CONNECTING]: |
| 164 | + await asyncio.sleep(0.1) |
| 165 | + |
| 166 | + self._task_manager.create_task( |
| 167 | + buffered_message_sender( |
| 168 | + block_until_connected=block_until_connected, |
| 169 | + block_until_message_available=self._buffer.block_until_message_available, |
| 170 | + get_ws=get_ws, |
| 171 | + websocket_closed_callback=self._begin_close_session_countdown, |
| 172 | + get_next_pending=get_next_pending, |
| 173 | + commit=commit, |
| 174 | + get_state=lambda: self._state, |
| 175 | + ) |
| 176 | + ) |
| 177 | + |
145 | 178 | async def is_session_open(self) -> bool: |
146 | 179 | async with self._state_lock: |
147 | 180 | return self._state == SessionState.ACTIVE |
@@ -181,24 +214,6 @@ async def replace_with_new_websocket( |
181 | 214 | await old_wrapper.close() |
182 | 215 | self._ws_wrapper = WebsocketWrapper(new_ws) |
183 | 216 |
|
184 | | - # Send buffered messages to the new ws |
185 | | - buffered_messages = list(self._buffer.buffer) |
186 | | - for msg in buffered_messages: |
187 | | - try: |
188 | | - await send_transport_message( |
189 | | - msg, |
190 | | - new_ws, |
191 | | - self._begin_close_session_countdown, |
192 | | - ) |
193 | | - except WebsocketClosedException: |
194 | | - logger.info( |
195 | | - "Connection closed while sending buffered messages", exc_info=True |
196 | | - ) |
197 | | - break |
198 | | - except FailedSendingMessageException: |
199 | | - logger.exception("Error while sending buffered messages") |
200 | | - break |
201 | | - |
202 | 217 | async def _get_current_time(self) -> float: |
203 | 218 | return asyncio.get_event_loop().time() |
204 | 219 |
|
@@ -249,30 +264,10 @@ async def send_message( |
249 | 264 | with use_span(span): |
250 | 265 | trace_propagator.inject(msg, None, trace_setter) |
251 | 266 | try: |
252 | | - try: |
253 | | - self._buffer.put(msg) |
254 | | - except MessageBufferClosedError: |
255 | | - # The session is closed and is no longer accepting new messages. |
256 | | - return |
257 | | - async with self._ws_lock: |
258 | | - if not self._ws_wrapper.is_open(): |
259 | | - # If the websocket is closed, we should not send the message |
260 | | - # and wait for the retry from the buffer. |
261 | | - return |
262 | | - await send_transport_message( |
263 | | - msg, self._ws_wrapper.ws, self._begin_close_session_countdown |
264 | | - ) |
265 | | - except WebsocketClosedException as e: |
266 | | - logger.debug( |
267 | | - "Connection closed while sending message %r, waiting for " |
268 | | - "retry from buffer", |
269 | | - type(e), |
270 | | - exc_info=e, |
271 | | - ) |
272 | | - except FailedSendingMessageException: |
273 | | - logger.error( |
274 | | - "Failed sending message, waiting for retry from buffer", exc_info=True |
275 | | - ) |
| 267 | + self._buffer.put(msg) |
| 268 | + except MessageBufferClosedError: |
| 269 | + # The session is closed and is no longer accepting new messages. |
| 270 | + return |
276 | 271 |
|
277 | 272 | async def close_websocket( |
278 | 273 | self, ws_wrapper: WebsocketWrapper, should_retry: bool |
|
0 commit comments