Skip to content

Commit e22e7d1

Browse files
committed
Updated middlewares. Now we only call method if it were overriden.
Signed-off-by: Pavel Kirilin <[email protected]>
1 parent ed3c789 commit e22e7d1

File tree

2 files changed

+27
-17
lines changed

2 files changed

+27
-17
lines changed

taskiq/cli/async_task_runner.py

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -167,13 +167,14 @@ async def run_task( # noqa: C901, WPS210, WPS211
167167
)
168168
if found_exception is not None:
169169
for middleware in middlewares:
170-
await maybe_awaitable(
171-
middleware.on_error(
172-
message,
173-
result,
174-
found_exception,
175-
),
176-
)
170+
if middleware.__class__.on_error != TaskiqMiddleware.on_error:
171+
await maybe_awaitable(
172+
middleware.on_error(
173+
message,
174+
result,
175+
found_exception,
176+
),
177+
)
177178

178179
return result
179180

@@ -223,12 +224,18 @@ async def async_listen_messages( # noqa: C901, WPS210, WPS213
223224
exc_info=True,
224225
)
225226
continue
227+
logger.info(
228+
"Executing task %s with ID: %s",
229+
taskiq_msg.task_name,
230+
taskiq_msg.task_id,
231+
)
226232
for middleware in broker.middlewares:
227-
taskiq_msg = await maybe_awaitable(
228-
middleware.pre_execute(
229-
taskiq_msg,
230-
),
231-
)
233+
if middleware.__class__.pre_execute != TaskiqMiddleware.pre_execute:
234+
taskiq_msg = await maybe_awaitable(
235+
middleware.pre_execute(
236+
taskiq_msg,
237+
),
238+
)
232239

233240
result = await run_task(
234241
target=broker.available_tasks[message.task_name].original_func,
@@ -239,7 +246,8 @@ async def async_listen_messages( # noqa: C901, WPS210, WPS213
239246
middlewares=broker.middlewares,
240247
)
241248
for middleware in broker.middlewares:
242-
await maybe_awaitable(middleware.post_execute(taskiq_msg, result))
249+
if middleware.__class__.post_execute != TaskiqMiddleware.post_execute:
250+
await maybe_awaitable(middleware.post_execute(taskiq_msg, result))
243251
try:
244252
await broker.result_backend.set_result(message.task_id, result)
245253
except Exception as exc:

taskiq/kicker.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from pydantic import BaseModel
1515
from typing_extensions import ParamSpec
1616

17+
from taskiq.abc.middleware import TaskiqMiddleware
1718
from taskiq.exceptions import SendTaskError
1819
from taskiq.message import TaskiqMessage
1920
from taskiq.task import AsyncTaskiqTask, SyncTaskiqTask
@@ -87,7 +88,7 @@ async def kiq( # noqa: D102
8788
) -> AsyncTaskiqTask[_ReturnType]:
8889
...
8990

90-
async def kiq(
91+
async def kiq( # noqa: C901
9192
self,
9293
*args: _FuncParams.args,
9394
**kwargs: _FuncParams.kwargs,
@@ -110,15 +111,16 @@ async def kiq(
110111
)
111112
message = self._prepare_message(*args, **kwargs)
112113
for middleware in self.broker.middlewares:
113-
message = await maybe_awaitable(middleware.pre_send(message))
114-
114+
if middleware.__class__.pre_send != TaskiqMiddleware.pre_send:
115+
message = await maybe_awaitable(middleware.pre_send(message))
115116
try:
116117
await self.broker.kick(self.broker.formatter.dumps(message))
117118
except Exception as exc:
118119
raise SendTaskError() from exc
119120

120121
for middleware in self.broker.middlewares:
121-
await maybe_awaitable(middleware.post_send(message))
122+
if middleware.__class__.post_send != TaskiqMiddleware.post_send:
123+
await maybe_awaitable(middleware.post_send(message))
122124

123125
return AsyncTaskiqTask(
124126
task_id=message.task_id,

0 commit comments

Comments
 (0)