11import asyncio
22import inspect
33import io
4- import signal
5- import sys
64from concurrent .futures import Executor , ThreadPoolExecutor
75from logging import getLogger
86from time import time
9- from typing import Any , Callable , Dict , List , NoReturn , Optional
7+ from typing import Any , Callable , Dict , List , Optional
108
119from pydantic import parse_obj_as
1210
@@ -180,81 +178,6 @@ async def run_task( # noqa: C901, WPS210, WPS211
180178 return result
181179
182180
183- def exit_process (task : "asyncio.Task[Any]" ) -> NoReturn :
184- """
185- This function exits from the current process.
186-
187- It receives asyncio Task of broker.shutdown().
188- We check if there were an exception or returned value.
189-
190- If the function raised an exception, we print it with stack trace.
191- If it returned a value, we log it.
192-
193- After this, we cancel all current tasks in the loop
194- and exits.
195-
196- :param task: broker.shutdown task.
197- """
198- exitcode = 0
199- try :
200- result = task .result ()
201- if result is not None :
202- logger .info ("Broker returned value on shutdown: '%s'" , str (result ))
203- except Exception as exc :
204- logger .warning ("Exception was found while shutting down!" )
205- logger .warning (exc , exc_info = True )
206- exitcode = 1
207-
208- loop = asyncio .get_event_loop ()
209- for running_task in asyncio .all_tasks (loop ):
210- running_task .cancel ()
211-
212- logger .info ("Worker process killed." )
213- sys .exit (exitcode )
214-
215-
216- def signal_handler (broker : AsyncBroker ) -> Callable [[int , Any ], None ]:
217- """
218- Signal handler.
219-
220- This function is used to generate
221- real signal handler using closures.
222-
223- It takes current broker as an argument
224- and returns function that shuts it down.
225-
226- :param broker: current broker.
227- :returns: signal handler function.
228- """
229-
230- def _handler (signum : int , _frame : Any ) -> None :
231- """
232- Exit signal handler.
233-
234- This signal handler
235- calls shutdown for broker and after
236- the task is done it exits process with 0 status code.
237-
238- :param signum: received signal.
239- :param _frame: current execution frame.
240- """
241- if getattr (broker , "_is_shutting_down" , False ):
242- # We're already shutting down the broker.
243- return
244-
245- # We set this flag to not call this method twice.
246- # Since we add an asynchronous task in loop
247- # It can wait for execution for some time.
248- # We want to execute shutdown only once. Otherwise
249- # it would give us Undefined Behaviour.
250- broker ._is_shutting_down = True # type: ignore # noqa: WPS437
251- logger .info (f"Got { signum } signal. Shutting down worker process." )
252- task = asyncio .create_task (broker .shutdown ())
253- task .add_done_callback (exit_process )
254-
255- return _handler
256-
257-
258181async def async_listen_messages ( # noqa: C901, WPS210, WPS213
259182 broker : AsyncBroker ,
260183 cli_args : TaskiqArgs ,
@@ -268,15 +191,6 @@ async def async_listen_messages( # noqa: C901, WPS210, WPS213
268191 :param broker: broker to listen to.
269192 :param cli_args: CLI arguments for worker.
270193 """
271- signal .signal (
272- signal .SIGTERM ,
273- signal_handler (broker ),
274- )
275- signal .signal (
276- signal .SIGINT ,
277- signal_handler (broker ),
278- )
279-
280194 logger .info ("Runing startup event." )
281195 await broker .startup ()
282196 executor = ThreadPoolExecutor (
0 commit comments