Skip to content

Commit 475dd18

Browse files
committed
All imported tasks are now available for brokers.
Signed-off-by: Pavel Kirilin <[email protected]>
1 parent 4fe5e48 commit 475dd18

File tree

4 files changed

+14
-23
lines changed

4 files changed

+14
-23
lines changed

taskiq/abc/broker.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,8 @@ class AsyncBroker(ABC):
276276
in async mode.
277277
"""
278278

279+
available_tasks: Dict[str, AsyncTaskiqDecoratedTask[Any, Any]] = {}
280+
279281
def __init__(
280282
self,
281283
result_backend: Optional[AsyncResultBackend[_T]] = None,
@@ -284,7 +286,6 @@ def __init__(
284286
result_backend = DummyResultBackend()
285287
self.result_backend = result_backend
286288
self.is_worker_process = False
287-
self._related_tasks: Set[AsyncTaskiqDecoratedTask[..., Any]] = set()
288289

289290
async def startup(self) -> None:
290291
"""Do something when starting broker."""
@@ -395,10 +396,7 @@ def inner(
395396
),
396397
)
397398

398-
# Adds this task to the list of tasks.
399-
# This option is used by workers.
400-
if self.is_worker_process:
401-
self._related_tasks.add(decorated_task) # type: ignore
399+
self.available_tasks[decorated_task.task_name] = decorated_task
402400

403401
return decorated_task
404402

taskiq/brokers/inmemory_broker.py

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from taskiq.abc.broker import AsyncBroker
77
from taskiq.abc.result_backend import AsyncResultBackend, TaskiqResult
88
from taskiq.cli.async_task_runner import run_task
9+
from taskiq.exceptions import TaskiqError
910
from taskiq.message import TaskiqMessage
1011

1112
_ReturnType = TypeVar("_ReturnType")
@@ -87,18 +88,15 @@ def __init__(
8788
sync_tasks_pool_size: int = 4,
8889
logs_format: Optional[str] = None,
8990
max_stored_results: int = 100,
91+
cast_types: bool = True,
9092
) -> None:
9193
super().__init__(
9294
InmemoryResultBackend(
9395
max_stored_results=max_stored_results,
9496
),
9597
)
96-
# We mock as if it's a worker process.
97-
# So every task call will add tasks in
98-
# _related_tasks attribute.
99-
self.is_worker_process = True
100-
self.tasks_mapping = None
10198
self.executor = ThreadPoolExecutor(max_workers=sync_tasks_pool_size)
99+
self.cast_types = cast_types
102100
if logs_format is None:
103101
logs_format = "%(levelname)s %(message)s"
104102
self.logs_format = logs_format
@@ -110,13 +108,11 @@ async def kick(self, message: TaskiqMessage) -> None:
110108
This method just executes given task.
111109
112110
:param message: incomming message.
113-
:raises ValueError: if someone wants to kick unknown task.
111+
:raises TaskiqError: if someone wants to kick unknown task.
114112
"""
115-
for task in self._related_tasks:
116-
if task.task_name == message.task_name:
117-
target_task = task
113+
target_task = self.available_tasks.get(message.task_name)
118114
if target_task is None:
119-
raise ValueError("Unknown task.")
115+
raise TaskiqError("Unknown task.")
120116
result = await run_task(
121117
target=target_task.original_func,
122118
signature=inspect.signature(target_task.original_func),

taskiq/cli/async_task_runner.py

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -249,22 +249,19 @@ async def async_listen_messages( # noqa: C901, WPS210, WPS213
249249
max_workers=cli_args.max_threadpool_threads,
250250
)
251251
logger.info("Listening started.")
252-
task_registry: Dict[str, Callable[..., Any]] = {}
253252
task_signatures: Dict[str, inspect.Signature] = {}
254-
for task in broker._related_tasks: # noqa: WPS437
255-
task_registry[task.task_name] = task.original_func
256-
# If we need to parse parameters we remember all tasks signatures.
253+
for task in broker.available_tasks.values():
257254
if not cli_args.no_parse:
258255
task_signatures[task.task_name] = inspect.signature(task.original_func)
259256
async for message in broker.listen():
260257
logger.debug(f"Received message: {message}")
261-
if message.task_name not in task_registry:
258+
if message.task_name not in broker.available_tasks:
262259
logger.warning(
263260
'task "%s" is not found. Maybe you forgot to import it?',
264261
message.task_name,
265262
)
266263
continue
267-
func = task_registry[message.task_name]
264+
func = broker.available_tasks[message.task_name]
268265
logger.debug(
269266
"Function for task %s is resolved. Executing...",
270267
message.task_name,

taskiq/cli/worker.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,9 +92,9 @@ def start_listen(args: TaskiqArgs) -> None:
9292
This function starts actual listening process.
9393
9494
It imports broker and all tasks.
95-
Since tasks registers themselfs in broker
95+
Since tasks registers themselfs in a global set,
9696
it's easy to just import module where you have decorated
97-
function and they will be available in broker's `_related_tasks`
97+
function and they will be available in broker's `available_tasks`
9898
field.
9999
100100
:param args: CLI arguments.

0 commit comments

Comments
 (0)