diff --git a/changelog.d/16803.misc b/changelog.d/16803.misc new file mode 100644 index 00000000000..40db4e232ab --- /dev/null +++ b/changelog.d/16803.misc @@ -0,0 +1 @@ +Refactor the `configure_workers_and_start.py` script used internally by Complement. \ No newline at end of file diff --git a/docker/conf-workers/synapse.supervisord.conf.j2 b/docker/conf-workers/synapse.supervisord.conf.j2 index 481eb4fc92f..9f7dd819797 100644 --- a/docker/conf-workers/synapse.supervisord.conf.j2 +++ b/docker/conf-workers/synapse.supervisord.conf.j2 @@ -6,7 +6,7 @@ command=/usr/local/bin/python -m synapse.app.complement_fork_starter --config-path="{{ main_config_path }}" --config-path=/conf/workers/shared.yaml {%- for worker in workers %} - -- {{ worker.app }} + -- synapse.app.generic_worker --config-path="{{ main_config_path }}" --config-path=/conf/workers/shared.yaml --config-path=/conf/workers/{{ worker.name }}.yaml @@ -36,7 +36,7 @@ exitcodes=0 {% for worker in workers %} [program:synapse_{{ worker.name }}] -command=/usr/local/bin/prefix-log /usr/local/bin/python -m {{ worker.app }} +command=/usr/local/bin/prefix-log /usr/local/bin/python -m synapse.app.generic_worker --config-path="{{ main_config_path }}" --config-path=/conf/workers/shared.yaml --config-path=/conf/workers/{{ worker.name }}.yaml diff --git a/docker/conf-workers/worker.yaml.j2 b/docker/conf-workers/worker.yaml.j2 index 29ec74b4ea0..88e7a33c420 100644 --- a/docker/conf-workers/worker.yaml.j2 +++ b/docker/conf-workers/worker.yaml.j2 @@ -3,7 +3,7 @@ # Values will be change depending on whichever workers are selected when # running that image. -worker_app: "{{ app }}" +worker_app: "synapse.app.generic_worker" worker_name: "{{ name }}" worker_listeners: diff --git a/docker/configure_workers_and_start.py b/docker/configure_workers_and_start.py index 62952e6b263..80f0a2e542d 100755 --- a/docker/configure_workers_and_start.py +++ b/docker/configure_workers_and_start.py @@ -47,16 +47,21 @@ # in the project's README), this script may be run multiple times, and functionality should # continue to work if so. +import dataclasses import os import platform import re import subprocess import sys +from argparse import ArgumentParser from collections import defaultdict +from copy import deepcopy +from dataclasses import dataclass, field from itertools import chain from pathlib import Path from typing import ( Any, + Callable, Dict, List, Mapping, @@ -78,9 +83,32 @@ MAIN_PROCESS_UNIX_SOCKET_PUBLIC_PATH = "/run/main_public.sock" MAIN_PROCESS_UNIX_SOCKET_PRIVATE_PATH = "/run/main_private.sock" -# A simple name used as a placeholder in the WORKERS_CONFIG below. This will be replaced -# during processing with the name of the worker. -WORKER_PLACEHOLDER_NAME = "placeholder_name" +# We place a file at this path to indicate that the script has already been +# run and should not be run again. +MARKER_FILE_PATH = "/conf/workers_have_been_configured" + + +@dataclass +class WorkerTemplate: + """ + A definition of individual settings for a specific worker type. + A worker name can be fed into the template in order to generate a config. + + These worker templates can be merged with `merge_worker_template_configs` + in order for a single worker to be made from multiple templates. + """ + + listener_resources: Set[str] = field(default_factory=set) + endpoint_patterns: Set[str] = field(default_factory=set) + # (worker_name) -> {config} + shared_extra_conf: Callable[[str], Dict[str, Any]] = lambda _worker_name: {} + worker_extra_conf: str = "" + + stream_writers: Set[str] = field(default_factory=set) + + # True if and only if multiple of this worker type are allowed. + sharding_allowed: bool = True + # Workers with exposed endpoints needs either "client", "federation", or "media" listener_resources # Watching /_matrix/client needs a "client" listener @@ -88,75 +116,60 @@ # Watching /_matrix/media and related needs a "media" listener # Stream Writers require "client" and "replication" listeners because they # have to attach by instance_map to the master process and have client endpoints. -WORKERS_CONFIG: Dict[str, Dict[str, Any]] = { - "pusher": { - "app": "synapse.app.generic_worker", - "listener_resources": [], - "endpoint_patterns": [], - "shared_extra_conf": {}, - "worker_extra_conf": "", - }, - "user_dir": { - "app": "synapse.app.generic_worker", - "listener_resources": ["client"], - "endpoint_patterns": [ +WORKERS_CONFIG: Dict[str, WorkerTemplate] = { + "pusher": WorkerTemplate( + shared_extra_conf=lambda worker_name: { + "pusher_instances": [worker_name], + } + ), + "user_dir": WorkerTemplate( + listener_resources={"client"}, + endpoint_patterns={ "^/_matrix/client/(api/v1|r0|v3|unstable)/user_directory/search$" - ], - "shared_extra_conf": { - "update_user_directory_from_worker": WORKER_PLACEHOLDER_NAME }, - "worker_extra_conf": "", - }, - "media_repository": { - "app": "synapse.app.generic_worker", - "listener_resources": ["media"], - "endpoint_patterns": [ + shared_extra_conf=lambda worker_name: { + "update_user_directory_from_worker": worker_name + }, + ), + "media_repository": WorkerTemplate( + listener_resources={"media"}, + endpoint_patterns={ "^/_matrix/media/", "^/_synapse/admin/v1/purge_media_cache$", "^/_synapse/admin/v1/room/.*/media.*$", "^/_synapse/admin/v1/user/.*/media.*$", "^/_synapse/admin/v1/media/.*$", "^/_synapse/admin/v1/quarantine_media/.*$", - ], + }, # The first configured media worker will run the media background jobs - "shared_extra_conf": { + shared_extra_conf=lambda worker_name: { "enable_media_repo": False, - "media_instance_running_background_jobs": WORKER_PLACEHOLDER_NAME, + "media_instance_running_background_jobs": worker_name, }, - "worker_extra_conf": "enable_media_repo: true", - }, - "appservice": { - "app": "synapse.app.generic_worker", - "listener_resources": [], - "endpoint_patterns": [], - "shared_extra_conf": { - "notify_appservices_from_worker": WORKER_PLACEHOLDER_NAME + worker_extra_conf="enable_media_repo: true", + ), + "appservice": WorkerTemplate( + shared_extra_conf=lambda worker_name: { + "notify_appservices_from_worker": worker_name }, - "worker_extra_conf": "", - }, - "federation_sender": { - "app": "synapse.app.generic_worker", - "listener_resources": [], - "endpoint_patterns": [], - "shared_extra_conf": {}, - "worker_extra_conf": "", - }, - "synchrotron": { - "app": "synapse.app.generic_worker", - "listener_resources": ["client"], - "endpoint_patterns": [ + ), + "federation_sender": WorkerTemplate( + shared_extra_conf=lambda worker_name: { + "federation_sender_instances": [worker_name], + } + ), + "synchrotron": WorkerTemplate( + listener_resources={"client"}, + endpoint_patterns={ "^/_matrix/client/(v2_alpha|r0|v3)/sync$", "^/_matrix/client/(api/v1|v2_alpha|r0|v3)/events$", "^/_matrix/client/(api/v1|r0|v3)/initialSync$", "^/_matrix/client/(api/v1|r0|v3)/rooms/[^/]+/initialSync$", - ], - "shared_extra_conf": {}, - "worker_extra_conf": "", - }, - "client_reader": { - "app": "synapse.app.generic_worker", - "listener_resources": ["client"], - "endpoint_patterns": [ + }, + ), + "client_reader": WorkerTemplate( + listener_resources={"client"}, + endpoint_patterns={ "^/_matrix/client/(api/v1|r0|v3|unstable)/publicRooms$", "^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/joined_members$", "^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/context/.*$", @@ -184,14 +197,11 @@ "^/_matrix/client/(api/v1|r0|v3|unstable)/directory/room/.*$", "^/_matrix/client/(r0|v3|unstable)/capabilities$", "^/_matrix/client/(r0|v3|unstable)/notifications$", - ], - "shared_extra_conf": {}, - "worker_extra_conf": "", - }, - "federation_reader": { - "app": "synapse.app.generic_worker", - "listener_resources": ["federation"], - "endpoint_patterns": [ + }, + ), + "federation_reader": WorkerTemplate( + listener_resources={"federation"}, + endpoint_patterns={ "^/_matrix/federation/(v1|v2)/event/", "^/_matrix/federation/(v1|v2)/state/", "^/_matrix/federation/(v1|v2)/state_ids/", @@ -211,97 +221,73 @@ "^/_matrix/federation/(v1|v2)/user/devices/", "^/_matrix/federation/(v1|v2)/get_groups_publicised$", "^/_matrix/key/v2/query", - ], - "shared_extra_conf": {}, - "worker_extra_conf": "", - }, - "federation_inbound": { - "app": "synapse.app.generic_worker", - "listener_resources": ["federation"], - "endpoint_patterns": ["/_matrix/federation/(v1|v2)/send/"], - "shared_extra_conf": {}, - "worker_extra_conf": "", - }, - "event_persister": { - "app": "synapse.app.generic_worker", - "listener_resources": ["replication"], - "endpoint_patterns": [], - "shared_extra_conf": {}, - "worker_extra_conf": "", - }, - "background_worker": { - "app": "synapse.app.generic_worker", - "listener_resources": [], - "endpoint_patterns": [], + }, + ), + "federation_inbound": WorkerTemplate( + listener_resources={"federation"}, + endpoint_patterns={"/_matrix/federation/(v1|v2)/send/"}, + ), + "event_persister": WorkerTemplate( + listener_resources={"replication"}, + stream_writers={"events"}, + ), + "background_worker": WorkerTemplate( # This worker cannot be sharded. Therefore, there should only ever be one # background worker. This is enforced for the safety of your database. - "shared_extra_conf": {"run_background_tasks_on": WORKER_PLACEHOLDER_NAME}, - "worker_extra_conf": "", - }, - "event_creator": { - "app": "synapse.app.generic_worker", - "listener_resources": ["client"], - "endpoint_patterns": [ + shared_extra_conf=lambda worker_name: {"run_background_tasks_on": worker_name}, + sharding_allowed=False, + ), + "event_creator": WorkerTemplate( + listener_resources={"client"}, + endpoint_patterns={ "^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/redact", "^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/send", "^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/(join|invite|leave|ban|unban|kick)$", "^/_matrix/client/(api/v1|r0|v3|unstable)/join/", "^/_matrix/client/(api/v1|r0|v3|unstable)/knock/", "^/_matrix/client/(api/v1|r0|v3|unstable)/profile/", - ], - "shared_extra_conf": {}, - "worker_extra_conf": "", - }, - "frontend_proxy": { - "app": "synapse.app.generic_worker", - "listener_resources": ["client", "replication"], - "endpoint_patterns": ["^/_matrix/client/(api/v1|r0|v3|unstable)/keys/upload"], - "shared_extra_conf": {}, - "worker_extra_conf": "", - }, - "account_data": { - "app": "synapse.app.generic_worker", - "listener_resources": ["client", "replication"], - "endpoint_patterns": [ + }, + ), + "frontend_proxy": WorkerTemplate( + listener_resources={"client", "replication"}, + endpoint_patterns={"^/_matrix/client/(api/v1|r0|v3|unstable)/keys/upload"}, + ), + "account_data": WorkerTemplate( + listener_resources={"client", "replication"}, + endpoint_patterns={ "^/_matrix/client/(r0|v3|unstable)/.*/tags", "^/_matrix/client/(r0|v3|unstable)/.*/account_data", - ], - "shared_extra_conf": {}, - "worker_extra_conf": "", - }, - "presence": { - "app": "synapse.app.generic_worker", - "listener_resources": ["client", "replication"], - "endpoint_patterns": ["^/_matrix/client/(api/v1|r0|v3|unstable)/presence/"], - "shared_extra_conf": {}, - "worker_extra_conf": "", - }, - "receipts": { - "app": "synapse.app.generic_worker", - "listener_resources": ["client", "replication"], - "endpoint_patterns": [ + }, + stream_writers={"account_data"}, + sharding_allowed=False, + ), + "presence": WorkerTemplate( + listener_resources={"client", "replication"}, + endpoint_patterns={"^/_matrix/client/(api/v1|r0|v3|unstable)/presence/"}, + stream_writers={"presence"}, + sharding_allowed=False, + ), + "receipts": WorkerTemplate( + listener_resources={"client", "replication"}, + endpoint_patterns={ "^/_matrix/client/(r0|v3|unstable)/rooms/.*/receipt", "^/_matrix/client/(r0|v3|unstable)/rooms/.*/read_markers", - ], - "shared_extra_conf": {}, - "worker_extra_conf": "", - }, - "to_device": { - "app": "synapse.app.generic_worker", - "listener_resources": ["client", "replication"], - "endpoint_patterns": ["^/_matrix/client/(r0|v3|unstable)/sendToDevice/"], - "shared_extra_conf": {}, - "worker_extra_conf": "", - }, - "typing": { - "app": "synapse.app.generic_worker", - "listener_resources": ["client", "replication"], - "endpoint_patterns": [ - "^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/typing" - ], - "shared_extra_conf": {}, - "worker_extra_conf": "", - }, + }, + stream_writers={"receipts"}, + sharding_allowed=False, + ), + "to_device": WorkerTemplate( + listener_resources={"client", "replication"}, + endpoint_patterns={"^/_matrix/client/(r0|v3|unstable)/sendToDevice/"}, + stream_writers={"to_device"}, + sharding_allowed=False, + ), + "typing": WorkerTemplate( + listener_resources={"client", "replication"}, + endpoint_patterns={"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/typing"}, + stream_writers={"typing"}, + sharding_allowed=False, + ), } # Templates for sections that may be inserted multiple times in config files @@ -336,6 +322,45 @@ def flush_buffers() -> None: sys.stderr.flush() +def merged(a: Any, b: Any) -> Any: + """ + Merges `a` and `b` together, returning the result. + + The merge is performed with the following rules: + + - dicts: values with the same key will be merged recursively + - lists: `new` will be appended to `dest` + - primitives: they will be checked for equality and inequality will result + in a ValueError + + + It is an error for `a` and `b` to be of different types. + """ + if isinstance(a, dict) and isinstance(b, dict): + result = {} + for key in set(a.keys()) | set(b.keys()): + if key in a and key in b: + result[key] = merged(a[key], b[key]) + elif key in a: + result[key] = deepcopy(a[key]) + else: + result[key] = deepcopy(b[key]) + + return result + elif isinstance(a, list) and isinstance(b, list): + return deepcopy(a) + deepcopy(b) + elif type(a) != type(b): + raise TypeError(f"Cannot merge {type(a).__name__} and {type(b).__name__}") + elif a != b: + raise ValueError(f"Cannot merge primitive values: {a!r} != {b!r}") + + if type(a) not in {str, int, float, bool, None.__class__}: + raise TypeError( + f"Cannot use `merged` on type {a} as it may not be safe (must either be an immutable primitive or must have special copy/merge logic)" + ) + return a + + def convert(src: str, dst: str, **template_vars: object) -> None: """Generate a file from a template @@ -364,138 +389,84 @@ def convert(src: str, dst: str, **template_vars: object) -> None: outfile.write(rendered) -def add_worker_roles_to_shared_config( +def add_worker_to_instance_map( shared_config: dict, - worker_types_set: Set[str], worker_name: str, worker_port: int, ) -> None: - """Given a dictionary representing a config file shared across all workers, - append appropriate worker information to it for the current worker_type instance. + """ + Update the shared config map to add the worker in the instance_map. Args: shared_config: The config dict that all worker instances share (after being converted to YAML) - worker_types_set: The type of worker (one of those defined in WORKERS_CONFIG). - This list can be a single worker type or multiple. worker_name: The name of the worker instance. worker_port: The HTTP replication port that the worker instance is listening on. """ - # The instance_map config field marks the workers that write to various replication - # streams instance_map = shared_config.setdefault("instance_map", {}) - # This is a list of the stream_writers that there can be only one of. Events can be - # sharded, and therefore doesn't belong here. - singular_stream_writers = [ - "account_data", - "presence", - "receipts", - "to_device", - "typing", - ] - - # Worker-type specific sharding config. Now a single worker can fulfill multiple - # roles, check each. - if "pusher" in worker_types_set: - shared_config.setdefault("pusher_instances", []).append(worker_name) - - if "federation_sender" in worker_types_set: - shared_config.setdefault("federation_sender_instances", []).append(worker_name) - - if "event_persister" in worker_types_set: - # Event persisters write to the events stream, so we need to update - # the list of event stream writers - shared_config.setdefault("stream_writers", {}).setdefault("events", []).append( - worker_name - ) - - # Map of stream writer instance names to host/ports combos - if os.environ.get("SYNAPSE_USE_UNIX_SOCKET", False): - instance_map[worker_name] = { - "path": f"/run/worker.{worker_port}", - } - else: - instance_map[worker_name] = { - "host": "localhost", - "port": worker_port, - } - # Update the list of stream writers. It's convenient that the name of the worker - # type is the same as the stream to write. Iterate over the whole list in case there - # is more than one. - for worker in worker_types_set: - if worker in singular_stream_writers: - shared_config.setdefault("stream_writers", {}).setdefault( - worker, [] - ).append(worker_name) - - # Map of stream writer instance names to host/ports combos - # For now, all stream writers need http replication ports - if os.environ.get("SYNAPSE_USE_UNIX_SOCKET", False): - instance_map[worker_name] = { - "path": f"/run/worker.{worker_port}", - } - else: - instance_map[worker_name] = { - "host": "localhost", - "port": worker_port, - } + if os.environ.get("SYNAPSE_USE_UNIX_SOCKET", False): + instance_map[worker_name] = { + "path": f"/run/worker.{worker_port}", + } + else: + instance_map[worker_name] = { + "host": "localhost", + "port": worker_port, + } def merge_worker_template_configs( - existing_dict: Optional[Dict[str, Any]], - to_be_merged_dict: Dict[str, Any], -) -> Dict[str, Any]: - """When given an existing dict of worker template configuration consisting with both - dicts and lists, merge new template data from WORKERS_CONFIG(or create) and - return new dict. + left: WorkerTemplate, + right: WorkerTemplate, +) -> WorkerTemplate: + """Merges two templates together, returning a new template that includes + the listeners, endpoint patterns and configuration from both. - Args: - existing_dict: Either an existing worker template or a fresh blank one. - to_be_merged_dict: The template from WORKERS_CONFIGS to be merged into - existing_dict. - Returns: The newly merged together dict values. + Does not mutate the input templates. """ - new_dict: Dict[str, Any] = {} - if not existing_dict: - # It doesn't exist yet, just use the new dict(but take a copy not a reference) - new_dict = to_be_merged_dict.copy() - else: - for i in to_be_merged_dict.keys(): - if (i == "endpoint_patterns") or (i == "listener_resources"): - # merge the two lists, remove duplicates - new_dict[i] = list(set(existing_dict[i] + to_be_merged_dict[i])) - elif i == "shared_extra_conf": - # merge dictionary's, the worker name will be replaced later - new_dict[i] = {**existing_dict[i], **to_be_merged_dict[i]} - elif i == "worker_extra_conf": - # There is only one worker type that has a 'worker_extra_conf' and it is - # the media_repo. Since duplicate worker types on the same worker don't - # work, this is fine. - new_dict[i] = existing_dict[i] + to_be_merged_dict[i] - else: - # Everything else should be identical, like "app", which only works - # because all apps are now generic_workers. - new_dict[i] = to_be_merged_dict[i] - return new_dict + + return WorkerTemplate( + # include listener resources from both + listener_resources=left.listener_resources | right.listener_resources, + # include endpoint patterns from both + endpoint_patterns=left.endpoint_patterns | right.endpoint_patterns, + # merge shared config dictionaries; the worker name will be replaced later + shared_extra_conf=lambda worker_name: merged( + left.shared_extra_conf(worker_name), + right.shared_extra_conf(worker_name), + ), + # There is only one worker type that has a 'worker_extra_conf' and it is + # the media_repo. Since duplicate worker types on the same worker don't + # work, this is fine. + worker_extra_conf=(left.worker_extra_conf + right.worker_extra_conf), + # (This is unused, but in principle sharding this hybrid worker type + # would be allowed if both constituent types are shardable) + sharding_allowed=left.sharding_allowed and right.sharding_allowed, + # include stream writers from both + stream_writers=left.stream_writers | right.stream_writers, + ) -def insert_worker_name_for_worker_config( - existing_dict: Dict[str, Any], worker_name: str +def instantiate_worker_template( + template: WorkerTemplate, worker_name: str ) -> Dict[str, Any]: - """Insert a given worker name into the worker's configuration dict. + """Given a worker template, instantiate it into a worker configuration + (which is currently represented as a dictionary). Args: - existing_dict: The worker_config dict that is imported into shared_config. - worker_name: The name of the worker to insert. - Returns: Copy of the dict with newly inserted worker name + template: The WorkerTemplate to template + worker_name: The name of the worker to use. + Returns: worker configuration dictionary """ - dict_to_edit = existing_dict.copy() - for k, v in dict_to_edit["shared_extra_conf"].items(): - # Only proceed if it's the placeholder name string - if v == WORKER_PLACEHOLDER_NAME: - dict_to_edit["shared_extra_conf"][k] = worker_name - return dict_to_edit + worker_config_dict = dataclasses.asdict(template) + stream_writers_dict = {writer: worker_name for writer in template.stream_writers} + worker_config_dict["shared_extra_conf"] = merged( + template.shared_extra_conf(worker_name), stream_writers_dict + ) + worker_config_dict["endpoint_patterns"] = sorted(template.endpoint_patterns) + worker_config_dict["listener_resources"] = sorted(template.listener_resources) + return worker_config_dict def apply_requested_multiplier_for_worker(worker_types: List[str]) -> List[str]: @@ -540,23 +511,6 @@ def apply_requested_multiplier_for_worker(worker_types: List[str]) -> List[str]: return new_worker_types -def is_sharding_allowed_for_worker_type(worker_type: str) -> bool: - """Helper to check to make sure worker types that cannot have multiples do not. - - Args: - worker_type: The type of worker to check against. - Returns: True if allowed, False if not - """ - return worker_type not in [ - "background_worker", - "account_data", - "presence", - "receipts", - "typing", - "to_device", - ] - - def split_and_strip_string( given_string: str, split_char: str, max_split: SupportsIndex = -1 ) -> List[str]: @@ -682,7 +636,7 @@ def parse_worker_types( ) if worker_type in worker_type_shard_counter: - if not is_sharding_allowed_for_worker_type(worker_type): + if not WORKERS_CONFIG[worker_type].sharding_allowed: error( f"There can be only a single worker with {worker_type} " "type. Please recount and remove." @@ -811,36 +765,35 @@ def generate_worker_files( # Map locations to upstreams (corresponding to worker types) in Nginx # but only if we use the appropriate worker type for worker_type in all_worker_types_in_use: - for endpoint_pattern in WORKERS_CONFIG[worker_type]["endpoint_patterns"]: + for endpoint_pattern in sorted(WORKERS_CONFIG[worker_type].endpoint_patterns): nginx_locations[endpoint_pattern] = f"http://{worker_type}" # For each worker type specified by the user, create config values and write it's # yaml config file for worker_name, worker_types_set in requested_worker_types.items(): # The collected and processed data will live here. - worker_config: Dict[str, Any] = {} + worker_template: WorkerTemplate = WorkerTemplate() # Merge all worker config templates for this worker into a single config for worker_type in worker_types_set: - copy_of_template_config = WORKERS_CONFIG[worker_type].copy() - # Merge worker type template configuration data. It's a combination of lists # and dicts, so use this helper. - worker_config = merge_worker_template_configs( - worker_config, copy_of_template_config + worker_template = merge_worker_template_configs( + worker_template, WORKERS_CONFIG[worker_type] ) # Replace placeholder names in the config template with the actual worker name. - worker_config = insert_worker_name_for_worker_config(worker_config, worker_name) + worker_config: Dict[str, Any] = instantiate_worker_template( + worker_template, worker_name + ) worker_config.update( {"name": worker_name, "port": str(worker_port), "config_path": config_path} ) - # Update the shared config with any worker_type specific options. The first of a - # given worker_type needs to stay assigned and not be replaced. - worker_config["shared_extra_conf"].update(shared_config) - shared_config = worker_config["shared_extra_conf"] + # Update the shared config with any options needed to enable this worker. + shared_config = merged(shared_config, worker_config["shared_extra_conf"]) + if using_unix_sockets: healthcheck_urls.append( f"--unix-socket /run/worker.{worker_port} http://localhost/health" @@ -848,10 +801,10 @@ def generate_worker_files( else: healthcheck_urls.append("http://localhost:%d/health" % (worker_port,)) - # Update the shared config with sharding-related options if necessary - add_worker_roles_to_shared_config( - shared_config, worker_types_set, worker_name, worker_port - ) + # Add all workers to the `instance_map` + # Technically only certain types of workers, such as stream writers, are needed + # here but it is simpler just to be consistent. + add_worker_to_instance_map(shared_config, worker_name, worker_port) # Enable the worker in supervisord worker_descriptors.append(worker_config) @@ -1018,6 +971,14 @@ def generate_worker_log_config( def main(args: List[str], environ: MutableMapping[str, str]) -> None: + parser = ArgumentParser() + parser.add_argument( + "--generate-only", + action="store_true", + help="Only generate configuration; don't run Synapse.", + ) + opts = parser.parse_args(args) + config_dir = environ.get("SYNAPSE_CONFIG_DIR", "/data") config_path = environ.get("SYNAPSE_CONFIG_PATH", config_dir + "/homeserver.yaml") data_dir = environ.get("SYNAPSE_DATA_DIR", "/data") @@ -1034,8 +995,8 @@ def main(args: List[str], environ: MutableMapping[str, str]) -> None: log("Base homeserver config exists—not regenerating") # This script may be run multiple times (mostly by Complement, see note at top of # file). Don't re-configure workers in this instance. - mark_filepath = "/conf/workers_have_been_configured" - if not os.path.exists(mark_filepath): + + if not os.path.exists(MARKER_FILE_PATH): # Collect and validate worker_type requests # Read the desired worker configuration from the environment worker_types_env = environ.get("SYNAPSE_WORKER_TYPES", "").strip() @@ -1054,11 +1015,15 @@ def main(args: List[str], environ: MutableMapping[str, str]) -> None: generate_worker_files(environ, config_path, data_dir, requested_worker_types) # Mark workers as being configured - with open(mark_filepath, "w") as f: + with open(MARKER_FILE_PATH, "w") as f: f.write("") else: log("Worker config exists—not regenerating") + if opts.generate_only: + log("--generate-only: won't run Synapse") + return + # Lifted right out of start.py jemallocpath = "/usr/lib/%s-linux-gnu/libjemalloc.so.2" % (platform.machine(),) @@ -1081,4 +1046,4 @@ def main(args: List[str], environ: MutableMapping[str, str]) -> None: if __name__ == "__main__": - main(sys.argv, os.environ) + main(sys.argv[1:], os.environ)