Skip to content
Closed
Changes from 1 commit
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
b39a50a
Remove obsolete `"app"` from worker templates
reivilibre Nov 16, 2023
a22eb7d
Convert worker templates into dataclass
reivilibre Nov 16, 2023
ba3b6a4
Use a lambda for the worker name rather than search and replace later
reivilibre Nov 16, 2023
67d4fc8
Collapse WORKERS_CONFIG by removing entries with defaults
reivilibre Nov 16, 2023
94a85b3
Convert listener_resources and endpoint_patterns to Set[str]
reivilibre Nov 16, 2023
26073fa
Tweak comments
reivilibre Nov 16, 2023
8b74639
Add `merge_into`
reivilibre Nov 16, 2023
f38297b
Remove special logic for adding stream_writers: just make it part of …
reivilibre Nov 16, 2023
7d8824e
Rename function to add_worker_to_instance_map given reduction of scope
reivilibre Nov 16, 2023
f49dbc7
Add `sharding_allowed` to the WorkerTemplate rather than having a sep…
reivilibre Nov 16, 2023
3bb21a9
Use `merge_into` when adding workers to the shared config
reivilibre Nov 16, 2023
fbafde8
Promote mark_filepath to constant
reivilibre Nov 16, 2023
321d359
Add a --generate-only option
reivilibre Nov 16, 2023
259a808
Docstring on WorkerTemplate
reivilibre Dec 6, 2023
3a46cf0
Fix comment and mutation bug on merge_worker_template_configs
reivilibre Dec 6, 2023
2f1d727
Update comment on `merged`
reivilibre Dec 6, 2023
ad4bb0e
Tweak `instantiate_worker_template`, both in name, description and va…
reivilibre Dec 6, 2023
2ff1de3
Newsfile
reivilibre Jan 10, 2024
29541fd
Move `stream_writers` to their own field in the WorkerTemplate
reivilibre Jan 17, 2024
c91ab4b
Remove `merge_into` and just have `merged` which copies inputs to avo…
reivilibre Jan 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 53 additions & 67 deletions docker/configure_workers_and_start.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,11 @@ class WorkerTemplate:
# Stream Writers require "client" and "replication" listeners because they
# have to attach by instance_map to the master process and have client endpoints.
WORKERS_CONFIG: Dict[str, WorkerTemplate] = {
"pusher": WorkerTemplate(),
"pusher": WorkerTemplate(
shared_extra_conf=lambda worker_name: {
"pusher_instances": [worker_name],
}
),
"user_dir": WorkerTemplate(
listener_resources={"client"},
endpoint_patterns={
Expand Down Expand Up @@ -130,7 +134,11 @@ class WorkerTemplate:
"notify_appservices_from_worker": worker_name
},
),
"federation_sender": WorkerTemplate(),
"federation_sender": WorkerTemplate(
shared_extra_conf=lambda worker_name: {
"federation_sender_instances": [worker_name],
}
),
"synchrotron": WorkerTemplate(
listener_resources={"client"},
endpoint_patterns={
Expand Down Expand Up @@ -202,6 +210,9 @@ class WorkerTemplate:
),
"event_persister": WorkerTemplate(
listener_resources={"replication"},
shared_extra_conf=lambda worker_name: {
"stream_writers": {"events": [worker_name]}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than trying to recursively merge stream writers, can we instead add a top level config to WorkerTemplate?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, if that works smoothly I will give it a go.

},
),
"background_worker": WorkerTemplate(
# This worker cannot be sharded. Therefore, there should only ever be one
Expand Down Expand Up @@ -229,25 +240,40 @@ class WorkerTemplate:
"^/_matrix/client/(r0|v3|unstable)/.*/tags",
"^/_matrix/client/(r0|v3|unstable)/.*/account_data",
},
shared_extra_conf=lambda worker_name: {
"stream_writers": {"account_data": [worker_name]}
},
),
"presence": WorkerTemplate(
listener_resources={"client", "replication"},
endpoint_patterns={"^/_matrix/client/(api/v1|r0|v3|unstable)/presence/"},
shared_extra_conf=lambda worker_name: {
"stream_writers": {"presence": [worker_name]}
},
),
"receipts": WorkerTemplate(
listener_resources={"client", "replication"},
endpoint_patterns={
"^/_matrix/client/(r0|v3|unstable)/rooms/.*/receipt",
"^/_matrix/client/(r0|v3|unstable)/rooms/.*/read_markers",
},
shared_extra_conf=lambda worker_name: {
"stream_writers": {"receipts": [worker_name]}
},
),
"to_device": WorkerTemplate(
listener_resources={"client", "replication"},
endpoint_patterns={"^/_matrix/client/(r0|v3|unstable)/sendToDevice/"},
shared_extra_conf=lambda worker_name: {
"stream_writers": {"to_device": [worker_name]}
},
),
"typing": WorkerTemplate(
listener_resources={"client", "replication"},
endpoint_patterns={"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/typing"},
shared_extra_conf=lambda worker_name: {
"stream_writers": {"typing": [worker_name]}
},
),
}

