Skip to content

Commit 6abc28d

Browse files
committed
Merge tag '0.0.5' into develop
0.0.5
2 parents e07a531 + 59b1eaa commit 6abc28d

File tree

4 files changed

+57
-27
lines changed

4 files changed

+57
-27
lines changed

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "taskiq"
3-
version = "0.0.4"
3+
version = "0.0.5"
44
description = "Distributed task queue with full async support"
55
authors = ["Pavel Kirilin <[email protected]>"]
66
maintainers = ["Pavel Kirilin <[email protected]>"]

taskiq/cli/async_task_runner.py

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -167,13 +167,14 @@ async def run_task( # noqa: C901, WPS210, WPS211
167167
)
168168
if found_exception is not None:
169169
for middleware in middlewares:
170-
await maybe_awaitable(
171-
middleware.on_error(
172-
message,
173-
result,
174-
found_exception,
175-
),
176-
)
170+
if middleware.__class__.on_error != TaskiqMiddleware.on_error:
171+
await maybe_awaitable(
172+
middleware.on_error(
173+
message,
174+
result,
175+
found_exception,
176+
),
177+
)
177178

178179
return result
179180

@@ -223,12 +224,18 @@ async def async_listen_messages( # noqa: C901, WPS210, WPS213
223224
exc_info=True,
224225
)
225226
continue
227+
logger.info(
228+
"Executing task %s with ID: %s",
229+
taskiq_msg.task_name,
230+
taskiq_msg.task_id,
231+
)
226232
for middleware in broker.middlewares:
227-
taskiq_msg = await maybe_awaitable(
228-
middleware.pre_execute(
229-
taskiq_msg,
230-
),
231-
)
233+
if middleware.__class__.pre_execute != TaskiqMiddleware.pre_execute:
234+
taskiq_msg = await maybe_awaitable(
235+
middleware.pre_execute(
236+
taskiq_msg,
237+
),
238+
)
232239

233240
result = await run_task(
234241
target=broker.available_tasks[message.task_name].original_func,
@@ -239,7 +246,8 @@ async def async_listen_messages( # noqa: C901, WPS210, WPS213
239246
middlewares=broker.middlewares,
240247
)
241248
for middleware in broker.middlewares:
242-
await maybe_awaitable(middleware.post_execute(taskiq_msg, result))
249+
if middleware.__class__.post_execute != TaskiqMiddleware.post_execute:
250+
await maybe_awaitable(middleware.post_execute(taskiq_msg, result))
243251
try:
244252
await broker.result_backend.set_result(message.task_id, result)
245253
except Exception as exc:

taskiq/kicker.py

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
from dataclasses import asdict, is_dataclass
22
from logging import getLogger
3-
from typing import (
3+
from typing import ( # noqa: WPS235
44
TYPE_CHECKING,
55
Any,
66
Coroutine,
77
Dict,
88
Generic,
9+
Optional,
910
TypeVar,
1011
Union,
1112
overload,
@@ -14,6 +15,7 @@
1415
from pydantic import BaseModel
1516
from typing_extensions import ParamSpec
1617

18+
from taskiq.abc.middleware import TaskiqMiddleware
1719
from taskiq.exceptions import SendTaskError
1820
from taskiq.message import TaskiqMessage
1921
from taskiq.task import AsyncTaskiqTask, SyncTaskiqTask
@@ -41,6 +43,7 @@ def __init__(
4143
self.task_name = task_name
4244
self.broker = broker
4345
self.labels = labels
46+
self.custom_task_id: Optional[str] = None
4447

4548
def with_labels(
4649
self,
@@ -55,6 +58,19 @@ def with_labels(
5558
self.labels.update(labels)
5659
return self
5760

61+
def with_task_id(self, task_id: str) -> "AsyncKicker[_FuncParams, _ReturnType]":
62+
"""
63+
Set task_id for current execution.
64+
65+
Please use this method with caution,
66+
because it may brake the logic of getting results.
67+
68+
:param task_id: custom task id.
69+
:return: kicker with custom task id.
70+
"""
71+
self.custom_task_id = task_id
72+
return self
73+
5874
def with_broker(
5975
self,
6076
broker: "AsyncBroker",
@@ -87,7 +103,7 @@ async def kiq( # noqa: D102
87103
) -> AsyncTaskiqTask[_ReturnType]:
88104
...
89105

90-
async def kiq(
106+
async def kiq( # noqa: C901
91107
self,
92108
*args: _FuncParams.args,
93109
**kwargs: _FuncParams.kwargs,
@@ -110,15 +126,16 @@ async def kiq(
110126
)
111127
message = self._prepare_message(*args, **kwargs)
112128
for middleware in self.broker.middlewares:
113-
message = await maybe_awaitable(middleware.pre_send(message))
114-
129+
if middleware.__class__.pre_send != TaskiqMiddleware.pre_send:
130+
message = await maybe_awaitable(middleware.pre_send(message))
115131
try:
116132
await self.broker.kick(self.broker.formatter.dumps(message))
117133
except Exception as exc:
118134
raise SendTaskError() from exc
119135

120136
for middleware in self.broker.middlewares:
121-
await maybe_awaitable(middleware.post_send(message))
137+
if middleware.__class__.post_send != TaskiqMiddleware.post_send:
138+
await maybe_awaitable(middleware.post_send(message))
122139

123140
return AsyncTaskiqTask(
124141
task_id=message.task_id,
@@ -198,8 +215,12 @@ def _prepare_message( # noqa: WPS210
198215
for label, label_val in self.labels.items():
199216
labels[label] = str(label_val)
200217

218+
task_id = self.custom_task_id
219+
if task_id is None:
220+
task_id = self.broker.id_generator()
221+
201222
return TaskiqMessage(
202-
task_id=self.broker.id_generator(),
223+
task_id=task_id,
203224
task_name=self.task_name,
204225
labels=labels,
205226
args=formatted_args,

taskiq/middlewares/retry_middleware.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from copy import deepcopy
12
from logging import getLogger
23
from typing import Any
34

@@ -40,19 +41,19 @@ async def on_error(
4041
# Check if retrying is enabled for the task.
4142
if retry_on_error != "True":
4243
return
43-
44+
new_msg = deepcopy(message)
4445
# Getting number of previous retries.
45-
retries = int(message.labels.get("_retries", 0)) + 1
46-
message.labels["_retries"] = str(retries)
47-
max_retries = int(message.labels.get("max_retries", self.default_retry_count))
46+
retries = int(new_msg.labels.get("_retries", 0)) + 1
47+
new_msg.labels["_retries"] = str(retries)
48+
max_retries = int(new_msg.labels.get("max_retries", self.default_retry_count))
4849
if retries < max_retries:
4950
logger.info(
5051
"Task '%s' invocation failed. Retrying.",
5152
message.task_name,
5253
)
53-
message.labels["_parent"] = message.task_id
54-
message.task_id = self.broker.id_generator()
55-
broker_message = self.broker.formatter.dumps(message=message)
54+
new_msg.labels["_parent"] = message.task_id
55+
new_msg.task_id = self.broker.id_generator()
56+
broker_message = self.broker.formatter.dumps(message=new_msg)
5657
await self.broker.kick(broker_message)
5758
else:
5859
logger.warning(

0 commit comments

Comments
 (0)