Skip to content

Commit 9168863

Browse files
committed
Added option to run in processpool.
1 parent 319ac9f commit 9168863

File tree

3 files changed

+25
-7
lines changed

3 files changed

+25
-7
lines changed

taskiq/api/receiver.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import asyncio
2-
from concurrent.futures import ThreadPoolExecutor
2+
from concurrent.futures import Executor, ProcessPoolExecutor, ThreadPoolExecutor
33
from logging import getLogger
44
from typing import Optional, Type
55

@@ -13,13 +13,14 @@
1313
async def run_receiver_task(
1414
broker: AsyncBroker,
1515
receiver_cls: Type[Receiver] = Receiver,
16-
sync_workers: int = 4,
16+
sync_workers: Optional[int] = None,
1717
validate_params: bool = True,
1818
max_async_tasks: int = 100,
1919
max_prefetch: int = 0,
2020
propagate_exceptions: bool = True,
2121
run_startup: bool = False,
2222
ack_time: Optional[AcknowledgeType] = None,
23+
use_process_pool: bool = False,
2324
) -> None:
2425
"""
2526
Function to run receiver programmatically.
@@ -46,6 +47,7 @@ async def run_receiver_task(
4647
:param propagate_exceptions: whether to propagate exceptions in generators or not.
4748
:param run_startup: whether to run startup function or not.
4849
:param ack_time: acknowledge type to use.
50+
:param use_process_pool: whether to use process pool or threadpool.
4951
:raises asyncio.CancelledError: if the task was cancelled.
5052
"""
5153
finish_event = asyncio.Event()
@@ -62,7 +64,12 @@ def on_exit(_: Receiver) -> None:
6264
finish_event.set()
6365
raise asyncio.CancelledError
6466

65-
with ThreadPoolExecutor(max_workers=sync_workers) as executor:
67+
executor: Executor
68+
if use_process_pool:
69+
executor = ProcessPoolExecutor(max_workers=sync_workers)
70+
else:
71+
executor = ThreadPoolExecutor(max_workers=sync_workers)
72+
with executor as executor:
6673
broker.is_worker_process = True
6774
while True:
6875
try:

taskiq/cli/worker/args.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ class WorkerArgs:
4646
max_tasks_per_child: Optional[int] = None
4747
wait_tasks_timeout: Optional[float] = None
4848
hardkill_count: int = 3
49+
use_process_pool: bool = False
4950

5051
@classmethod
5152
def from_cli(
@@ -210,8 +211,7 @@ def from_cli(
210211
"--wait-tasks-timeout",
211212
type=float,
212213
default=None,
213-
help="Maximum time to wait for all current tasks "
214-
"to finish before exiting.",
214+
help="Maximum time to wait for all current tasks to finish before exiting.",
215215
)
216216
parser.add_argument(
217217
"--hardkill-count",
@@ -220,6 +220,11 @@ def from_cli(
220220
help="Number of termination signals to the main "
221221
"process before performing a hardkill.",
222222
)
223+
parser.add_argument(
224+
"--use-process-pool",
225+
action="store_true",
226+
help="Use process pool instead of thread pool for sync tasks.",
227+
)
223228

224229
namespace = parser.parse_args(args)
225230
# If there are any patterns specified, remove default.

taskiq/cli/worker/run.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import os
44
import signal
55
import sys
6-
from concurrent.futures import ThreadPoolExecutor
6+
from concurrent.futures import Executor, ProcessPoolExecutor, ThreadPoolExecutor
77
from multiprocessing import set_start_method
88
from sys import platform
99
from typing import Any, Optional, Type
@@ -135,9 +135,15 @@ def interrupt_handler(signum: int, _frame: Any) -> None:
135135
receiver_type = get_receiver_type(args)
136136
receiver_kwargs = dict(args.receiver_arg)
137137

138+
executor: Executor
139+
if args.use_process_pool:
140+
executor = ProcessPoolExecutor(max_workers=args.max_threadpool_threads)
141+
else:
142+
executor = ThreadPoolExecutor(max_workers=args.max_threadpool_threads)
143+
138144
try:
139145
logger.debug("Initialize receiver.")
140-
with ThreadPoolExecutor(args.max_threadpool_threads) as pool:
146+
with executor as pool:
141147
receiver = receiver_type(
142148
broker=broker,
143149
executor=pool,

0 commit comments

Comments
 (0)