Expand Down Expand Up @@ -308,6 +334,14 @@ def merge_into(dest: Any, new: Any) -> None:
raise ValueError(f"Cannot merge primitive values: {dest!r} != {new!r}")


def merged(a: Dict[str, Any], b: Dict[str, Any]) -> Dict[str, Any]:
"""
Merges `b` into `a` and returns `a`.
"""
merge_into(a, b)
return a


def convert(src: str, dst: str, **template_vars: object) -> None:
"""Generate a file from a template

Expand Down Expand Up @@ -338,7 +372,6 @@ def convert(src: str, dst: str, **template_vars: object) -> None:

def add_worker_roles_to_shared_config(
shared_config: dict,
worker_types_set: Set[str],
worker_name: str,
worker_port: int,
) -> None:
Expand All @@ -348,70 +381,25 @@ def add_worker_roles_to_shared_config(
Args:
shared_config: The config dict that all worker instances share (after being
converted to YAML)
worker_types_set: The type of worker (one of those defined in WORKERS_CONFIG).
This list can be a single worker type or multiple.
worker_name: The name of the worker instance.
worker_port: The HTTP replication port that the worker instance is listening on.
"""
# The instance_map config field marks the workers that write to various replication
# streams
instance_map = shared_config.setdefault("instance_map", {})

# This is a list of the stream_writers that there can be only one of. Events can be
# sharded, and therefore doesn't belong here.
singular_stream_writers = [
"account_data",
"presence",
"receipts",
"to_device",
"typing",
]

# Worker-type specific sharding config. Now a single worker can fulfill multiple
# roles, check each.
if "pusher" in worker_types_set:
shared_config.setdefault("pusher_instances", []).append(worker_name)

if "federation_sender" in worker_types_set:
shared_config.setdefault("federation_sender_instances", []).append(worker_name)

if "event_persister" in worker_types_set:
# Event persisters write to the events stream, so we need to update
# the list of event stream writers
shared_config.setdefault("stream_writers", {}).setdefault("events", []).append(
worker_name
)

# Map of stream writer instance names to host/ports combos
if os.environ.get("SYNAPSE_USE_UNIX_SOCKET", False):
instance_map[worker_name] = {
"path": f"/run/worker.{worker_port}",
}
else:
instance_map[worker_name] = {
"host": "localhost",
"port": worker_port,
}
# Update the list of stream writers. It's convenient that the name of the worker
# type is the same as the stream to write. Iterate over the whole list in case there
# is more than one.
for worker in worker_types_set:
if worker in singular_stream_writers:
shared_config.setdefault("stream_writers", {}).setdefault(
worker, []
).append(worker_name)

# Map of stream writer instance names to host/ports combos
# For now, all stream writers need http replication ports
if os.environ.get("SYNAPSE_USE_UNIX_SOCKET", False):
instance_map[worker_name] = {
"path": f"/run/worker.{worker_port}",
}
else:
instance_map[worker_name] = {
"host": "localhost",
"port": worker_port,
}
# Add all workers to the `instance_map`
# Technically only certain types of workers, such as stream writers, are needed
# here but it is simpler just to be consistent.
if os.environ.get("SYNAPSE_USE_UNIX_SOCKET", False):
instance_map[worker_name] = {
"path": f"/run/worker.{worker_port}",
}
else:
instance_map[worker_name] = {
"host": "localhost",
"port": worker_port,
}


def merge_worker_template_configs(
Expand All @@ -438,10 +426,10 @@ def merge_worker_template_configs(
new_template.endpoint_patterns |= to_be_merged_template.endpoint_patterns

# merge dictionaries; the worker name will be replaced later
new_template.shared_extra_conf = lambda worker_name: {
**new_template.shared_extra_conf(worker_name),
**to_be_merged_template.shared_extra_conf(worker_name),
}
new_template.shared_extra_conf = lambda worker_name: merged(
existing_template.shared_extra_conf(worker_name),
to_be_merged_template.shared_extra_conf(worker_name),
)

# There is only one worker type that has a 'worker_extra_conf' and it is
# the media_repo. Since duplicate worker types on the same worker don't
Expand Down Expand Up @@ -821,9 +809,7 @@ def generate_worker_files(
healthcheck_urls.append("http://localhost:%d/health" % (worker_port,))

# Update the shared config with sharding-related options if necessary
add_worker_roles_to_shared_config(
shared_config, worker_types_set, worker_name, worker_port
)
add_worker_roles_to_shared_config(shared_config, worker_name, worker_port)

# Enable the worker in supervisord
worker_descriptors.append(worker_config)
Expand Down