Skip to content

Commit b0eb561

Browse files
committed
add workers arg to CLI
1 parent 7a911f3 commit b0eb561

File tree

2 files changed

+22
-3
lines changed

2 files changed

+22
-3
lines changed

arq/cli.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import logging.config
33
import os
44
import sys
5+
from multiprocessing import Process
56
from signal import Signals
67
from typing import TYPE_CHECKING, cast
78

@@ -20,6 +21,7 @@
2021
watch_help = 'Watch a directory and reload the worker upon changes.'
2122
verbose_help = 'Enable verbose output.'
2223
logdict_help = "Import path for a dictionary in logdict form, to configure Arq's own logging."
24+
workers_help = 'Number of worker processes to spawn'
2325

2426

2527
@click.command('arq')
@@ -28,9 +30,12 @@
2830
@click.option('--burst/--no-burst', default=None, help=burst_help)
2931
@click.option('--check', is_flag=True, help=health_check_help)
3032
@click.option('--watch', type=click.Path(exists=True, dir_okay=True, file_okay=False), help=watch_help)
33+
@click.option('-w', '--workers', type=int, default=1, help=workers_help)
3134
@click.option('-v', '--verbose', is_flag=True, help=verbose_help)
3235
@click.option('--custom-log-dict', type=str, help=logdict_help)
33-
def cli(*, worker_settings: str, burst: bool, check: bool, watch: str, verbose: bool, custom_log_dict: str) -> None:
36+
def cli(
37+
*, worker_settings: str, burst: bool, check: bool, watch: str, workers: int, verbose: bool, custom_log_dict: str
38+
) -> None:
3439
"""
3540
Job queues in python with asyncio and redis.
3641
@@ -49,8 +54,15 @@ def cli(*, worker_settings: str, burst: bool, check: bool, watch: str, verbose:
4954
else:
5055
kwargs = {} if burst is None else {'burst': burst}
5156
if watch:
52-
asyncio.run(watch_reload(watch, worker_settings_))
57+
coroutine = watch_reload(watch, worker_settings_)
58+
if workers > 1:
59+
for _ in range(workers - 1):
60+
Process(target=asyncio.run, args=(coroutine,)).start()
61+
asyncio.run(coroutine)
5362
else:
63+
if workers > 1:
64+
for _ in range(workers - 1):
65+
Process(target=run_worker, args=(worker_settings_,), kwargs=kwargs).start()
5466
run_worker(worker_settings_, **kwargs)
5567

5668

tests/test_cli.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,14 @@ def test_run_watch(mocker, cancel_remaining_task):
5151
runner = CliRunner()
5252
result = runner.invoke(cli, ['tests.test_cli.WorkerSettings', '--watch', 'tests'])
5353
assert result.exit_code == 0
54-
assert '1 files changes, reloading arq worker...'
54+
assert 'files changed, reloading arq worker...' in result.output
55+
56+
57+
def test_multiple_workers(cancel_remaining_task, mocker, loop):
58+
mocker.patch('asyncio.get_event_loop', lambda: loop)
59+
runner = CliRunner()
60+
result = runner.invoke(cli, ['tests.test_cli.WorkerSettings', '--workers', '4'])
61+
assert len(result.output.split('Waiting to be ready...')) >= 4
5562

5663

5764
custom_log_dict = {

0 commit comments

Comments
 (0)