Skip to content

Commit ed8826e

Browse files
authored
Added reload parameter. (#29)
Signed-off-by: Pavel Kirilin <[email protected]>
1 parent 3d45587 commit ed8826e

File tree

5 files changed

+167
-7
lines changed

5 files changed

+167
-7
lines changed

poetry.lock

Lines changed: 50 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ typing-extensions = ">=3.10.0.0"
3030
pydantic = "^1.6.2"
3131
pyzmq = { version = "^23.2.0", optional = true }
3232
uvloop = { version = "^0.16.0", optional = true }
33+
watchdog = "^2.1.9"
34+
gitignore-parser = "^0.0.8"
3335

3436
[tool.poetry.dev-dependencies]
3537
pytest = "^7.1.2"

taskiq/cli/args.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ class TaskiqArgs:
2828
max_threadpool_threads: int
2929
no_parse: bool
3030
shutdown_timeout: float
31+
reload: bool
32+
no_gitignore: bool
3133

3234
@classmethod
3335
def from_cli(cls, args: Optional[List[str]] = None) -> "TaskiqArgs": # noqa: WPS213
@@ -113,6 +115,18 @@ def from_cli(cls, args: Optional[List[str]] = None) -> "TaskiqArgs": # noqa: WP
113115
default=5,
114116
help="Maximum amount of time for graceful broker's shutdown is seconds.",
115117
)
118+
parser.add_argument(
119+
"--reload",
120+
"-r",
121+
action="store_true",
122+
help="Reload workers if file is changed.",
123+
)
124+
parser.add_argument(
125+
"--do-not-use-gitignore",
126+
action="store_true",
127+
dest="no_gitignore",
128+
help="Do not use gitignore to check for updated files.",
129+
)
116130

117131
if args is None:
118132
namespace = parser.parse_args(args)

taskiq/cli/watcher.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
from pathlib import Path
2+
from typing import Callable
3+
4+
from gitignore_parser import parse_gitignore
5+
from watchdog.events import FileSystemEvent
6+
7+
8+
class FileWatcher:
9+
"""Filewatcher that watchs for filesystem changes."""
10+
11+
def __init__(
12+
self,
13+
callback: Callable[[], None],
14+
use_gitignore: bool = True,
15+
) -> None:
16+
self.callback = callback
17+
self.gitignore = None
18+
gpath = Path("./.gitignore")
19+
if use_gitignore and gpath.exists():
20+
self.gitignore = parse_gitignore(gpath)
21+
22+
def dispatch(self, event: FileSystemEvent) -> None:
23+
"""
24+
React to event.
25+
26+
This function checks wether we need to
27+
react to event and calls callback if we do.
28+
29+
:param event: incoming fs event.
30+
"""
31+
if event.is_directory:
32+
return
33+
if event.event_type == "closed":
34+
return
35+
if ".pytest_cache" in event.src_path:
36+
return
37+
if "__pycache__" in event.src_path:
38+
return
39+
if self.gitignore and self.gitignore(event.src_path):
40+
return
41+
self.callback()

taskiq/cli/worker.py

Lines changed: 60 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,16 @@
77
from logging import basicConfig, getLevelName, getLogger
88
from multiprocessing import Process
99
from pathlib import Path
10+
from queue import Queue
1011
from time import sleep
1112
from typing import Any, Generator, List
1213

14+
from watchdog.observers import Observer
15+
1316
from taskiq.abc.broker import AsyncBroker
1417
from taskiq.cli.args import TaskiqArgs
1518
from taskiq.cli.async_task_runner import async_listen_messages
19+
from taskiq.cli.watcher import FileWatcher
1620

1721
try:
1822
import uvloop # noqa: WPS433
@@ -25,6 +29,8 @@
2529

2630
restart_workers = True
2731
worker_processes: List[Process] = []
32+
observer = Observer()
33+
reload_queue: "Queue[int]" = Queue(-1)
2834

2935

