Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 44de53b

Browse files
authored
Reduce state pulled from DB due to sending typing and receipts over federation (#12964)
Reducing the amount of state we pull from the DB is useful as fetching state is expensive in terms of DB, CPU and memory.
1 parent 148fe58 commit 44de53b

File tree

9 files changed

+68
-16
lines changed

9 files changed

+68
-16
lines changed

changelog.d/12964.misc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Reduce the amount of state we pull from the DB.

synapse/federation/sender/__init__.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,8 @@ def __init__(self, hs: "HomeServer"):
245245
self.store = hs.get_datastores().main
246246
self.state = hs.get_state_handler()
247247

248+
self._storage_controllers = hs.get_storage_controllers()
249+
248250
self.clock = hs.get_clock()
249251
self.is_mine_id = hs.is_mine_id
250252

@@ -602,7 +604,9 @@ async def send_read_receipt(self, receipt: ReadReceipt) -> None:
602604
room_id = receipt.room_id
603605

604606
# Work out which remote servers should be poked and poke them.
605-
domains_set = await self.state.get_current_hosts_in_room(room_id)
607+
domains_set = await self._storage_controllers.state.get_current_hosts_in_room(
608+
room_id
609+
)
606610
domains = [
607611
d
608612
for d in domains_set

synapse/handlers/typing.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ class FollowerTypingHandler:
5959

6060
def __init__(self, hs: "HomeServer"):
6161
self.store = hs.get_datastores().main
62+
self._storage_controllers = hs.get_storage_controllers()
6263
self.server_name = hs.config.server.server_name
6364
self.clock = hs.get_clock()
6465
self.is_mine_id = hs.is_mine_id
@@ -131,15 +132,17 @@ async def _push_remote(self, member: RoomMember, typing: bool) -> None:
131132
return
132133

133134
try:
134-
users = await self.store.get_users_in_room(member.room_id)
135135
self._member_last_federation_poke[member] = self.clock.time_msec()
136136

137137
now = self.clock.time_msec()
138138
self.wheel_timer.insert(
139139
now=now, obj=member, then=now + FEDERATION_PING_INTERVAL
140140
)
141141

142-
for domain in {get_domain_from_id(u) for u in users}:
142+
hosts = await self._storage_controllers.state.get_current_hosts_in_room(
143+
member.room_id
144+
)
145+
for domain in hosts:
143146
if domain != self.server_name:
144147
logger.debug("sending typing update to %s", domain)
145148
self.federation.build_and_send_edu(

synapse/state/__init__.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -172,10 +172,6 @@ async def get_current_users_in_room(
172172
entry = await self.resolve_state_groups_for_events(room_id, latest_event_ids)
173173
return await self.store.get_joined_users_from_state(room_id, entry)
174174

175-
async def get_current_hosts_in_room(self, room_id: str) -> FrozenSet[str]:
176-
event_ids = await self.store.get_latest_event_ids_in_room(room_id)
177-
return await self.get_hosts_in_room_at_events(room_id, event_ids)
178-
179175
async def get_hosts_in_room_at_events(
180176
self, room_id: str, event_ids: Collection[str]
181177
) -> FrozenSet[str]:

synapse/storage/_base.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ def _invalidate_state_caches(
7171
self._attempt_to_invalidate_cache("is_host_joined", (room_id, host))
7272
if members_changed:
7373
self._attempt_to_invalidate_cache("get_users_in_room", (room_id,))
74+
self._attempt_to_invalidate_cache("get_current_hosts_in_room", (room_id,))
7475
self._attempt_to_invalidate_cache(
7576
"get_users_in_room_with_profiles", (room_id,)
7677
)

synapse/storage/controllers/state.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
List,
2424
Mapping,
2525
Optional,
26+
Set,
2627
Tuple,
2728
)
2829

@@ -482,3 +483,10 @@ async def get_current_state_event(
482483
room_id, StateFilter.from_types((key,))
483484
)
484485
return state_map.get(key)
486+
487+
async def get_current_hosts_in_room(self, room_id: str) -> Set[str]:
488+
"""Get current hosts in room based on current state."""
489+
490+
await self._partial_state_room_tracker.await_full_state(room_id)
491+
492+
return await self.stores.main.get_current_hosts_in_room(room_id)

synapse/storage/databases/main/roommember.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -893,6 +893,43 @@ async def _check_host_room_membership(
893893

894894
return True
895895

896+
@cached(iterable=True, max_entries=10000)
897+
async def get_current_hosts_in_room(self, room_id: str) -> Set[str]:
898+
"""Get current hosts in room based on current state."""
899+
900+
# First we check if we already have `get_users_in_room` in the cache, as
901+
# we can just calculate result from that
902+
users = self.get_users_in_room.cache.get_immediate(
903+
(room_id,), None, update_metrics=False
904+
)
905+
if users is not None:
906+
return {get_domain_from_id(u) for u in users}
907+
908+
if isinstance(self.database_engine, Sqlite3Engine):
909+
# If we're using SQLite then let's just always use
910+
# `get_users_in_room` rather than funky SQL.
911+
users = await self.get_users_in_room(room_id)
912+
return {get_domain_from_id(u) for u in users}
913+
914+
# For PostgreSQL we can use a regex to pull out the domains from the
915+
# joined users in `current_state_events` via regex.
916+
917+
def get_current_hosts_in_room_txn(txn: LoggingTransaction) -> Set[str]:
918+
sql = """
919+
SELECT DISTINCT substring(state_key FROM '@[^:]*:(.*)$')
920+
FROM current_state_events
921+
WHERE
922+
type = 'm.room.member'
923+
AND membership = 'join'
924+
AND room_id = ?
925+
"""
926+
txn.execute(sql, (room_id,))
927+
return {d for d, in txn}
928+
929+
return await self.db_pool.runInteraction(
930+
"get_current_hosts_in_room", get_current_hosts_in_room_txn
931+
)
932+
896933
async def get_joined_hosts(
897934
self, room_id: str, state_entry: "_StateCacheEntry"
898935
) -> FrozenSet[str]:

tests/federation/test_federation_sender.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,16 +30,16 @@
3030

3131
class FederationSenderReceiptsTestCases(HomeserverTestCase):
3232
def make_homeserver(self, reactor, clock):
33-
mock_state_handler = Mock(spec=["get_current_hosts_in_room"])
34-
# Ensure a new Awaitable is created for each call.
35-
mock_state_handler.get_current_hosts_in_room.return_value = make_awaitable(
36-
["test", "host2"]
37-
)
38-
return self.setup_test_homeserver(
39-
state_handler=mock_state_handler,
33+
hs = self.setup_test_homeserver(
4034
federation_transport_client=Mock(spec=["send_transaction"]),
4135
)
4236

37+
hs.get_storage_controllers().state.get_current_hosts_in_room = Mock(
38+
return_value=make_awaitable({"test", "host2"})
39+
)
40+
41+
return hs
42+
4343
@override_config({"send_federation": True})
4444
def test_send_receipts(self):
4545
mock_send_transaction = (

tests/handlers/test_typing.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,10 +129,12 @@ async def check_host_in_room(room_id: str, server_name: str) -> bool:
129129

130130
hs.get_event_auth_handler().check_host_in_room = check_host_in_room
131131

132-
def get_joined_hosts_for_room(room_id: str):
132+
async def get_current_hosts_in_room(room_id: str):
133133
return {member.domain for member in self.room_members}
134134

135-
self.datastore.get_joined_hosts_for_room = get_joined_hosts_for_room
135+
hs.get_storage_controllers().state.get_current_hosts_in_room = (
136+
get_current_hosts_in_room
137+
)
136138

137139
async def get_users_in_room(room_id: str):
138140
return {str(u) for u in self.room_members}

0 commit comments

Comments
 (0)