Skip to content

Commit 0a7c595

Browse files
committed
Added simples retries logic.
Signed-off-by: Pavel Kirilin <[email protected]>
1 parent 7b6609b commit 0a7c595

File tree

12 files changed

+114
-46
lines changed

12 files changed

+114
-46
lines changed

taskiq/__init__.py

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,24 @@
11
"""Distributed task manager."""
22
from taskiq.abc.broker import AsyncBroker, AsyncTaskiqDecoratedTask
3+
from taskiq.abc.formatter import TaskiqFormatter
4+
from taskiq.abc.middleware import TaskiqMiddleware
35
from taskiq.abc.result_backend import AsyncResultBackend
4-
from taskiq.message import TaskiqMessage
6+
from taskiq.brokers.shared_broker import async_shared_broker
7+
from taskiq.exceptions import TaskiqError
8+
from taskiq.message import BrokerMessage, TaskiqMessage
9+
from taskiq.result import TaskiqResult
510
from taskiq.task import AsyncTaskiqTask
611

712
__all__ = [
8-
"TaskiqMessage",
913
"AsyncBroker",
10-
"AsyncTaskiqDecoratedTask",
11-
"AsyncResultBackend",
14+
"TaskiqError",
15+
"TaskiqResult",
16+
"TaskiqMessage",
17+
"BrokerMessage",
18+
"TaskiqFormatter",
1219
"AsyncTaskiqTask",
20+
"TaskiqMiddleware",
21+
"AsyncResultBackend",
22+
"async_shared_broker",
23+
"AsyncTaskiqDecoratedTask",
1324
]

taskiq/abc/broker.py

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import inspect
12
from abc import ABC, abstractmethod
23
from functools import wraps
34
from logging import getLogger
@@ -13,12 +14,13 @@
1314
Union,
1415
overload,
1516
)
17+
from uuid import uuid4
1618

1719
from typing_extensions import ParamSpec
1820

1921
from taskiq.decor import AsyncTaskiqDecoratedTask
22+
from taskiq.formatters.json_formatter import JSONFormatter
2023
from taskiq.message import BrokerMessage
21-
from taskiq.plugins.json_formatter import JSONFormatter
2224
from taskiq.result_backends.dummy import DummyResultBackend
2325

2426
if TYPE_CHECKING:
@@ -33,6 +35,18 @@
3335
logger = getLogger("taskiq")
3436

3537

38+
def default_id_generator() -> str:
39+
"""
40+
Default task_id generator.
41+
42+
This function is used to generate id's
43+
for tasks.
44+
45+
:return: new task_id.
46+
"""
47+
return uuid4().hex
48+
49+
3650
class AsyncBroker(ABC):
3751
"""
3852
Async broker.
@@ -47,14 +61,18 @@ class AsyncBroker(ABC):
4761
def __init__(
4862
self,
4963
result_backend: "Optional[AsyncResultBackend[_T]]" = None,
64+
task_id_generator: Optional[Callable[[], str]] = None,
5065
) -> None:
5166
if result_backend is None:
5267
result_backend = DummyResultBackend()
68+
if task_id_generator is None:
69+
task_id_generator = default_id_generator
5370
self.middlewares: "List[TaskiqMiddleware]" = []
5471
self.result_backend = result_backend
5572
self.is_worker_process = False
5673
self.decorator_class = AsyncTaskiqDecoratedTask
5774
self.formatter: "TaskiqFormatter" = JSONFormatter()
75+
self.id_generator = task_id_generator
5876

5977
def add_middlewares(self, middlewares: "List[TaskiqMiddleware]") -> None:
6078
"""
@@ -79,6 +97,10 @@ async def shutdown(self) -> None:
7997
This method is called,
8098
when broker is closig.
8199
"""
100+
for middleware in self.middlewares:
101+
middleware_shutdown = middleware.shutdown()
102+
if inspect.isawaitable(middleware_shutdown):
103+
await middleware_shutdown
82104

83105
@abstractmethod
84106
async def kick(

taskiq/abc/formatter.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
from abc import ABC, abstractmethod
2-
from typing import Any, Dict
32

43
from taskiq.message import BrokerMessage, TaskiqMessage
54

@@ -8,12 +7,11 @@ class TaskiqFormatter(ABC):
87
"""Custom formatter for brokers."""
98

109
@abstractmethod
11-
def dumps(self, message: TaskiqMessage, labels: Dict[str, Any]) -> BrokerMessage:
10+
def dumps(self, message: TaskiqMessage) -> BrokerMessage:
1211
"""
1312
Dump message to broker message instance.
1413
1514
:param message: message to send.
16-
:param labels: task's labels.
1715
:return: message for brokers.
1816
"""
1917

taskiq/abc/middleware.py

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from typing import TYPE_CHECKING, Any, Coroutine, Dict, Union
1+
from typing import TYPE_CHECKING, Any, Coroutine, Union
22

33
if TYPE_CHECKING:
44
from taskiq.abc.broker import AsyncBroker
@@ -20,10 +20,12 @@ def set_broker(self, broker: "AsyncBroker") -> None:
2020
"""
2121
self.broker = broker
2222

