Skip to content

Commit 0e8cc3a

Browse files
Moving more fields from session to the specialized classes
1 parent 5bd3ad1 commit 0e8cc3a

File tree

3 files changed

+23
-27
lines changed

3 files changed

+23
-27
lines changed

src/replit_river/client_session.py

Lines changed: 19 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -88,24 +88,21 @@ async def serve(self) -> None:
8888
"""Serve messages from the websocket."""
8989
self._reset_session_close_countdown()
9090
try:
91-
async with asyncio.TaskGroup() as tg:
92-
try:
93-
await self._handle_messages_from_ws(tg)
94-
except ConnectionClosed:
95-
if self._retry_connection_callback:
96-
self._task_manager.create_task(
97-
self._retry_connection_callback()
98-
)
99-
100-
await self._begin_close_session_countdown()
101-
logger.debug("ConnectionClosed while serving", exc_info=True)
102-
except FailedSendingMessageException:
103-
# Expected error if the connection is closed.
104-
logger.debug(
105-
"FailedSendingMessageException while serving", exc_info=True
106-
)
107-
except Exception:
108-
logger.exception("caught exception at message iterator")
91+
try:
92+
await self._handle_messages_from_ws()
93+
except ConnectionClosed:
94+
if self._retry_connection_callback:
95+
self._task_manager.create_task(self._retry_connection_callback())
96+
97+
await self._begin_close_session_countdown()
98+
logger.debug("ConnectionClosed while serving", exc_info=True)
99+
except FailedSendingMessageException:
100+
# Expected error if the connection is closed.
101+
logger.debug(
102+
"FailedSendingMessageException while serving", exc_info=True
103+
)
104+
except Exception:
105+
logger.exception("caught exception at message iterator")
109106
except ExceptionGroup as eg:
110107
_, unhandled = eg.split(lambda e: isinstance(e, ConnectionClosed))
111108
if unhandled:
@@ -118,9 +115,10 @@ async def _update_book_keeping(self, msg: TransportMessage) -> None:
118115
await self._remove_acked_messages_in_buffer()
119116
self._reset_session_close_countdown()
120117

121-
async def _handle_messages_from_ws(
122-
self, tg: asyncio.TaskGroup | None = None
123-
) -> None:
118+
async def _remove_acked_messages_in_buffer(self) -> None:
119+
await self._buffer.remove_old_messages(self._seq_manager.receiver_ack)
120+
121+
async def _handle_messages_from_ws(self) -> None:
124122
logger.debug(
125123
"%s start handling messages from ws %s",
126124
"client",

src/replit_river/server_session.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -116,9 +116,10 @@ async def _update_book_keeping(self, msg: TransportMessage) -> None:
116116
await self._remove_acked_messages_in_buffer()
117117
self._reset_session_close_countdown()
118118

119-
async def _handle_messages_from_ws(
120-
self, tg: asyncio.TaskGroup | None = None
121-
) -> None:
119+
async def _remove_acked_messages_in_buffer(self) -> None:
120+
await self._buffer.remove_old_messages(self._seq_manager.receiver_ack)
121+
122+
async def _handle_messages_from_ws(self, tg: asyncio.TaskGroup) -> None:
122123
logger.debug(
123124
"%s start handling messages from ws %s",
124125
"server",

src/replit_river/session.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -331,9 +331,6 @@ async def _add_msg_to_stream(
331331
except RuntimeError as e:
332332
raise InvalidMessageException(e) from e
333333

334-
async def _remove_acked_messages_in_buffer(self) -> None:
335-
await self._buffer.remove_old_messages(self._seq_manager.receiver_ack)
336-
337334
async def close(self) -> None:
338335
"""Close the session and all associated streams."""
339336
logger.info(

0 commit comments

Comments
 (0)