Skip to content

Commit 1a5c645

Browse files
committed
Made some fixes and small enhancements.
1 parent c4c8171 commit 1a5c645

File tree

3 files changed

+24
-5
lines changed

3 files changed

+24
-5
lines changed

docs/guide/cli.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,24 @@ Like this:
1818
taskiq worker mybroker:broker_var my_project.module1 my_project.module2
1919
```
2020

21+
### Sync function
22+
23+
Taskiq can run sync function. But since it's asynchronous system, it runs it in separate thread or process.
24+
By default process pool is used. But if you are going to use taskiq for heavy computations, such as neural network
25+
model training or other calculations, you might want to use processpool instead.
26+
27+
More precisely about the difference you can read in [python docs about executors](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor).
28+
29+
As a rule of thumb:
30+
* If you're using sync functions for IO then use `threadpool`;
31+
* If you're using sync functions for CPU bound workloads then use processpool.
32+
33+
By default taskiq uses threadpool. If you want to change this behavior, use these parameters:
34+
35+
* `--use-process-pool` to switch to processpools;
36+
* `--max-process-pool-processes` to manually specify worker processes;
37+
* `--max-threadpool-threads` to configure maximum threads for threadpool if it's being used;
38+
2139
### Auto importing
2240

2341
Enumerating all modules with tasks is not an option sometimes.

taskiq/cli/worker/args.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ class WorkerArgs:
3131
configure_logging: bool = True
3232
log_level: LogLevel = LogLevel.INFO
3333
workers: int = 2
34-
max_threadpool_threads: int = 10
34+
max_threadpool_threads: Optional[int] = None
3535
max_process_pool_processes: Optional[int] = None
3636
no_parse: bool = False
3737
shutdown_timeout: float = 5
@@ -147,6 +147,7 @@ def from_cli(
147147
parser.add_argument(
148148
"--max-threadpool-threads",
149149
type=int,
150+
default=None,
150151
help="Maximum number of threads for executing sync functions.",
151152
)
152153
parser.add_argument(

taskiq/schedule_sources/label_based.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@ async def get_schedules(self) -> List["ScheduledTask"]:
3131
if task.broker != self.broker:
3232
# if task broker doesn't match self, something is probably wrong
3333
logger.warning(
34-
f"Broker for {task_name} ({task.broker}) doesn't "
35-
f"match scheduler's broker ({self.broker})"
34+
f"Broker for {task_name} `{task.broker}` doesn't "
35+
f"match scheduler's broker `{self.broker}`",
3636
)
3737
continue
3838
for schedule in task.labels.get("schedule", []):
@@ -69,8 +69,8 @@ def post_send(self, scheduled_task: ScheduledTask) -> None:
6969
if task.broker != self.broker:
7070
# if task broker doesn't match self, something is probably wrong
7171
logger.warning(
72-
f"Broker for {task_name} ({task.broker}) doesn't "
73-
f"match scheduler's broker ({self.broker})"
72+
f"Broker for {task_name} `{task.broker}` doesn't "
73+
f"match scheduler's broker `{self.broker}`",
7474
)
7575
continue
7676
if scheduled_task.task_name != task_name:

0 commit comments

Comments
 (0)