File tree Expand file tree Collapse file tree 1 file changed +4
-3
lines changed
Expand file tree Collapse file tree 1 file changed +4
-3
lines changed Original file line number Diff line number Diff line change @@ -55,8 +55,6 @@ def start_daemonthreads_for_strategy(self, index_strategy):
5555 stop_event = self .stop_event ,
5656 daemonthread_context = self .daemonthread_context ,
5757 )
58- # spin up daemonthreads, ready for messages
59- self ._daemonthreads .extend (_daemon .start ())
6058 _consumer = KombuMessageConsumer (
6159 kombu_connection = self .kombu_connection .clone (),
6260 stop_event = self .stop_event ,
@@ -65,7 +63,10 @@ def start_daemonthreads_for_strategy(self, index_strategy):
6563 )
6664 # give the daemon a more robust callback for ack-ing
6765 _daemon .ack_callback = _consumer .ensure_ack
68- # assign a thread for the consumer to receive and enqueue messages to this daemon
66+ # spin up daemonthreads, ready for messages
67+ self ._daemonthreads .extend (_daemon .start ())
68+ # start a thread to consume messages from this strategy's queues
69+ # (and enqueue them for daemonthreads)
6970 threading .Thread (target = _consumer .run ).start ()
7071 return _daemon
7172
You can’t perform that action at this time.
0 commit comments