@@ -126,7 +126,7 @@ def import_tasks(modules: list[str], pattern: str, fs_discover: bool) -> None:
126126 import_from_modules (modules )
127127
128128
129- async def shutdown_broker (broker : AsyncBroker ) -> None :
129+ async def shutdown_broker (broker : AsyncBroker , timeout : float ) -> None :
130130 """
131131 This function used to shutdown broker.
132132
@@ -136,11 +136,15 @@ async def shutdown_broker(broker: AsyncBroker) -> None:
136136 We need to handle such situations.
137137
138138 :param broker: current broker.
139+ :param timeout: maximum amout of time to shutdown the broker.
139140 """
141+ logger .warning ("Shutting down the broker." )
140142 try :
141- ret_val = await broker .shutdown () # type: ignore
143+ ret_val = await asyncio . wait_for ( broker .shutdown (), timeout ) # type: ignore
142144 if ret_val is not None :
143145 logger .info ("Broker returned value on shutdown: '%s'" , str (ret_val ))
146+ except asyncio .TimeoutError :
147+ logger .warning ("Cannot shutdown broker gracefully. Timed out." )
144148 except Exception as exc :
145149 logger .warning (
146150 "Exception found while terminating: %s" ,
@@ -149,7 +153,7 @@ async def shutdown_broker(broker: AsyncBroker) -> None:
149153 )
150154
151155
152- def start_listen (args : TaskiqArgs ) -> None :
156+ def start_listen (args : TaskiqArgs ) -> None : # noqa: C901, WPS213
153157 """
154158 This function starts actual listening process.
155159
@@ -172,14 +176,42 @@ def start_listen(args: TaskiqArgs) -> None:
172176 AsyncBroker .is_worker_process = True
173177 broker = import_broker (args .broker )
174178 import_tasks (args .modules , args .tasks_pattern , args .fs_discover )
175- loop = asyncio .get_event_loop ()
176179 if not isinstance (broker , AsyncBroker ):
177180 raise ValueError ("Unknown broker type. Please use AsyncBroker instance." )
181+
182+ # Here how we manage interruptions.
183+ # We have to remember shutting_down state,
184+ # because KeyboardInterrupt can be send multiple
185+ # times. And it may interrupt the broker's shutdown process.
186+ shutting_down = False
187+
188+ def interrupt_handler (_signum : int , _frame : Any ) -> None :
189+ """
190+ Signal handler.
191+
192+ This handler checks if process is already
193+ terminating and if it's true, it does nothing.
194+
195+ :param _signum: received signal number.
196+ :param _frame: current execution frame.
197+ :raises KeyboardInterrupt: if termiation hasn't begun.
198+ """
199+ nonlocal shutting_down # noqa: WPS420
200+ if shutting_down :
201+ return
202+ shutting_down = True # noqa: WPS442
203+ raise KeyboardInterrupt
204+
205+ signal .signal (signal .SIGINT , interrupt_handler )
206+
207+ loop = asyncio .get_event_loop ()
178208 try :
179209 loop .run_until_complete (async_listen_messages (broker , args ))
180- except (KeyboardInterrupt , Exception ):
181- logger .warning ("Terminating process!" )
182- loop .run_until_complete (broker .shutdown ())
210+ except KeyboardInterrupt :
211+ logger .warning ("Worker process interrupted." )
212+ except Exception as exc :
213+ logger .error ("Exception found: %s" , exc , exc_info = True )
214+ loop .run_until_complete (shutdown_broker (broker , args .shutdown_timeout ))
183215
184216
185217def watch_workers_restarts (args : TaskiqArgs ) -> None :
0 commit comments