diff --git a/taskiq/formatters/json_formatter.py b/taskiq/formatters/json_formatter.py index 3d49e286..0af0c7ec 100644 --- a/taskiq/formatters/json_formatter.py +++ b/taskiq/formatters/json_formatter.py @@ -16,6 +16,7 @@ def dumps(self, message: TaskiqMessage) -> BrokerMessage: return BrokerMessage( task_id=message.task_id, task_name=message.task_name, + queue=message.queue, message=model_dump_json(message).encode(), labels=message.labels, ) diff --git a/taskiq/formatters/proxy_formatter.py b/taskiq/formatters/proxy_formatter.py index a2208715..987b5336 100644 --- a/taskiq/formatters/proxy_formatter.py +++ b/taskiq/formatters/proxy_formatter.py @@ -24,6 +24,7 @@ def dumps(self, message: TaskiqMessage) -> BrokerMessage: return BrokerMessage( task_id=message.task_id, task_name=message.task_name, + queue=message.queue, message=self.broker.serializer.dumpb(model_dump(message)), labels=message.labels, ) diff --git a/taskiq/kicker.py b/taskiq/kicker.py index bdc62dae..a0ed5494 100644 --- a/taskiq/kicker.py +++ b/taskiq/kicker.py @@ -23,6 +23,7 @@ from taskiq.exceptions import SendTaskError from taskiq.labels import prepare_label from taskiq.message import TaskiqMessage +from taskiq.queue import DEFAULT_QUEUE, Queue from taskiq.scheduler.created_schedule import CreatedSchedule from taskiq.scheduler.scheduled_task import CronSpec, ScheduledTask from taskiq.task import AsyncTaskiqTask @@ -47,10 +48,12 @@ def __init__( task_name: str, broker: "AsyncBroker", labels: Dict[str, Any], + queue: Union["Queue", str] = DEFAULT_QUEUE, return_type: Optional[Type[_ReturnType]] = None, ) -> None: self.task_name = task_name self.broker = broker + self.queue = Queue(queue) self.labels = labels self.custom_task_id: Optional[str] = None self.custom_schedule_id: Optional[str] = None @@ -111,6 +114,19 @@ def with_broker( self.broker = broker return self + def with_queue( + self, + queue: Union["Queue", str], + ) -> "AsyncKicker[_FuncParams, _ReturnType]": + """ + Replace queue for the function. + + :param queue: new queue instance or name. + :return: Kicker with new queue. + """ + self.queue = Queue(queue) + return self + @overload async def kiq( self: "AsyncKicker[_FuncParams, CoroutineType[Any, Any, _T]]", @@ -296,6 +312,7 @@ def _prepare_message( return TaskiqMessage( task_id=task_id, task_name=self.task_name, + queue=self.queue.name, labels=labels, labels_types=labels_types, args=formatted_args, diff --git a/taskiq/message.py b/taskiq/message.py index 675f7cf3..ab0ee960 100644 --- a/taskiq/message.py +++ b/taskiq/message.py @@ -16,6 +16,7 @@ class TaskiqMessage(BaseModel): task_id: str task_name: str + queue: str labels: Dict[str, Any] labels_types: Optional[Dict[str, int]] = None args: List[Any] @@ -40,5 +41,6 @@ class BrokerMessage(BaseModel): task_id: str task_name: str + queue: str message: bytes labels: Dict[str, Any] diff --git a/taskiq/queue.py b/taskiq/queue.py new file mode 100644 index 00000000..190697f4 --- /dev/null +++ b/taskiq/queue.py @@ -0,0 +1,26 @@ +from __future__ import annotations + +import dataclasses + +DEFAULT_QUEUE = "taskiq" + + +@dataclasses.dataclass(frozen=True, init=False, eq=True) +class Queue: + """Represents an abstraction for dealing with queues in real brokers.""" + + name: str + + def __init__(self, src: str | Queue) -> None: + if isinstance(src, Queue): + object.__setattr__(self, "name", src.name) + elif isinstance(src, str): + object.__setattr__(self, "name", src) + else: + raise TypeError( + "Queue.__init__ expect str or Queue, " + "{type(src).__name__!r} is recieved", + ) + + def __repr__(self) -> str: + return self.name diff --git a/tests/cli/worker/test_parameters_parsing.py b/tests/cli/worker/test_parameters_parsing.py index cf01792b..6ae45af3 100644 --- a/tests/cli/worker/test_parameters_parsing.py +++ b/tests/cli/worker/test_parameters_parsing.py @@ -25,6 +25,7 @@ def test_parse_params_no_signature() -> None: task_id="", task_name="", labels={}, + queue="taskiq", args=[1, 2], kwargs={"a": 1}, ) @@ -49,6 +50,7 @@ def test_func(param: test_class) -> test_class: # type: ignore task_id="", task_name="", labels={}, + queue="taskiq", args=[{"field": "test_val"}], kwargs={}, ) @@ -66,6 +68,7 @@ def test_func(param: test_class) -> test_class: # type: ignore task_id="", task_name="", labels={}, + queue="taskiq", args=[], kwargs={"param": {"field": "test_val"}}, ) @@ -91,6 +94,7 @@ def test_func(param: test_class) -> test_class: # type: ignore task_id="", task_name="", labels={}, + queue="taskiq", args=[{"unknown": "unknown"}], kwargs={}, ) @@ -107,6 +111,7 @@ def test_func(param: test_class) -> test_class: # type: ignore task_id="", task_name="", labels={}, + queue="taskiq", args=[], kwargs={"param": {"unknown": "unknown"}}, ) @@ -130,6 +135,7 @@ def test_func(param: test_class) -> test_class: # type: ignore msg_with_args = TaskiqMessage( task_id="", task_name="", + queue="taskiq", labels={}, args=[None], kwargs={}, @@ -142,6 +148,7 @@ def test_func(param: test_class) -> test_class: # type: ignore msg_with_kwargs = TaskiqMessage( task_id="", task_name="", + queue="taskiq", labels={}, args=[], kwargs={"param": None}, diff --git a/tests/depends/test_progress_tracker.py b/tests/depends/test_progress_tracker.py index 040381b0..8a48f222 100644 --- a/tests/depends/test_progress_tracker.py +++ b/tests/depends/test_progress_tracker.py @@ -51,6 +51,7 @@ def get_message( task_id=task_id or task.broker.id_generator(), task_name=task.task_name, labels=labels, + queue="taskiq", args=list(args), kwargs=kwargs, ) diff --git a/tests/formatters/test_json_formatter.py b/tests/formatters/test_json_formatter.py index 17a37185..b971ed4b 100644 --- a/tests/formatters/test_json_formatter.py +++ b/tests/formatters/test_json_formatter.py @@ -12,6 +12,7 @@ async def test_json_dumps() -> None: msg = TaskiqMessage( task_id="task-id", task_name="task.name", + queue="taskiq", labels={"label1": 1, "label2": "text"}, args=[1, "a"], kwargs={"p1": "v1"}, @@ -19,8 +20,10 @@ async def test_json_dumps() -> None: expected = BrokerMessage( task_id="task-id", task_name="task.name", + queue="taskiq", message=( b'{"task_id":"task-id","task_name":"task.name",' + b'"queue": "taskiq",' b'"labels":{"label1":1,"label2":"text"},' b'"labels_types":null,' b'"args":[1,"a"],"kwargs":{"p1":"v1"}}' @@ -39,6 +42,7 @@ async def test_json_loads() -> None: fmt = JSONFormatter() msg = ( b'{"task_id":"task-id","task_name":"task.name",' + b'"queue": "taskiq",' b'"labels":{"label1":1,"label2":"text"},' b'"args":[1,"a"],"kwargs":{"p1":"v1"}}' ) @@ -46,6 +50,7 @@ async def test_json_loads() -> None: task_id="task-id", task_name="task.name", labels={"label1": 1, "label2": "text"}, + queue="taskiq", args=[1, "a"], kwargs={"p1": "v1"}, ) diff --git a/tests/formatters/test_proxy_formatter.py b/tests/formatters/test_proxy_formatter.py index 8d583f16..745aafb5 100644 --- a/tests/formatters/test_proxy_formatter.py +++ b/tests/formatters/test_proxy_formatter.py @@ -11,6 +11,7 @@ async def test_proxy_dumps() -> None: msg = TaskiqMessage( task_id="task-id", task_name="task.name", + queue="taskiq", labels={"label1": 1, "label2": "text"}, args=[1, "a"], kwargs={"p1": "v1"}, @@ -18,8 +19,10 @@ async def test_proxy_dumps() -> None: expected = BrokerMessage( task_id="task-id", task_name="task.name", + queue="taskiq", message=( b'{"task_id": "task-id", "task_name": "task.name", ' + b'"queue": "taskiq", ' b'"labels": {"label1": 1, "label2": "text"}, ' b'"labels_types": null, ' b'"args": [1, "a"], "kwargs": {"p1": "v1"}}' @@ -35,12 +38,14 @@ async def test_proxy_loads() -> None: broker = InMemoryBroker() msg = ( b'{"task_id":"task-id","task_name":"task.name",' + b'"queue": "taskiq", ' b'"labels":{"label1":1,"label2":"text"},' b'"args":[1,"a"],"kwargs":{"p1":"v1"}}' ) expected = TaskiqMessage( task_id="task-id", task_name="task.name", + queue="taskiq", labels={"label1": 1, "label2": "text"}, args=[1, "a"], kwargs={"p1": "v1"}, diff --git a/tests/middlewares/test_simple_retry.py b/tests/middlewares/test_simple_retry.py index 7a0c12d3..5c973696 100644 --- a/tests/middlewares/test_simple_retry.py +++ b/tests/middlewares/test_simple_retry.py @@ -25,6 +25,7 @@ async def test_successful_retry(broker: AsyncMock) -> None: TaskiqMessage( task_id="test_id", task_name="meme", + queue="taskiq", labels={ "retry_on_error": "True", }, @@ -47,6 +48,7 @@ async def test_no_retry(broker: AsyncMock) -> None: TaskiqMessage( task_id="test_id", task_name="meme", + queue="taskiq", labels={}, args=[], kwargs={}, @@ -65,6 +67,7 @@ async def test_max_retries(broker: AsyncMock) -> None: TaskiqMessage( task_id="test_id", task_name="meme", + queue="taskiq", labels={ "retry_on_error": "True", "_retries": "2", diff --git a/tests/receiver/test_params_parser.py b/tests/receiver/test_params_parser.py index c0df8e5c..8df845a5 100644 --- a/tests/receiver/test_params_parser.py +++ b/tests/receiver/test_params_parser.py @@ -25,6 +25,7 @@ def func(a: int, b: int) -> None: task_name="test", labels={}, labels_types={}, + queue="taskiq", args=["1", "2"], kwargs={}, ) @@ -47,6 +48,7 @@ def func(a: TestObj) -> None: task_name="test", labels={}, labels_types={}, + queue="taskiq", args=[{"a": "10", "b": "20"}], kwargs={}, ) @@ -68,6 +70,7 @@ def func(a: TestObj) -> None: task_name="test", labels={}, labels_types={}, + queue="taskiq", args=[{"a": "10", "b": "20"}], kwargs={}, ) @@ -85,6 +88,7 @@ def func(a: int, b: int) -> None: task_name="test", labels={}, labels_types={}, + queue="taskiq", args=["f3", "2"], kwargs={}, ) @@ -108,6 +112,7 @@ def func(a: TestObj) -> None: task_name="test", labels={}, labels_types={}, + queue="taskiq", args=[{"a": "10", "b": "f3"}], kwargs={}, ) @@ -130,6 +135,7 @@ def func(a: TestObj) -> None: task_name="test", labels={}, labels_types={}, + queue="taskiq", args=[{"a": "10", "b": "f3"}], kwargs={}, ) @@ -149,6 +155,7 @@ def func(a: int, b: int) -> None: task_name="test", labels={}, labels_types={}, + queue="taskiq", args=[1], kwargs={"b": "2"}, ) @@ -171,6 +178,7 @@ def func(a: TestObj) -> None: task_name="test", labels={}, labels_types={}, + queue="taskiq", args=[], kwargs={"a": {"a": "10", "b": "20"}}, ) @@ -192,6 +200,7 @@ def func(a: TestObj) -> None: task_name="test", labels={}, labels_types={}, + queue="taskiq", args=[], kwargs={"a": {"a": "10", "b": "20"}}, ) @@ -209,6 +218,7 @@ def func(a: int, b: int) -> None: task_name="test", labels={}, labels_types={}, + queue="taskiq", args=[], kwargs={"a": "1", "b": "f3"}, ) @@ -232,6 +242,7 @@ def func(a: TestObj) -> None: task_name="test", labels={}, labels_types={}, + queue="taskiq", args=[], kwargs={"a": {"a": "10", "b": "f3"}}, ) @@ -254,6 +265,7 @@ def func(a: TestObj) -> None: task_name="test", labels={}, labels_types={}, + queue="taskiq", args=[], kwargs={"a": {"a": "10", "b": "f3"}}, ) diff --git a/tests/receiver/test_receiver.py b/tests/receiver/test_receiver.py index 57637e95..bb0c2121 100644 --- a/tests/receiver/test_receiver.py +++ b/tests/receiver/test_receiver.py @@ -55,6 +55,7 @@ async def test_func(param: int) -> int: task_id="", task_name="", labels={}, + queue="taskiq", args=[1], kwargs={}, ), @@ -77,6 +78,7 @@ def test_func(param: int) -> int: TaskiqMessage( task_id="", task_name="", + queue="taskiq", labels={}, args=[1], kwargs={}, @@ -99,6 +101,7 @@ def test_func() -> None: TaskiqMessage( task_id="", task_name="", + queue="taskiq", labels={}, args=[], kwargs={}, @@ -120,6 +123,7 @@ async def test_func() -> None: TaskiqMessage( task_id="", task_name="", + queue="taskiq", labels={"timeout": "0.3"}, args=[], kwargs={}, @@ -142,6 +146,7 @@ def test_func() -> None: TaskiqMessage( task_id="", task_name="", + queue="taskiq", labels={"timeout": "0.3"}, args=[], kwargs={}, @@ -178,6 +183,7 @@ def test_func() -> None: TaskiqMessage( task_id="", task_name="", + queue="taskiq", labels={}, args=[], kwargs={}, @@ -207,6 +213,7 @@ async def my_task() -> int: TaskiqMessage( task_id="task_id", task_name=my_task.task_name, + queue="taskiq", labels={}, args=[], kwargs={}, @@ -242,6 +249,7 @@ async def my_task(dep: int = Depends(dependency)) -> None: TaskiqMessage( task_id="task_id", task_name=my_task.task_name, + queue="taskiq", labels={}, args=[], kwargs={}, @@ -275,6 +283,7 @@ def ack_callback() -> None: TaskiqMessage( task_id="task_id", task_name=my_task.task_name, + queue="taskiq", labels={}, args=[], kwargs={}, @@ -314,6 +323,7 @@ async def ack_callback() -> None: TaskiqMessage( task_id="task_id", task_name=my_task.task_name, + queue="taskiq", labels={}, args=[], kwargs={}, @@ -350,6 +360,7 @@ async def test_callback_unknown_task() -> None: TaskiqMessage( task_id="task_id", task_name="unknown", + queue="taskiq", labels={}, args=[], kwargs={}, @@ -387,6 +398,7 @@ def test_func(tes_val: MyTestClass = Depends()) -> int: TaskiqMessage( task_id="", task_name=test_func.task_name, + queue="taskiq", labels={}, args=[], kwargs={},