Skip to content

Commit 0934ebd

Browse files
Clarify enqueue vs send semantics
1 parent 5c1d278 commit 0934ebd

File tree

1 file changed

+13
-13
lines changed

1 file changed

+13
-13
lines changed

src/replit_river/v2/session.py

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -339,7 +339,7 @@ def _reset_session_close_countdown(self) -> None:
339339
self._heartbeat_misses = 0
340340
self._close_session_after_time_secs = None
341341

342-
async def _send_message(
342+
async def _enqueue_message(
343343
self,
344344
stream_id: str,
345345
payload: dict[Any, Any] | str,
@@ -350,7 +350,7 @@ async def _send_message(
350350
) -> None:
351351
"""Send serialized messages to the websockets."""
352352
logger.debug(
353-
"_send_message(stream_id=%r, payload=%r, control_flags=%r, "
353+
"_enqueue_message(stream_id=%r, payload=%r, control_flags=%r, "
354354
"service_name=%r, procedure_name=%r)",
355355
stream_id,
356356
payload,
@@ -568,7 +568,7 @@ async def block_until_connected() -> None:
568568
close_session=self.close,
569569
assert_incoming_seq_bookkeeping=assert_incoming_seq_bookkeeping,
570570
get_stream=lambda stream_id: self._streams.get(stream_id),
571-
send_message=self._send_message,
571+
enqueue_message=self._enqueue_message,
572572
)
573573
)
574574

@@ -612,7 +612,7 @@ async def send_rpc[R, A](
612612
Expects the input and output be messages that will be msgpacked.
613613
"""
614614
stream_id = nanoid.generate()
615-
await self._send_message(
615+
await self._enqueue_message(
616616
stream_id=stream_id,
617617
control_flags=STREAM_OPEN_BIT | STREAM_CLOSED_BIT,
618618
payload=request_serializer(request),
@@ -673,7 +673,7 @@ async def send_upload[I, R, A](
673673
Expects the input and output be messages that will be msgpacked.
674674
"""
675675
stream_id = nanoid.generate()
676-
await self._send_message(
676+
await self._enqueue_message(
677677
stream_id=stream_id,
678678
control_flags=STREAM_OPEN_BIT,
679679
service_name=service_name,
@@ -692,7 +692,7 @@ async def send_upload[I, R, A](
692692
if output.closed():
693693
logger.debug("Stream is closed, avoid sending the rest")
694694
break
695-
await self._send_message(
695+
await self._enqueue_message(
696696
stream_id=stream_id,
697697
service_name=service_name,
698698
procedure_name=procedure_name,
@@ -757,7 +757,7 @@ async def send_subscription[R, E, A](
757757
Expects the input and output be messages that will be msgpacked.
758758
"""
759759
stream_id = nanoid.generate()
760-
await self._send_message(
760+
await self._enqueue_message(
761761
service_name=service_name,
762762
procedure_name=procedure_name,
763763
stream_id=stream_id,
@@ -812,7 +812,7 @@ async def send_stream[I, R, E, A](
812812
"""
813813

814814
stream_id = nanoid.generate()
815-
await self._send_message(
815+
await self._enqueue_message(
816816
service_name=service_name,
817817
procedure_name=procedure_name,
818818
stream_id=stream_id,
@@ -843,7 +843,7 @@ async def _encode_stream() -> None:
843843
if output.closed():
844844
logger.debug("Stream is closed, avoid sending the rest")
845845
break
846-
await self._send_message(
846+
await self._enqueue_message(
847847
stream_id=stream_id,
848848
control_flags=0,
849849
payload=request_serializer(item),
@@ -888,7 +888,7 @@ async def _send_cancel_stream(
888888
extra_control_flags: int,
889889
span: Span,
890890
) -> None:
891-
await self._send_message(
891+
await self._enqueue_message(
892892
stream_id=stream_id,
893893
control_flags=STREAM_CANCEL_BIT | extra_control_flags,
894894
payload={"type": "CANCEL"},
@@ -900,7 +900,7 @@ async def _send_close_stream(
900900
stream_id: str,
901901
span: Span,
902902
) -> None:
903-
await self._send_message(
903+
await self._enqueue_message(
904904
stream_id=stream_id,
905905
control_flags=STREAM_CLOSED_BIT,
906906
payload={"type": "CLOSE"},
@@ -1080,7 +1080,7 @@ async def _recv_from_ws(
10801080
[str, int, int], Literal[True] | _IgnoreMessage
10811081
],
10821082
get_stream: Callable[[str], tuple[asyncio.Event, Channel[Any]] | None],
1083-
send_message: SendMessage[None],
1083+
enqueue_message: SendMessage[None],
10841084
) -> None:
10851085
"""Serve messages from the websocket.
10861086
@@ -1161,7 +1161,7 @@ async def _recv_from_ws(
11611161

11621162
# Shortcut to avoid processing ack packets
11631163
if msg.controlFlags & ACK_BIT != 0:
1164-
await send_message(
1164+
await enqueue_message(
11651165
stream_id="heartbeat",
11661166
# TODO: make this a message class
11671167
# https://github.com/replit/river/blob/741b1ea6d7600937ad53564e9cf8cd27a92ec36a/transport/message.ts#L42

0 commit comments

Comments
 (0)