3036
def signal_handler(_signal: int, _frame: Any) -> None:
@@ -45,9 +51,33 @@ def signal_handler(_signal: int, _frame: Any) -> None:
4551
# This is how we kill children,
4652
# by sending SIGINT to child processes.
4753
if process.pid is None:
48-
process.kill()
49-
else:
54+
continue
55+
try:
5056
os.kill(process.pid, signal.SIGINT)
57+
except ProcessLookupError:
58+
continue
59+
process.join()
60+
if observer.is_alive():
61+
observer.stop()
62+
observer.join()
63+
64+
65+
def schedule_workers_reload() -> None:
66+
"""
67+
Function to schedule workers to restart.
68+
69+
This function adds worker ids to the queue.
70+
71+
This queue is later read in watcher loop.
72+
"""
73+
global worker_processes # noqa: WPS420
74+
global reload_queue # noqa: WPS420
75+
76+
logger.info("Reloading workers")
77+
for worker_id, _ in enumerate(worker_processes):
78+
reload_queue.put(worker_id)
79+
logger.info("Worker %s scheduled to reload", worker_id)
80+
reload_queue.join()
5181

5282

5383
@contextmanager
@@ -212,13 +242,16 @@ def interrupt_handler(_signum: int, _frame: Any) -> None:
212242
loop.run_until_complete(shutdown_broker(broker, args.shutdown_timeout))
213243

214244

215-
def watch_workers_restarts(args: TaskiqArgs) -> None:
245+
def watcher_loop(args: TaskiqArgs) -> None: # noqa: C901, WPS213
216246
"""
217247
Infinate loop for main process.
218248
219249
This loop restarts worker processes
220250
if they exit with error returncodes.
221251
252+
Also it reads process ids from reload_queue
253+
and reloads workers if they were scheduled to reload.
254+
222255
:param args: cli arguements.
223256
"""
224257
global worker_processes # noqa: WPS420
@@ -228,6 +261,18 @@ def watch_workers_restarts(args: TaskiqArgs) -> None:
228261
# List of processes to remove.
229262
sleep(1)
230263
process_to_remove = []
264+
while not reload_queue.empty():
265+
process_id = reload_queue.get()
266+
worker_processes[process_id].terminate()
267+
worker_processes[process_id].join()
268+
worker_processes[process_id] = Process(
269+
target=start_listen,
270+
kwargs={"args": args},
271+
name=f"worker-{process_id}",
272+
)
273+
worker_processes[process_id].start()
274+
reload_queue.task_done()
275+
231276
for worker_id, worker in enumerate(worker_processes):
232277
if worker.is_alive():
233278
continue
@@ -241,14 +286,13 @@ def watch_workers_restarts(args: TaskiqArgs) -> None:
241286
worker_processes[worker_id].start()
242287
else:
243288
logger.info("Worker-%s terminated.", worker_id)
244-
worker.join()
245289
process_to_remove.append(worker)
246290

247291
for dead_process in process_to_remove:
248292
worker_processes.remove(dead_process)
249293

250294

251-
def run_worker(args: TaskiqArgs) -> None:
295+
def run_worker(args: TaskiqArgs) -> None: # noqa: WPS213
252296
"""
253297
This function starts worker processes.
254298
@@ -279,7 +323,17 @@ def run_worker(args: TaskiqArgs) -> None:
279323
)
280324
worker_processes.append(work_proc)
281325

326+
if args.reload:
327+
observer.schedule(
328+
FileWatcher(
329+
callback=schedule_workers_reload,
330+
use_gitignore=not args.no_gitignore,
331+
),
332+
path=".",
333+
recursive=True,
334+
)
335+
observer.start()
282336
signal.signal(signal.SIGINT, signal_handler)
283337
signal.signal(signal.SIGTERM, signal_handler)
284338

285-
watch_workers_restarts(args=args)
339+
watcher_loop(args=args)

0 commit comments

Comments
 (0)