Skip to content

Commit e10a4b9

Browse files
authored
Added max-restarts option. (#190)
1 parent 70836ea commit e10a4b9

File tree

7 files changed

+288
-261
lines changed

7 files changed

+288
-261
lines changed

poetry.lock

Lines changed: 241 additions & 246 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

taskiq/__main__.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from taskiq.abc.cmd import TaskiqCMD
99

1010

11-
def main() -> None: # noqa: WPS210 # pragma: no cover
11+
def main() -> None: # noqa: C901, WPS210 # pragma: no cover
1212
"""
1313
Main entrypoint of the taskiq.
1414
@@ -70,7 +70,9 @@ def main() -> None: # noqa: WPS210 # pragma: no cover
7070

7171
command = subcommands[args.subcommand]
7272
sys.argv.pop(0)
73-
command.exec(sys.argv[1:])
73+
status = command.exec(sys.argv[1:])
74+
if status is not None:
75+
exit(status) # noqa: WPS421
7476

7577

7678
if __name__ == "__main__": # pragma: no cover

taskiq/abc/cmd.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from abc import ABC, abstractmethod
2-
from typing import Sequence
2+
from typing import Optional, Sequence
33

44

55
class TaskiqCMD(ABC): # pragma: no cover
@@ -8,7 +8,7 @@ class TaskiqCMD(ABC): # pragma: no cover
88
short_help = ""
99

1010
@abstractmethod
11-
def exec(self, args: Sequence[str]) -> None:
11+
def exec(self, args: Sequence[str]) -> Optional[int]:
1212
"""
1313
Execute the command.
1414

taskiq/cli/worker/args.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ class WorkerArgs:
4040
receiver_arg: List[Tuple[str, str]] = field(default_factory=list)
4141
max_prefetch: int = 0
4242
no_propagate_errors: bool = False
43+
max_fails: int = -1
4344

4445
@classmethod
4546
def from_cli( # noqa: WPS213
@@ -179,6 +180,13 @@ def from_cli( # noqa: WPS213
179180
dest="configure_logging",
180181
help="Use this parameter if your application configures custom logging.",
181182
)
183+
parser.add_argument(
184+
"--max-fails",
185+
type=int,
186+
dest="max_fails",
187+
default=-1,
188+
help="Maximum number of child process exits.",
189+
)
182190

183191
namespace = parser.parse_args(args)
184192
return WorkerArgs(**namespace.__dict__)

taskiq/cli/worker/cmd.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import logging
2-
from typing import Sequence
2+
from typing import Optional, Sequence
33

44
from taskiq.abc.cmd import TaskiqCMD
55
from taskiq.cli.worker.args import WorkerArgs
@@ -13,14 +13,15 @@ class WorkerCMD(TaskiqCMD):
1313

1414
short_help = "Helper to run workers"
1515

16-
def exec(self, args: Sequence[str]) -> None:
16+
def exec(self, args: Sequence[str]) -> Optional[int]:
1717
"""
1818
Start worker process.
1919
2020
Worker process creates several small
2121
processes in which tasks are actually processed.
2222
2323
:param args: CLI arguments.
24+
:returns: status code.
2425
"""
2526
wargs = WorkerArgs.from_cli(args)
26-
run_worker(wargs)
27+
return run_worker(wargs)

taskiq/cli/worker/process_manager.py

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,14 +43,15 @@ def handle(
4343
:param action_queue: queue to send events to.
4444
"""
4545
for worker_id in range(workers_num):
46-
action_queue.put(ReloadOneAction(worker_num=worker_id))
46+
action_queue.put(ReloadOneAction(worker_num=worker_id, is_reload_all=True))
4747

4848

4949
@dataclass
5050
class ReloadOneAction(ProcessActionBase):
5151
"""This action reloads single worker with particular id."""
5252

5353
worker_num: int
54+
is_reload_all: bool
5455

5556
def handle(
5657
self,
@@ -153,6 +154,7 @@ def __init__(
153154
args: WorkerArgs,
154155
worker_function: Callable[[WorkerArgs, EventType], None],
155156
observer: Optional[Observer] = None, # type: ignore[valid-type]
157+
max_restarts: Optional[int] = None,
156158
) -> None:
157159
self.worker_function = worker_function
158160
self.action_queue: "Queue[ProcessActionBase]" = Queue(-1)
@@ -198,7 +200,7 @@ def prepare_workers(self) -> None:
198200
for worker, event in zip(self.workers, events):
199201
_wait_for_worker_startup(worker, event)
200202

201-
def start(self) -> None: # noqa: C901, WPS213
203+
def start(self) -> Optional[int]: # noqa: C901, WPS213
202204
"""
203205
Start managing child processes.
204206
@@ -223,7 +225,10 @@ def start(self) -> None: # noqa: C901, WPS213
223225
After all events are handled, it iterates over all child processes and
224226
checks that all processes are healthy. If process was terminated for
225227
some reason, it schedules a restart for dead process.
228+
229+
:returns: status code or None.
226230
"""
231+
restarts = 0
227232
self.prepare_workers()
228233
while True:
229234
sleep(1)
@@ -238,16 +243,30 @@ def start(self) -> None: # noqa: C901, WPS213
238243
action_queue=self.action_queue,
239244
)
240245
elif isinstance(action, ReloadOneAction):
246+
# We check if max_fails is set.
247+
# If it's true, we check how many times
248+
# worker was reloaded.
249+
if not action.is_reload_all and self.args.max_fails >= 1:
250+
restarts += 1
251+
if restarts >= self.args.max_fails:
252+
logger.warning("Max restarts reached. Exiting.")
253+
# Returning error status.
254+
return -1
241255
# If we just reloaded this worker, skip handling.
242256
if action.worker_num in reloaded_workers:
243257
continue
244258
action.handle(self.workers, self.args, self.worker_function)
245259
reloaded_workers.add(action.worker_num)
246260
elif isinstance(action, ShutdownAction):
247261
logger.debug("Process manager closed.")
248-
return
262+
return None
249263

250264
for worker_num, worker in enumerate(self.workers):
251265
if not worker.is_alive():
252266
logger.info(f"{worker.name} is dead. Scheduling reload.")
253-
self.action_queue.put(ReloadOneAction(worker_num=worker_num))
267+
self.action_queue.put(
268+
ReloadOneAction(
269+
worker_num=worker_num,
270+
is_reload_all=False,
271+
),
272+
)

taskiq/cli/worker/run.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from multiprocessing import set_start_method
66
from multiprocessing.synchronize import Event
77
from sys import platform
8-
from typing import Any, Type
8+
from typing import Any, Optional, Type
99

1010
from taskiq.abc.broker import AsyncBroker
1111
from taskiq.cli.utils import import_object, import_tasks
@@ -149,7 +149,7 @@ def interrupt_handler(signum: int, _frame: Any) -> None:
149149
loop.run_until_complete(shutdown_broker(broker, args.shutdown_timeout))
150150

151151

152-
def run_worker(args: WorkerArgs) -> None: # noqa: WPS213
152+
def run_worker(args: WorkerArgs) -> Optional[int]: # noqa: WPS213
153153
"""
154154
This function starts worker processes.
155155
@@ -159,6 +159,7 @@ def run_worker(args: WorkerArgs) -> None: # noqa: WPS213
159159
:param args: CLI arguments.
160160
161161
:raises ValueError: if reload flag is used, but dependencies are not installed.
162+
:returns: Optional status code.
162163
"""
163164
if platform == "darwin":
164165
set_start_method("fork")
@@ -187,10 +188,11 @@ def run_worker(args: WorkerArgs) -> None: # noqa: WPS213
187188

188189
manager = ProcessManager(args=args, observer=observer, worker_function=start_listen)
189190

190-
manager.start()
191+
status = manager.start()
191192

192193
if observer is not None and observer.is_alive():
193194
if args.reload:
194195
logger.info("Stopping watching files.")
195196
observer.stop()
196-
logger.info("Stopping logging thread.")
197+
198+
return status

0 commit comments

Comments
 (0)