Skip to content

Commit 5ea2cf2

Browse files
Move device changes off the main process (#18581)
The main goal of this PR is to handle device list changes onto multiple writers, off the main process, so that we can have logins happening whilst Synapse is rolling-restarting. This is quite an intrusive change, so I would advise to review this commit by commit; I tried to keep the history as clean as possible. There are a few things to consider: - the `device_list_key` in stream tokens becomes a `MultiWriterStreamToken`, which has a few implications in sync and on the storage layer - we had a split between `DeviceHandler` and `DeviceWorkerHandler` for master vs. worker process. I've kept this split, but making it rather writer vs. non-writer worker, using method overrides for doing replication calls when needed - there are a few operations that need to happen on a single worker at a time. Instead of using cross-worker locks, for now I made them run on the first writer on the list --------- Co-authored-by: Eric Eastwood <[email protected]>
1 parent 66504d1 commit 5ea2cf2

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+1772
-1441
lines changed

changelog.d/18581.feature

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Enable workers to write directly to the device lists stream and handle device list updates, reducing load on the main process.

docker/complement/conf/start_for_complement.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@ if [[ -n "$SYNAPSE_COMPLEMENT_USE_WORKERS" ]]; then
5454
export SYNAPSE_WORKER_TYPES="\
5555
event_persister:2, \
5656
background_worker, \
57-
frontend_proxy, \
5857
event_creator, \
5958
user_dir, \
6059
media_repository, \
@@ -65,6 +64,7 @@ if [[ -n "$SYNAPSE_COMPLEMENT_USE_WORKERS" ]]; then
6564
client_reader, \
6665
appservice, \
6766
pusher, \
67+
device_lists:2, \
6868
stream_writers=account_data+presence+receipts+to_device+typing"
6969

7070
fi

docker/configure_workers_and_start.py

Lines changed: 25 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,8 @@
178178
"^/_matrix/client/(api/v1|r0|v3|unstable)/login$",
179179
"^/_matrix/client/(api/v1|r0|v3|unstable)/account/3pid$",
180180
"^/_matrix/client/(api/v1|r0|v3|unstable)/account/whoami$",
181+
"^/_matrix/client/(api/v1|r0|v3|unstable)/devices(/|$)",
182+
"^/_matrix/client/(r0|v3)/delete_devices$",
181183
"^/_matrix/client/versions$",
182184
"^/_matrix/client/(api/v1|r0|v3|unstable)/voip/turnServer$",
183185
"^/_matrix/client/(r0|v3|unstable)/register$",
@@ -194,6 +196,9 @@
194196
"^/_matrix/client/(api/v1|r0|v3|unstable)/directory/room/.*$",
195197
"^/_matrix/client/(r0|v3|unstable)/capabilities$",
196198
"^/_matrix/client/(r0|v3|unstable)/notifications$",
199+
"^/_matrix/client/(api/v1|r0|v3|unstable)/keys/upload",
200+
"^/_matrix/client/(api/v1|r0|v3|unstable)/keys/device_signing/upload$",
201+
"^/_matrix/client/(api/v1|r0|v3|unstable)/keys/signatures/upload$",
197202
],
198203
"shared_extra_conf": {},
199204
"worker_extra_conf": "",
@@ -265,13 +270,6 @@
265270
"shared_extra_conf": {},
266271
"worker_extra_conf": "",
267272
},
268-
"frontend_proxy": {
269-
"app": "synapse.app.generic_worker",
270-
"listener_resources": ["client", "replication"],
271-
"endpoint_patterns": ["^/_matrix/client/(api/v1|r0|v3|unstable)/keys/upload"],
272-
"shared_extra_conf": {},
273-
"worker_extra_conf": "",
274-
},
275273
"account_data": {
276274
"app": "synapse.app.generic_worker",
277275
"listener_resources": ["client", "replication"],
@@ -306,6 +304,13 @@
306304
"shared_extra_conf": {},
307305
"worker_extra_conf": "",
308306
},
307+
"device_lists": {
308+
"app": "synapse.app.generic_worker",
309+
"listener_resources": ["client", "replication"],
310+
"endpoint_patterns": [],
311+
"shared_extra_conf": {},
312+
"worker_extra_conf": "",
313+
},
309314
"typing": {
310315
"app": "synapse.app.generic_worker",
311316
"listener_resources": ["client", "replication"],
@@ -412,16 +417,17 @@ def add_worker_roles_to_shared_config(
412417
# streams
413418
instance_map = shared_config.setdefault("instance_map", {})
414419

415-
# This is a list of the stream_writers that there can be only one of. Events can be
416-
# sharded, and therefore doesn't belong here.
417-
singular_stream_writers = [
420+
# This is a list of the stream_writers.
421+
stream_writers = {
418422
"account_data",
423+
"events",
424+
"device_lists",
419425
"presence",
420426
"receipts",
421427
"to_device",
422428
"typing",
423429
"push_rules",
424-
]
430+
}
425431

426432
# Worker-type specific sharding config. Now a single worker can fulfill multiple
427433
# roles, check each.
@@ -431,28 +437,11 @@ def add_worker_roles_to_shared_config(
431437
if "federation_sender" in worker_types_set:
432438
shared_config.setdefault("federation_sender_instances", []).append(worker_name)
433439

434-
if "event_persister" in worker_types_set:
435-
# Event persisters write to the events stream, so we need to update
436-
# the list of event stream writers
437-
shared_config.setdefault("stream_writers", {}).setdefault("events", []).append(
438-
worker_name
439-
)
440-
441-
# Map of stream writer instance names to host/ports combos
442-
if os.environ.get("SYNAPSE_USE_UNIX_SOCKET", False):
443-
instance_map[worker_name] = {
444-
"path": f"/run/worker.{worker_port}",
445-
}
446-
else:
447-
instance_map[worker_name] = {
448-
"host": "localhost",
449-
"port": worker_port,
450-
}
451440
# Update the list of stream writers. It's convenient that the name of the worker
452441
# type is the same as the stream to write. Iterate over the whole list in case there
453442
# is more than one.
454443
for worker in worker_types_set:
455-
if worker in singular_stream_writers:
444+
if worker in stream_writers:
456445
shared_config.setdefault("stream_writers", {}).setdefault(
457446
worker, []
458447
).append(worker_name)
@@ -876,6 +865,13 @@ def generate_worker_files(
876865
else:
877866
healthcheck_urls.append("http://localhost:%d/health" % (worker_port,))
878867

868+
# Special case for event_persister: those are just workers that write to
869+
# the `events` stream. For other workers, the worker name is the same
870+
# name of the stream they write to, but for some reason it is not the
871+
# case for event_persister.
872+
if "event_persister" in worker_types_set:
873+
worker_types_set.add("events")
874+
879875
# Update the shared config with sharding-related options if necessary
880876
add_worker_roles_to_shared_config(
881877
shared_config, worker_types_set, worker_name, worker_port

docs/usage/configuration/config_documentation.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4341,6 +4341,8 @@ This setting has the following sub-options:
43414341

43424342
* `push_rules` (string): Name of a worker assigned to the `push_rules` stream.
43434343

4344+
* `device_lists` (string): Name of a worker assigned to the `device_lists` stream.
4345+
43444346
Example configuration:
43454347
```yaml
43464348
stream_writers:

docs/workers.md

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,8 @@ information.
238238
^/_matrix/client/unstable/im.nheko.summary/summary/.*$
239239
^/_matrix/client/(r0|v3|unstable)/account/3pid$
240240
^/_matrix/client/(r0|v3|unstable)/account/whoami$
241-
^/_matrix/client/(r0|v3|unstable)/devices$
241+
^/_matrix/client/(r0|v3)/delete_devices$
242+
^/_matrix/client/(api/v1|r0|v3|unstable)/devices(/|$)
242243
^/_matrix/client/versions$
243244
^/_matrix/client/(api/v1|r0|v3|unstable)/voip/turnServer$
244245
^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/event/
@@ -257,7 +258,9 @@ information.
257258
^/_matrix/client/(r0|v3|unstable)/keys/changes$
258259
^/_matrix/client/(r0|v3|unstable)/keys/claim$
259260
^/_matrix/client/(r0|v3|unstable)/room_keys/
260-
^/_matrix/client/(r0|v3|unstable)/keys/upload$
261+
^/_matrix/client/(r0|v3|unstable)/keys/upload
262+
^/_matrix/client/(api/v1|r0|v3|unstable/keys/device_signing/upload$
263+
^/_matrix/client/(api/v1|r0|v3|unstable)/keys/signatures/upload$
261264

262265
# Registration/login requests
263266
^/_matrix/client/(api/v1|r0|v3|unstable)/login$
@@ -282,7 +285,6 @@ Additionally, the following REST endpoints can be handled for GET requests:
282285

283286
^/_matrix/client/(api/v1|r0|v3|unstable)/pushrules/
284287
^/_matrix/client/unstable/org.matrix.msc4140/delayed_events
285-
^/_matrix/client/(api/v1|r0|v3|unstable)/devices/
286288

287289
# Account data requests
288290
^/_matrix/client/(r0|v3|unstable)/.*/tags
@@ -329,7 +331,6 @@ set to `true`), the following endpoints can be handled by the worker:
329331
^/_synapse/admin/v2/users/[^/]+$
330332
^/_synapse/admin/v1/username_available$
331333
^/_synapse/admin/v1/users/[^/]+/_allow_cross_signing_replacement_without_uia$
332-
# Only the GET method:
333334
^/_synapse/admin/v1/users/[^/]+/devices$
334335

335336
Note that a [HTTP listener](usage/configuration/config_documentation.md#listeners)
@@ -550,6 +551,18 @@ the stream writer for the `push_rules` stream:
550551

551552
^/_matrix/client/(api/v1|r0|v3|unstable)/pushrules/
552553

554+
##### The `device_lists` stream
555+
556+
The `device_lists` stream supports multiple writers. The following endpoints
557+
can be handled by any worker, but should be routed directly one of the workers
558+
configured as stream writer for the `device_lists` stream:
559+
560+
^/_matrix/client/(r0|v3)/delete_devices$
561+
^/_matrix/client/(api/v1|r0|v3|unstable)/devices/
562+
^/_matrix/client/(r0|v3|unstable)/keys/upload
563+
^/_matrix/client/(api/v1|r0|v3|unstable/keys/device_signing/upload$
564+
^/_matrix/client/(api/v1|r0|v3|unstable)/keys/signatures/upload$
565+
553566
#### Restrict outbound federation traffic to a specific set of workers
554567

555568
The

schema/synapse-config.schema.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5383,6 +5383,9 @@ properties:
53835383
push_rules:
53845384
type: string
53855385
description: Name of a worker assigned to the `push_rules` stream.
5386+
device_lists:
5387+
type: string
5388+
description: Name of a worker assigned to the `device_lists` stream.
53865389
default: {}
53875390
examples:
53885391
- events: worker1

synapse/config/workers.py

Lines changed: 31 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -134,40 +134,44 @@ class WriterLocations:
134134
can only be a single instance.
135135
account_data: The instances that write to the account data streams. Currently
136136
can only be a single instance.
137-
receipts: The instances that write to the receipts stream. Currently
138-
can only be a single instance.
137+
receipts: The instances that write to the receipts stream.
139138
presence: The instances that write to the presence stream. Currently
140139
can only be a single instance.
141140
push_rules: The instances that write to the push stream. Currently
142141
can only be a single instance.
142+
device_lists: The instances that write to the device list stream.
143143
"""
144144

145145
events: List[str] = attr.ib(
146-
default=["master"],
146+
default=[MAIN_PROCESS_INSTANCE_NAME],
147147
converter=_instance_to_list_converter,
148148
)
149149
typing: List[str] = attr.ib(
150-
default=["master"],
150+
default=[MAIN_PROCESS_INSTANCE_NAME],
151151
converter=_instance_to_list_converter,
152152
)
153153
to_device: List[str] = attr.ib(
154-
default=["master"],
154+
default=[MAIN_PROCESS_INSTANCE_NAME],
155155
converter=_instance_to_list_converter,
156156
)
157157
account_data: List[str] = attr.ib(
158-
default=["master"],
158+
default=[MAIN_PROCESS_INSTANCE_NAME],
159159
converter=_instance_to_list_converter,
160160
)
161161
receipts: List[str] = attr.ib(
162-
default=["master"],
162+
default=[MAIN_PROCESS_INSTANCE_NAME],
163163
converter=_instance_to_list_converter,
164164
)
165165
presence: List[str] = attr.ib(
166-
default=["master"],
166+
default=[MAIN_PROCESS_INSTANCE_NAME],
167167
converter=_instance_to_list_converter,
168168
)
169169
push_rules: List[str] = attr.ib(
170-
default=["master"],
170+
default=[MAIN_PROCESS_INSTANCE_NAME],
171+
converter=_instance_to_list_converter,
172+
)
173+
device_lists: List[str] = attr.ib(
174+
default=[MAIN_PROCESS_INSTANCE_NAME],
171175
converter=_instance_to_list_converter,
172176
)
173177

@@ -358,7 +362,10 @@ def read_config(
358362
):
359363
instances = _instance_to_list_converter(getattr(self.writers, stream))
360364
for instance in instances:
361-
if instance != "master" and instance not in self.instance_map:
365+
if (
366+
instance != MAIN_PROCESS_INSTANCE_NAME
367+
and instance not in self.instance_map
368+
):
362369
raise ConfigError(
363370
"Instance %r is configured to write %s but does not appear in `instance_map` config."
364371
% (instance, stream)
@@ -397,6 +404,11 @@ def read_config(
397404
"Must only specify one instance to handle `push` messages."
398405
)
399406

407+
if len(self.writers.device_lists) == 0:
408+
raise ConfigError(
409+
"Must specify at least one instance to handle `device_lists` messages."
410+
)
411+
400412
self.events_shard_config = RoutableShardedWorkerHandlingConfig(
401413
self.writers.events
402414
)
@@ -419,9 +431,12 @@ def read_config(
419431
#
420432
# No effort is made to ensure only a single instance of these tasks is
421433
# running.
422-
background_tasks_instance = config.get("run_background_tasks_on") or "master"
434+
background_tasks_instance = (
435+
config.get("run_background_tasks_on") or MAIN_PROCESS_INSTANCE_NAME
436+
)
423437
self.run_background_tasks = (
424-
self.worker_name is None and background_tasks_instance == "master"
438+
self.worker_name is None
439+
and background_tasks_instance == MAIN_PROCESS_INSTANCE_NAME
425440
) or self.worker_name == background_tasks_instance
426441

427442
self.should_notify_appservices = self._should_this_worker_perform_duty(
@@ -493,9 +508,10 @@ def _should_this_worker_perform_duty(
493508
# 'don't run here'.
494509
new_option_should_run_here = None
495510
if new_option_name in config:
496-
designated_worker = config[new_option_name] or "master"
511+
designated_worker = config[new_option_name] or MAIN_PROCESS_INSTANCE_NAME
497512
new_option_should_run_here = (
498-
designated_worker == "master" and self.worker_name is None
513+
designated_worker == MAIN_PROCESS_INSTANCE_NAME
514+
and self.worker_name is None
499515
) or designated_worker == self.worker_name
500516

501517
legacy_option_should_run_here = None
@@ -592,7 +608,7 @@ def _worker_names_performing_this_duty(
592608
# If no worker instances are set we check if the legacy option
593609
# is set, which means use the main process.
594610
if legacy_option:
595-
worker_instances = ["master"]
611+
worker_instances = [MAIN_PROCESS_INSTANCE_NAME]
596612

597613
if self.worker_app == legacy_app_name:
598614
if legacy_option:

synapse/handlers/appservice.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -638,7 +638,8 @@ async def _get_device_list_summary(
638638

639639
# Fetch the users who have modified their device list since then.
640640
users_with_changed_device_lists = await self.store.get_all_devices_changed(
641-
from_key, to_key=new_key
641+
MultiWriterStreamToken(stream=from_key),
642+
to_key=MultiWriterStreamToken(stream=new_key),
642643
)
643644

644645
# Filter out any users the application service is not interested in

synapse/handlers/deactivate_account.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424

2525
from synapse.api.constants import Membership
2626
from synapse.api.errors import SynapseError
27-
from synapse.handlers.device import DeviceHandler
2827
from synapse.metrics.background_process_metrics import run_as_background_process
2928
from synapse.types import Codes, Requester, UserID, create_requester
3029

@@ -84,10 +83,6 @@ async def deactivate_account(
8483
Returns:
8584
True if identity server supports removing threepids, otherwise False.
8685
"""
87-
88-
# This can only be called on the main process.
89-
assert isinstance(self._device_handler, DeviceHandler)
90-
9186
# Check if this user can be deactivated
9287
if not await self._third_party_rules.check_can_deactivate_user(
9388
user_id, by_admin

0 commit comments

Comments
 (0)