Skip to content

Commit f9a33f3

Browse files
Inline update_bookkeeping
1 parent a426353 commit f9a33f3

File tree

2 files changed

+14
-13
lines changed

2 files changed

+14
-13
lines changed

src/replit_river/client_session.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@
4040
InitType,
4141
RequestType,
4242
ResponseType,
43-
TransportMessage,
4443
)
4544

4645
logger = logging.getLogger(__name__)
@@ -111,11 +110,6 @@ async def serve(self) -> None:
111110
"Unhandled exceptions on River server", unhandled.exceptions
112111
)
113112

114-
async def _update_book_keeping(self, msg: TransportMessage) -> None:
115-
await self._seq_manager.check_seq_and_update(msg)
116-
await self._buffer.remove_old_messages(self._seq_manager.receiver_ack)
117-
self._reset_session_close_countdown()
118-
119113
async def _handle_messages_from_ws(self) -> None:
120114
logger.debug(
121115
"%s start handling messages from ws %s",
@@ -133,7 +127,13 @@ async def _handle_messages_from_ws(self) -> None:
133127

134128
logger.debug(f"{self._transport_id} got a message %r", msg)
135129

136-
await self._update_book_keeping(msg)
130+
# Update bookkeeping
131+
await self._seq_manager.check_seq_and_update(msg)
132+
await self._buffer.remove_old_messages(
133+
self._seq_manager.receiver_ack,
134+
)
135+
self._reset_session_close_countdown()
136+
137137
if msg.controlFlags & ACK_BIT != 0:
138138
continue
139139
async with self._stream_lock:

src/replit_river/server_session.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -109,11 +109,6 @@ async def serve(self) -> None:
109109
"Unhandled exceptions on River server", unhandled.exceptions
110110
)
111111

112-
async def _update_book_keeping(self, msg: TransportMessage) -> None:
113-
await self._seq_manager.check_seq_and_update(msg)
114-
await self._buffer.remove_old_messages(self._seq_manager.receiver_ack)
115-
self._reset_session_close_countdown()
116-
117112
async def _handle_messages_from_ws(self, tg: asyncio.TaskGroup) -> None:
118113
logger.debug(
119114
"%s start handling messages from ws %s",
@@ -131,7 +126,13 @@ async def _handle_messages_from_ws(self, tg: asyncio.TaskGroup) -> None:
131126

132127
logger.debug(f"{self._transport_id} got a message %r", msg)
133128

134-
await self._update_book_keeping(msg)
129+
# Update bookkeeping
130+
await self._seq_manager.check_seq_and_update(msg)
131+
await self._buffer.remove_old_messages(
132+
self._seq_manager.receiver_ack,
133+
)
134+
self._reset_session_close_countdown()
135+
135136
if msg.controlFlags & ACK_BIT != 0:
136137
continue
137138
async with self._stream_lock:

0 commit comments

Comments
 (0)