Skip to content

Commit a5538ea

Browse files
committed
Fixed docs and qualname generation.
1 parent c70790d commit a5538ea

File tree

4 files changed

+37
-5
lines changed

4 files changed

+37
-5
lines changed

taskiq/api/receiver.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@ async def run_receiver_task(
4040
4141
:param broker: current broker instance.
4242
:param receiver_cls: receiver class to use.
43-
:param sync_workers: number of threads of a threadpool that runs sync tasks.
43+
:param sync_workers: number of threads of a threadpool
44+
or processes in processpool that runs sync tasks.
4445
:param validate_params: whether to validate params or not.
4546
:param max_async_tasks: maximum number of simultaneous async tasks.
4647
:param max_prefetch: maximum number of tasks to prefetch.

taskiq/cli/worker/args.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ class WorkerArgs:
3232
log_level: LogLevel = LogLevel.INFO
3333
workers: int = 2
3434
max_threadpool_threads: int = 10
35+
max_process_pool_processes: Optional[int] = None
3536
no_parse: bool = False
3637
shutdown_timeout: float = 5
3738
reload: bool = False
@@ -223,8 +224,16 @@ def from_cli(
223224
parser.add_argument(
224225
"--use-process-pool",
225226
action="store_true",
227+
dest="use_process_pool",
226228
help="Use process pool instead of thread pool for sync tasks.",
227229
)
230+
parser.add_argument(
231+
"--max-process-pool-processes",
232+
type=int,
233+
dest="max_process_pool_processes",
234+
default=None,
235+
help="Maximum number of processes in process pool.",
236+
)
228237

229238
namespace = parser.parse_args(args)
230239
# If there are any patterns specified, remove default.

taskiq/cli/worker/run.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ def interrupt_handler(signum: int, _frame: Any) -> None:
137137

138138
executor: Executor
139139
if args.use_process_pool:
140-
executor = ProcessPoolExecutor(max_workers=args.max_threadpool_threads)
140+
executor = ProcessPoolExecutor(max_workers=args.max_process_pool_processes)
141141
else:
142142
executor = ThreadPoolExecutor(max_workers=args.max_threadpool_threads)
143143

taskiq/decor.py

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,17 +55,39 @@ def __init__(
5555
self.broker = broker
5656
self.task_name = task_name
5757
self.original_func = original_func
58+
self.labels = labels
59+
60+
# This is a hack to make ProcessPoolExecutor work
61+
# with decorated functions.
62+
#
63+
# The problem is that when we decorate a function
64+
# it becomes a new class. This class has the same
65+
# name as the original function.
66+
#
67+
# When receiver sends original function to another
68+
# process, it will have the same name as the decorated
69+
# class. This will cause an error, because ProcessPoolExecutor
70+
# uses `__name__` and `__qualname__` attributes to
71+
# import functions from other processes and then it verifies
72+
# that the function is the same as the original one.
73+
#
74+
# This hack renames the original function and injects
75+
# it back to the module where it was defined.
76+
# This way ProcessPoolExecutor will be able to import
77+
# the function by it's name and verify its correctness.
5878
new_name = f"{original_func.__name__}__taskiq_original"
5979
self.original_func.__name__ = new_name
60-
self.original_func.__qualname__ = new_name
80+
if hasattr(self.original_func, "__qualname__"):
81+
original_qualname = self.original_func.__qualname__.rsplit(".")
82+
original_qualname[-1] = new_name
83+
new_qualname = ".".join(original_qualname)
84+
self.original_func.__qualname__ = new_qualname
6185
setattr(
6286
sys.modules[original_func.__module__],
6387
new_name,
6488
original_func,
6589
)
6690

67-
self.labels = labels
68-
6991
# Docs for this method are omitted in order to help
7092
# your IDE resolve correct docs for it.
7193
def __call__( # noqa: D102

0 commit comments

Comments
 (0)