Skip to content

Multiqueue Support #486

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions taskiq/formatters/json_formatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ def dumps(self, message: TaskiqMessage) -> BrokerMessage:
return BrokerMessage(
task_id=message.task_id,
task_name=message.task_name,
queue=message.queue,
message=model_dump_json(message).encode(),
labels=message.labels,
)
Expand Down
1 change: 1 addition & 0 deletions taskiq/formatters/proxy_formatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def dumps(self, message: TaskiqMessage) -> BrokerMessage:
return BrokerMessage(
task_id=message.task_id,
task_name=message.task_name,
queue=message.queue,
message=self.broker.serializer.dumpb(model_dump(message)),
labels=message.labels,
)
Expand Down
17 changes: 17 additions & 0 deletions taskiq/kicker.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from taskiq.exceptions import SendTaskError
from taskiq.labels import prepare_label
from taskiq.message import TaskiqMessage
from taskiq.queue import DEFAULT_QUEUE, Queue
from taskiq.scheduler.created_schedule import CreatedSchedule
from taskiq.scheduler.scheduled_task import CronSpec, ScheduledTask
from taskiq.task import AsyncTaskiqTask
Expand All @@ -47,10 +48,12 @@ def __init__(
task_name: str,
broker: "AsyncBroker",
labels: Dict[str, Any],
queue: Union["Queue", str] = DEFAULT_QUEUE,
return_type: Optional[Type[_ReturnType]] = None,
) -> None:
self.task_name = task_name
self.broker = broker
self.queue = Queue(queue)
self.labels = labels
self.custom_task_id: Optional[str] = None
self.custom_schedule_id: Optional[str] = None
Expand Down Expand Up @@ -111,6 +114,19 @@ def with_broker(
self.broker = broker
return self

def with_queue(
self,
queue: Union["Queue", str],
) -> "AsyncKicker[_FuncParams, _ReturnType]":
"""
Replace queue for the function.

:param queue: new queue instance or name.
:return: Kicker with new queue.
"""
self.queue = Queue(queue)
return self

@overload
async def kiq(
self: "AsyncKicker[_FuncParams, CoroutineType[Any, Any, _T]]",
Expand Down Expand Up @@ -296,6 +312,7 @@ def _prepare_message(
return TaskiqMessage(
task_id=task_id,
task_name=self.task_name,
queue=self.queue.name,
labels=labels,
labels_types=labels_types,
args=formatted_args,
Expand Down
2 changes: 2 additions & 0 deletions taskiq/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class TaskiqMessage(BaseModel):

task_id: str
task_name: str
queue: str
labels: Dict[str, Any]
labels_types: Optional[Dict[str, int]] = None
args: List[Any]
Expand All @@ -40,5 +41,6 @@ class BrokerMessage(BaseModel):

task_id: str
task_name: str
queue: str
message: bytes
labels: Dict[str, Any]
26 changes: 26 additions & 0 deletions taskiq/queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from __future__ import annotations

import dataclasses

DEFAULT_QUEUE = "taskiq"


@dataclasses.dataclass(frozen=True, init=False, eq=True)
class Queue:
"""Represents an abstraction for dealing with queues in real brokers."""

name: str

def __init__(self, src: str | Queue) -> None:
if isinstance(src, Queue):
object.__setattr__(self, "name", src.name)
elif isinstance(src, str):
object.__setattr__(self, "name", src)
else:
raise TypeError(
"Queue.__init__ expect str or Queue, "
"{type(src).__name__!r} is recieved",
)

def __repr__(self) -> str:
return self.name
7 changes: 7 additions & 0 deletions tests/cli/worker/test_parameters_parsing.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def test_parse_params_no_signature() -> None:
task_id="",
task_name="",
labels={},
queue="taskiq",
args=[1, 2],
kwargs={"a": 1},
)
Expand All @@ -49,6 +50,7 @@ def test_func(param: test_class) -> test_class: # type: ignore
task_id="",
task_name="",
labels={},
queue="taskiq",
args=[{"field": "test_val"}],
kwargs={},
)
Expand All @@ -66,6 +68,7 @@ def test_func(param: test_class) -> test_class: # type: ignore
task_id="",
task_name="",
labels={},
queue="taskiq",
args=[],
kwargs={"param": {"field": "test_val"}},
)
Expand All @@ -91,6 +94,7 @@ def test_func(param: test_class) -> test_class: # type: ignore
task_id="",
task_name="",
labels={},
queue="taskiq",
args=[{"unknown": "unknown"}],
kwargs={},
)
Expand All @@ -107,6 +111,7 @@ def test_func(param: test_class) -> test_class: # type: ignore
task_id="",
task_name="",
labels={},
queue="taskiq",
args=[],
kwargs={"param": {"unknown": "unknown"}},
)
Expand All @@ -130,6 +135,7 @@ def test_func(param: test_class) -> test_class: # type: ignore
msg_with_args = TaskiqMessage(
task_id="",
task_name="",
queue="taskiq",
labels={},
args=[None],
kwargs={},
Expand All @@ -142,6 +148,7 @@ def test_func(param: test_class) -> test_class: # type: ignore
msg_with_kwargs = TaskiqMessage(
task_id="",
task_name="",
queue="taskiq",
labels={},
args=[],
kwargs={"param": None},
Expand Down
1 change: 1 addition & 0 deletions tests/depends/test_progress_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def get_message(
task_id=task_id or task.broker.id_generator(),
task_name=task.task_name,
labels=labels,
queue="taskiq",
args=list(args),
kwargs=kwargs,
)
Expand Down
5 changes: 5 additions & 0 deletions tests/formatters/test_json_formatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,18 @@ async def test_json_dumps() -> None:
msg = TaskiqMessage(
task_id="task-id",
task_name="task.name",
queue="taskiq",
labels={"label1": 1, "label2": "text"},
args=[1, "a"],
kwargs={"p1": "v1"},
)
expected = BrokerMessage(
task_id="task-id",
task_name="task.name",
queue="taskiq",
message=(
b'{"task_id":"task-id","task_name":"task.name",'
b'"queue": "taskiq",'
b'"labels":{"label1":1,"label2":"text"},'
b'"labels_types":null,'
b'"args":[1,"a"],"kwargs":{"p1":"v1"}}'
Expand All @@ -39,13 +42,15 @@ async def test_json_loads() -> None:
fmt = JSONFormatter()
msg = (
b'{"task_id":"task-id","task_name":"task.name",'
b'"queue": "taskiq",'
b'"labels":{"label1":1,"label2":"text"},'
b'"args":[1,"a"],"kwargs":{"p1":"v1"}}'
)
expected = TaskiqMessage(
task_id="task-id",
task_name="task.name",
labels={"label1": 1, "label2": "text"},
queue="taskiq",
args=[1, "a"],
kwargs={"p1": "v1"},
)
Expand Down
5 changes: 5 additions & 0 deletions tests/formatters/test_proxy_formatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,18 @@ async def test_proxy_dumps() -> None:
msg = TaskiqMessage(
task_id="task-id",
task_name="task.name",
queue="taskiq",
labels={"label1": 1, "label2": "text"},
args=[1, "a"],
kwargs={"p1": "v1"},
)
expected = BrokerMessage(
task_id="task-id",
task_name="task.name",
queue="taskiq",
message=(
b'{"task_id": "task-id", "task_name": "task.name", '
b'"queue": "taskiq", '
b'"labels": {"label1": 1, "label2": "text"}, '
b'"labels_types": null, '
b'"args": [1, "a"], "kwargs": {"p1": "v1"}}'
Expand All @@ -35,12 +38,14 @@ async def test_proxy_loads() -> None:
broker = InMemoryBroker()
msg = (
b'{"task_id":"task-id","task_name":"task.name",'
b'"queue": "taskiq", '
b'"labels":{"label1":1,"label2":"text"},'
b'"args":[1,"a"],"kwargs":{"p1":"v1"}}'
)
expected = TaskiqMessage(
task_id="task-id",
task_name="task.name",
queue="taskiq",
labels={"label1": 1, "label2": "text"},
args=[1, "a"],
kwargs={"p1": "v1"},
Expand Down
3 changes: 3 additions & 0 deletions tests/middlewares/test_simple_retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ async def test_successful_retry(broker: AsyncMock) -> None:
TaskiqMessage(
task_id="test_id",
task_name="meme",
queue="taskiq",
labels={
"retry_on_error": "True",
},
Expand All @@ -47,6 +48,7 @@ async def test_no_retry(broker: AsyncMock) -> None:
TaskiqMessage(
task_id="test_id",
task_name="meme",
queue="taskiq",
labels={},
args=[],
kwargs={},
Expand All @@ -65,6 +67,7 @@ async def test_max_retries(broker: AsyncMock) -> None:
TaskiqMessage(
task_id="test_id",
task_name="meme",
queue="taskiq",
labels={
"retry_on_error": "True",
"_retries": "2",
Expand Down
12 changes: 12 additions & 0 deletions tests/receiver/test_params_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def func(a: int, b: int) -> None:
task_name="test",
labels={},
labels_types={},
queue="taskiq",
args=["1", "2"],
kwargs={},
)
Expand All @@ -47,6 +48,7 @@ def func(a: TestObj) -> None:
task_name="test",
labels={},
labels_types={},
queue="taskiq",
args=[{"a": "10", "b": "20"}],
kwargs={},
)
Expand All @@ -68,6 +70,7 @@ def func(a: TestObj) -> None:
task_name="test",
labels={},
labels_types={},
queue="taskiq",
args=[{"a": "10", "b": "20"}],
kwargs={},
)
Expand All @@ -85,6 +88,7 @@ def func(a: int, b: int) -> None:
task_name="test",
labels={},
labels_types={},
queue="taskiq",
args=["f3", "2"],
kwargs={},
)
Expand All @@ -108,6 +112,7 @@ def func(a: TestObj) -> None:
task_name="test",
labels={},
labels_types={},
queue="taskiq",
args=[{"a": "10", "b": "f3"}],
kwargs={},
)
Expand All @@ -130,6 +135,7 @@ def func(a: TestObj) -> None:
task_name="test",
labels={},
labels_types={},
queue="taskiq",
args=[{"a": "10", "b": "f3"}],
kwargs={},
)
Expand All @@ -149,6 +155,7 @@ def func(a: int, b: int) -> None:
task_name="test",
labels={},
labels_types={},
queue="taskiq",
args=[1],
kwargs={"b": "2"},
)
Expand All @@ -171,6 +178,7 @@ def func(a: TestObj) -> None:
task_name="test",
labels={},
labels_types={},
queue="taskiq",
args=[],
kwargs={"a": {"a": "10", "b": "20"}},
)
Expand All @@ -192,6 +200,7 @@ def func(a: TestObj) -> None:
task_name="test",
labels={},
labels_types={},
queue="taskiq",
args=[],
kwargs={"a": {"a": "10", "b": "20"}},
)
Expand All @@ -209,6 +218,7 @@ def func(a: int, b: int) -> None:
task_name="test",
labels={},
labels_types={},
queue="taskiq",
args=[],
kwargs={"a": "1", "b": "f3"},
)
Expand All @@ -232,6 +242,7 @@ def func(a: TestObj) -> None:
task_name="test",
labels={},
labels_types={},
queue="taskiq",
args=[],
kwargs={"a": {"a": "10", "b": "f3"}},
)
Expand All @@ -254,6 +265,7 @@ def func(a: TestObj) -> None:
task_name="test",
labels={},
labels_types={},
queue="taskiq",
args=[],
kwargs={"a": {"a": "10", "b": "f3"}},
)
Expand Down
Loading
Loading