Skip to content

Commit ddc3e5b

Browse files
Adding explicit CancelledError handlers during async waiting loops
1 parent ca72568 commit ddc3e5b

File tree

1 file changed

+32
-1
lines changed

1 file changed

+32
-1
lines changed

src/replit_river/v2/session.py

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -751,6 +751,13 @@ async def send_rpc[R, A](
751751
# Block for backpressure and emission errors from the ws
752752
await backpressured_waiter()
753753
result = await anext(output)
754+
except asyncio.CancelledError:
755+
await self._send_cancel_stream(
756+
stream_id=stream_id,
757+
message="RPC cancelled",
758+
span=span,
759+
)
760+
raise
754761
except asyncio.TimeoutError as e:
755762
await self._send_cancel_stream(
756763
stream_id=stream_id,
@@ -835,6 +842,13 @@ async def send_upload[I, R, A](
835842
payload=payload,
836843
span=span,
837844
)
845+
except asyncio.CancelledError:
846+
await self._send_cancel_stream(
847+
stream_id=stream_id,
848+
message="Upload cancelled",
849+
span=span,
850+
)
851+
raise
838852
except Exception as e:
839853
# If we get any exception other than WebsocketClosedException,
840854
# cancel the stream.
@@ -916,6 +930,13 @@ async def send_subscription[I, E, A](
916930
continue
917931
yield response_deserializer(item["payload"])
918932
await self._send_close_stream(stream_id, span)
933+
except asyncio.CancelledError:
934+
await self._send_cancel_stream(
935+
stream_id=stream_id,
936+
message="Subscription cancelled",
937+
span=span,
938+
)
939+
raise
919940
except Exception as e:
920941
await self._send_cancel_stream(
921942
stream_id=stream_id,
@@ -1002,6 +1023,13 @@ async def _encode_stream() -> None:
10021023
# ... block the outer function until the emitter is finished emitting,
10031024
# possibly raising a terminal exception.
10041025
await emitter_task
1026+
except asyncio.CancelledError:
1027+
await self._send_cancel_stream(
1028+
stream_id=stream_id,
1029+
message="Stream cancelled",
1030+
span=span,
1031+
)
1032+
raise
10051033
except Exception as e:
10061034
await self._send_cancel_stream(
10071035
stream_id=stream_id,
@@ -1084,7 +1112,10 @@ async def _do_ensure_connected[HandshakeMetadata](
10841112
ws: ClientConnection | None = None
10851113
try:
10861114
uri_and_metadata = await uri_and_metadata_factory()
1087-
ws = await websockets.asyncio.client.connect(uri_and_metadata["uri"], max_size=None)
1115+
ws = await websockets.asyncio.client.connect(
1116+
uri_and_metadata["uri"],
1117+
max_size=None,
1118+
)
10881119
transition_connecting(ws)
10891120

10901121
try:

0 commit comments

Comments
 (0)