@@ -98,7 +98,11 @@ class WorkerTemplate:
9898# Stream Writers require "client" and "replication" listeners because they
9999# have to attach by instance_map to the master process and have client endpoints.
100100WORKERS_CONFIG : Dict [str , WorkerTemplate ] = {
101- "pusher" : WorkerTemplate (),
101+ "pusher" : WorkerTemplate (
102+ shared_extra_conf = lambda worker_name : {
103+ "pusher_instances" : [worker_name ],
104+ }
105+ ),
102106 "user_dir" : WorkerTemplate (
103107 listener_resources = {"client" },
104108 endpoint_patterns = {
@@ -130,7 +134,11 @@ class WorkerTemplate:
130134 "notify_appservices_from_worker" : worker_name
131135 },
132136 ),
133- "federation_sender" : WorkerTemplate (),
137+ "federation_sender" : WorkerTemplate (
138+ shared_extra_conf = lambda worker_name : {
139+ "federation_sender_instances" : [worker_name ],
140+ }
141+ ),
134142 "synchrotron" : WorkerTemplate (
135143 listener_resources = {"client" },
136144 endpoint_patterns = {
@@ -202,6 +210,9 @@ class WorkerTemplate:
202210 ),
203211 "event_persister" : WorkerTemplate (
204212 listener_resources = {"replication" },
213+ shared_extra_conf = lambda worker_name : {
214+ "stream_writers" : {"events" : [worker_name ]}
215+ },
205216 ),
206217 "background_worker" : WorkerTemplate (
207218 # This worker cannot be sharded. Therefore, there should only ever be one
@@ -229,25 +240,40 @@ class WorkerTemplate:
229240 "^/_matrix/client/(r0|v3|unstable)/.*/tags" ,
230241 "^/_matrix/client/(r0|v3|unstable)/.*/account_data" ,
231242 },
243+ shared_extra_conf = lambda worker_name : {
244+ "stream_writers" : {"account_data" : [worker_name ]}
245+ },
232246 ),
233247 "presence" : WorkerTemplate (
234248 listener_resources = {"client" , "replication" },
235249 endpoint_patterns = {"^/_matrix/client/(api/v1|r0|v3|unstable)/presence/" },
250+ shared_extra_conf = lambda worker_name : {
251+ "stream_writers" : {"presence" : [worker_name ]}
252+ },
236253 ),
237254 "receipts" : WorkerTemplate (
238255 listener_resources = {"client" , "replication" },
239256 endpoint_patterns = {
240257 "^/_matrix/client/(r0|v3|unstable)/rooms/.*/receipt" ,
241258 "^/_matrix/client/(r0|v3|unstable)/rooms/.*/read_markers" ,
242259 },
260+ shared_extra_conf = lambda worker_name : {
261+ "stream_writers" : {"receipts" : [worker_name ]}
262+ },
243263 ),
244264 "to_device" : WorkerTemplate (
245265 listener_resources = {"client" , "replication" },
246266 endpoint_patterns = {"^/_matrix/client/(r0|v3|unstable)/sendToDevice/" },
267+ shared_extra_conf = lambda worker_name : {
268+ "stream_writers" : {"to_device" : [worker_name ]}
269+ },
247270 ),
248271 "typing" : WorkerTemplate (
249272 listener_resources = {"client" , "replication" },
250273 endpoint_patterns = {"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/typing" },
274+ shared_extra_conf = lambda worker_name : {
275+ "stream_writers" : {"typing" : [worker_name ]}
276+ },
251277 ),
252278}
253279
@@ -308,6 +334,14 @@ def merge_into(dest: Any, new: Any) -> None:
308334 raise ValueError (f"Cannot merge primitive values: { dest !r} != { new !r} " )
309335
310336
337+ def merged (a : Dict [str , Any ], b : Dict [str , Any ]) -> Dict [str , Any ]:
338+ """
339+ Merges `b` into `a` and returns `a`.
340+ """
341+ merge_into (a , b )
342+ return a
343+
344+
311345def convert (src : str , dst : str , ** template_vars : object ) -> None :
312346 """Generate a file from a template
313347
@@ -338,7 +372,6 @@ def convert(src: str, dst: str, **template_vars: object) -> None:
338372
339373def add_worker_roles_to_shared_config (
340374 shared_config : dict ,
341- worker_types_set : Set [str ],
342375 worker_name : str ,
343376 worker_port : int ,
344377) -> None :
@@ -348,70 +381,25 @@ def add_worker_roles_to_shared_config(
348381 Args:
349382 shared_config: The config dict that all worker instances share (after being
350383 converted to YAML)
351- worker_types_set: The type of worker (one of those defined in WORKERS_CONFIG).
352- This list can be a single worker type or multiple.
353384 worker_name: The name of the worker instance.
354385 worker_port: The HTTP replication port that the worker instance is listening on.
355386 """
356387 # The instance_map config field marks the workers that write to various replication
357388 # streams
358389 instance_map = shared_config .setdefault ("instance_map" , {})
359390
360- # This is a list of the stream_writers that there can be only one of. Events can be
361- # sharded, and therefore doesn't belong here.
362- singular_stream_writers = [
363- "account_data" ,
364- "presence" ,
365- "receipts" ,
366- "to_device" ,
367- "typing" ,
368- ]
369-
370- # Worker-type specific sharding config. Now a single worker can fulfill multiple
371- # roles, check each.
372- if "pusher" in worker_types_set :
373- shared_config .setdefault ("pusher_instances" , []).append (worker_name )
374-
375- if "federation_sender" in worker_types_set :
376- shared_config .setdefault ("federation_sender_instances" , []).append (worker_name )
377-
378- if "event_persister" in worker_types_set :
379- # Event persisters write to the events stream, so we need to update
380- # the list of event stream writers
381- shared_config .setdefault ("stream_writers" , {}).setdefault ("events" , []).append (
382- worker_name
383- )
384-
385- # Map of stream writer instance names to host/ports combos
386- if os .environ .get ("SYNAPSE_USE_UNIX_SOCKET" , False ):
387- instance_map [worker_name ] = {
388- "path" : f"/run/worker.{ worker_port } " ,
389- }
390- else :
391- instance_map [worker_name ] = {
392- "host" : "localhost" ,
393- "port" : worker_port ,
394- }
395- # Update the list of stream writers. It's convenient that the name of the worker
396- # type is the same as the stream to write. Iterate over the whole list in case there
397- # is more than one.
398- for worker in worker_types_set :
399- if worker in singular_stream_writers :
400- shared_config .setdefault ("stream_writers" , {}).setdefault (
401- worker , []
402- ).append (worker_name )
403-
404- # Map of stream writer instance names to host/ports combos
405- # For now, all stream writers need http replication ports
406- if os .environ .get ("SYNAPSE_USE_UNIX_SOCKET" , False ):
407- instance_map [worker_name ] = {
408- "path" : f"/run/worker.{ worker_port } " ,
409- }
410- else :
411- instance_map [worker_name ] = {
412- "host" : "localhost" ,
413- "port" : worker_port ,
414- }
391+ # Add all workers to the `instance_map`
392+ # Technically only certain types of workers, such as stream writers, are needed
393+ # here but it is simpler just to be consistent.
394+ if os .environ .get ("SYNAPSE_USE_UNIX_SOCKET" , False ):
395+ instance_map [worker_name ] = {
396+ "path" : f"/run/worker.{ worker_port } " ,
397+ }
398+ else :
399+ instance_map [worker_name ] = {
400+ "host" : "localhost" ,
401+ "port" : worker_port ,
402+ }
415403
416404
417405def merge_worker_template_configs (
@@ -438,10 +426,10 @@ def merge_worker_template_configs(
438426 new_template .endpoint_patterns |= to_be_merged_template .endpoint_patterns
439427
440428 # merge dictionaries; the worker name will be replaced later
441- new_template .shared_extra_conf = lambda worker_name : {
442- ** new_template .shared_extra_conf (worker_name ),
443- ** to_be_merged_template .shared_extra_conf (worker_name ),
444- }
429+ new_template .shared_extra_conf = lambda worker_name : merged (
430+ existing_template .shared_extra_conf (worker_name ),
431+ to_be_merged_template .shared_extra_conf (worker_name ),
432+ )
445433
446434 # There is only one worker type that has a 'worker_extra_conf' and it is
447435 # the media_repo. Since duplicate worker types on the same worker don't
@@ -821,9 +809,7 @@ def generate_worker_files(
821809 healthcheck_urls .append ("http://localhost:%d/health" % (worker_port ,))
822810
823811 # Update the shared config with sharding-related options if necessary
824- add_worker_roles_to_shared_config (
825- shared_config , worker_types_set , worker_name , worker_port
826- )
812+ add_worker_roles_to_shared_config (shared_config , worker_name , worker_port )
827813
828814 # Enable the worker in supervisord
829815 worker_descriptors .append (worker_config )
0 commit comments