23+
def shutdown(self) -> Union[None, Coroutine[Any, Any, None]]:
24+
"""This function is used to do some work on shutdown."""
25+
2326
def pre_send(
2427
self,
2528
message: "TaskiqMessage",
26-
labels: Dict[str, Any],
2729
) -> "Union[TaskiqMessage, Coroutine[Any, Any, TaskiqMessage]]":
2830
"""
2931
Hook that executes before sending the task to worker.
@@ -32,15 +34,13 @@ def pre_send(
3234
the message is sent to broker.
3335
3436
:param message: message to send.
35-
:param labels: task's labels.
3637
:return: modified message.
3738
"""
3839
return message
3940

4041
def post_send(
4142
self,
4243
message: "TaskiqMessage",
43-
labels: Dict[str, Any],
4444
) -> "Union[None, Coroutine[Any, Any, None]]":
4545
"""
4646
This hook is executed right after the task is sent.
@@ -49,13 +49,11 @@ def post_send(
4949
after the messages is kicked in broker.
5050
5151
:param message: kicked message.
52-
:param labels: labels for a message.
5352
"""
5453

5554
def pre_execute(
5655
self,
5756
message: "TaskiqMessage",
58-
labels: Dict[str, Any],
5957
) -> "Union[TaskiqMessage, Coroutine[Any, Any, TaskiqMessage]]":
6058
"""
6159
This hook is called before executing task.
@@ -64,24 +62,23 @@ def pre_execute(
6462
executes in the worker process.
6563
6664
:param message: incoming parsed taskiq message.
67-
:param labels: task's labels without user-supplied lables.
6865
:return: modified message.
6966
"""
7067
return message
7168

7269
def post_execute(
7370
self,
71+
message: "TaskiqMessage",
7472
result: "TaskiqResult[Any]",
75-
labels: Dict[str, Any],
7673
) -> "Union[None, Coroutine[Any, Any, None]]":
7774
"""
7875
This hook executes after task is complete.
7976
8077
This is a worker-side hook. It's called
8178
in worker process.
8279
80+
:param message: incoming message.
8381
:param result: result of execution for current task.
84-
:param labels: task's labels. Without user-supplied labels.
8582
"""
8683

