Skip to content

Commit 89e8d64

Browse files
committed
Fixed worker process for windows.
Signed-off-by: Pavel Kirilin <[email protected]>
1 parent 8eb7850 commit 89e8d64

File tree

1 file changed

+39
-24
lines changed

1 file changed

+39
-24
lines changed

taskiq/cli/async_task_runner.py

Lines changed: 39 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -191,32 +191,50 @@ def exit_process(task: asyncio.Task[Any]) -> NoReturn:
191191
for running_task in asyncio.all_tasks(loop):
192192
running_task.cancel()
193193

194-
logger.info("Killing worker process.")
194+
logger.info("Worker process killed.")
195195
sys.exit(exitcode)
196196

197197

198-
def signal_handler(broker: AsyncBroker) -> None:
198+
def signal_handlera(broker: AsyncBroker) -> Callable[[int, Any], None]:
199199
"""
200-
Exit signal handler.
200+
Signal handler.
201201
202-
This signal handler
203-
calls _close_broker and after
204-
the task is done it exits.
202+
This function is used to generate
203+
real signal handler using closures.
204+
205+
It takes current broker as an argument
206+
and returns function that shuts it down.
205207
206208
:param broker: current broker.
209+
:returns: signal handler function.
207210
"""
208-
if getattr(broker, "_is_shutting_down", False):
209-
# We're already shutting down the broker.
210-
return
211211

212-
# We set this flag to not call this method twice.
213-
# Since we add an asynchronous task in loop
214-
# It can wait for execution for some time.
215-
# We want to execute shutdown only once. Otherwise
216-
# it would give us Undefined Behaviour.
217-
broker._is_shutting_down = True # type: ignore # noqa: WPS437
218-
task = asyncio.create_task(broker.shutdown())
219-
task.add_done_callback(exit_process)
212+
def _handler(signum: int, _frame: Any) -> None:
213+
"""
214+
Exit signal handler.
215+
216+
This signal handler
217+
calls shutdown for broker and after
218+
the task is done it exits process with 0 status code.
219+
220+
:param signum: received signal.
221+
:param _frame: current execution frame.
222+
"""
223+
if getattr(broker, "_is_shutting_down", False):
224+
# We're already shutting down the broker.
225+
return
226+
227+
# We set this flag to not call this method twice.
228+
# Since we add an asynchronous task in loop
229+
# It can wait for execution for some time.
230+
# We want to execute shutdown only once. Otherwise
231+
# it would give us Undefined Behaviour.
232+
broker._is_shutting_down = True # type: ignore # noqa: WPS437
233+
logger.info(f"Got {signum} signal. Shutting down worker process.")
234+
task = asyncio.create_task(broker.shutdown())
235+
task.add_done_callback(exit_process)
236+
237+
return _handler
220238

221239

222240
async def async_listen_messages( # noqa: C901, WPS210, WPS213
@@ -232,16 +250,13 @@ async def async_listen_messages( # noqa: C901, WPS210, WPS213
232250
:param broker: broker to listen to.
233251
:param cli_args: CLI arguments for worker.
234252
"""
235-
loop = asyncio.get_event_loop()
236-
loop.add_signal_handler(
253+
signal.signal(
237254
signal.SIGTERM,
238-
signal_handler,
239-
broker,
255+
signal_handlera(broker),
240256
)
241-
loop.add_signal_handler(
257+
signal.signal(
242258
signal.SIGINT,
243-
signal_handler,
244-
broker,
259+
signal_handlera(broker),
245260
)
246261

247262
logger.info("Runing startup event.")

0 commit comments

Comments
 (0)