Skip to content
This repository was archived by the owner on Mar 26, 2024. It is now read-only.

Commit ba4ea7d

Browse files
authored
Batch up replication requests to request the resyncing of remote users's devices. (matrix-org#14716)
1 parent 3479599 commit ba4ea7d

File tree

9 files changed

+306
-79
lines changed

9 files changed

+306
-79
lines changed

changelog.d/14716.misc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Batch up replication requests to request the resyncing of remote users's devices.

synapse/handlers/device.py

Lines changed: 98 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
# See the License for the specific language governing permissions and
1515
# limitations under the License.
1616
import logging
17+
from http import HTTPStatus
1718
from typing import (
1819
TYPE_CHECKING,
1920
Any,
@@ -33,6 +34,7 @@
3334
Codes,
3435
FederationDeniedError,
3536
HttpResponseException,
37+
InvalidAPICallError,
3638
RequestSendFailed,
3739
SynapseError,
3840
)
@@ -45,6 +47,7 @@
4547
JsonDict,
4648
StreamKeyType,
4749
StreamToken,
50+
UserID,
4851
get_domain_from_id,
4952
get_verify_key_from_cross_signing_key,
5053
)
@@ -893,12 +896,47 @@ class DeviceListWorkerUpdater:
893896

894897
def __init__(self, hs: "HomeServer"):
895898
from synapse.replication.http.devices import (
899+
ReplicationMultiUserDevicesResyncRestServlet,
896900
ReplicationUserDevicesResyncRestServlet,
897901
)
898902

899903
self._user_device_resync_client = (
900904
ReplicationUserDevicesResyncRestServlet.make_client(hs)
901905
)
906+
self._multi_user_device_resync_client = (
907+
ReplicationMultiUserDevicesResyncRestServlet.make_client(hs)
908+
)
909+
910+
async def multi_user_device_resync(
911+
self, user_ids: List[str], mark_failed_as_stale: bool = True
912+
) -> Dict[str, Optional[JsonDict]]:
913+
"""
914+
Like `user_device_resync` but operates on multiple users **from the same origin**
915+
at once.
916+
917+
Returns:
918+
Dict from User ID to the same Dict as `user_device_resync`.
919+
"""
920+
# mark_failed_as_stale is not sent. Ensure this doesn't break expectations.
921+
assert mark_failed_as_stale
922+
923+
if not user_ids:
924+
# Shortcut empty requests
925+
return {}
926+
927+
try:
928+
return await self._multi_user_device_resync_client(user_ids=user_ids)
929+
except SynapseError as err:
930+
if not (
931+
err.code == HTTPStatus.NOT_FOUND and err.errcode == Codes.UNRECOGNIZED
932+
):
933+
raise
934+
935+
# Fall back to single requests
936+
result: Dict[str, Optional[JsonDict]] = {}
937+
for user_id in user_ids:
938+
result[user_id] = await self._user_device_resync_client(user_id=user_id)
939+
return result
902940

