Skip to content

Commit 54ae65d

Browse files
Reduce lock scope to what matters
1 parent 6335150 commit 54ae65d

File tree

1 file changed

+5
-6
lines changed

1 file changed

+5
-6
lines changed

src/replit_river/message_buffer.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,7 @@ class MessageBuffer:
1717
def __init__(self, max_num_messages: int = MAX_MESSAGE_BUFFER_SIZE):
1818
self.max_size = max_num_messages
1919
self.buffer: list[TransportMessage] = []
20-
self._lock = asyncio.Lock()
21-
self._space_available_cond = asyncio.Condition(lock=self._lock)
20+
self._space_available_cond = asyncio.Condition()
2221
self._closed = False
2322

2423
async def empty(self) -> bool:
@@ -47,14 +46,14 @@ async def peek(self) -> TransportMessage | None:
4746

4847
async def remove_old_messages(self, min_seq: int) -> None:
4948
"""Remove messages in the buffer with a seq number less than min_seq."""
50-
async with self._lock:
51-
self.buffer = [msg for msg in self.buffer if msg.seq >= min_seq]
49+
self.buffer = [msg for msg in self.buffer if msg.seq >= min_seq]
50+
async with self._space_available_cond:
5251
self._space_available_cond.notify_all()
5352

5453
async def close(self) -> None:
5554
"""
5655
Closes the message buffer and rejects any pending put operations.
5756
"""
58-
async with self._lock:
59-
self._closed = True
57+
self._closed = True
58+
async with self._space_available_cond:
6059
self._space_available_cond.notify_all()

0 commit comments

Comments
 (0)