@@ -751,6 +751,13 @@ async def send_rpc[R, A](
751751 # Block for backpressure and emission errors from the ws
752752 await backpressured_waiter ()
753753 result = await anext (output )
754+ except asyncio .CancelledError :
755+ await self ._send_cancel_stream (
756+ stream_id = stream_id ,
757+ message = "RPC cancelled" ,
758+ span = span ,
759+ )
760+ raise
754761 except asyncio .TimeoutError as e :
755762 await self ._send_cancel_stream (
756763 stream_id = stream_id ,
@@ -835,6 +842,13 @@ async def send_upload[I, R, A](
835842 payload = payload ,
836843 span = span ,
837844 )
845+ except asyncio .CancelledError :
846+ await self ._send_cancel_stream (
847+ stream_id = stream_id ,
848+ message = "Upload cancelled" ,
849+ span = span ,
850+ )
851+ raise
838852 except Exception as e :
839853 # If we get any exception other than WebsocketClosedException,
840854 # cancel the stream.
@@ -916,6 +930,13 @@ async def send_subscription[I, E, A](
916930 continue
917931 yield response_deserializer (item ["payload" ])
918932 await self ._send_close_stream (stream_id , span )
933+ except asyncio .CancelledError :
934+ await self ._send_cancel_stream (
935+ stream_id = stream_id ,
936+ message = "Subscription cancelled" ,
937+ span = span ,
938+ )
939+ raise
919940 except Exception as e :
920941 await self ._send_cancel_stream (
921942 stream_id = stream_id ,
@@ -1002,6 +1023,13 @@ async def _encode_stream() -> None:
10021023 # ... block the outer function until the emitter is finished emitting,
10031024 # possibly raising a terminal exception.
10041025 await emitter_task
1026+ except asyncio .CancelledError :
1027+ await self ._send_cancel_stream (
1028+ stream_id = stream_id ,
1029+ message = "Stream cancelled" ,
1030+ span = span ,
1031+ )
1032+ raise
10051033 except Exception as e :
10061034 await self ._send_cancel_stream (
10071035 stream_id = stream_id ,
0 commit comments