Skip to content

Commit 8833f67

Browse files
committed
deadlock patch
1 parent b536b64 commit 8833f67

File tree

2 files changed

+38
-37
lines changed

2 files changed

+38
-37
lines changed

src/replit_river/server_transport.py

Lines changed: 37 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -122,28 +122,12 @@ async def _get_or_create_session(
122122
websocket: WebSocketCommonProtocol,
123123
) -> ServerSession:
124124
new_session: ServerSession | None = None
125-
old_session: ServerSession | None = await self._get_existing_session(to_id)
126-
if not old_session:
127-
logger.info(
128-
'Creating new session with "%s" using ws: %s', to_id, websocket.id
129-
)
130-
new_session = ServerSession(
131-
transport_id,
132-
to_id,
133-
session_id,
134-
websocket,
135-
self._transport_options,
136-
self._handlers,
137-
close_session_callback=self._delete_session,
138-
)
139-
else:
140-
if old_session.session_id != session_id:
125+
old_session: ServerSession | None = None
126+
async with self._session_lock:
127+
old_session = self._sessions.get(to_id)
128+
if not old_session:
141129
logger.info(
142-
'Create new session with "%s" for session id %s'
143-
" and close old session %s",
144-
to_id,
145-
session_id,
146-
old_session.session_id,
130+
'Creating new session with "%s" using ws: %s', to_id, websocket.id
147131
)
148132
new_session = ServerSession(
149133
transport_id,
@@ -155,26 +139,43 @@ async def _get_or_create_session(
155139
close_session_callback=self._delete_session,
156140
)
157141
else:
158-
# If the instance id is the same, we reuse the session and assign
159-
# a new websocket to it.
160-
logger.debug(
161-
'Reuse old session with "%s" using new ws: %s',
162-
to_id,
163-
websocket.id,
164-
)
165-
try:
166-
await old_session.replace_with_new_websocket(websocket)
167-
new_session = old_session
168-
except FailedSendingMessageException as e:
169-
raise e
142+
if old_session.session_id != session_id:
143+
logger.info(
144+
'Create new session with "%s" for session id %s'
145+
" and close old session %s",
146+
to_id,
147+
session_id,
148+
old_session.session_id,
149+
)
150+
new_session = ServerSession(
151+
transport_id,
152+
to_id,
153+
session_id,
154+
websocket,
155+
self._transport_options,
156+
self._handlers,
157+
close_session_callback=self._delete_session,
158+
)
159+
else:
160+
# If the instance id is the same, we reuse the session and assign
161+
# a new websocket to it.
162+
logger.debug(
163+
'Reuse old session with "%s" using new ws: %s',
164+
to_id,
165+
websocket.id,
166+
)
167+
try:
168+
await old_session.replace_with_new_websocket(websocket)
169+
new_session = old_session
170+
except FailedSendingMessageException as e:
171+
raise e
172+
173+
self._sessions[new_session._to_id] = new_session
170174

171175
if old_session and new_session != old_session:
172176
logger.info("Closing stale session %s", old_session.session_id)
173177
await old_session.close()
174178

175-
async with self._session_lock:
176-
self._sessions[new_session._to_id] = new_session
177-
178179
return new_session
179180

180181
async def _send_handshake_response(

tests/test_communication.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,7 @@ async def test_rpc_method_reconnect(client: Client) -> None:
284284
)
285285
assert response == "Hello, Alice!"
286286

287-
await client._transport._close_all_sessions(client._transport._get_all_sessions)
287+
await client._transport._close_all_sessions()
288288

289289
response = await client.send_rpc(
290290
"test_service",

0 commit comments

Comments
 (0)