@@ -90,6 +90,9 @@ class WorkerTemplate:
9090 shared_extra_conf : Callable [[str ], Dict [str , Any ]] = lambda _worker_name : {}
9191 worker_extra_conf : str = ""
9292
93+ # True if and only if multiple of this worker type are allowed.
94+ sharding_allowed : bool = True
95+
9396
9497# Workers with exposed endpoints needs either "client", "federation", or "media" listener_resources
9598# Watching /_matrix/client needs a "client" listener
@@ -218,6 +221,7 @@ class WorkerTemplate:
218221 # This worker cannot be sharded. Therefore, there should only ever be one
219222 # background worker. This is enforced for the safety of your database.
220223 shared_extra_conf = lambda worker_name : {"run_background_tasks_on" : worker_name },
224+ sharding_allowed = False ,
221225 ),
222226 "event_creator" : WorkerTemplate (
223227 listener_resources = {"client" },
@@ -243,13 +247,15 @@ class WorkerTemplate:
243247 shared_extra_conf = lambda worker_name : {
244248 "stream_writers" : {"account_data" : [worker_name ]}
245249 },
250+ sharding_allowed = False ,
246251 ),
247252 "presence" : WorkerTemplate (
248253 listener_resources = {"client" , "replication" },
249254 endpoint_patterns = {"^/_matrix/client/(api/v1|r0|v3|unstable)/presence/" },
250255 shared_extra_conf = lambda worker_name : {
251256 "stream_writers" : {"presence" : [worker_name ]}
252257 },
258+ sharding_allowed = False ,
253259 ),
254260 "receipts" : WorkerTemplate (
255261 listener_resources = {"client" , "replication" },
@@ -260,20 +266,23 @@ class WorkerTemplate:
260266 shared_extra_conf = lambda worker_name : {
261267 "stream_writers" : {"receipts" : [worker_name ]}
262268 },
269+ sharding_allowed = False ,
263270 ),
264271 "to_device" : WorkerTemplate (
265272 listener_resources = {"client" , "replication" },
266273 endpoint_patterns = {"^/_matrix/client/(r0|v3|unstable)/sendToDevice/" },
267274 shared_extra_conf = lambda worker_name : {
268275 "stream_writers" : {"to_device" : [worker_name ]}
269276 },
277+ sharding_allowed = False ,
270278 ),
271279 "typing" : WorkerTemplate (
272280 listener_resources = {"client" , "replication" },
273281 endpoint_patterns = {"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/typing" },
274282 shared_extra_conf = lambda worker_name : {
275283 "stream_writers" : {"typing" : [worker_name ]}
276284 },
285+ sharding_allowed = False ,
277286 ),
278287}
279288
@@ -495,23 +504,6 @@ def apply_requested_multiplier_for_worker(worker_types: List[str]) -> List[str]:
495504 return new_worker_types
496505
497506
498- def is_sharding_allowed_for_worker_type (worker_type : str ) -> bool :
499- """Helper to check to make sure worker types that cannot have multiples do not.
500-
501- Args:
502- worker_type: The type of worker to check against.
503- Returns: True if allowed, False if not
504- """
505- return worker_type not in [
506- "background_worker" ,
507- "account_data" ,
508- "presence" ,
509- "receipts" ,
510- "typing" ,
511- "to_device" ,
512- ]
513-
514-
515507def split_and_strip_string (
516508 given_string : str , split_char : str , max_split : SupportsIndex = - 1
517509) -> List [str ]:
@@ -637,7 +629,7 @@ def parse_worker_types(
637629 )
638630
639631 if worker_type in worker_type_shard_counter :
640- if not is_sharding_allowed_for_worker_type ( worker_type ) :
632+ if not WORKERS_CONFIG [ worker_type ]. sharding_allowed :
641633 error (
642634 f"There can be only a single worker with { worker_type } "
643635 "type. Please recount and remove."
0 commit comments