Skip to content

Commit 1731c4a

Browse files
committed
Merge branch 'release/0.8.7'
2 parents 8ac5281 + 72ab309 commit 1731c4a

File tree

10 files changed

+338
-290
lines changed

10 files changed

+338
-290
lines changed

poetry.lock

Lines changed: 241 additions & 246 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "taskiq"
3-
version = "0.8.6"
3+
version = "0.8.7"
44
description = "Distributed task queue with full async support"
55
authors = ["Pavel Kirilin <[email protected]>"]
66
maintainers = ["Pavel Kirilin <[email protected]>"]

taskiq/__main__.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from taskiq.abc.cmd import TaskiqCMD
99

1010

11-
def main() -> None: # noqa: WPS210 # pragma: no cover
11+
def main() -> None: # noqa: C901, WPS210 # pragma: no cover
1212
"""
1313
Main entrypoint of the taskiq.
1414
@@ -70,7 +70,9 @@ def main() -> None: # noqa: WPS210 # pragma: no cover
7070

7171
command = subcommands[args.subcommand]
7272
sys.argv.pop(0)
73-
command.exec(sys.argv[1:])
73+
status = command.exec(sys.argv[1:])
74+
if status is not None:
75+
exit(status) # noqa: WPS421
7476

7577

7678
if __name__ == "__main__": # pragma: no cover

taskiq/abc/cmd.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from abc import ABC, abstractmethod
2-
from typing import Sequence
2+
from typing import Optional, Sequence
33

44

55
class TaskiqCMD(ABC): # pragma: no cover
@@ -8,7 +8,7 @@ class TaskiqCMD(ABC): # pragma: no cover
88
short_help = ""
99

1010
@abstractmethod
11-
def exec(self, args: Sequence[str]) -> None:
11+
def exec(self, args: Sequence[str]) -> Optional[int]:
1212
"""
1313
Execute the command.
1414

taskiq/cli/scheduler/run.py

