Skip to content

Commit eea1804

Browse files
committed
Track lazy loaded members in SSS separately.
This ensures that the set of required state doesn't keep growing as we add and remove member state. We then only load them from the DB when needed, rather than all state for all rooms when we get a request.
1 parent 49fa7eb commit eea1804

File tree

6 files changed

+521
-47
lines changed

6 files changed

+521
-47
lines changed

synapse/handlers/sliding_sync/__init__.py

Lines changed: 186 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
HaveSentRoomFlag,
6464
MutablePerConnectionState,
6565
PerConnectionState,
66+
RoomLazyMembershipChanges,
6667
RoomSyncConfig,
6768
SlidingSyncConfig,
6869
SlidingSyncResult,
@@ -984,14 +985,15 @@ async def get_room_sync_data(
984985
#
985986
# Calculate the `StateFilter` based on the `required_state` for the room
986987
required_state_filter = StateFilter.none()
987-
# The requested `required_state_map` with the lazy membership expanded and
988-
# `$ME` replaced with the user's ID. This allows us to see what membership we've
989-
# sent down to the client in the next request.
990-
#
991-
# Make a copy so we can modify it. Still need to be careful to make a copy of
992-
# the state key sets if we want to add/remove from them. We could make a deep
993-
# copy but this saves us some work.
994-
expanded_required_state_map = dict(room_sync_config.required_state_map)
988+
989+
# Keep track of which users' state we may need to fetch. We split this
990+
# into explicit users and lazy loaded users.
991+
explicit_user_state = set()
992+
lazy_load_user_ids = set()
993+
994+
# Whether lazy-loading of room members is enabled.
995+
lazy_load_room_members = False
996+
995997
if room_membership_for_user_at_to_token.membership not in (
996998
Membership.INVITE,
997999
Membership.KNOCK,
@@ -1039,7 +1041,6 @@ async def get_room_sync_data(
10391041
else:
10401042
required_state_types: list[tuple[str, str | None]] = []
10411043
num_wild_state_keys = 0
1042-
lazy_load_room_members = False
10431044
num_others = 0
10441045
for (
10451046
state_type,
@@ -1071,6 +1072,10 @@ async def get_room_sync_data(
10711072
timeline_event.state_key
10721073
)
10731074

1075+
# The client needs to know the membership of everyone in
1076+
# the timeline we're returning.
1077+
lazy_load_user_ids.update(timeline_membership)
1078+
10741079
# Update the required state filter so we pick up the new
10751080
# membership
10761081
if limited or initial:
@@ -1095,36 +1100,30 @@ async def get_room_sync_data(
10951100
# to state resolution anyway).
10961101
required_state_types.append((EventTypes.Member, None))
10971102

1098-
# Add an explicit entry for each user in the timeline
1099-
#
1100-
# Make a new set or copy of the state key set so we can
1101-
# modify it without affecting the original
1102-
# `required_state_map`
1103-
expanded_required_state_map[EventTypes.Member] = (
1104-
expanded_required_state_map.get(
1105-
EventTypes.Member, set()
1103+
# Record the extra members we're returning.
1104+
lazy_load_user_ids.update(
1105+
state_key
1106+
for event_type, state_key in room_state_delta_id_map
1107+
if event_type == EventTypes.Member
11061108
)
1107-
| timeline_membership
1108-
)
1109-
elif state_key == StateValues.ME:
1109+
else:
11101110
num_others += 1
1111-
required_state_types.append((state_type, user.to_string()))
1111+
11121112
# Replace `$ME` with the user's ID so we can deduplicate
11131113
# when someone requests the same state with `$ME` or with
11141114
# their user ID.
1115-
#
1116-
# Make a new set or copy of the state key set so we can
1117-
# modify it without affecting the original
1118-
# `required_state_map`
1119-
expanded_required_state_map[EventTypes.Member] = (
1120-
expanded_required_state_map.get(
1121-
EventTypes.Member, set()
1122-
)
1123-
| {user.to_string()}
1115+
normalized_state_key = state_key
1116+
if state_key == StateValues.ME:
1117+
normalized_state_key = user.to_string()
1118+
1119+
if state_type == EventTypes.Member:
1120+
# Also track explicitly requested member state for
1121+
# lazy membership tracking.
1122+
explicit_user_state.add(normalized_state_key)
1123+
1124+
required_state_types.append(
1125+
(state_type, normalized_state_key)
11241126
)
1125-
else:
1126-
num_others += 1
1127-
required_state_types.append((state_type, state_key))
11281127

11291128
set_tag(
11301129
SynapseTags.FUNC_ARG_PREFIX
@@ -1142,6 +1141,10 @@ async def get_room_sync_data(
11421141

11431142
required_state_filter = StateFilter.from_types(required_state_types)
11441143

1144+
# Define `required_user_state` as all user state we want.
1145+
required_user_state = explicit_user_state | lazy_load_user_ids
1146+
lazy_load_user_ids -= explicit_user_state
1147+
11451148
# We need this base set of info for the response so let's just fetch it along
11461149
# with the `required_state` for the room
11471150
hero_room_state = [
@@ -1174,6 +1177,17 @@ async def get_room_sync_data(
11741177
# state as well).
11751178
hero_membership_state: StateMap[EventBase] = {}
11761179

1180+
# By default we mark all required user state as being added when lazy
1181+
# loaded members is enabled.
1182+
#
1183+
# We may later update this to account for previously sent members.
1184+
returned_users = {}
1185+
if lazy_load_room_members:
1186+
returned_users = dict.fromkeys(lazy_load_user_ids)
1187+
new_connection_state.room_lazy_membership[room_id] = RoomLazyMembershipChanges(
1188+
returned=returned_users
1189+
)
1190+
11771191
if initial:
11781192
room_state = await self.get_current_state_at(
11791193
room_id=room_id,
@@ -1186,19 +1200,38 @@ async def get_room_sync_data(
11861200
# We'll later filter this down so we don't need to do so here.
11871201
hero_membership_state = room_state
11881202
else:
1203+
assert from_token is not None
11891204
assert from_bound is not None
11901205

11911206
if prev_room_sync_config is not None:
1207+
# Fetch which of the needed lazy-loaded members we have already sent.
1208+
if required_user_state:
1209+
previously_returned_user_state = (
1210+
await self.store.get_sliding_sync_connection_lazy_members(
1211+
connection_position=from_token.connection_position,
1212+
room_id=room_id,
1213+
user_ids=required_user_state,
1214+
)
1215+
)
1216+
else:
1217+
previously_returned_user_state = {}
1218+
11921219
# Check if there are any changes to the required state config
11931220
# that we need to handle.
11941221
changes_return = _required_state_changes(
11951222
user.to_string(),
11961223
prev_required_state_map=prev_room_sync_config.required_state_map,
1197-
request_required_state_map=expanded_required_state_map,
1224+
request_required_state_map=room_sync_config.required_state_map,
1225+
previously_returned_user_state=previously_returned_user_state,
1226+
required_user_state=lazy_load_user_ids,
11981227
state_deltas=room_state_delta_id_map,
11991228
)
12001229
changed_required_state_map = changes_return.required_state_map_change
12011230

1231+
new_connection_state.room_lazy_membership[
1232+
room_id
1233+
].invalidated = changes_return.lazy_members_invalidated
1234+
12021235
if changes_return.added_state_filter:
12031236
# Some state entries got added, so we pull out the current
12041237
# state for them. If we don't do this we'd only send down new deltas.
@@ -1309,7 +1342,7 @@ async def get_room_sync_data(
13091342
bump_stamp = 0
13101343

13111344
room_sync_required_state_map_to_persist: Mapping[str, AbstractSet[str]] = (
1312-
expanded_required_state_map
1345+
room_sync_config.required_state_map
13131346
)
13141347
if changed_required_state_map:
13151348
room_sync_required_state_map_to_persist = changed_required_state_map
@@ -1508,17 +1541,27 @@ class _RequiredStateChangesReturn:
15081541
the room config, or None if there is no change.
15091542
added_state_filter: The state filter to use to fetch any additional
15101543
current state that needs to be returned to the client.
1544+
lazy_members_previously_returned: The set of user IDs we should add to
1545+
the lazy members cache that we had previously returned.
1546+
lazy_members_invalidated: The set of user IDs whose membership has
1547+
changed but we didn't send down, so we need to invalidate them from
1548+
the cache.
15111549
"""
15121550

15131551
required_state_map_change: Mapping[str, AbstractSet[str]] | None
15141552
added_state_filter: StateFilter
15151553

1554+
lazy_members_previously_returned: AbstractSet[str] = frozenset()
1555+
lazy_members_invalidated: AbstractSet[str] = frozenset()
1556+
15161557

15171558
def _required_state_changes(
15181559
user_id: str,
15191560
*,
15201561
prev_required_state_map: Mapping[str, AbstractSet[str]],
15211562
request_required_state_map: Mapping[str, AbstractSet[str]],
1563+
previously_returned_user_state: Mapping[str, int | None],
1564+
required_user_state: AbstractSet[str],
15221565
state_deltas: StateMap[str],
15231566
) -> _RequiredStateChangesReturn:
15241567
"""Calculates the changes between the required state room config from the
@@ -1533,13 +1576,60 @@ def _required_state_changes(
15331576
This function tries to ensure to handle the case where a state entry is
15341577
added, removed and then added again to the required state. In that case we
15351578
only want to re-send that entry down sync if it has changed.
1579+
1580+
Args:
1581+
user_id: The user ID of the user making the request.
1582+
prev_required_state_map: The required state map from the previous
1583+
request.
1584+
request_required_state_map: The required state map from the current
1585+
request.
1586+
previously_returned_user_state: The set of user IDs whose lazy-loaded
1587+
membership we have previously returned to the client.
1588+
required_user_state: The set of user IDs whose lazy-loaded membership
1589+
is required for this request.
1590+
state_deltas: The state deltas that have changed in the room since the
1591+
previous request.
15361592
"""
1593+
1594+
# First we find any lazy members that have been invalidated due to state
1595+
# changes that we are not sending down.
1596+
lazy_members_invalidated = set()
1597+
for event_type, state_key in state_deltas:
1598+
if event_type != EventTypes.Member:
1599+
continue
1600+
1601+
if state_key in required_user_state:
1602+
# We're returning this member change.
1603+
continue
1604+
1605+
if state_key not in previously_returned_user_state:
1606+
# We've not previously returned this member so nothing to
1607+
# invalidate.
1608+
continue
1609+
1610+
lazy_members_invalidated.add(state_key)
1611+
15371612
if prev_required_state_map == request_required_state_map:
1538-
# There has been no change. Return immediately.
1613+
# There has been no change in state, just need to check lazy members.
1614+
newly_returned_lazy_members = (
1615+
required_user_state - previously_returned_user_state.keys()
1616+
)
1617+
if newly_returned_lazy_members:
1618+
# There are some new lazy members we need to fetch.
1619+
added_types: list[tuple[str, str | None]] = []
1620+
for new_user_id in newly_returned_lazy_members:
1621+
added_types.append((EventTypes.Member, new_user_id))
1622+
1623+
added_state_filter = StateFilter.from_types(added_types)
1624+
else:
1625+
added_state_filter = StateFilter.none()
1626+
15391627
return _RequiredStateChangesReturn(
15401628
required_state_map_change=None,
1541-
added_state_filter=StateFilter.none(),
1629+
added_state_filter=added_state_filter,
1630+
lazy_members_invalidated=lazy_members_invalidated,
15421631
)
1632+
15431633
prev_wildcard = prev_required_state_map.get(StateValues.WILDCARD, set())
15441634
request_wildcard = request_required_state_map.get(StateValues.WILDCARD, set())
15451635

@@ -1551,6 +1641,7 @@ def _required_state_changes(
15511641
return _RequiredStateChangesReturn(
15521642
required_state_map_change=request_required_state_map,
15531643
added_state_filter=StateFilter.none(),
1644+
lazy_members_invalidated=lazy_members_invalidated,
15541645
)
15551646

15561647
# If a event type wildcard has been added or removed we don't try and do
@@ -1561,12 +1652,14 @@ def _required_state_changes(
15611652
return _RequiredStateChangesReturn(
15621653
required_state_map_change=request_required_state_map,
15631654
added_state_filter=StateFilter.all(),
1655+
lazy_members_invalidated=lazy_members_invalidated,
15641656
)
15651657
if prev_wildcard - request_wildcard:
15661658
# Keys were only removed, so we don't have to fetch everything.
15671659
return _RequiredStateChangesReturn(
15681660
required_state_map_change=request_required_state_map,
15691661
added_state_filter=StateFilter.none(),
1662+
lazy_members_invalidated=lazy_members_invalidated,
15701663
)
15711664

15721665
# Contains updates to the required state map compared with the previous room
@@ -1577,6 +1670,11 @@ def _required_state_changes(
15771670
# client. Passed to `StateFilter.from_types(...)`
15781671
added: list[tuple[str, str | None]] = []
15791672

1673+
# Record any members that were previously explicitly tracked and should now
1674+
# be tracked as lazy members. This handles the case of membership changing
1675+
# from e.g. `{@alice:example.com}` to `{LAZY}`.
1676+
lazy_members_previously_returned: set[str] = set()
1677+
15801678
# Convert the list of state deltas to map from type to state_keys that have
15811679
# changed.
15821680
changed_types_to_state_keys: dict[str, set[str]] = {}
@@ -1599,6 +1697,39 @@ def _required_state_changes(
15991697
# Nothing *added*, so we skip. Removals happen below.
16001698
continue
16011699

1700+
# Handle the special case of adding LAZY membership, where we want to
1701+
# remember what explicit members we've previously sent down.
1702+
if event_type == EventTypes.Member:
1703+
old_state_key_lazy = StateValues.LAZY in old_state_keys
1704+
request_state_key_lazy = StateValues.LAZY in request_state_keys
1705+
if not old_state_key_lazy and request_state_key_lazy:
1706+
# We're adding a LAZY flag. We therefore add any previously
1707+
# explicit members we've sent down to lazy cache.
1708+
for state_key in old_state_keys:
1709+
if (
1710+
state_key == StateValues.WILDCARD
1711+
or state_key == StateValues.LAZY
1712+
):
1713+
# Ignore non-user IDs.
1714+
continue
1715+
1716+
if state_key == StateValues.ME:
1717+
# Normalize to proper user ID
1718+
state_key = user_id
1719+
1720+
# We remember the user if either a) they haven't been
1721+
# invalidated...
1722+
if (EventTypes.Member, state_key) not in state_deltas:
1723+
lazy_members_previously_returned.add(state_key)
1724+
1725+
# ...or b) if we are going to send the delta down in this
1726+
# sync.
1727+
if state_key in required_user_state:
1728+
lazy_members_previously_returned.add(state_key)
1729+
1730+
changes[event_type] = request_state_keys
1731+
continue
1732+
16021733
# We only remove state keys from the effective state if they've been
16031734
# removed from the request *and* the state has changed. This ensures
16041735
# that if a client removes and then re-adds a state key, we only send
@@ -1669,9 +1800,23 @@ def _required_state_changes(
16691800
# LAZY values should also be ignore for event types that are
16701801
# not membership.
16711802
pass
1803+
elif event_type == EventTypes.Member:
1804+
if state_key not in previously_returned_user_state:
1805+
# Only add *explicit* members we haven't previously sent
1806+
# down.
1807+
added.append((event_type, state_key))
16721808
else:
16731809
added.append((event_type, state_key))
16741810

1811+
# We also need to pull out any lazy members that are now required but
1812+
# haven't previously been returned.
1813+
for required_user_id in (
1814+
required_user_state
1815+
- previously_returned_user_state.keys()
1816+
- lazy_members_previously_returned
1817+
):
1818+
added.append((EventTypes.Member, required_user_id))
1819+
16751820
added_state_filter = StateFilter.from_types(added)
16761821

16771822
# Figure out what changes we need to apply to the effective required state
@@ -1746,9 +1891,13 @@ def _required_state_changes(
17461891
return _RequiredStateChangesReturn(
17471892
required_state_map_change=new_required_state_map,
17481893
added_state_filter=added_state_filter,
1894+
lazy_members_invalidated=lazy_members_invalidated,
1895+
lazy_members_previously_returned=lazy_members_previously_returned,
17491896
)
17501897
else:
17511898
return _RequiredStateChangesReturn(
17521899
required_state_map_change=None,
17531900
added_state_filter=added_state_filter,
1901+
lazy_members_invalidated=lazy_members_invalidated,
1902+
lazy_members_previously_returned=lazy_members_previously_returned,
17541903
)

0 commit comments

Comments
 (0)