Skip to content

Commit 9325d45

Browse files
committed
add _wait_for_stream_event helper function
1 parent 544e26d commit 9325d45

File tree

1 file changed

+19
-2
lines changed

1 file changed

+19
-2
lines changed

libp2p/stream_muxer/yamux/yamux.py

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -501,14 +501,31 @@ async def read_stream(self, stream_id: int, n: int = -1) -> bytes:
501501
# Wait for data if stream is still open
502502
logger.debug(f"Waiting for data on stream {self.peer_id}:{stream_id}")
503503
try:
504-
await self.stream_events[stream_id].wait()
505-
self.stream_events[stream_id] = trio.Event()
504+
await self._wait_for_stream_event(stream_id)
506505
except KeyError:
507506
raise MuxedStreamEOF("Stream was removed")
508507

509508
# This line should never be reached, but satisfies the type checker
510509
raise MuxedStreamEOF("Unexpected end of read_stream")
511510

511+
async def _wait_for_stream_event(self, stream_id: int) -> None:
512+
async with self.streams_lock:
513+
if stream_id not in self.stream_events or self.event_shutting_down.is_set():
514+
return
515+
event = self.stream_events[stream_id]
516+
517+
try:
518+
await event.wait()
519+
except trio.Cancelled:
520+
raise
521+
522+
async with self.streams_lock:
523+
if (
524+
stream_id in self.stream_events
525+
and not self.event_shutting_down.is_set()
526+
):
527+
self.stream_events[stream_id] = trio.Event()
528+
512529
async def handle_incoming(self) -> None:
513530
while not self.event_shutting_down.is_set():
514531
try:

0 commit comments

Comments
 (0)