Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit f38297b

Browse files
committed
Remove special logic for adding stream_writers: just make it part of the extra config template
1 parent 8b74639 commit f38297b

File tree

1 file changed

+53
-67
lines changed

1 file changed

+53
-67
lines changed

docker/configure_workers_and_start.py

Lines changed: 53 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,11 @@ class WorkerTemplate:
9898
# Stream Writers require "client" and "replication" listeners because they
9999
# have to attach by instance_map to the master process and have client endpoints.
100100
WORKERS_CONFIG: Dict[str, WorkerTemplate] = {
101-
"pusher": WorkerTemplate(),
101+
"pusher": WorkerTemplate(
102+
shared_extra_conf=lambda worker_name: {
103+
"pusher_instances": [worker_name],
104+
}
105+
),
102106
"user_dir": WorkerTemplate(
103107
listener_resources={"client"},
104108
endpoint_patterns={
@@ -130,7 +134,11 @@ class WorkerTemplate:
130134
"notify_appservices_from_worker": worker_name
131135
},
132136
),
133-
"federation_sender": WorkerTemplate(),
137+
"federation_sender": WorkerTemplate(
138+
shared_extra_conf=lambda worker_name: {
139+
"federation_sender_instances": [worker_name],
140+
}
141+
),
134142
"synchrotron": WorkerTemplate(
135143
listener_resources={"client"},
136144
endpoint_patterns={
@@ -202,6 +210,9 @@ class WorkerTemplate:
202210
),
203211
"event_persister": WorkerTemplate(
204212
listener_resources={"replication"},
213+
shared_extra_conf=lambda worker_name: {
214+
"stream_writers": {"events": [worker_name]}
215+
},
205216
),
206217
"background_worker": WorkerTemplate(
207218
# This worker cannot be sharded. Therefore, there should only ever be one
@@ -229,25 +240,40 @@ class WorkerTemplate:
229240
"^/_matrix/client/(r0|v3|unstable)/.*/tags",
230241
"^/_matrix/client/(r0|v3|unstable)/.*/account_data",
231242
},
243+
shared_extra_conf=lambda worker_name: {
244+
"stream_writers": {"account_data": [worker_name]}
245+
},
232246
),
233247
"presence": WorkerTemplate(
234248
listener_resources={"client", "replication"},
235249
endpoint_patterns={"^/_matrix/client/(api/v1|r0|v3|unstable)/presence/"},
250+
shared_extra_conf=lambda worker_name: {
251+
"stream_writers": {"presence": [worker_name]}
252+
},
236253
),
237254
"receipts": WorkerTemplate(
238255
listener_resources={"client", "replication"},
239256
endpoint_patterns={
240257
"^/_matrix/client/(r0|v3|unstable)/rooms/.*/receipt",
241258
"^/_matrix/client/(r0|v3|unstable)/rooms/.*/read_markers",
242259
},
260+
shared_extra_conf=lambda worker_name: {
261+
"stream_writers": {"receipts": [worker_name]}
262+
},
243263
),
244264
"to_device": WorkerTemplate(
245265
listener_resources={"client", "replication"},
246266
endpoint_patterns={"^/_matrix/client/(r0|v3|unstable)/sendToDevice/"},
267+
shared_extra_conf=lambda worker_name: {
268+
"stream_writers": {"to_device": [worker_name]}
269+
},
247270
),
248271
"typing": WorkerTemplate(
249272
listener_resources={"client", "replication"},
250273
endpoint_patterns={"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/typing"},
274+
shared_extra_conf=lambda worker_name: {
275+
"stream_writers": {"typing": [worker_name]}
276+
},
251277
),
252278
}
253279

@@ -308,6 +334,14 @@ def merge_into(dest: Any, new: Any) -> None:
308334
raise ValueError(f"Cannot merge primitive values: {dest!r} != {new!r}")
309335

310336

337+
def merged(a: Dict[str, Any], b: Dict[str, Any]) -> Dict[str, Any]:
338+
"""
339+
Merges `b` into `a` and returns `a`.
340+
"""
341+
merge_into(a, b)
342+
return a
343+
344+
311345
def convert(src: str, dst: str, **template_vars: object) -> None:
312346
"""Generate a file from a template
313347
@@ -338,7 +372,6 @@ def convert(src: str, dst: str, **template_vars: object) -> None:
338372

339373
def add_worker_roles_to_shared_config(
340374
shared_config: dict,
341-
worker_types_set: Set[str],
342375
worker_name: str,
343376
worker_port: int,
344377
) -> None:
@@ -348,70 +381,25 @@ def add_worker_roles_to_shared_config(
348381
Args:
349382
shared_config: The config dict that all worker instances share (after being
350383
converted to YAML)
351-
worker_types_set: The type of worker (one of those defined in WORKERS_CONFIG).
352-
This list can be a single worker type or multiple.
353384
worker_name: The name of the worker instance.
354385
worker_port: The HTTP replication port that the worker instance is listening on.
355386
"""
356387
# The instance_map config field marks the workers that write to various replication
357388
# streams
358389
instance_map = shared_config.setdefault("instance_map", {})
359390

