Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
11fcf17
Remove noqa again, it was not necessary
blast-hardcheese Mar 18, 2025
6aa87be
Bubble state out of heartbeat
blast-hardcheese Mar 18, 2025
b0f989b
Break out heartbeat lifecycle
blast-hardcheese Mar 18, 2025
cfc25ac
Break out "ServerSession" type
blast-hardcheese Mar 18, 2025
bd36123
Flattening Transport into ClientTransport and ServerTransport
blast-hardcheese Mar 18, 2025
1c9e76d
Disambiguate between builtin type
blast-hardcheese Mar 19, 2025
034005f
Split serve() functionality between client and server
blast-hardcheese Mar 19, 2025
6e1b781
Remove handlers from Client*
blast-hardcheese Mar 19, 2025
42ead41
Strip is_server from Session __init__
blast-hardcheese Mar 19, 2025
c8ace77
Adding __init__ to ClientSession
blast-hardcheese Mar 19, 2025
7f0c323
Remove is_server
blast-hardcheese Mar 19, 2025
2db0827
Moving more fields from session to the specialized classes
blast-hardcheese Mar 19, 2025
a68b8ff
Resolving circular import
blast-hardcheese Mar 19, 2025
403a447
Bubble state out of check_to_close_session
blast-hardcheese Mar 19, 2025
b3952d2
Moving add_msg_to_stream out
blast-hardcheese Mar 19, 2025
8d9161e
Moving send_responses_from_output_stream to server_session
blast-hardcheese Mar 19, 2025
36aee31
Turns out _send_buffered_messages was only used in one place
blast-hardcheese Mar 19, 2025
628a8ae
Unused
blast-hardcheese Mar 19, 2025
a426353
Inline
blast-hardcheese Mar 19, 2025
f9a33f3
Inline update_bookkeeping
blast-hardcheese Mar 19, 2025
46afc33
Unnest ExpectedSessionState constructor
blast-hardcheese Mar 19, 2025
accd729
Remove sleep
blast-hardcheese Mar 19, 2025
3e5ad7f
Adding test for deadlock
blast-hardcheese Mar 19, 2025
d6e57a6
Apply Jacky's patch
blast-hardcheese Mar 19, 2025
b536b64
Inlining no-longer-invariant _sessions access
blast-hardcheese Mar 19, 2025
8833f67
deadlock patch
jackyzha0 Mar 20, 2025
a0011b2
Merge branch 'main' into jacky-deadlock-patch
jackyzha0 Mar 20, 2025
1fad3f1
resolve merge
jackyzha0 Mar 20, 2025
359e4d3
let get_or_create replace ws
jackyzha0 Mar 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/replit_river/client_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,6 @@ async def _establish_handshake(
# If the session status is mismatched, we should close the old session
# and let the retry logic to create a new session.
await old_session.close()
await self._delete_session(old_session)

raise RiverException(
ERROR_HANDSHAKE,
Expand Down
23 changes: 10 additions & 13 deletions src/replit_river/server_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,11 @@ async def _get_or_create_session(
session_id: str,
websocket: WebSocketCommonProtocol,
) -> ServerSession:
new_session: ServerSession | None = None
old_session: ServerSession | None = None
async with self._session_lock:
session_to_close: Session | None = None
new_session: ServerSession | None = None
if to_id not in self._sessions:
old_session = self._sessions.get(to_id)
if not old_session:
logger.info(
'Creating new session with "%s" using ws: %s', to_id, websocket.id
)
Expand All @@ -134,7 +135,6 @@ async def _get_or_create_session(
close_session_callback=self._delete_session,
)
else:
old_session = self._sessions[to_id]
if old_session.session_id != session_id:
logger.info(
'Create new session with "%s" for session id %s'
Expand All @@ -143,7 +143,6 @@ async def _get_or_create_session(
session_id,
old_session.session_id,
)
session_to_close = old_session
new_session = ServerSession(
transport_id,
to_id,
Expand All @@ -167,10 +166,12 @@ async def _get_or_create_session(
except FailedSendingMessageException as e:
raise e

if session_to_close:
logger.info("Closing stale session %s", session_to_close.session_id)
await session_to_close.close()
self._sessions[new_session._to_id] = new_session

if old_session and new_session != old_session:
logger.info("Closing stale session %s", old_session.session_id)
await old_session.close()

return new_session

async def _send_handshake_response(
Expand Down Expand Up @@ -247,7 +248,7 @@ async def _establish_handshake(
raise InvalidMessageException("handshake request to wrong server")

async with self._session_lock:
old_session = self._sessions.get(request_message.from_, None)
old_session = self._sessions.get(request_message.from_)
client_next_expected_seq = (
handshake_request.expectedSessionState.nextExpectedSeq
)
Expand Down Expand Up @@ -285,10 +286,6 @@ async def _establish_handshake(
)
raise SessionStateMismatchException(message)
elif old_session:
# we have an old session but the session id is different
# just delete the old session
await old_session.close()
await self._delete_session(old_session)
old_session = None

if not old_session and (
Expand Down
2 changes: 0 additions & 2 deletions tests/river_fixtures/clientserver.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import asyncio
import logging
from typing import AsyncGenerator, Literal

Expand Down Expand Up @@ -64,7 +63,6 @@ async def websocket_uri_factory() -> UriAndMetadata[None]:
await client.close()

finally:
await asyncio.sleep(1)
logging.debug("Start closing test server")
if binding:
binding.close()
Expand Down
29 changes: 29 additions & 0 deletions tests/test_communication.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,3 +268,32 @@ async def test_ignore_flood_subscription(client: Client) -> None:
timedelta(seconds=20),
)
assert response == "Hello, Alice!"


@pytest.mark.asyncio
@pytest.mark.parametrize("handlers", [{**basic_rpc_method}])
async def test_rpc_method_reconnect(client: Client) -> None:
response = await client.send_rpc(
"test_service",
"rpc_method",
"Alice",
serialize_request,
deserialize_response,
deserialize_error,
timedelta(seconds=20),
)
assert response == "Hello, Alice!"

await client._transport._close_all_sessions()

response = await client.send_rpc(
"test_service",
"rpc_method",
"Bob",
serialize_request,
deserialize_response,
deserialize_error,
timedelta(seconds=20),
)

assert response == "Hello, Bob!"
Loading