Skip to content

Commit 0b11aa7

Browse files
None of these were async either
1 parent 54ae65d commit 0b11aa7

File tree

6 files changed

+22
-30
lines changed

6 files changed

+22
-30
lines changed

src/replit_river/client_session.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ async def _handle_messages_from_ws(self) -> None:
140140
case other:
141141
assert_never(other)
142142

143-
await self._buffer.remove_old_messages(
143+
self._buffer.remove_old_messages(
144144
self._seq_manager.receiver_ack,
145145
)
146146
self._reset_session_close_countdown()

src/replit_river/message_buffer.py

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,6 @@ def __init__(self, max_num_messages: int = MAX_MESSAGE_BUFFER_SIZE):
2020
self._space_available_cond = asyncio.Condition()
2121
self._closed = False
2222

23-
async def empty(self) -> bool:
24-
"""Check if the buffer is empty"""
25-
return len(self.buffer) == 0
26-
2723
async def put(self, message: TransportMessage) -> None:
2824
"""Add a message to the buffer. Blocks until there is space in the buffer.
2925
@@ -38,19 +34,19 @@ async def put(self, message: TransportMessage) -> None:
3834
raise MessageBufferClosedError("message buffer is closed")
3935
self.buffer.append(message)
4036

41-
async def peek(self) -> TransportMessage | None:
37+
def peek(self) -> TransportMessage | None:
4238
"""Peek the first message in the buffer, returns None if the buffer is empty."""
4339
if len(self.buffer) == 0:
4440
return None
4541
return self.buffer[0]
4642

47-
async def remove_old_messages(self, min_seq: int) -> None:
43+
def remove_old_messages(self, min_seq: int) -> None:
4844
"""Remove messages in the buffer with a seq number less than min_seq."""
4945
self.buffer = [msg for msg in self.buffer if msg.seq >= min_seq]
5046
async with self._space_available_cond:
5147
self._space_available_cond.notify_all()
5248

53-
async def close(self) -> None:
49+
def close(self) -> None:
5450
"""
5551
Closes the message buffer and rejects any pending put operations.
5652
"""

src/replit_river/server_session.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ async def _handle_messages_from_ws(self, tg: asyncio.TaskGroup) -> None:
136136
pass
137137
case other:
138138
assert_never(other)
139-
await self._buffer.remove_old_messages(
139+
self._buffer.remove_old_messages(
140140
self._seq_manager.receiver_ack,
141141
)
142142
self._reset_session_close_countdown()

src/replit_river/session.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ async def get_next_expected_seq(self) -> int:
229229

230230
async def get_next_sent_seq(self) -> int:
231231
"""Get the next sequence number that the client will send."""
232-
nextMessage = await self._buffer.peek()
232+
nextMessage = self._buffer.peek()
233233
if nextMessage:
234234
return nextMessage.seq
235235
return self._seq_manager.get_seq()
@@ -325,7 +325,7 @@ async def close(self) -> None:
325325

326326
await self.close_websocket(self._ws_wrapper, should_retry=False)
327327

328-
await self._buffer.close()
328+
self._buffer.close()
329329

330330
# Clear the session in transports
331331
await self._close_session_callback(self)

tests/test_message_buffer.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ async def put_messages() -> None:
4444
# Wait for the put call to return.
4545
await sync_events.get()
4646
assert len(buffer.buffer) == 1
47-
await buffer.remove_old_messages(i)
47+
buffer.remove_old_messages(i)
4848

4949
await background_puts
5050

@@ -57,7 +57,7 @@ async def test_message_buffer_close() -> None:
5757
buffer = MessageBuffer(max_num_messages=1)
5858
await buffer.put(mock_transport_message(seq=1))
5959
background_put = asyncio.create_task(buffer.put(mock_transport_message(seq=1)))
60-
await buffer.close()
60+
buffer.close()
6161
with pytest.raises(MessageBufferClosedError):
6262
await background_put
6363
with pytest.raises(MessageBufferClosedError):

tests/test_seq_manager.py

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
1-
import asyncio
2-
31
import pytest
42

53
from replit_river.seq_manager import (
6-
IgnoreMessageException,
4+
IgnoreMessage,
75
OutOfOrderMessageException,
86
SeqManager,
97
)
@@ -14,17 +12,17 @@
1412
@pytest.mark.asyncio
1513
async def test_initial_sequence_and_ack_numbers(no_logging_error: NoErrors) -> None:
1614
manager = SeqManager()
17-
assert await manager.get_seq() == 0, "Initial sequence number should be 0"
18-
assert await manager.get_ack() == 0, "Initial acknowledgment number should be 0"
15+
assert manager.get_seq() == 0, "Initial sequence number should be 0"
16+
assert manager.get_ack() == 0, "Initial acknowledgment number should be 0"
1917
no_logging_error()
2018

2119

2220
@pytest.mark.asyncio
2321
async def test_sequence_number_increment(no_logging_error: NoErrors) -> None:
2422
manager = SeqManager()
25-
initial_seq = await manager.get_seq_and_increment()
23+
initial_seq = manager.get_seq_and_increment()
2624
assert initial_seq == 0, "Sequence number should start at 0"
27-
new_seq = await manager.get_seq()
25+
new_seq = manager.get_seq()
2826
assert new_seq == 1, "Sequence number should increment to 1"
2927
no_logging_error()
3028

@@ -33,42 +31,40 @@ async def test_sequence_number_increment(no_logging_error: NoErrors) -> None:
3331
async def test_message_reception(no_logging_error: NoErrors) -> None:
3432
manager = SeqManager()
3533
msg = transport_message(seq=0, ack=0, from_="client")
36-
await manager.check_seq_and_update(
34+
manager.check_seq_and_update(
3735
msg
3836
) # No error should be raised for the correct sequence
39-
assert await manager.get_ack() == 1, "Acknowledgment should be set to 1"
37+
assert manager.get_ack() == 1, "Acknowledgment should be set to 1"
4038

4139
# We assert no errors before we send out-of-order messages
4240
no_logging_error()
4341

4442
# Test duplicate message
45-
with pytest.raises(IgnoreMessageException):
46-
await manager.check_seq_and_update(msg)
43+
assert isinstance(manager.check_seq_and_update(msg), IgnoreMessage)
4744

4845
# Test out of order message
4946
msg.seq = 2
5047
with pytest.raises(OutOfOrderMessageException):
51-
await manager.check_seq_and_update(msg)
48+
manager.check_seq_and_update(msg)
5249

5350

5451
@pytest.mark.asyncio
5552
async def test_acknowledgment_setting(no_logging_error: NoErrors) -> None:
5653
manager = SeqManager()
5754
msg = transport_message(seq=0, ack=0, from_="client")
58-
await manager.check_seq_and_update(msg)
59-
assert await manager.get_ack() == 1, "Acknowledgment number should be updated"
55+
manager.check_seq_and_update(msg)
56+
assert manager.get_ack() == 1, "Acknowledgment number should be updated"
6057
no_logging_error()
6158

6259

6360
@pytest.mark.asyncio
6461
async def test_concurrent_access_to_sequence(no_logging_error: NoErrors) -> None:
6562
manager = SeqManager()
66-
tasks = [manager.get_seq_and_increment() for _ in range(10)]
67-
results = await asyncio.gather(*tasks)
63+
results = [manager.get_seq_and_increment() for _ in range(10)]
6864
assert len(set(results)) == 10, (
6965
"Each increment call should return a unique sequence number"
7066
)
71-
assert await manager.get_seq() == 10, (
67+
assert manager.get_seq() == 10, (
7268
"Final sequence number should be 10 after 10 increments"
7369
)
7470
no_logging_error()

0 commit comments

Comments
 (0)