Skip to content

Commit ae69cdc

Browse files
Clarifying termination semantics
1 parent 72a005d commit ae69cdc

File tree

1 file changed

+10
-6
lines changed

1 file changed

+10
-6
lines changed

src/replit_river/v2/session.py

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -494,7 +494,8 @@ def _start_buffered_message_sender(
494494
Building on buffered_message_sender's documentation, we implement backpressure
495495
per-stream by way of self._streams'
496496
497-
error_channel: Channel[Exception | None]
497+
error_channel: Channel[Exception]
498+
backpressured_waiter: Callable[[], Awaitable[None]]
498499
499500
This is accomplished via the following strategy:
500501
- If buffered_message_sender encounters an error, we transition back to
@@ -506,8 +507,11 @@ def _start_buffered_message_sender(
506507
- Alternately, if buffered_message_sender successfully writes back to the
507508
508509
- Finally, if _recv_from_ws encounters an error (transport or deserialization),
509-
we emit an informative error to close_session which gets emitted to all
510-
backpressured client methods.
510+
it transitions to NO_CONNECTION and defers to the client_transport to
511+
reestablish a connection.
512+
513+
The in-flight messages are still valid, as if we can reconnect to the server
514+
in time, those responses can be marshalled to their respective callbacks.
511515
"""
512516

513517
async def commit(msg: TransportMessage) -> None:
@@ -789,7 +793,7 @@ async def send_upload[I, R, A](
789793
# If this request is not closed and the session is killed, we should
790794
# throw exception here
791795
async for item in request:
792-
# Block for backpressure and emission errors from the ws
796+
# Block for backpressure
793797
await backpressured_waiter()
794798
try:
795799
payload = request_serializer(item)
@@ -950,9 +954,9 @@ async def _encode_stream() -> None:
950954
assert request_serializer, "send_stream missing request_serializer"
951955

952956
async for item in request:
953-
# Block for backpressure (or errors)
957+
# Block for backpressure
954958
await backpressured_waiter()
955-
# If there are any errors so far, raise them
959+
956960
await self._enqueue_message(
957961
stream_id=stream_id,
958962
control_flags=0,

0 commit comments

Comments
 (0)