8784
def on_error(

taskiq/cli/async_task_runner.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -311,7 +311,6 @@ async def async_listen_messages( # noqa: C901, WPS210, WPS213
311311
for middleware in broker.middlewares:
312312
pre_ex_res = middleware.pre_execute(
313313
taskiq_msg,
314-
broker.available_tasks[message.task_name].labels,
315314
)
316315
if inspect.isawaitable(pre_ex_res):
317316
taskiq_msg = await pre_ex_res
@@ -326,10 +325,7 @@ async def async_listen_messages( # noqa: C901, WPS210, WPS213
326325
middlewares=broker.middlewares,
327326
)
328327
for middleware in broker.middlewares:
329-
post_ex_res = middleware.post_execute(
330-
result,
331-
broker.available_tasks[message.task_name].labels,
332-
)
328+
post_ex_res = middleware.post_execute(taskiq_msg, result)
333329
if inspect.isawaitable(post_ex_res):
334330
await post_ex_res
335331
try:

taskiq/formatters/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
"""Taskiq formatters."""

taskiq/plugins/json_formatter.py renamed to taskiq/formatters/json_formatter.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,22 @@
1-
from typing import Any, Dict
2-
31
from taskiq.abc.formatter import TaskiqFormatter
42
from taskiq.message import BrokerMessage, TaskiqMessage
53

64

75
class JSONFormatter(TaskiqFormatter):
86
"""Default taskiq formatter."""
97

10-
def dumps(self, message: TaskiqMessage, labels: Dict[str, Any]) -> BrokerMessage:
8+
def dumps(self, message: TaskiqMessage) -> BrokerMessage:
119
"""
1210
Dumps taskiq message to some broker message format.
1311
1412
:param message: message to send.
15-
:param labels: message's labels.
1613
:return: Dumped message.
1714
"""
1815
return BrokerMessage(
1916
task_id=message.task_id,
2017
task_name=message.task_name,
2118
message=message.json(),
22-
headers={
19+
labels={
2320
"content_type": "application/json",
2421
},
2522
)

taskiq/kicker.py

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
Union,
1212
overload,
1313
)
14-
from uuid import uuid4
1514

1615
from pydantic import BaseModel
1716
from typing_extensions import ParamSpec
@@ -37,14 +36,7 @@ def __init__(
3736
self,
3837
task_name: str,
3938
broker: "AsyncBroker",
40-
labels: Dict[
41-
str,
42-
Union[
43-
str,
44-
int,
45-
float,
46-
],
47-
],
39+
labels: Dict[str, Any],
4840
) -> None:
4941
self.task_name = task_name
5042
self.broker = broker
@@ -118,17 +110,17 @@ async def kiq( # noqa: C901
118110
)
119111
message = self._prepare_message(*args, **kwargs)
120112
for middleware in self.broker.middlewares:
121-
pre_send_res = middleware.pre_send(message, self.labels)
113+
pre_send_res = middleware.pre_send(message)
122114
if isawaitable(pre_send_res):
123115
message = await pre_send_res
124116
else:
125117
message = pre_send_res # type: ignore
126118
try:
127-
await self.broker.kick(self.broker.formatter.dumps(message, self.labels))
119+
await self.broker.kick(self.broker.formatter.dumps(message))
128120
except Exception as exc:
129121
raise SendTaskError() from exc
130122
for middleware in self.broker.middlewares:
131-
post_send_res = middleware.post_send(message, self.labels)
123+
post_send_res = middleware.post_send(message)
132124
if isawaitable(post_send_res):
133125
await post_send_res
134126
return AsyncTaskiqTask(
@@ -167,17 +159,18 @@ def _prepare_message( # noqa: WPS210
167159
"""
168160
formatted_args = []
169161
formatted_kwargs = {}
162+
labels = {}
170163
for arg in args:
171164
formatted_args.append(self._prepare_arg(arg))
172165
for kwarg_name, kwarg_val in kwargs.items():
173166
formatted_kwargs[kwarg_name] = self._prepare_arg(kwarg_val)
174-
175-
task_id = uuid4().hex
167+
for label, label_val in self.labels.items():
168+
labels[label] = str(label_val)
176169

177170
return TaskiqMessage(
178-
task_id=task_id,
171+
task_id=self.broker.id_generator,
179172
task_name=self.task_name,
180-
meta=self.labels,
173+
labels=labels,
181174
args=formatted_args,
182175
kwargs=formatted_kwargs,
183176
)

taskiq/message.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ class TaskiqMessage(BaseModel):
1414

1515
task_id: str
1616
task_name: str
17-
meta: Dict[str, str]
17+
labels: Dict[str, str]
1818
args: List[Any]
1919
kwargs: Dict[str, Any]
2020

@@ -25,4 +25,4 @@ class BrokerMessage(BaseModel):
2525
task_id: str
2626
task_name: str
2727
message: str
28-
headers: Dict[str, Any]
28+
labels: Dict[str, str]

taskiq/middlewares/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
"""Taskiq middlewares."""

0 commit comments

Comments
 (0)