Skip to content

Commit 4c34980

Browse files
authored
Refaсtored process manager. (#64)
1 parent e2c07ee commit 4c34980

File tree

5 files changed

+270
-153
lines changed

5 files changed

+270
-153
lines changed

.flake8

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ docstring_style=sphinx
88
ignore =
99
; Found a line that starts with a dot
1010
WPS348,
11+
; Found overly complex type annotation
12+
WPS234,
1113
; `noqa` comments overuse ))))
1214
WPS402,
1315
; Found `%` string formatting

taskiq/cli/watcher.py

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,28 @@
1+
from logging import getLogger
12
from pathlib import Path
2-
from typing import Callable
3+
from typing import Any, Callable
34

45
from gitignore_parser import parse_gitignore
56
from watchdog.events import FileSystemEvent
67

8+
logger = getLogger("taskiq.worker")
9+
710

811
class FileWatcher: # pragma: no cover
912
"""Filewatcher that watchs for filesystem changes."""
1013

1114
def __init__(
1215
self,
13-
callback: Callable[[], None],
16+
callback: Callable[..., None],
1417
use_gitignore: bool = True,
18+
**callback_kwargs: Any,
1519
) -> None:
1620
self.callback = callback
1721
self.gitignore = None
1822
gpath = Path("./.gitignore")
1923
if use_gitignore and gpath.exists():
2024
self.gitignore = parse_gitignore(gpath)
25+
self.callback_kwargs = callback_kwargs
2126

2227
def dispatch(self, event: FileSystemEvent) -> None:
2328
"""
@@ -30,12 +35,11 @@ def dispatch(self, event: FileSystemEvent) -> None:
3035
"""
3136
if event.is_directory:
3237
return
33-
if event.event_type == "closed":
34-
return
35-
if ".pytest_cache" in event.src_path:
38+
if event.event_type in {"opened", "closed"}:
3639
return
37-
if "__pycache__" in event.src_path:
40+
if ".git" in event.src_path:
3841
return
3942
if self.gitignore and self.gitignore(event.src_path):
4043
return
41-
self.callback()
44+
logger.debug(f"File changed. Event: {event}")
45+
self.callback(**self.callback_kwargs)

