Skip to content

Commit 80b0237

Browse files
dima-dmytruk23ddmytruk
authored andcommitted
Add factories (#349)
* Add factories support --------- Co-authored-by: ddmytruk <[email protected]>
1 parent abae036 commit 80b0237

File tree

5 files changed

+20
-4
lines changed

5 files changed

+20
-4
lines changed

taskiq/brokers/inmemory_broker.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ async def set_progress(
8787
progress: TaskProgress[Any],
8888
) -> None:
8989
"""
90-
Set progress of task exection.
90+
Set progress of task execution.
9191
9292
:param task_id: task id
9393
:param progress: task execution progress

taskiq/cli/scheduler/args.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,10 @@ def from_cli(cls, args: Optional[Sequence[str]] = None) -> "SchedulerArgs":
3232
formatter_class=ArgumentDefaultsHelpFormatter,
3333
description="Subcommand to run scheduler",
3434
)
35-
parser.add_argument("scheduler", help="Path to scheduler")
35+
parser.add_argument(
36+
"scheduler",
37+
help="Path to scheduler or scheduler factory function",
38+
)
3639
parser.add_argument(
3740
"modules",
3841
help="List of modules where to look for tasks.",

taskiq/cli/scheduler/run.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import asyncio
2+
import inspect
23
import sys
34
from datetime import datetime, timedelta
45
from logging import basicConfig, getLevelName, getLogger
@@ -192,15 +193,19 @@ async def run_scheduler(args: SchedulerArgs) -> None:
192193
),
193194
)
194195
getLogger("taskiq").setLevel(level=getLevelName(args.log_level))
196+
195197
if isinstance(args.scheduler, str):
196198
scheduler = import_object(args.scheduler)
199+
if inspect.isfunction(scheduler):
200+
scheduler = scheduler()
197201
else:
198202
scheduler = args.scheduler
199203
if not isinstance(scheduler, TaskiqScheduler):
200204
logger.error(
201205
"Imported scheduler is not a subclass of TaskiqScheduler.",
202206
)
203207
sys.exit(1)
208+
204209
scheduler.broker.is_scheduler_process = True
205210
import_tasks(args.modules, args.tasks_pattern, args.fs_discover)
206211
for source in scheduler.sources:

taskiq/cli/worker/args.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ def from_cli(
6262
parser.add_argument(
6363
"broker",
6464
help=(
65-
"Where to search for broker. "
65+
"Where to search for broker or broker factory function. "
6666
"This string must be specified in "
6767
"'module.module:variable' format."
6868
),

taskiq/cli/worker/run.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import asyncio
2+
import inspect
23
import logging
34
import os
45
import signal
@@ -126,9 +127,16 @@ def interrupt_handler(signum: int, _frame: Any) -> None:
126127
# broker is running as a worker.
127128
# We must set this field before importing tasks,
128129
# so broker will remember all tasks it's related to.
130+
129131
broker = import_object(args.broker)
132+
if inspect.isfunction(broker):
133+
broker = broker()
130134
if not isinstance(broker, AsyncBroker):
131-
raise ValueError("Unknown broker type. Please use AsyncBroker instance.")
135+
raise ValueError(
136+
"Unknown broker type. Please use AsyncBroker instance "
137+
"or pass broker factory function that returns an AsyncBroker instance.",
138+
)
139+
132140
broker.is_worker_process = True
133141
import_tasks(args.modules, args.tasks_pattern, args.fs_discover)
134142

0 commit comments

Comments
 (0)