Skip to content

Commit ccef083

Browse files
session_id -> stream_id, as well as close error_channel
1 parent 1bd479d commit ccef083

File tree

1 file changed

+12
-3
lines changed

1 file changed

+12
-3
lines changed

src/replit_river/v2/session.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -549,7 +549,7 @@ async def block_until_connected() -> None:
549549
@asynccontextmanager
550550
async def _with_stream(
551551
self,
552-
session_id: str,
552+
stream_id: str,
553553
maxsize: int,
554554
) -> AsyncIterator[tuple[Channel[Exception | None], Channel[ResultType]]]:
555555
"""
@@ -564,11 +564,20 @@ async def _with_stream(
564564
"""
565565
output: Channel[Any] = Channel(maxsize=maxsize)
566566
error_channel: Channel[Exception | None] = Channel(maxsize=1)
567-
self._streams[session_id] = (error_channel, output)
567+
self._streams[stream_id] = (error_channel, output)
568568
try:
569569
yield (error_channel, output)
570570
finally:
571-
del self._streams[session_id]
571+
stream_meta = self._streams.get(stream_id)
572+
if not stream_meta:
573+
logger.warning("_with_stream had an entry deleted out from under it", extra={
574+
"session_id": self.session_id,
575+
"stream_id": stream_id,
576+
})
577+
return
578+
# We need to signal back to all emitters or waiters that we're gone
579+
stream_meta[0].close()
580+
del self._streams[stream_id]
572581

573582
async def send_rpc[R, A](
574583
self,

0 commit comments

Comments
 (0)