taskiq/cli/worker/async_task_runner.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@ async def async_listen_messages(
2121
:param broker: broker to listen to.
2222
:param cli_args: CLI arguments for worker.
2323
"""
24-
logger.info("Runing startup event.")
24+
logger.debug("Runing startup event.")
2525
await broker.startup()
26-
logger.info("Inicialized receiver.")
26+
logger.debug("Initialized receiver.")
2727
receiver = Receiver(broker, cli_args)
2828
logger.info("Listening started.")
2929
tasks = set()

taskiq/cli/worker/process_manager.py

Lines changed: 226 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,226 @@
1+
import logging
2+
import signal
3+
from dataclasses import dataclass
4+
from multiprocessing import Process, Queue
5+
from time import sleep
6+
from typing import Any, Callable, List
7+
8+
from watchdog.observers import Observer
9+
10+
from taskiq.cli.watcher import FileWatcher
11+
from taskiq.cli.worker.args import WorkerArgs
12+
13+
logger = logging.getLogger("taskiq.process-manager")
14+
15+
16+
class ProcessActionBase:
17+
"""Base for all process actions. Used for types."""
18+
19+
20+
@dataclass
21+
class ReloadAllAction(ProcessActionBase):
22+
"""This action triggers reload of all workers."""
23+
24+
def handle(
25+
self,
26+
workers_num: int,
27+
action_queue: "Queue[ProcessActionBase]",
28+
) -> None:
29+
"""
30+
Handle reload all action.
31+
32+
This action sends N reloadOne actions in a queue,
33+
where N is a number of worker processes.
34+
35+
:param workers_num: number of currently active workers.
36+
:param action_queue: queue to send events to.
37+
"""
38+
for worker_id in range(workers_num):
39+
action_queue.put(ReloadOneAction(worker_num=worker_id))
40+
41+
42+
@dataclass
43+
class ReloadOneAction(ProcessActionBase):
44+
"""This action reloads single worker with particular id."""
45+
46+
worker_num: int
47+
48+
def handle(
49+
self,
50+
workers: List[Process],
51+
args: WorkerArgs,
52+
worker_func: Callable[[WorkerArgs], None],
53+
) -> None:
54+
"""
55+
This action reloads a single process.
56+
57+
:param workers: known children processes.
58+
:param args: args for new process.
59+
:param worker_func: function that is used to start worker processes.
60+
"""
61+
if self.worker_num < 0 or self.worker_num >= len(workers):
62+
logger.warning("Unknown worker id.")
63+
return
64+
worker = workers[self.worker_num]
65+
try:
66+
worker.terminate()
67+
except ValueError:
68+
logger.debug(f"Process {worker.name} is already terminated.")
69+
# Waiting worker shutdown.
70+
worker.join()
71+
new_process = Process(
72+
target=worker_func,
73+
kwargs={"args": args},
74+
name=f"worker-{self.worker_num}",
75+
daemon=True,
76+
)
77+
new_process.start()
78+
workers[self.worker_num] = new_process
79+
80+
81+
@dataclass
82+
class ShutdownAction(ProcessActionBase):
83+
"""This action shuts down process manager loop."""
84+
85+
86+
def schedule_workers_reload(
87+
action_queue: "Queue[ProcessActionBase]",
88+
) -> None:
89+
"""
90+
Function to schedule workers to restart.
91+
92+
It simply send FULL_RELOAD event, which is handled
93+
in the mainloop.
94+
95+
:param action_queue: queue to send events to.
96+
"""
97+
action_queue.put(ReloadAllAction())
98+
logger.info("Scheduled workers reload.")
99+
100+
101+
def get_signal_handler(
102+
action_queue: "Queue[ProcessActionBase]",
103+
) -> Callable[[int, Any], None]:
104+
"""
105+
Generate singnal handler for main process.
106+
107+
The signal handler will just put the SHUTDOWN event in
108+
the action queue.
109+
110+
:param action_queue: event queue.
111+
:returns: actual signal handler.
112+
"""
113+
114+
def _signal_handler(signum: int, _frame: Any) -> None:
115+
logger.debug(f"Got signal {signum}.")
116+
action_queue.put(ShutdownAction())
117+
logger.warn("Workers are scheduled for shutdown.")
118+
119+
return _signal_handler
120+
121+
122+
class ProcessManager:
123+
"""
124+
Process manager for taskiq.
125+
126+
This class spawns multiple processes,
127+
and maintains their states. If process
128+
is down, it tries to restart it.
129+
"""
130+
131+
def __init__(
132+
self,
133+
args: WorkerArgs,
134+
observer: Observer,
135+
worker_function: Callable[[WorkerArgs], None],
136+
) -> None:
137+
self.worker_function = worker_function
138+
self.action_queue: "Queue[ProcessActionBase]" = Queue(-1)
139+
self.args = args
140+
if args.reload:
141+
observer.schedule(
142+
FileWatcher(
143+
callback=schedule_workers_reload,
144+
use_gitignore=not args.no_gitignore,
145+
action_queue=self.action_queue,
146+
),
147+
path=".",
148+
recursive=True,
149+
)
150+
151+
signal_handler = get_signal_handler(self.action_queue)
152+
signal.signal(signal.SIGINT, signal_handler)
153+
signal.signal(signal.SIGTERM, signal_handler)
154+
155+
self.workers: List[Process] = []
156+
157+
def prepare_workers(self) -> None:
158+
"""Spawn multiple processes."""
159+
for process in range(self.args.workers):
160+
work_proc = Process(
161+
target=self.worker_function,
162+
kwargs={"args": self.args},
163+
name=f"worker-{process}",
164+
daemon=True,
165+
)
166+
logger.info(
167+
"Started process worker-%d with pid %s ",
168+
process,
169+
work_proc.pid,
170+
)
171+
work_proc.start()
172+
self.workers.append(work_proc)
173+
174+
def start(self) -> None: # noqa: C901, WPS213
175+
"""
176+
Start managing child processes.
177+
178+
This function is an endless loop,
179+
which listens to new events from different sources.
180+
181+
Every second it checks for new events and
182+
current states of child processes.
183+
184+
If there are new events it handles them.
185+
Manager can handle 3 types of events:
186+
187+
1. `ReloadAllAction` - when we want to restart all child processes.
188+
It checks for running processes and generates RELOAD_ONE event for
189+
any process.
190+
191+
2. `ReloadOneAction` - this event restarts one single child process.
192+
193+
3. `ShutdownAction` - exits the loop. Since all child processes are
194+
daemons, they will be automatically terminated using signals.
195+
196+
After all events are handled, it iterates over all child processes and
197+
checks that all processes are healthy. If process was terminated for
198+
some reason, it schedules a restart for dead process.
199+
"""
200+
self.prepare_workers()
201+
while True:
202+
sleep(1)
203+
reloaded_workers = set()
204+
# We bulk_process all pending events.
205+
while not self.action_queue.empty():
206+
action = self.action_queue.get()
207+
logging.debug(f"Got event: {action}")
208+
if isinstance(action, ReloadAllAction):
209+
action.handle(
210+
workers_num=len(self.workers),
211+
action_queue=self.action_queue,
212+
)
213+
elif isinstance(action, ReloadOneAction):
214+
# If we just reloaded this worker, skip handling.
215+
if action.worker_num in reloaded_workers:
216+
continue
217+
action.handle(self.workers, self.args, self.worker_function)
218+
reloaded_workers.add(action.worker_num)
219+
elif isinstance(action, ShutdownAction):
220+
logger.debug("Process manager closed.")
221+
return
222+
223+
for worker_num, worker in enumerate(self.workers):
224+
if not worker.is_alive():
225+
logger.info(f"{worker.name} is dead. Scheduling reload.")
226+
self.action_queue.put(ReloadOneAction(worker_num=worker_num))

0 commit comments

Comments
 (0)