Skip to content

Commit 4ae2e65

Browse files
committed
move span closuse to post_save,
middleware post_send, post_execute, post_save, on_error are now called in reverse order
1 parent e1e3bf1 commit 4ae2e65

File tree

3 files changed

+5
-5
lines changed

3 files changed

+5
-5
lines changed

taskiq/kicker.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ async def kiq(
168168
except Exception as exc:
169169
raise SendTaskError from exc
170170

171-
for middleware in self.broker.middlewares:
171+
for middleware in reversed(self.broker.middlewares):
172172
if middleware.__class__.post_send != TaskiqMiddleware.post_send:
173173
await maybe_awaitable(middleware.post_send(message))
174174

taskiq/middlewares/opentelemetry_middleware.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ def pre_execute(self, message: TaskiqMessage) -> TaskiqMessage:
222222
attach_context(message, span, activation, token)
223223
return message
224224

225-
def post_execute( # pylint: disable=R6301
225+
def post_save( # pylint: disable=R6301
226226
self,
227227
message: TaskiqMessage,
228228
result: TaskiqResult[T],

taskiq/receiver/receiver.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -139,15 +139,15 @@ async def callback( # noqa: C901, PLR0912
139139
):
140140
await maybe_awaitable(message.ack())
141141

142-
for middleware in self.broker.middlewares:
142+
for middleware in reversed(self.broker.middlewares):
143143
if middleware.__class__.post_execute != TaskiqMiddleware.post_execute:
144144
await maybe_awaitable(middleware.post_execute(taskiq_msg, result))
145145

146146
try:
147147
if not isinstance(result.error, NoResultError):
148148
await self.broker.result_backend.set_result(taskiq_msg.task_id, result)
149149

150-
for middleware in self.broker.middlewares:
150+
for middleware in reversed(self.broker.middlewares):
151151
if middleware.__class__.post_save != TaskiqMiddleware.post_save:
152152
await maybe_awaitable(middleware.post_save(taskiq_msg, result))
153153

@@ -289,7 +289,7 @@ async def run_task( # noqa: C901, PLR0912, PLR0915
289289
)
290290
# If exception is found we execute middlewares.
291291
if found_exception is not None:
292-
for middleware in self.broker.middlewares:
292+
for middleware in reversed(self.broker.middlewares):
293293
if middleware.__class__.on_error != TaskiqMiddleware.on_error:
294294
await maybe_awaitable(
295295
middleware.on_error(

0 commit comments

Comments
 (0)