Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit f2d88f7

Browse files
committed
Remove special logic for adding stream_writers: just make it part of the extra config template
1 parent 6f7583b commit f2d88f7

File tree

1 file changed

+53
-67
lines changed

1 file changed

+53
-67
lines changed

docker/configure_workers_and_start.py

Lines changed: 53 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,11 @@ class WorkerTemplate:
9898
# Stream Writers require "client" and "replication" listeners because they
9999
# have to attach by instance_map to the master process and have client endpoints.
100100
WORKERS_CONFIG: Dict[str, WorkerTemplate] = {
101-
"pusher": WorkerTemplate(),
101+
"pusher": WorkerTemplate(
102+
shared_extra_conf=lambda worker_name: {
103+
"pusher_instances": [worker_name],
104+
}
105+
),
102106
"user_dir": WorkerTemplate(
103107
listener_resources={"client"},
104108
endpoint_patterns={
@@ -130,7 +134,11 @@ class WorkerTemplate:
130134
"notify_appservices_from_worker": worker_name
131135
},
132136
),
133-
"federation_sender": WorkerTemplate(),
137+
"federation_sender": WorkerTemplate(
138+
shared_extra_conf=lambda worker_name: {
139+
"federation_sender_instances": [worker_name],
140+
}
141+
),
134142
"synchrotron": WorkerTemplate(
135143
listener_resources={"client"},
136144
endpoint_patterns={
@@ -202,6 +210,9 @@ class WorkerTemplate:
202210
),
203211
"event_persister": WorkerTemplate(
204212
listener_resources={"replication"},
213+
shared_extra_conf=lambda worker_name: {
214+
"stream_writers": {"events": [worker_name]}
215+
},
205216
),
206217
"background_worker": WorkerTemplate(
207218
# This worker cannot be sharded. Therefore, there should only ever be one
@@ -229,25 +240,40 @@ class WorkerTemplate:
229240
"^/_matrix/client/(r0|v3|unstable)/.*/tags",
230241
"^/_matrix/client/(r0|v3|unstable)/.*/account_data",
231242
},
243+
shared_extra_conf=lambda worker_name: {
244+
"stream_writers": {"account_data": [worker_name]}
245+
},
232246
),
233247
"presence": WorkerTemplate(
234248
listener_resources={"client", "replication"},
235249
endpoint_patterns={"^/_matrix/client/(api/v1|r0|v3|unstable)/presence/"},
250+
shared_extra_conf=lambda worker_name: {
251+
"stream_writers": {"presence": [worker_name]}
252+
},
236253
),
237254
"receipts": WorkerTemplate(
238255
listener_resources={"client", "replication"},
239256
endpoint_patterns={
240257
"^/_matrix/client/(r0|v3|unstable)/rooms/.*/receipt",
241258
"^/_matrix/client/(r0|v3|unstable)/rooms/.*/read_markers",
242259
},
260+
shared_extra_conf=lambda worker_name: {
261+
"stream_writers": {"receipts": [worker_name]}
262+
},
243263
),
244264
"to_device": WorkerTemplate(
245265
listener_resources={"client", "replication"},
246266
endpoint_patterns={"^/_matrix/client/(r0|v3|unstable)/sendToDevice/"},
267+
shared_extra_conf=lambda worker_name: {
268+
"stream_writers": {"to_device": [worker_name]}
269+
},
247270
),
248271
"typing": WorkerTemplate(
249272
listener_resources={"client", "replication"},
250273
endpoint_patterns={"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/typing"},
274+
shared_extra_conf=lambda worker_name: {
275+
"stream_writers": {"typing": [worker_name]}
276+
},
251277
),
252278
}
253279

@@ -309,6 +335,14 @@ def merge_into(dest: Any, new: Any) -> None:
309335
raise ValueError(f"Cannot merge primitive values: {dest!r} != {new!r}")
310336

311337

338+
def merged(a: Dict[str, Any], b: Dict[str, Any]) -> Dict[str, Any]:
339+
"""
340+
Merges `b` into `a` and returns `a`.
341+
"""
342+
merge_into(a, b)
343+
return a
344+
345+
312346
def convert(src: str, dst: str, **template_vars: object) -> None:
313347
"""Generate a file from a template
314348
@@ -339,7 +373,6 @@ def convert(src: str, dst: str, **template_vars: object) -> None:
339373

340374
def add_worker_roles_to_shared_config(
341375
shared_config: dict,
342-
worker_types_set: Set[str],
343376
worker_name: str,
344377
worker_port: int,
345378
) -> None:
@@ -349,70 +382,25 @@ def add_worker_roles_to_shared_config(
349382
Args:
350383
shared_config: The config dict that all worker instances share (after being
351384
converted to YAML)
352-
worker_types_set: The type of worker (one of those defined in WORKERS_CONFIG).
353-
This list can be a single worker type or multiple.
354385
worker_name: The name of the worker instance.
355386
worker_port: The HTTP replication port that the worker instance is listening on.
356387
"""
357388
# The instance_map config field marks the workers that write to various replication
358389
# streams
359390
instance_map = shared_config.setdefault("instance_map", {})
360391