Lines changed: 45 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ async def schedules_updater(
2626
:param scheduler: current scheduler.
2727
:param current_schedules: list of schedules.
2828
"""
29-
while True:
29+
while True: # noqa: WPS457
3030
logger.debug("Started schedule update.")
3131
new_schedules: "List[ScheduledTask]" = []
3232
for source in scheduler.sources:
@@ -40,8 +40,7 @@ async def schedules_updater(
4040
logger.debug(exc, exc_info=True)
4141
continue
4242

43-
for schedule in scheduler.merge_func(new_schedules, schedules):
44-
new_schedules.append(schedule)
43+
new_schedules = scheduler.merge_func(new_schedules, schedules)
4544

4645
current_schedules.clear()
4746
current_schedules.extend(new_schedules)
@@ -62,38 +61,15 @@ def should_run(task: ScheduledTask) -> bool:
6261
return False
6362

6463

65-
async def run_scheduler(args: SchedulerArgs) -> None: # noqa: C901, WPS210, WPS213
64+
async def _run_loop(scheduler: TaskiqScheduler) -> None:
6665
"""
6766
Runs scheduler loop.
6867
6968
This function imports taskiq scheduler
7069
and runs tasks when needed.
7170
72-
:param args: parsed CLI args.
71+
:param scheduler: current scheduler.
7372
"""
74-
if isinstance(args.scheduler, str):
75-
scheduler = import_object(args.scheduler)
76-
else:
77-
scheduler = args.scheduler
78-
if not isinstance(scheduler, TaskiqScheduler):
79-
print( # noqa: WPS421
80-
"Imported scheduler is not a subclass of TaskiqScheduler.",
81-
)
82-
exit(1) # noqa: WPS421
83-
scheduler.broker.is_scheduler_process = True
84-
import_tasks(args.modules, args.tasks_pattern, args.fs_discover)
85-
if args.configure_logging:
86-
basicConfig(
87-
level=getLevelName(args.log_level),
88-
format=(
89-
"[%(asctime)s][%(levelname)-7s]"
90-
"[%(module)s:%(funcName)s:%(lineno)d]"
91-
" %(message)s"
92-
),
93-
)
94-
getLogger("taskiq").setLevel(level=getLevelName(args.log_level))
95-
for source in scheduler.sources:
96-
await source.startup()
9773
loop = asyncio.get_event_loop()
9874
tasks: "List[ScheduledTask]" = []
9975
loop.create_task(schedules_updater(scheduler, tasks))
@@ -121,3 +97,44 @@ async def run_scheduler(args: SchedulerArgs) -> None: # noqa: C901, WPS210, WPS
12197
- datetime.now()
12298
)
12399
await asyncio.sleep(delay.total_seconds())
100+
101+
102+
async def run_scheduler(args: SchedulerArgs) -> None: # noqa: WPS213
103+
"""
104+
Run scheduler.
105+
106+
This function takes all CLI arguments
107+
and starts the scheduler process.
108+
109+
:param args: parsed CLI arguments.
110+
"""
111+
if isinstance(args.scheduler, str):
112+
scheduler = import_object(args.scheduler)
113+
else:
114+
scheduler = args.scheduler
115+
if not isinstance(scheduler, TaskiqScheduler):
116+
print( # noqa: WPS421
117+
"Imported scheduler is not a subclass of TaskiqScheduler.",
118+
)
119+
exit(1) # noqa: WPS421
120+
scheduler.broker.is_scheduler_process = True
121+
import_tasks(args.modules, args.tasks_pattern, args.fs_discover)
122+
if args.configure_logging:
123+
basicConfig(
124+
level=getLevelName(args.log_level),
125+
format=(
126+
"[%(asctime)s][%(levelname)-7s]"
127+
"[%(module)s:%(funcName)s:%(lineno)d]"
128+
" %(message)s"
129+
),
130+
)
131+
getLogger("taskiq").setLevel(level=getLevelName(args.log_level))
132+
for source in scheduler.sources:
133+
await source.startup()
134+
135+
try:
136+
await _run_loop(scheduler)
137+
except asyncio.CancelledError:
138+
logger.warning("Shutting down scheduler.")
139+
await scheduler.shutdown()
140+
logger.info("Scheduler shut down. Good bye!")

taskiq/cli/worker/args.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ class WorkerArgs:
4040
receiver_arg: List[Tuple[str, str]] = field(default_factory=list)
4141
max_prefetch: int = 0
4242
no_propagate_errors: bool = False
43+
max_fails: int = -1
4344

4445
@classmethod
4546
def from_cli( # noqa: WPS213
@@ -179,6 +180,13 @@ def from_cli( # noqa: WPS213
179180
dest="configure_logging",
180181
help="Use this parameter if your application configures custom logging.",
181182
)
183+
parser.add_argument(
184+
"--max-fails",
185+
type=int,
186+
dest="max_fails",
187+
default=-1,
188+
help="Maximum number of child process exits.",
189+
)
182190

183191
namespace = parser.parse_args(args)
184192
return WorkerArgs(**namespace.__dict__)

taskiq/cli/worker/cmd.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import logging
2-
from typing import Sequence
2+
from typing import Optional, Sequence
33

44
from taskiq.abc.cmd import TaskiqCMD
55
from taskiq.cli.worker.args import WorkerArgs
@@ -13,14 +13,15 @@ class WorkerCMD(TaskiqCMD):
1313

1414
short_help = "Helper to run workers"
1515

16-
def exec(self, args: Sequence[str]) -> None:
16+
def exec(self, args: Sequence[str]) -> Optional[int]:
1717
"""
1818
Start worker process.
1919
2020
Worker process creates several small
2121
processes in which tasks are actually processed.
2222
2323
:param args: CLI arguments.
24+
:returns: status code.
2425
"""
2526
wargs = WorkerArgs.from_cli(args)
26-
run_worker(wargs)
27+
return run_worker(wargs)

taskiq/cli/worker/process_manager.py

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,14 +43,15 @@ def handle(
4343
:param action_queue: queue to send events to.
4444
"""
4545
for worker_id in range(workers_num):
46-
action_queue.put(ReloadOneAction(worker_num=worker_id))
46+
action_queue.put(ReloadOneAction(worker_num=worker_id, is_reload_all=True))
4747

4848

4949
@dataclass
5050
class ReloadOneAction(ProcessActionBase):
5151
"""This action reloads single worker with particular id."""
5252

5353
worker_num: int
54+
is_reload_all: bool
5455

5556
def handle(
5657
self,
@@ -153,6 +154,7 @@ def __init__(
153154
args: WorkerArgs,
154155
worker_function: Callable[[WorkerArgs, EventType], None],
155156
observer: Optional[Observer] = None, # type: ignore[valid-type]
157+
max_restarts: Optional[int] = None,
156158
) -> None:
157159
self.worker_function = worker_function
158160
self.action_queue: "Queue[ProcessActionBase]" = Queue(-1)
@@ -198,7 +200,7 @@ def prepare_workers(self) -> None:
198200
for worker, event in zip(self.workers, events):
199201
_wait_for_worker_startup(worker, event)
200202

201-
def start(self) -> None: # noqa: C901, WPS213
203+
def start(self) -> Optional[int]: # noqa: C901, WPS213
202204
"""
203205
Start managing child processes.
204206
@@ -223,7 +225,10 @@ def start(self) -> None: # noqa: C901, WPS213
223225
After all events are handled, it iterates over all child processes and
224226
checks that all processes are healthy. If process was terminated for
225227
some reason, it schedules a restart for dead process.
228+
229+
:returns: status code or None.
226230
"""
231+
restarts = 0
227232
self.prepare_workers()
228233
while True:
229234
sleep(1)
@@ -238,16 +243,30 @@ def start(self) -> None: # noqa: C901, WPS213
238243
action_queue=self.action_queue,
239244
)
240245
elif isinstance(action, ReloadOneAction):
246+
# We check if max_fails is set.
247+
# If it's true, we check how many times
248+
# worker was reloaded.
249+
if not action.is_reload_all and self.args.max_fails >= 1:
250+
restarts += 1
251+
if restarts >= self.args.max_fails:
252+
logger.warning("Max restarts reached. Exiting.")
253+
# Returning error status.
254+
return -1
241255
# If we just reloaded this worker, skip handling.
242256
if action.worker_num in reloaded_workers:
243257
continue
244258
action.handle(self.workers, self.args, self.worker_function)
245259
reloaded_workers.add(action.worker_num)
246260
elif isinstance(action, ShutdownAction):
247261
logger.debug("Process manager closed.")
248-
return
262+
return None
249263

250264
for worker_num, worker in enumerate(self.workers):
251265
if not worker.is_alive():
252266
logger.info(f"{worker.name} is dead. Scheduling reload.")
253-
self.action_queue.put(ReloadOneAction(worker_num=worker_num))
267+
self.action_queue.put(
268+
ReloadOneAction(
269+
worker_num=worker_num,
270+
is_reload_all=False,
271+
),
272+
)

taskiq/cli/worker/run.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from multiprocessing import set_start_method
66
from multiprocessing.synchronize import Event
77
from sys import platform
8-
from typing import Any, Type
8+
from typing import Any, Optional, Type
99

1010
from taskiq.abc.broker import AsyncBroker
1111
from taskiq.cli.utils import import_object, import_tasks
@@ -149,7 +149,7 @@ def interrupt_handler(signum: int, _frame: Any) -> None:
149149
loop.run_until_complete(shutdown_broker(broker, args.shutdown_timeout))
150150

151151

152-
def run_worker(args: WorkerArgs) -> None: # noqa: WPS213
152+
def run_worker(args: WorkerArgs) -> Optional[int]: # noqa: WPS213
153153
"""
154154
This function starts worker processes.
155155
@@ -159,6 +159,7 @@ def run_worker(args: WorkerArgs) -> None: # noqa: WPS213
159159
:param args: CLI arguments.
160160
161161
:raises ValueError: if reload flag is used, but dependencies are not installed.
162+
:returns: Optional status code.
162163
"""
163164
if platform == "darwin":
164165
set_start_method("fork")
@@ -187,10 +188,11 @@ def run_worker(args: WorkerArgs) -> None: # noqa: WPS213
187188

188189
manager = ProcessManager(args=args, observer=observer, worker_function=start_listen)
189190

190-
manager.start()
191+
status = manager.start()
191192

192193
if observer is not None and observer.is_alive():
193194
if args.reload:
194195
logger.info("Stopping watching files.")
195196
observer.stop()
196-
logger.info("Stopping logging thread.")
197+
198+
return status

taskiq/scheduler/scheduler.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,3 +73,7 @@ async def on_ready(self, task: ScheduledTask) -> None:
7373
**task.kwargs,
7474
)
7575
await maybe_awaitable(task.source.post_send(task))
76+
77+
async def shutdown(self) -> None:
78+
"""Shutdown the scheduler process."""
79+
await self.broker.shutdown()

0 commit comments

Comments
 (0)