-
Notifications
You must be signed in to change notification settings - Fork 424
Fix sliding sync performance slow down for long lived connections. #19206
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from 25 commits
fc6000c
087f6eb
49fa7eb
8cba313
6303bb1
5c48983
4984858
7a0a8a2
8a3ec20
fc01740
ae3f569
027b422
abee4db
99855ba
0b1ecf1
2090d14
113f6ce
ec45e00
f8f6dc9
5604d3a
815b852
2e844aa
deaf995
cdeebc8
65aebf4
4d4c1b8
e6939e7
69fc61d
56ead16
2d2047d
da08203
2546ca6
ba59391
0a68e12
45d1bfa
4070326
e2b4fe8
b75b3cb
6caacd1
b1bc509
7ff3d2f
008cb58
d3f3f98
0ffb32a
17bf341
85c6754
1adcdaa
aa2c426
1d7b649
6d8950e
bea19c4
91770fc
31c913e
855b448
6fc746c
b63c8ad
c1887b8
bfe05de
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| Fix sliding sync performance slow down for long lived connections. |
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -14,7 +14,7 @@ | |
|
|
||
|
|
||
| import logging | ||
| from typing import TYPE_CHECKING, Mapping, cast | ||
| from typing import TYPE_CHECKING, AbstractSet, Mapping, cast | ||
|
|
||
| import attr | ||
|
|
||
|
|
@@ -26,13 +26,16 @@ | |
| DatabasePool, | ||
| LoggingDatabaseConnection, | ||
| LoggingTransaction, | ||
| make_in_list_sql_clause, | ||
| ) | ||
| from synapse.storage.engines import PostgresEngine | ||
| from synapse.types import MultiWriterStreamToken, RoomStreamToken | ||
| from synapse.types.handlers.sliding_sync import ( | ||
| HaveSentRoom, | ||
| HaveSentRoomFlag, | ||
| MutablePerConnectionState, | ||
| PerConnectionState, | ||
| RoomLazyMembershipChanges, | ||
| RoomStatusMap, | ||
| RoomSyncConfig, | ||
| ) | ||
|
|
@@ -52,6 +55,10 @@ | |
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| # How often to update the last seen timestamp for lazy members. We don't want to | ||
| # update it too often as that causes DB writes. | ||
| LAZY_MEMBERS_UPDATE_INTERVAL_MS = ONE_HOUR_SECONDS * MILLISECONDS_PER_SECOND | ||
erikjohnston marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
erikjohnston marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| # How often to update the `last_used_ts` column on | ||
| # `sliding_sync_connection_positions` when the client uses a connection | ||
| # position. We don't want to update it on every use to avoid excessive | ||
|
|
@@ -378,6 +385,13 @@ def persist_per_connection_state_txn( | |
| value_values=values, | ||
| ) | ||
|
|
||
| self._persist_sliding_sync_connection_lazy_members_txn( | ||
| txn, | ||
| connection_key, | ||
| connection_position, | ||
| per_connection_state.room_lazy_membership, | ||
| ) | ||
|
|
||
| return connection_position | ||
|
|
||
| @cached(iterable=True, max_entries=100000) | ||
|
|
@@ -448,6 +462,19 @@ def _get_and_clear_connection_positions_txn( | |
| """ | ||
| txn.execute(sql, (connection_key, connection_position)) | ||
|
|
||
| # Move any lazy membership entries for this connection position to have | ||
| # `NULL` connection position, indicating that it applies to all future | ||
| # positions on this connecetion. | ||
erikjohnston marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| self.db_pool.simple_update_txn( | ||
| txn, | ||
| table="sliding_sync_connection_lazy_members", | ||
| keyvalues={ | ||
| "connection_key": connection_key, | ||
| "connection_position": connection_position, | ||
| }, | ||
| updatevalues={"connection_position": None}, | ||
| ) | ||
|
|
||
| # Fetch and create a mapping from required state ID to the actual | ||
| # required state for the connection. | ||
| rows = self.db_pool.simple_select_list_txn( | ||
|
|
@@ -527,8 +554,146 @@ def _get_and_clear_connection_positions_txn( | |
| receipts=RoomStatusMap(receipts), | ||
| account_data=RoomStatusMap(account_data), | ||
| room_configs=room_configs, | ||
| room_lazy_membership={}, | ||
| ) | ||
|
|
||
| async def get_sliding_sync_connection_lazy_members( | ||
| self, | ||
| connection_position: int, | ||
| room_id: str, | ||
| user_ids: AbstractSet[str], | ||
| ) -> Mapping[str, int]: | ||
| """Get which user IDs in the room we have previously sent lazy | ||
| membership for. | ||
|
|
||
| Args: | ||
| connection_position: The sliding sync connection position. | ||
| room_id: The room ID to get lazy members for. | ||
| user_ids: The user IDs to check for lazy membership. | ||
erikjohnston marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| Returns: | ||
| The mapping of user IDs to the last seen timestamp for those user | ||
| IDs. | ||
erikjohnston marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| """ | ||
|
|
||
| def get_sliding_sync_connection_lazy_members_txn( | ||
| txn: LoggingTransaction, | ||
| ) -> Mapping[str, int]: | ||
| user_clause, user_args = make_in_list_sql_clause( | ||
| txn.database_engine, "user_id", user_ids | ||
| ) | ||
|
|
||
| sql = f""" | ||
| SELECT user_id, connection_position, last_seen_ts | ||
| FROM sliding_sync_connection_lazy_members AS pos | ||
| WHERE room_id = ? AND {user_clause} | ||
erikjohnston marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| """ | ||
|
|
||
| txn.execute(sql, (room_id, *user_args)) | ||
|
|
||
| # Filter out any cache entries that only apply to forked connection | ||
| # positions. Entries with `NULL` connection position apply to all | ||
| # positions on the connection. | ||
| return { | ||
| user_id: last_seen_ts | ||
| for user_id, db_connection_position, last_seen_ts in txn | ||
| if db_connection_position == connection_position | ||
| or db_connection_position is None | ||
|
Comment on lines
+612
to
+613
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not do this in the query itself?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think mainly to avoid having confusion over different positions in the query. The vast majority of the time the query won't return any extra rows (as that only happens when there has been a forked position, which is rare). If/when we just pass in the
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ⏩ |
||
| } | ||
erikjohnston marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| return await self.db_pool.runInteraction( | ||
| "sliding_sync_connection_lazy_members", | ||
erikjohnston marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| get_sliding_sync_connection_lazy_members_txn, | ||
| db_autocommit=True, # Avoid transaction for single read | ||
| ) | ||
|
|
||
| def _persist_sliding_sync_connection_lazy_members_txn( | ||
| self, | ||
| txn: LoggingTransaction, | ||
| connection_key: int, | ||
| new_connection_position: int, | ||
| all_changes: dict[str, RoomLazyMembershipChanges], | ||
| ) -> None: | ||
| """Persist that we have sent lazy membership for the given user IDs.""" | ||
|
|
||
| now = self.clock.time_msec() | ||
|
|
||
| # Figure out which cache entries to add or update. | ||
| # | ||
| # These are either a) new entries we've never sent before (i.e. with a | ||
| # None last_seen_ts), or b) where the `last_seen_ts` is old enough that | ||
| # we want to update it. | ||
erikjohnston marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| # | ||
| # We don't update the timestamp every time to avoid hammering the DB | ||
| # with writes, and we don't need the timestamp to be precise. It is used | ||
| # to evict old entries that haven't been used in a while. | ||
| to_update: list[tuple[str, str]] = [] | ||
| for room_id, room_changes in all_changes.items(): | ||
| for ( | ||
| user_id, | ||
| last_seen_ts, | ||
| ) in room_changes.returned_user_id_to_last_seen_ts_map.items(): | ||
| if last_seen_ts is None: | ||
| # We've never sent this user before, so we need to record that | ||
| # we've sent it at the new connection position. | ||
| to_update.append((room_id, user_id)) | ||
| elif last_seen_ts + LAZY_MEMBERS_UPDATE_INTERVAL_MS < now: | ||
| # We last saw this user over | ||
| # `LAZY_MEMBERS_UPDATE_INTERVAL_MS` ago, so we update the | ||
| # timestamp (c.f. comment above). | ||
| to_update.append((room_id, user_id)) | ||
|
|
||
| if to_update: | ||
| # Upsert the new/updated entries. | ||
| # | ||
| # Ignore conflicts where the existing entry has a different | ||
| # connection position (i.e. from a forked connection position). This | ||
| # may mean that we lose some updates, but that's acceptable as this | ||
| # is a cache and its fine for it to *not* include rows. (Downstream | ||
| # this will cause us to maybe send a few extra lazy members down | ||
| # sync, but we're allowed to send extra members). | ||
| sql = """ | ||
| INSERT INTO sliding_sync_connection_lazy_members | ||
MadLittleMods marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| (connection_key, connection_position, room_id, user_id, last_seen_ts) | ||
| VALUES {value_placeholder} | ||
| ON CONFLICT (connection_key, room_id, user_id) | ||
| DO UPDATE SET last_seen_ts = EXCLUDED.last_seen_ts | ||
| WHERE sliding_sync_connection_lazy_members.connection_position IS NULL | ||
| OR sliding_sync_connection_lazy_members.connection_position = EXCLUDED.connection_position | ||
| """ | ||
|
|
||
| args = [ | ||
| (connection_key, new_connection_position, room_id, user_id, now) | ||
| for room_id, user_id in to_update | ||
| ] | ||
|
|
||
| if isinstance(self.database_engine, PostgresEngine): | ||
| sql = sql.format(value_placeholder="?") | ||
| txn.execute_values(sql, args, fetch=False) | ||
| else: | ||
| sql = sql.format(value_placeholder="(?, ?, ?, ?, ?)") | ||
| txn.execute_batch(sql, args) | ||
MadLittleMods marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| # Remove any invalidated entries. | ||
| to_remove: list[tuple[str, str]] = [] | ||
| for room_id, room_changes in all_changes.items(): | ||
| for user_id in room_changes.invalidated_user_ids: | ||
| to_remove.append((room_id, user_id)) | ||
|
|
||
| if to_remove: | ||
| # We don't try and match on connection position here: it's fine to | ||
| # remove it from all forks. This is a cache so it's fine to expire | ||
| # arbitrary entries, the worst that happens is we send a few extra | ||
| # lazy members down sync. | ||
| self.db_pool.simple_delete_many_batch_txn( | ||
| txn, | ||
| table="sliding_sync_connection_lazy_members", | ||
| keys=("connection_key", "room_id", "user_id"), | ||
| values=[ | ||
| (connection_key, room_id, user_id) for room_id, user_id in to_remove | ||
| ], | ||
| ) | ||
|
|
||
| @wrap_as_background_process("delete_old_sliding_sync_connections") | ||
| async def delete_old_sliding_sync_connections(self) -> None: | ||
| """Delete sliding sync connections that have not been used for a long time.""" | ||
|
|
@@ -556,6 +721,8 @@ class PerConnectionStateDB: | |
| serialized to strings. | ||
|
|
||
| When persisting this *only* contains updates to the state. | ||
|
|
||
| The `room_lazy_membership` field is only used when persisting. | ||
|
||
| """ | ||
|
|
||
| last_used_ts: int | None | ||
|
|
@@ -566,6 +733,8 @@ class PerConnectionStateDB: | |
|
|
||
| room_configs: Mapping[str, "RoomSyncConfig"] | ||
|
|
||
| room_lazy_membership: dict[str, RoomLazyMembershipChanges] | ||
|
|
||
| @staticmethod | ||
| async def from_state( | ||
| per_connection_state: "MutablePerConnectionState", store: "DataStore" | ||
|
|
@@ -620,6 +789,7 @@ async def from_state( | |
| receipts=RoomStatusMap(receipts), | ||
| account_data=RoomStatusMap(account_data), | ||
| room_configs=per_connection_state.room_configs.maps[0], | ||
| room_lazy_membership=per_connection_state.room_lazy_membership, | ||
| ) | ||
|
|
||
| async def to_state(self, store: "DataStore") -> "PerConnectionState": | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,48 @@ | ||
| -- | ||
| -- This file is licensed under the Affero General Public License (AGPL) version 3. | ||
| -- | ||
| -- Copyright (C) 2025 Element Creations Ltd | ||
| -- | ||
| -- This program is free software: you can redistribute it and/or modify | ||
| -- it under the terms of the GNU Affero General Public License as | ||
| -- published by the Free Software Foundation, either version 3 of the | ||
| -- License, or (at your option) any later version. | ||
| -- | ||
| -- See the GNU Affero General Public License for more details: | ||
| -- <https://www.gnu.org/licenses/agpl-3.0.html>. | ||
|
|
||
|
|
||
| -- Tracks which member states have been sent to the client for lazy-loaded | ||
| -- members in sliding sync. This is a *cache* as it doesn't matter if we send | ||
| -- down members we've previously sent down, i.e. it's safe to delete any rows. | ||
| -- | ||
| -- We track a *rough* `last_seen_ts` for each user in each room which indicates | ||
| -- when we last would've sent their member state to the client. This is used so | ||
| -- that we can remove members which haven't been seen for a while to save space. | ||
| -- | ||
| -- Care must be taken when handling "forked" positions, i.e. we have responded | ||
| -- to a request with a position and then get another different request using the | ||
| -- previous position as a base. We track this by including a | ||
| -- `connection_position` for newly inserted rows. When we advance the position | ||
| -- we set this to NULL for all rows which were present at that position, and | ||
| -- delete all other rows. When reading rows we can then filter out any rows | ||
| -- which have a non-NULL `connection_position` which is not the current | ||
| -- position. | ||
| -- | ||
| -- I.e. `connection_position` is NULL for rows which are valid for *all* | ||
| -- positions on the connection, and is non-NULL for rows which are only valid | ||
| -- for a specific position. | ||
| -- | ||
| -- When invalidating rows, we can just delete them. Technically this could | ||
| -- invalidate for a forked position, but this is acceptable as equivalent to a | ||
| -- cache eviction. | ||
| CREATE TABLE sliding_sync_connection_lazy_members ( | ||
erikjohnston marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| connection_key BIGINT NOT NULL REFERENCES sliding_sync_connections(connection_key) ON DELETE CASCADE, | ||
| connection_position BIGINT REFERENCES sliding_sync_connection_positions(connection_position) ON DELETE CASCADE, | ||
| room_id TEXT NOT NULL, | ||
| user_id TEXT NOT NULL, | ||
| last_seen_ts BIGINT NOT NULL | ||
| ); | ||
|
|
||
| CREATE UNIQUE INDEX sliding_sync_connection_lazy_members_idx ON sliding_sync_connection_lazy_members (connection_key, room_id, user_id); | ||
| CREATE INDEX sliding_sync_connection_lazy_members_pos_idx ON sliding_sync_connection_lazy_members (connection_key, connection_position) WHERE connection_position IS NOT NULL; | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -891,6 +891,43 @@ def __len__(self) -> int: | |
| return len(self.rooms) + len(self.receipts) + len(self.room_configs) | ||
|
|
||
|
|
||
| @attr.s(auto_attribs=True) | ||
| class RoomLazyMembershipChanges: | ||
| """Changes to lazily-loaded room memberships for a given room. | ||
|
|
||
| Attributes: | ||
| returned: Map from user ID to timestamp for users whose membership we | ||
| have lazily loaded. The timestamp indicates the time we previously | ||
| saw the membership if we have sent it down previously, or None if | ||
| we sent it down for the first time. | ||
|
|
||
| Note: this will include users whose membership we would have sent | ||
| down but didn't due to us having previously sent them. | ||
| invalidated: Set of user IDs whose latest membership we have *not* sent | ||
| down | ||
erikjohnston marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| """ | ||
|
|
||
| # A map from user ID -> timestamp. Indicates that those memberships have | ||
| # been lazily loaded. I.e. that either a) we sent those memberships down, or | ||
| # b) we did so previously. The timestamp indicates the time we previously | ||
| # saw the membership. | ||
erikjohnston marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| # | ||
| # We track a *rough* `last_seen_ts` for each user in each room which | ||
| # indicates when we last would've sent their member state to the client. | ||
| # This is used so that we can remove members which haven't been seen for a | ||
| # while to save space. | ||
| returned_user_id_to_last_seen_ts_map: Mapping[str, int | None] = attr.Factory(dict) | ||
|
|
||
| # A set of user IDs whose membership change we have *not* sent | ||
| # down | ||
| invalidated_user_ids: AbstractSet[str] = attr.Factory(set) | ||
|
|
||
| def __bool__(self) -> bool: | ||
| return bool( | ||
| self.returned_user_id_to_last_seen_ts_map or self.invalidated_user_ids | ||
| ) | ||
|
|
||
|
|
||
| @attr.s(auto_attribs=True) | ||
| class MutablePerConnectionState(PerConnectionState): | ||
| """A mutable version of `PerConnectionState`""" | ||
|
|
@@ -903,12 +940,19 @@ class MutablePerConnectionState(PerConnectionState): | |
|
|
||
| room_configs: typing.ChainMap[str, RoomSyncConfig] | ||
|
|
||
| # A map from room ID -> user ID -> timestamp. Indicates that those | ||
| # memberships have been lazily loaded. I.e. that either a) we sent those | ||
| # memberships down, or b) we did so previously. The timestamp indicates the | ||
|
||
| # time we previously saw the membership. | ||
| room_lazy_membership: dict[str, RoomLazyMembershipChanges] = attr.Factory(dict) | ||
|
|
||
| def has_updates(self) -> bool: | ||
| return ( | ||
| bool(self.rooms.get_updates()) | ||
| or bool(self.receipts.get_updates()) | ||
| or bool(self.account_data.get_updates()) | ||
| or bool(self.get_room_config_updates()) | ||
| or bool(self.room_lazy_membership) | ||
| ) | ||
|
|
||
| def get_room_config_updates(self) -> Mapping[str, RoomSyncConfig]: | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.