Skip to content

Commit 31084f4

Browse files
Propagating stream_id into logs
1 parent b5971b6 commit 31084f4

File tree

3 files changed

+39
-5
lines changed

3 files changed

+39
-5
lines changed

src/replit_river/error_schema.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,8 +86,10 @@ class SessionClosedRiverServiceException(RiverException):
8686
def __init__(
8787
self,
8888
message: str,
89+
streamId: str,
8990
) -> None:
9091
super().__init__(SYNTHETIC_ERROR_CODE_SESSION_CLOSED, message)
92+
self.streamId = streamId
9193

9294

9395
def exception_from_message(code: str) -> type[RiverServiceException]:

src/replit_river/task_manager.py

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,11 @@
22
import logging
33
from typing import Coroutine, Set
44

5-
from replit_river.error_schema import ERROR_CODE_STREAM_CLOSED, RiverException
5+
from replit_river.error_schema import (
6+
ERROR_CODE_STREAM_CLOSED,
7+
RiverException,
8+
SessionClosedRiverServiceException,
9+
)
610

711
logger = logging.getLogger(__name__)
812

@@ -37,6 +41,13 @@ async def cancel_task(
3741
# If we cancel the task manager we will get called here as well,
3842
# if we want to handle the cancellation differently we can do it here.
3943
logger.debug("Task was cancelled %r", task_to_remove)
44+
except SessionClosedRiverServiceException as e:
45+
logger.warning(
46+
"Session was closed",
47+
extra={
48+
"stream_id": e.streamId,
49+
},
50+
)
4051
except RiverException as e:
4152
if e.code == ERROR_CODE_STREAM_CLOSED:
4253
# Task is cancelled
@@ -76,6 +87,14 @@ def _task_done_callback(
7687
):
7788
# Task is cancelled
7889
pass
90+
elif isinstance(exception, SessionClosedRiverServiceException):
91+
# Session is closed, don't bother logging
92+
logger.info(
93+
"Session closed",
94+
extra={
95+
"stream_id": exception.streamId,
96+
},
97+
)
7998
else:
8099
logger.error(
81100
"Exception on cancelling task",

src/replit_river/v2/session.py

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -353,6 +353,7 @@ async def _enqueue_message(
353353
# session is closing / closed, raise
354354
raise SessionClosedRiverServiceException(
355355
"river session is closed, dropping message",
356+
stream_id,
356357
)
357358

358359
# Begin critical section: Avoid any await between here and _send_buffer.append
@@ -448,14 +449,15 @@ async def do_close() -> None:
448449

449450
await self._task_manager.cancel_all_tasks()
450451

451-
for stream_meta in self._streams.values():
452+
for stream_id, stream_meta in self._streams.items():
452453
stream_meta["output"].close()
453454
# Wake up backpressured writers
454455
try:
455456
stream_meta["error_channel"].put_nowait(
456457
reason
457458
or SessionClosedRiverServiceException(
458459
"river session is closed",
460+
stream_id,
459461
)
460462
)
461463
except ChannelFull:
@@ -1023,12 +1025,14 @@ async def _encode_stream() -> None:
10231025
# ... block the outer function until the emitter is finished emitting,
10241026
# possibly raising a terminal exception.
10251027
await emitter_task
1026-
except asyncio.CancelledError:
1028+
except asyncio.CancelledError as e:
10271029
await self._send_cancel_stream(
10281030
stream_id=stream_id,
10291031
message="Stream cancelled",
10301032
span=span,
10311033
)
1034+
if emitter_task.done() and (err := emitter_task.exception()):
1035+
raise e from err
10321036
raise
10331037
except Exception as e:
10341038
await self._send_cancel_stream(
@@ -1316,6 +1320,7 @@ async def _recv_from_ws(
13161320
# the outer loop.
13171321
await transition_no_connection()
13181322
break
1323+
msg: TransportMessage | str | None = None
13191324
try:
13201325
msg = parse_transport_msg(message)
13211326
logger.debug(
@@ -1395,19 +1400,27 @@ async def _recv_from_ws(
13951400
stream_meta["output"].close()
13961401
except OutOfOrderMessageException:
13971402
logger.exception("Out of order message, closing connection")
1403+
stream_id = "unknown"
1404+
if isinstance(msg, TransportMessage):
1405+
stream_id = msg.streamId
13981406
close_session(
13991407
SessionClosedRiverServiceException(
1400-
"Out of order message, closing connection"
1408+
"Out of order message, closing connection",
1409+
stream_id,
14011410
)
14021411
)
14031412
continue
14041413
except InvalidMessageException:
14051414
logger.exception(
14061415
"Got invalid transport message, closing session",
14071416
)
1417+
stream_id = "unknown"
1418+
if isinstance(msg, TransportMessage):
1419+
stream_id = msg.streamId
14081420
close_session(
14091421
SessionClosedRiverServiceException(
1410-
"Out of order message, closing connection"
1422+
"Out of order message, closing connection",
1423+
stream_id,
14111424
)
14121425
)
14131426
continue

0 commit comments

Comments
 (0)