360-
# This is a list of the stream_writers that there can be only one of. Events can be
361-
# sharded, and therefore doesn't belong here.
362-
singular_stream_writers = [
363-
"account_data",
364-
"presence",
365-
"receipts",
366-
"to_device",
367-
"typing",
368-
]
369-
370-
# Worker-type specific sharding config. Now a single worker can fulfill multiple
371-
# roles, check each.
372-
if "pusher" in worker_types_set:
373-
shared_config.setdefault("pusher_instances", []).append(worker_name)
374-
375-
if "federation_sender" in worker_types_set:
376-
shared_config.setdefault("federation_sender_instances", []).append(worker_name)
377-
378-
if "event_persister" in worker_types_set:
379-
# Event persisters write to the events stream, so we need to update
380-
# the list of event stream writers
381-
shared_config.setdefault("stream_writers", {}).setdefault("events", []).append(
382-
worker_name
383-
)
384-
385-
# Map of stream writer instance names to host/ports combos
386-
if os.environ.get("SYNAPSE_USE_UNIX_SOCKET", False):
387-
instance_map[worker_name] = {
388-
"path": f"/run/worker.{worker_port}",
389-
}
390-
else:
391-
instance_map[worker_name] = {
392-
"host": "localhost",
393-
"port": worker_port,
394-
}
395-
# Update the list of stream writers. It's convenient that the name of the worker
396-
# type is the same as the stream to write. Iterate over the whole list in case there
397-
# is more than one.
398-
for worker in worker_types_set:
399-
if worker in singular_stream_writers:
400-
shared_config.setdefault("stream_writers", {}).setdefault(
401-
worker, []
402-
).append(worker_name)
403-
404-
# Map of stream writer instance names to host/ports combos
405-
# For now, all stream writers need http replication ports
406-
if os.environ.get("SYNAPSE_USE_UNIX_SOCKET", False):
407-
instance_map[worker_name] = {
408-
"path": f"/run/worker.{worker_port}",
409-
}
410-
else:
411-
instance_map[worker_name] = {
412-
"host": "localhost",
413-
"port": worker_port,
414-
}
391+
# Add all workers to the `instance_map`
392+
# Technically only certain types of workers, such as stream writers, are needed
393+
# here but it is simpler just to be consistent.
394+
if os.environ.get("SYNAPSE_USE_UNIX_SOCKET", False):
395+
instance_map[worker_name] = {
396+
"path": f"/run/worker.{worker_port}",
397+
}
398+
else:
399+
instance_map[worker_name] = {
400+
"host": "localhost",
401+
"port": worker_port,
402+
}
415403

416404

417405
def merge_worker_template_configs(
@@ -438,10 +426,10 @@ def merge_worker_template_configs(
438426
new_template.endpoint_patterns |= to_be_merged_template.endpoint_patterns
439427

440428
# merge dictionaries; the worker name will be replaced later
441-
new_template.shared_extra_conf = lambda worker_name: {
442-
**new_template.shared_extra_conf(worker_name),
443-
**to_be_merged_template.shared_extra_conf(worker_name),
444-
}
429+
new_template.shared_extra_conf = lambda worker_name: merged(
430+
existing_template.shared_extra_conf(worker_name),
431+
to_be_merged_template.shared_extra_conf(worker_name),
432+
)
445433

446434
# There is only one worker type that has a 'worker_extra_conf' and it is
447435
# the media_repo. Since duplicate worker types on the same worker don't
@@ -821,9 +809,7 @@ def generate_worker_files(
821809
healthcheck_urls.append("http://localhost:%d/health" % (worker_port,))
822810

823811
# Update the shared config with sharding-related options if necessary
824-
add_worker_roles_to_shared_config(
825-
shared_config, worker_types_set, worker_name, worker_port
826-
)
812+
add_worker_roles_to_shared_config(shared_config, worker_name, worker_port)
827813

828814
# Enable the worker in supervisord
829815
worker_descriptors.append(worker_config)

0 commit comments

Comments
 (0)