Skip to content

Commit ac95b8e

Browse files
Reflowing message_buffer locks
1 parent d40ec18 commit ac95b8e

File tree

1 file changed

+9
-11
lines changed

1 file changed

+9
-11
lines changed

src/replit_river/message_buffer.py

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,7 @@ class MessageBuffer:
1717
def __init__(self, max_num_messages: int = MAX_MESSAGE_BUFFER_SIZE):
1818
self.max_size = max_num_messages
1919
self.buffer: list[TransportMessage] = []
20-
self._lock = asyncio.Lock()
21-
self._space_available_cond = asyncio.Condition(lock=self._lock)
20+
self._space_available_cond = asyncio.Condition()
2221
self._closed = False
2322

2423
async def has_capacity(self) -> None:
@@ -42,23 +41,22 @@ def get_next_sent_seq(self) -> int | None:
4241
return self.buffer[0].seq
4342
return None
4443

45-
async def peek(self) -> TransportMessage | None:
44+
def peek(self) -> TransportMessage | None:
4645
"""Peek the first message in the buffer, returns None if the buffer is empty."""
47-
async with self._lock:
48-
if len(self.buffer) == 0:
49-
return None
50-
return self.buffer[0]
46+
if len(self.buffer) == 0:
47+
return None
48+
return self.buffer[0]
5149

5250
async def remove_old_messages(self, min_seq: int) -> None:
5351
"""Remove messages in the buffer with a seq number less than min_seq."""
54-
async with self._lock:
55-
self.buffer = [msg for msg in self.buffer if msg.seq >= min_seq]
52+
self.buffer = [msg for msg in self.buffer if msg.seq >= min_seq]
53+
async with self._space_available_cond:
5654
self._space_available_cond.notify_all()
5755

5856
async def close(self) -> None:
5957
"""
6058
Closes the message buffer and rejects any pending put operations.
6159
"""
62-
async with self._lock:
63-
self._closed = True
60+
self._closed = True
61+
async with self._space_available_cond:
6462
self._space_available_cond.notify_all()

0 commit comments

Comments
 (0)