903941
async def user_device_resync(
904942
self, user_id: str, mark_failed_as_stale: bool = True
@@ -913,8 +951,10 @@ async def user_device_resync(
913951
A dict with device info as under the "devices" in the result of this
914952
request:
915953
https://matrix.org/docs/spec/server_server/r0.1.2#get-matrix-federation-v1-user-devices-userid
954+
None when we weren't able to fetch the device info for some reason,
955+
e.g. due to a connection problem.
916956
"""
917-
return await self._user_device_resync_client(user_id=user_id)
957+
return (await self.multi_user_device_resync([user_id]))[user_id]
918958

919959

920960
class DeviceListUpdater(DeviceListWorkerUpdater):
@@ -1160,19 +1200,66 @@ async def _maybe_retry_device_resync(self) -> None:
11601200
# Allow future calls to retry resyncinc out of sync device lists.
11611201
self._resync_retry_in_progress = False
11621202

1203+
async def multi_user_device_resync(
1204+
self, user_ids: List[str], mark_failed_as_stale: bool = True
1205+
) -> Dict[str, Optional[JsonDict]]:
1206+
"""
1207+
Like `user_device_resync` but operates on multiple users **from the same origin**
1208+
at once.
1209+
1210+
Returns:
1211+
Dict from User ID to the same Dict as `user_device_resync`.
1212+
"""
1213+
if not user_ids:
1214+
return {}
1215+
1216+
origins = {UserID.from_string(user_id).domain for user_id in user_ids}
1217+
1218+
if len(origins) != 1:
1219+
raise InvalidAPICallError(f"Only one origin permitted, got {origins!r}")
1220+
1221+
result = {}
1222+
failed = set()
1223+
# TODO(Perf): Actually batch these up
1224+
for user_id in user_ids:
1225+
user_result, user_failed = await self._user_device_resync_returning_failed(
1226+
user_id
1227+
)
1228+
result[user_id] = user_result
1229+
if user_failed:
1230+
failed.add(user_id)
1231+
1232+
if mark_failed_as_stale:
1233+
await self.store.mark_remote_users_device_caches_as_stale(failed)
1234+
1235+
return result
1236+
11631237
async def user_device_resync(
11641238
self, user_id: str, mark_failed_as_stale: bool = True
11651239
) -> Optional[JsonDict]:
1240+
result, failed = await self._user_device_resync_returning_failed(user_id)
1241+
1242+
if failed and mark_failed_as_stale:
1243+
# Mark the remote user's device list as stale so we know we need to retry
1244+
# it later.
1245+
await self.store.mark_remote_users_device_caches_as_stale((user_id,))
1246+
1247+
return result
1248+
1249+
async def _user_device_resync_returning_failed(
1250+
self, user_id: str
1251+
) -> Tuple[Optional[JsonDict], bool]:
11661252
"""Fetches all devices for a user and updates the device cache with them.
11671253
11681254
Args:
11691255
user_id: The user's id whose device_list will be updated.
1170-
mark_failed_as_stale: Whether to mark the user's device list as stale
1171-
if the attempt to resync failed.
11721256
Returns:
1173-
A dict with device info as under the "devices" in the result of this
1174-
request:
1175-
https://matrix.org/docs/spec/server_server/r0.1.2#get-matrix-federation-v1-user-devices-userid
1257+
- A dict with device info as under the "devices" in the result of this
1258+
request:
1259+
https://matrix.org/docs/spec/server_server/r0.1.2#get-matrix-federation-v1-user-devices-userid
1260+
None when we weren't able to fetch the device info for some reason,
1261+
e.g. due to a connection problem.
1262+
- True iff the resync failed and the device list should be marked as stale.
11761263
"""
11771264
logger.debug("Attempting to resync the device list for %s", user_id)
11781265
log_kv({"message": "Doing resync to update device list."})
@@ -1181,49 +1268,34 @@ async def user_device_resync(
11811268
try:
11821269
result = await self.federation.query_user_devices(origin, user_id)
11831270
except NotRetryingDestination:
1184-
if mark_failed_as_stale:
1185-
# Mark the remote user's device list as stale so we know we need to retry
1186-
# it later.
1187-
await self.store.mark_remote_user_device_cache_as_stale(user_id)
1188-
1189-
return None
1271+
return None, True
11901272
except (RequestSendFailed, HttpResponseException) as e:
11911273
logger.warning(
11921274
"Failed to handle device list update for %s: %s",
11931275
user_id,
11941276
e,
11951277
)
11961278

1197-
if mark_failed_as_stale:
1198-
# Mark the remote user's device list as stale so we know we need to retry
1199-
# it later.
1200-
await self.store.mark_remote_user_device_cache_as_stale(user_id)
1201-
12021279
# We abort on exceptions rather than accepting the update
12031280
# as otherwise synapse will 'forget' that its device list
12041281
# is out of date. If we bail then we will retry the resync
12051282
# next time we get a device list update for this user_id.
12061283
# This makes it more likely that the device lists will
12071284
# eventually become consistent.
1208-
return None
1285+
return None, True
12091286
except FederationDeniedError as e:
12101287
set_tag("error", True)
12111288
log_kv({"reason": "FederationDeniedError"})
12121289
logger.info(e)
1213-
return None
1290+
return None, False
12141291
except Exception as e:
12151292
set_tag("error", True)
12161293
log_kv(
12171294
{"message": "Exception raised by federation request", "exception": e}
12181295
)
12191296
logger.exception("Failed to handle device list update for %s", user_id)
12201297

1221-
if mark_failed_as_stale:
1222-
# Mark the remote user's device list as stale so we know we need to retry
1223-
# it later.
1224-
await self.store.mark_remote_user_device_cache_as_stale(user_id)
1225-
1226-
return None
1298+
return None, True
12271299
log_kv({"result": result})
12281300
stream_id = result["stream_id"]
12291301
devices = result["devices"]
@@ -1305,7 +1377,7 @@ async def user_device_resync(
13051377
# point.
13061378
self._seen_updates[user_id] = {stream_id}
13071379

1308-
return result
1380+
return result, False
13091381

13101382
async def process_cross_signing_key_update(
13111383
self,

synapse/handlers/devicemessage.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ async def _check_for_unknown_devices(
195195
sender_user_id,
196196
unknown_devices,
197197
)
198-
await self.store.mark_remote_user_device_cache_as_stale(sender_user_id)
198+
await self.store.mark_remote_users_device_caches_as_stale((sender_user_id,))
199199

200200
# Immediately attempt a resync in the background
201201
run_in_background(self._user_device_resync, user_id=sender_user_id)

synapse/handlers/e2e_keys.py

Lines changed: 55 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,8 @@
3636
get_domain_from_id,
3737
get_verify_key_from_cross_signing_key,
3838
)
39-
from synapse.util import json_decoder, unwrapFirstError
40-
from synapse.util.async_helpers import Linearizer, delay_cancellation
39+
from synapse.util import json_decoder
40+
from synapse.util.async_helpers import Linearizer, concurrently_execute
4141
from synapse.util.cancellation import cancellable
4242
from synapse.util.retryutils import NotRetryingDestination
4343

@@ -238,24 +238,28 @@ async def query_devices(
238238
# Now fetch any devices that we don't have in our cache
239239
# TODO It might make sense to propagate cancellations into the
240240
# deferreds which are querying remote homeservers.
241-
await make_deferred_yieldable(
242-
delay_cancellation(
243-
defer.gatherResults(
244-
[
245-
run_in_background(
246-
self._query_devices_for_destination,
247-
results,
248-
cross_signing_keys,
249-
failures,
250-
destination,
251-
queries,
252-
timeout,
253-
)
254-
for destination, queries in remote_queries_not_in_cache.items()
255-
],
256-
consumeErrors=True,
257-
).addErrback(unwrapFirstError)
241+
logger.debug(
242+
"%d destinations to query devices for", len(remote_queries_not_in_cache)
243+
)
244+
245+
async def _query(
246+
destination_queries: Tuple[str, Dict[str, Iterable[str]]]
247+
) -> None:
248+
destination, queries = destination_queries
249+
return await self._query_devices_for_destination(
250+
results,
251+
cross_signing_keys,
252+
failures,
253+
destination,
254+
queries,
255+
timeout,
258256
)
257+
258+
await concurrently_execute(
259+
_query,
260+
remote_queries_not_in_cache.items(),
261+
10,
262+
delay_cancellation=True,
259263
)
260264

261265
ret = {"device_keys": results, "failures": failures}
@@ -300,28 +304,41 @@ async def _query_devices_for_destination(
300304
# queries. We use the more efficient batched query_client_keys for all
301305
# remaining users
302306
user_ids_updated = []
303-
for (user_id, device_list) in destination_query.items():
304-
if user_id in user_ids_updated:
305-
continue
306307

307-
if device_list:
308-
continue
308+
# Perform a user device resync for each user only once and only as long as:
309+
# - they have an empty device_list
310+
# - they are in some rooms that this server can see
311+
users_to_resync_devices = {
312+
user_id
313+
for (user_id, device_list) in destination_query.items()
314+
if (not device_list) and (await self.store.get_rooms_for_user(user_id))
315+
}
309316

310-
room_ids = await self.store.get_rooms_for_user(user_id)
311-
if not room_ids:
312-
continue
317+
logger.debug(
318+
"%d users to resync devices for from destination %s",
319+
len(users_to_resync_devices),
320+
destination,
321+
)
313322

314-
# We've decided we're sharing a room with this user and should
315-
# probably be tracking their device lists. However, we haven't
316-
# done an initial sync on the device list so we do it now.
317-
try:
318-
resync_results = (
319-
await self.device_handler.device_list_updater.user_device_resync(
320-
user_id
321-
)
323+
try:
324+
user_resync_results = (
325+
await self.device_handler.device_list_updater.multi_user_device_resync(
326+
list(users_to_resync_devices)
322327
)
328+
)
329+
for user_id in users_to_resync_devices:
330+
resync_results = user_resync_results[user_id]
331+
323332
if resync_results is None:
324-
raise ValueError("Device resync failed")
333+
# TODO: It's weird that we'll store a failure against a
334+
# destination, yet continue processing users from that
335+
# destination.
336+
# We might want to consider changing this, but for now
337+
# I'm leaving it as I found it.
338+
failures[destination] = _exception_to_failure(
339+
ValueError(f"Device resync failed for {user_id!r}")
340+
)
341+
continue
325342

326343
# Add the device keys to the results.
327344
user_devices = resync_results["devices"]
@@ -339,8 +356,8 @@ async def _query_devices_for_destination(
339356

340357
if self_signing_key:
341358
cross_signing_keys["self_signing_keys"][user_id] = self_signing_key
342-
except Exception as e:
343-
failures[destination] = _exception_to_failure(e)
359+
except Exception as e:
360+
failures[destination] = _exception_to_failure(e)
344361

345362
if len(destination_query) == len(user_ids_updated):
346363
# We've updated all the users in the query and we do not need to

synapse/handlers/federation_event.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1423,7 +1423,7 @@ async def _resync_device(self, sender: str) -> None:
14231423
"""
14241424

14251425
try:
1426-
await self._store.mark_remote_user_device_cache_as_stale(sender)
1426+
await self._store.mark_remote_users_device_caches_as_stale((sender,))
14271427

14281428
# Immediately attempt a resync in the background
14291429
if self._config.worker.worker_app:

0 commit comments

Comments
 (0)