@@ -191,32 +191,50 @@ def exit_process(task: asyncio.Task[Any]) -> NoReturn:
191
191
for running_task in asyncio .all_tasks (loop ):
192
192
running_task .cancel ()
193
193
194
- logger .info ("Killing worker process." )
194
+ logger .info ("Worker process killed ." )
195
195
sys .exit (exitcode )
196
196
197
197
198
- def signal_handler (broker : AsyncBroker ) -> None :
198
+ def signal_handlera (broker : AsyncBroker ) -> Callable [[ int , Any ], None ] :
199
199
"""
200
- Exit signal handler.
200
+ Signal handler.
201
201
202
- This signal handler
203
- calls _close_broker and after
204
- the task is done it exits.
202
+ This function is used to generate
203
+ real signal handler using closures.
204
+
205
+ It takes current broker as an argument
206
+ and returns function that shuts it down.
205
207
206
208
:param broker: current broker.
209
+ :returns: signal handler function.
207
210
"""
208
- if getattr (broker , "_is_shutting_down" , False ):
209
- # We're already shutting down the broker.
210
- return
211
211
212
- # We set this flag to not call this method twice.
213
- # Since we add an asynchronous task in loop
214
- # It can wait for execution for some time.
215
- # We want to execute shutdown only once. Otherwise
216
- # it would give us Undefined Behaviour.
217
- broker ._is_shutting_down = True # type: ignore # noqa: WPS437
218
- task = asyncio .create_task (broker .shutdown ())
219
- task .add_done_callback (exit_process )
212
+ def _handler (signum : int , _frame : Any ) -> None :
213
+ """
214
+ Exit signal handler.
215
+
216
+ This signal handler
217
+ calls shutdown for broker and after
218
+ the task is done it exits process with 0 status code.
219
+
220
+ :param signum: received signal.
221
+ :param _frame: current execution frame.
222
+ """
223
+ if getattr (broker , "_is_shutting_down" , False ):
224
+ # We're already shutting down the broker.
225
+ return
226
+
227
+ # We set this flag to not call this method twice.
228
+ # Since we add an asynchronous task in loop
229
+ # It can wait for execution for some time.
230
+ # We want to execute shutdown only once. Otherwise
231
+ # it would give us Undefined Behaviour.
232
+ broker ._is_shutting_down = True # type: ignore # noqa: WPS437
233
+ logger .info (f"Got { signum } signal. Shutting down worker process." )
234
+ task = asyncio .create_task (broker .shutdown ())
235
+ task .add_done_callback (exit_process )
236
+
237
+ return _handler
220
238
221
239
222
240
async def async_listen_messages ( # noqa: C901, WPS210, WPS213
@@ -232,16 +250,13 @@ async def async_listen_messages( # noqa: C901, WPS210, WPS213
232
250
:param broker: broker to listen to.
233
251
:param cli_args: CLI arguments for worker.
234
252
"""
235
- loop = asyncio .get_event_loop ()
236
- loop .add_signal_handler (
253
+ signal .signal (
237
254
signal .SIGTERM ,
238
- signal_handler ,
239
- broker ,
255
+ signal_handlera (broker ),
240
256
)
241
- loop . add_signal_handler (
257
+ signal . signal (
242
258
signal .SIGINT ,
243
- signal_handler ,
244
- broker ,
259
+ signal_handlera (broker ),
245
260
)
246
261
247
262
logger .info ("Runing startup event." )
0 commit comments