361-
# This is a list of the stream_writers that there can be only one of. Events can be
362-
# sharded, and therefore doesn't belong here.
363-
singular_stream_writers = [
364-
"account_data",
365-
"presence",
366-
"receipts",
367-
"to_device",
368-
"typing",
369-
]
370-
371-
# Worker-type specific sharding config. Now a single worker can fulfill multiple
372-
# roles, check each.
373-
if "pusher" in worker_types_set:
374-
shared_config.setdefault("pusher_instances", []).append(worker_name)
375-
376-
if "federation_sender" in worker_types_set:
377-
shared_config.setdefault("federation_sender_instances", []).append(worker_name)
378-
379-
if "event_persister" in worker_types_set:
380-
# Event persisters write to the events stream, so we need to update
381-
# the list of event stream writers
382-
shared_config.setdefault("stream_writers", {}).setdefault("events", []).append(
383-
worker_name
384-
)
385-
386-
# Map of stream writer instance names to host/ports combos
387-
if os.environ.get("SYNAPSE_USE_UNIX_SOCKET", False):
388-
instance_map[worker_name] = {
389-
"path": f"/run/worker.{worker_port}",
390-
}
391-
else:
392-
instance_map[worker_name] = {
393-
"host": "localhost",
394-
"port": worker_port,
395-
}
396-
# Update the list of stream writers. It's convenient that the name of the worker
397-
# type is the same as the stream to write. Iterate over the whole list in case there
398-
# is more than one.
399-
for worker in worker_types_set:
400-
if worker in singular_stream_writers:
401-
shared_config.setdefault("stream_writers", {}).setdefault(
402-
worker, []
403-
).append(worker_name)
404-
405-
# Map of stream writer instance names to host/ports combos
406-
# For now, all stream writers need http replication ports
407-
if os.environ.get("SYNAPSE_USE_UNIX_SOCKET", False):
408-
instance_map[worker_name] = {
409-
"path": f"/run/worker.{worker_port}",
410-
}
411-
else:
412-
instance_map[worker_name] = {
413-
"host": "localhost",
414-
"port": worker_port,
415-
}
392+
# Add all workers to the `instance_map`
393+
# Technically only certain types of workers, such as stream writers, are needed
394+
# here but it is simpler just to be consistent.
395+
if os.environ.get("SYNAPSE_USE_UNIX_SOCKET", False):
396+
instance_map[worker_name] = {
397+
"path": f"/run/worker.{worker_port}",
398+
}
399+
else:
400+
instance_map[worker_name] = {
401+
"host": "localhost",
402+
"port": worker_port,
403+
}
416404

417405

418406
def merge_worker_template_configs(
@@ -439,10 +427,10 @@ def merge_worker_template_configs(
439427
new_template.endpoint_patterns |= to_be_merged_template.endpoint_patterns
440428

441429
# merge dictionaries; the worker name will be replaced later
442-
new_template.shared_extra_conf = lambda worker_name: {
443-
**new_template.shared_extra_conf(worker_name),
444-
**to_be_merged_template.shared_extra_conf(worker_name),
445-
}
430+
new_template.shared_extra_conf = lambda worker_name: merged(
431+
existing_template.shared_extra_conf(worker_name),
432+
to_be_merged_template.shared_extra_conf(worker_name),
433+
)
446434

447435
# There is only one worker type that has a 'worker_extra_conf' and it is
448436
# the media_repo. Since duplicate worker types on the same worker don't
@@ -822,9 +810,7 @@ def generate_worker_files(
822810
healthcheck_urls.append("http://localhost:%d/health" % (worker_port,))
823811

824812
# Update the shared config with sharding-related options if necessary
825-
add_worker_roles_to_shared_config(
826-
shared_config, worker_types_set, worker_name, worker_port
827-
)
813+
add_worker_roles_to_shared_config(shared_config, worker_name, worker_port)
828814

829815
# Enable the worker in supervisord
830816
worker_descriptors.append(worker_config)

0 commit comments

Comments
 (0)