@@ -214,6 +214,12 @@ class Conductor(ConductorT, Service):
214214 #: to call here.
215215 _tp_to_callback : MutableMapping [TP , ConsumerCallback ]
216216
217+ #: Lock used to synchronize access to _tp_to_callback.
218+ #: Resubscriptions and updates to the indices may modify the mapping, and
219+ #: while that is happening, the mapping should not be accessed by message
220+ #: handlers.
221+ _tp_to_callback_lock : asyncio .Lock
222+
217223 #: Whenever a change is made, i.e. a Topic is added/removed, we notify
218224 #: the background task responsible for resubscribing.
219225 _subscription_changed : Optional [asyncio .Event ]
@@ -224,8 +230,21 @@ class Conductor(ConductorT, Service):
224230
225231 _compiler : ConductorCompiler
226232
227- #: We wait for 45 seconds after a resubscription request, to make
228- #: sure any later requests are handled at the same time.
233+ # `_resubscribe_sleep_lock_seconds` trades off between the latency of
234+ # receiving messages for newly added topics and the cost of resubscribing
235+ # to topics. Note that this resubscription flow only occurs when the topic
236+ # list has changed (see the `_subscription_changed` event). This mechanism
237+ # attempts to coalesce topic list changes that happen in quick succession
238+ # and prevents the framework from constantly resubscribing to topics after
239+ # every change.
240+ #
241+ # If the value is set too low and an agent is adding topics very
242+ # frequently, then resubscription will happen very often and will issue
243+ # unnecessary work on the async loop.
244+ # If the value is set too high, it will take a long time for a newly added
245+ # agent to start receiving messages; this time is bounded by the value of
246+ # `_resubscribe_sleep_lock_seconds`, barring something hogging the async
247+ # loop.
229248 _resubscribe_sleep_lock_seconds : float = 45.0
230249
231250 def __init__ (self , app : AppT , ** kwargs : Any ) -> None :
@@ -235,6 +254,7 @@ def __init__(self, app: AppT, **kwargs: Any) -> None:
235254 self ._topic_name_index = defaultdict (set )
236255 self ._tp_index = defaultdict (set )
237256 self ._tp_to_callback = {}
257+ self ._tp_to_callback_lock = asyncio .Lock ()
238258 self ._acking_topics = set ()
239259 self ._subscription_changed = None
240260 self ._subscription_done = None
@@ -266,12 +286,18 @@ def _compile_message_handler(self) -> ConsumerCallback:
266286
267287 async def on_message (message : Message ) -> None :
268288 tp = TP (topic = message .topic , partition = 0 )
269- return await get_callback_for_tp (tp )(message )
289+ async with self ._tp_to_callback_lock :
290+ callback = get_callback_for_tp (tp )
291+
292+ return await callback (message )
270293
271294 else :
272295
273296 async def on_message (message : Message ) -> None :
274- return await get_callback_for_tp (message .tp )(message )
297+ async with self ._tp_to_callback_lock :
298+ callback = get_callback_for_tp (message .tp )
299+
300+ return await callback (message )
275301
276302 return on_message
277303
@@ -309,11 +335,14 @@ async def _subscriber(self) -> None: # pragma: no cover
309335 # further subscription requests will happen during the same
310336 # rebalance.
311337 await self .sleep (self ._resubscribe_sleep_lock_seconds )
338+
339+ # Clear the event before updating indices. This way, new events
340+ # that get triggered during the update will be handled the next
341+ # time around.
342+ ev .clear ()
312343 subscribed_topics = await self ._update_indices ()
313344 await self .app .consumer .subscribe (subscribed_topics )
314345
315- # clear the subscription_changed flag, so we can wait on it again.
316- ev .clear ()
317346 # wake-up anything waiting for the subscription to be done.
318347 notify (self ._subscription_done )
319348
@@ -328,15 +357,23 @@ async def maybe_wait_for_subscriptions(self) -> None:
328357 await self ._subscription_done
329358
330359 async def _update_indices (self ) -> Iterable [str ]:
331- self ._topic_name_index .clear ()
332- self ._tp_to_callback .clear ()
333- for channel in self ._topics :
334- if channel .internal :
335- await channel .maybe_declare ()
336- for topic in channel .topics :
337- if channel .acks :
338- self ._acking_topics .add (topic )
339- self ._topic_name_index [topic ].add (channel )
360+ async with self ._tp_to_callback_lock :
361+ self ._topic_name_index .clear ()
362+ self ._tp_to_callback .clear ()
363+
364+ # Make a (shallow) copy of the topics, so new additions to the set
365+ # won't poison the iterator. Additions can come in while this
366+ # function yields during an await.
367+ topics = list (self ._topics )
368+ for channel in topics :
369+ if channel .internal :
370+ await channel .maybe_declare ()
371+ for topic in channel .topics :
372+ if channel .acks :
373+ self ._acking_topics .add (topic )
374+ self ._topic_name_index [topic ].add (channel )
375+
376+ self ._update_callback_map ()
340377
341378 return self ._topic_name_index
342379
@@ -418,6 +455,7 @@ def _topic_contain_unsubscribed_topics(self, topic: TopicT) -> bool:
418455 def discard (self , topic : Any ) -> None :
419456 """Unregister topic from conductor."""
420457 self ._topics .discard (topic )
458+ self ._flag_changes ()
421459
422460 def _flag_changes (self ) -> None :
423461 if self ._subscription_changed is not None :
0 commit comments