Skip to content

Commit 102cf66

Browse files
Send CANCEL correctly
1 parent e759093 commit 102cf66

File tree

1 file changed

+26
-4
lines changed

1 file changed

+26
-4
lines changed

src/replit_river/v2/session.py

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -642,6 +642,7 @@ async def send_rpc[R, A](
642642
except asyncio.TimeoutError as e:
643643
await self._send_cancel_stream(
644644
stream_id=stream_id,
645+
message="Timeout, abandoning request",
645646
span=span,
646647
)
647648
raise RiverException(ERROR_CODE_CANCEL, str(e)) from e
@@ -705,6 +706,7 @@ async def send_upload[I, R, A](
705706
except Exception as e:
706707
await self._send_cancel_stream(
707708
stream_id=stream_id,
709+
message="Request serialization error",
708710
span=span,
709711
)
710712
raise RiverServiceException(
@@ -726,6 +728,7 @@ async def send_upload[I, R, A](
726728
# cancel the stream.
727729
await self._send_cancel_stream(
728730
stream_id=stream_id,
731+
message="Unspecified error",
729732
span=span,
730733
)
731734
raise RiverServiceException(
@@ -746,7 +749,11 @@ async def send_upload[I, R, A](
746749
procedure_name,
747750
) from e
748751
except Exception as e:
749-
await self._send_cancel_stream(stream_id, span)
752+
await self._send_cancel_stream(
753+
stream_id=stream_id,
754+
message="Unspecified error",
755+
span=span,
756+
)
750757
raise RiverException(ERROR_CODE_STREAM_CLOSED, str(e)) from e
751758

752759
if "ok" not in result or not result["ok"]:
@@ -793,7 +800,11 @@ async def send_subscription[I, E, A](
793800
yield error_deserializer(item["payload"])
794801
yield response_deserializer(item["payload"])
795802
except Exception as e:
796-
await self._send_cancel_stream(stream_id, span)
803+
await self._send_cancel_stream(
804+
stream_id=stream_id,
805+
message="Unspecified error",
806+
span=span,
807+
)
797808
raise RiverServiceException(
798809
ERROR_CODE_STREAM_CLOSED,
799810
"Stream closed before response",
@@ -877,7 +888,11 @@ async def _encode_stream() -> None:
877888
# ... block the outer function until the emitter is finished emitting.
878889
await emitter_task
879890
except Exception as e:
880-
await self._send_cancel_stream(stream_id, span)
891+
await self._send_cancel_stream(
892+
stream_id=stream_id,
893+
message="Unspecified error",
894+
span=span,
895+
)
881896
raise RiverServiceException(
882897
ERROR_CODE_STREAM_CLOSED,
883898
"Stream closed before response",
@@ -888,12 +903,19 @@ async def _encode_stream() -> None:
888903
async def _send_cancel_stream(
889904
self,
890905
stream_id: str,
906+
message: str,
891907
span: Span,
892908
) -> None:
893909
await self._enqueue_message(
894910
stream_id=stream_id,
895911
control_flags=STREAM_CANCEL_BIT,
896-
payload={"type": "CANCEL"},
912+
payload={
913+
"ok": False,
914+
"payload": {
915+
"code": "CANCEL",
916+
"message": message,
917+
},
918+
},
897919
span=span,
898920
)
899921

0 commit comments

Comments
 (0)