Skip to content

Commit df0e0c4

Browse files
Inline add_msg_to_stream
1 parent a5dd3e0 commit df0e0c4

File tree

4 files changed

+23
-23
lines changed

4 files changed

+23
-23
lines changed

src/replit_river/client_session.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
from opentelemetry.trace import Span
1212
from websockets.exceptions import ConnectionClosed
1313

14-
from replit_river.common_session import add_msg_to_stream
1514
from replit_river.error_schema import (
1615
ERROR_CODE_CANCEL,
1716
ERROR_CODE_STREAM_CLOSED,
@@ -154,7 +153,14 @@ async def _handle_messages_from_ws(self) -> None:
154153
# close message is not sent to the stream
155154
pass
156155
else:
157-
await add_msg_to_stream(msg, stream)
156+
try:
157+
await stream.put(msg.payload)
158+
except ChannelClosed:
159+
# The client is no longer interested in this stream,
160+
# just drop the message.
161+
pass
162+
except RuntimeError as e:
163+
raise InvalidMessageException(e) from e
158164
else:
159165
raise InvalidMessageException(
160166
"Client should not receive stream open bit"

src/replit_river/codegen/typing.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -213,16 +213,20 @@ def extract_inner_type(value: TypeExpression) -> TypeName:
213213
raise ValueError(f"Unexpected literal type: {repr(value)}")
214214
case UnionTypeExpr(_):
215215
raise ValueError(
216-
f"Attempting to extract from a union, currently not possible: {repr(value)}"
216+
"Attempting to extract from a union, "
217+
f"currently not possible: {repr(value)}"
217218
)
218219
case OpenUnionTypeExpr(_):
219220
raise ValueError(
220-
f"Attempting to extract from a union, currently not possible: {repr(value)}"
221+
"Attempting to extract from a union, "
222+
f"currently not possible: {repr(value)}"
221223
)
222224
case TypeName(name):
223225
return TypeName(name)
224226
case NoneTypeExpr():
225-
raise ValueError(f"Attempting to extract from a literal 'None': {repr(value)}")
227+
raise ValueError(
228+
f"Attempting to extract from a literal 'None': {repr(value)}",
229+
)
226230
case other:
227231
assert_never(other)
228232

src/replit_river/common_session.py

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import logging
44
from typing import Any, Awaitable, Callable, Coroutine, Protocol
55

6-
from aiochannel import Channel, ChannelClosed
76
from opentelemetry.trace import Span
87
from websockets import WebSocketCommonProtocol
98
from websockets.asyncio.client import ClientConnection
@@ -14,7 +13,6 @@
1413
send_transport_message,
1514
)
1615
from replit_river.rpc import ACK_BIT, TransportMessage
17-
from replit_river.seq_manager import InvalidMessageException
1816

1917
logger = logging.getLogger(__name__)
2018

@@ -181,17 +179,3 @@ async def buffered_message_sender(
181179
logger.exception("Error attempting to send buffered messages")
182180
message_enqueued.release()
183181
break
184-
185-
186-
async def add_msg_to_stream(
187-
msg: TransportMessage,
188-
stream: Channel[Any],
189-
) -> None:
190-
try:
191-
await stream.put(msg.payload)
192-
except ChannelClosed:
193-
# The client is no longer interested in this stream,
194-
# just drop the message.
195-
pass
196-
except RuntimeError as e:
197-
raise InvalidMessageException(e) from e

src/replit_river/server_session.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
88
from websockets.exceptions import ConnectionClosed
99

10-
from replit_river.common_session import add_msg_to_stream
1110
from replit_river.messages import (
1211
FailedSendingMessageException,
1312
parse_transport_msg,
@@ -151,7 +150,14 @@ async def _handle_messages_from_ws(self, tg: asyncio.TaskGroup) -> None:
151150
# close message is not sent to the stream
152151
pass
153152
else:
154-
await add_msg_to_stream(msg, stream)
153+
try:
154+
await stream.put(msg.payload)
155+
except ChannelClosed:
156+
# The client is no longer interested in this stream,
157+
# just drop the message.
158+
pass
159+
except RuntimeError as e:
160+
raise InvalidMessageException(e) from e
155161
else:
156162
_stream = await self._open_stream_and_call_handler(msg, tg)
157163
if not stream:

0 commit comments

Comments
 (0)