Skip to content

Commit 501f62d

Browse files
authored
Faster remote room joins: stream the un-partial-stating of rooms over replication. [rei:frrj/streams/unpsr] (#14473)
1 parent e1779bc commit 501f62d

File tree

8 files changed

+280
-67
lines changed

8 files changed

+280
-67
lines changed

changelog.d/14473.misc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Faster remote room joins: stream the un-partial-stating of rooms over replication.

synapse/handlers/device.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -996,7 +996,7 @@ async def incoming_device_list_update(
996996
# Check if we are partially joining any rooms. If so we need to store
997997
# all device list updates so that we can handle them correctly once we
998998
# know who is in the room.
999-
# TODO(faster joins): this fetches and processes a bunch of data that we don't
999+
# TODO(faster_joins): this fetches and processes a bunch of data that we don't
10001000
# use. Could be replaced by a tighter query e.g.
10011001
# SELECT EXISTS(SELECT 1 FROM partial_state_rooms)
10021002
partial_rooms = await self.store.get_partial_state_room_resync_info()

synapse/handlers/federation.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@ def __init__(self, hs: "HomeServer"):
152152
self._federation_event_handler = hs.get_federation_event_handler()
153153
self._device_handler = hs.get_device_handler()
154154
self._bulk_push_rule_evaluator = hs.get_bulk_push_rule_evaluator()
155+
self._notifier = hs.get_notifier()
155156

156157
self._clean_room_for_join_client = ReplicationCleanRoomRestServlet.make_client(
157158
hs
@@ -1692,6 +1693,9 @@ async def _sync_partial_state_room(
16921693
self._storage_controllers.state.notify_room_un_partial_stated(
16931694
room_id
16941695
)
1696+
# Poke the notifier so that other workers see the write to
1697+
# the un-partial-stated rooms stream.
1698+
self._notifier.notify_replication()
16951699

16961700
# TODO(faster_joins) update room stats and user directory?
16971701
# https://github.com/matrix-org/synapse/issues/12814

synapse/replication/tcp/streams/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
)
4343
from synapse.replication.tcp.streams.events import EventsStream
4444
from synapse.replication.tcp.streams.federation import FederationStream
45+
from synapse.replication.tcp.streams.partial_state import UnPartialStatedRoomStream
4546

4647
STREAMS_MAP = {
4748
stream.NAME: stream
@@ -61,6 +62,7 @@
6162
TagAccountDataStream,
6263
AccountDataStream,
6364
UserSignatureStream,
65+
UnPartialStatedRoomStream,
6466
)
6567
}
6668

@@ -80,4 +82,5 @@
8082
"TagAccountDataStream",
8183
"AccountDataStream",
8284
"UserSignatureStream",
85+
"UnPartialStatedRoomStream",
8386
]
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
# Copyright 2022 The Matrix.org Foundation C.I.C.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
from typing import TYPE_CHECKING
15+
16+
import attr
17+
18+
from synapse.replication.tcp.streams import Stream
19+
from synapse.replication.tcp.streams._base import current_token_without_instance
20+
21+
if TYPE_CHECKING:
22+
from synapse.server import HomeServer
23+
24+
25+
@attr.s(slots=True, frozen=True, auto_attribs=True)
26+
class UnPartialStatedRoomStreamRow:
27+
# ID of the room that has been un-partial-stated.
28+
room_id: str
29+
30+
31+
class UnPartialStatedRoomStream(Stream):
32+
"""
33+
Stream to notify about rooms becoming un-partial-stated;
34+
that is, when the background sync finishes such that we now have full state for
35+
the room.
36+
"""
37+
38+
NAME = "un_partial_stated_room"
39+
ROW_TYPE = UnPartialStatedRoomStreamRow
40+
41+
def __init__(self, hs: "HomeServer"):
42+
store = hs.get_datastores().main
43+
super().__init__(
44+
hs.get_instance_name(),
45+
# TODO(faster_joins, multiple writers): we need to account for instance names
46+
current_token_without_instance(store.get_un_partial_stated_rooms_token),
47+
store.get_un_partial_stated_rooms_from_stream,
48+
)

synapse/storage/databases/main/room.py

Lines changed: 171 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
# Copyright 2014-2016 OpenMarket Ltd
2-
# Copyright 2019 The Matrix.org Foundation C.I.C.
2+
# Copyright 2019, 2022 The Matrix.org Foundation C.I.C.
33
#
44
# Licensed under the Apache License, Version 2.0 (the "License");
55
# you may not use this file except in compliance with the License.
@@ -50,8 +50,14 @@
5050
LoggingTransaction,
5151
)
5252
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
53+
from synapse.storage.engines import PostgresEngine
5354
from synapse.storage.types import Cursor
54-
from synapse.storage.util.id_generators import IdGenerator
55+
from synapse.storage.util.id_generators import (
56+
AbstractStreamIdGenerator,
57+
IdGenerator,
58+
MultiWriterIdGenerator,
59+
StreamIdGenerator,
60+
)
5561
from synapse.types import JsonDict, RetentionPolicy, ThirdPartyInstanceID
5662
from synapse.util import json_encoder
5763
from synapse.util.caches.descriptors import cached
@@ -114,6 +120,26 @@ def __init__(
114120

115121
self.config: HomeServerConfig = hs.config
116122

123+
self._un_partial_stated_rooms_stream_id_gen: AbstractStreamIdGenerator
124+
125+
if isinstance(database.engine, PostgresEngine):
126+
self._un_partial_stated_rooms_stream_id_gen = MultiWriterIdGenerator(
127+
db_conn=db_conn,
128+
db=database,
129+
stream_name="un_partial_stated_room_stream",
130+
instance_name=self._instance_name,
131+
tables=[
132+
("un_partial_stated_room_stream", "instance_name", "stream_id")
133+
],
134+
sequence_name="un_partial_stated_room_stream_sequence",
135+
# TODO(faster_joins, multiple writers) Support multiple writers.
136+
writers=["master"],
137+
)
138+
else:
139+
self._un_partial_stated_rooms_stream_id_gen = StreamIdGenerator(
140+
db_conn, "un_partial_stated_room_stream", "stream_id"
141+
)
142+
117143
async def store_room(
118144
self,
119145
room_id: str,
@@ -1216,70 +1242,6 @@ async def get_partial_state_room_resync_info(
12161242

12171243
return room_servers
12181244

1219-
async def clear_partial_state_room(self, room_id: str) -> bool:
1220-
"""Clears the partial state flag for a room.
1221-
1222-
Args:
1223-
room_id: The room whose partial state flag is to be cleared.
1224-
1225-
Returns:
1226-
`True` if the partial state flag has been cleared successfully.
1227-
1228-
`False` if the partial state flag could not be cleared because the room
1229-
still contains events with partial state.
1230-
"""
1231-
try:
1232-
await self.db_pool.runInteraction(
1233-
"clear_partial_state_room", self._clear_partial_state_room_txn, room_id
1234-
)
1235-
return True
1236-
except self.db_pool.engine.module.IntegrityError as e:
1237-
# Assume that any `IntegrityError`s are due to partial state events.
1238-
logger.info(
1239-
"Exception while clearing lazy partial-state-room %s, retrying: %s",
1240-
room_id,
1241-
e,
1242-
)
1243-
return False
1244-
1245-
def _clear_partial_state_room_txn(
1246-
self, txn: LoggingTransaction, room_id: str
1247-
) -> None:
1248-
DatabasePool.simple_delete_txn(
1249-
txn,
1250-
table="partial_state_rooms_servers",
1251-
keyvalues={"room_id": room_id},
1252-
)
1253-
DatabasePool.simple_delete_one_txn(
1254-
txn,
1255-
table="partial_state_rooms",
1256-
keyvalues={"room_id": room_id},
1257-
)
1258-
self._invalidate_cache_and_stream(txn, self.is_partial_state_room, (room_id,))
1259-
self._invalidate_cache_and_stream(
1260-
txn, self.get_partial_state_servers_at_join, (room_id,)
1261-
)
1262-
1263-
# We now delete anything from `device_lists_remote_pending` with a
1264-
# stream ID less than the minimum
1265-
# `partial_state_rooms.device_lists_stream_id`, as we no longer need them.
1266-
device_lists_stream_id = DatabasePool.simple_select_one_onecol_txn(
1267-
txn,
1268-
table="partial_state_rooms",
1269-
keyvalues={},
1270-
retcol="MIN(device_lists_stream_id)",
1271-
allow_none=True,
1272-
)
1273-
if device_lists_stream_id is None:
1274-
# There are no rooms being currently partially joined, so we delete everything.
1275-
txn.execute("DELETE FROM device_lists_remote_pending")
1276-
else:
1277-
sql = """
1278-
DELETE FROM device_lists_remote_pending
1279-
WHERE stream_id <= ?
1280-
"""
1281-
txn.execute(sql, (device_lists_stream_id,))
1282-
12831245
@cached()
12841246
async def is_partial_state_room(self, room_id: str) -> bool:
12851247
"""Checks if this room has partial state.
@@ -1315,6 +1277,66 @@ async def get_join_event_id_and_device_lists_stream_id_for_partial_state(
13151277
)
13161278
return result["join_event_id"], result["device_lists_stream_id"]
13171279

1280+
def get_un_partial_stated_rooms_token(self) -> int:
1281+
# TODO(faster_joins, multiple writers): This is inappropriate if there
1282+
# are multiple writers because workers that don't write often will
1283+
# hold all readers up.
1284+
# (See `MultiWriterIdGenerator.get_persisted_upto_position` for an
1285+
# explanation.)
1286+
return self._un_partial_stated_rooms_stream_id_gen.get_current_token()
1287+
1288+
async def get_un_partial_stated_rooms_from_stream(
1289+
self, instance_name: str, last_id: int, current_id: int, limit: int
1290+
) -> Tuple[List[Tuple[int, Tuple[str]]], int, bool]:
1291+
"""Get updates for caches replication stream.
1292+
1293+
Args:
1294+
instance_name: The writer we want to fetch updates from. Unused
1295+
here since there is only ever one writer.
1296+
last_id: The token to fetch updates from. Exclusive.
1297+
current_id: The token to fetch updates up to. Inclusive.
1298+
limit: The requested limit for the number of rows to return. The
1299+
function may return more or fewer rows.
1300+
1301+
Returns:
1302+
A tuple consisting of: the updates, a token to use to fetch
1303+
subsequent updates, and whether we returned fewer rows than exists
1304+
between the requested tokens due to the limit.
1305+
1306+
The token returned can be used in a subsequent call to this
1307+
function to get further updatees.
1308+
1309+
The updates are a list of 2-tuples of stream ID and the row data
1310+
"""
1311+
1312+
if last_id == current_id:
1313+
return [], current_id, False
1314+
1315+
def get_un_partial_stated_rooms_from_stream_txn(
1316+
txn: LoggingTransaction,
1317+
) -> Tuple[List[Tuple[int, Tuple[str]]], int, bool]:
1318+
sql = """
1319+
SELECT stream_id, room_id
1320+
FROM un_partial_stated_room_stream
1321+
WHERE ? < stream_id AND stream_id <= ? AND instance_name = ?
1322+
ORDER BY stream_id ASC
1323+
LIMIT ?
1324+
"""
1325+
txn.execute(sql, (last_id, current_id, instance_name, limit))
1326+
updates = [(row[0], (row[1],)) for row in txn]
1327+
limited = False
1328+
upto_token = current_id
1329+
if len(updates) >= limit:
1330+
upto_token = updates[-1][0]
1331+
limited = True
1332+
1333+
return updates, upto_token, limited
1334+
1335+
return await self.db_pool.runInteraction(
1336+
"get_un_partial_stated_rooms_from_stream",
1337+
get_un_partial_stated_rooms_from_stream_txn,
1338+
)
1339+
13181340

13191341
class _BackgroundUpdates:
13201342
REMOVE_TOMESTONED_ROOMS_BG_UPDATE = "remove_tombstoned_rooms_from_directory"
@@ -1806,6 +1828,8 @@ def __init__(
18061828

18071829
self._event_reports_id_gen = IdGenerator(db_conn, "event_reports", "id")
18081830

1831+
self._instance_name = hs.get_instance_name()
1832+
18091833
async def upsert_room_on_join(
18101834
self, room_id: str, room_version: RoomVersion, state_events: List[EventBase]
18111835
) -> None:
@@ -2270,3 +2294,84 @@ async def unblock_room(self, room_id: str) -> None:
22702294
self.is_room_blocked,
22712295
(room_id,),
22722296
)
2297+
2298+
async def clear_partial_state_room(self, room_id: str) -> bool:
2299+
"""Clears the partial state flag for a room.
2300+
2301+
Args:
2302+
room_id: The room whose partial state flag is to be cleared.
2303+
2304+
Returns:
2305+
`True` if the partial state flag has been cleared successfully.
2306+
2307+
`False` if the partial state flag could not be cleared because the room
2308+
still contains events with partial state.
2309+
"""
2310+
try:
2311+
async with self._un_partial_stated_rooms_stream_id_gen.get_next() as un_partial_state_room_stream_id:
2312+
await self.db_pool.runInteraction(
2313+
"clear_partial_state_room",
2314+
self._clear_partial_state_room_txn,
2315+
room_id,
2316+
un_partial_state_room_stream_id,
2317+
)
2318+
return True
2319+
except self.db_pool.engine.module.IntegrityError as e:
2320+
# Assume that any `IntegrityError`s are due to partial state events.
2321+
logger.info(
2322+
"Exception while clearing lazy partial-state-room %s, retrying: %s",
2323+
room_id,
2324+
e,
2325+
)
2326+
return False
2327+
2328+
def _clear_partial_state_room_txn(
2329+
self,
2330+
txn: LoggingTransaction,
2331+
room_id: str,
2332+
un_partial_state_room_stream_id: int,
2333+
) -> None:
2334+
DatabasePool.simple_delete_txn(
2335+
txn,
2336+
table="partial_state_rooms_servers",
2337+
keyvalues={"room_id": room_id},
2338+
)
2339+
DatabasePool.simple_delete_one_txn(
2340+
txn,
2341+
table="partial_state_rooms",
2342+
keyvalues={"room_id": room_id},
2343+
)
2344+
self._invalidate_cache_and_stream(txn, self.is_partial_state_room, (room_id,))
2345+
self._invalidate_cache_and_stream(
2346+
txn, self.get_partial_state_servers_at_join, (room_id,)
2347+
)
2348+
2349+
DatabasePool.simple_insert_txn(
2350+
txn,
2351+
"un_partial_stated_room_stream",
2352+
{
2353+
"stream_id": un_partial_state_room_stream_id,
2354+
"instance_name": self._instance_name,
2355+
"room_id": room_id,
2356+
},
2357+
)
2358+
2359+
# We now delete anything from `device_lists_remote_pending` with a
2360+
# stream ID less than the minimum
2361+
# `partial_state_rooms.device_lists_stream_id`, as we no longer need them.
2362+
device_lists_stream_id = DatabasePool.simple_select_one_onecol_txn(
2363+
txn,
2364+
table="partial_state_rooms",
2365+
keyvalues={},
2366+
retcol="MIN(device_lists_stream_id)",
2367+
allow_none=True,
2368+
)
2369+
if device_lists_stream_id is None:
2370+
# There are no rooms being currently partially joined, so we delete everything.
2371+
txn.execute("DELETE FROM device_lists_remote_pending")
2372+
else:
2373+
sql = """
2374+
DELETE FROM device_lists_remote_pending
2375+
WHERE stream_id <= ?
2376+
"""
2377+
txn.execute(sql, (device_lists_stream_id,))

0 commit comments

Comments
 (0)