Skip to content

Commit aff781d

Browse files
authored
Added post-save hook in middlewares. (#41)
Signed-off-by: Pavel Kirilin <[email protected]>
1 parent 0e8b7c6 commit aff781d

File tree

2 files changed

+20
-1
lines changed

2 files changed

+20
-1
lines changed

taskiq/abc/middleware.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,21 @@ def post_execute(
8181
:param result: result of execution for current task.
8282
"""
8383

84+
def post_save(
85+
self,
86+
message: "TaskiqMessage",
87+
result: "TaskiqResult[Any]",
88+
) -> "Union[None, Coroutine[Any, Any, None]]":
89+
"""
90+
Post save hook.
91+
92+
This function is called after result of
93+
the executions is saved in the result_backend.
94+
95+
:param message: processed message.
96+
:param result: returned value.
97+
"""
98+
8499
def on_error(
85100
self,
86101
message: "TaskiqMessage",

taskiq/cli/receiver.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ def __init__(self, broker: AsyncBroker, cli_args: TaskiqArgs) -> None:
7373
max_workers=cli_args.max_threadpool_threads,
7474
)
7575

76-
async def callback( # noqa: C901
76+
async def callback( # noqa: C901, WPS213
7777
self,
7878
message: BrokerMessage,
7979
raise_err: bool = False,
@@ -142,6 +142,10 @@ async def callback( # noqa: C901
142142
if raise_err:
143143
raise exc
144144

145+
for middleware in self.broker.middlewares:
146+
if middleware.__class__.post_save != TaskiqMiddleware.post_save:
147+
await maybe_awaitable(middleware.post_save(taskiq_msg, result))
148+
145149
async def run_task( # noqa: C901, WPS210
146150
self,
147151
target: Callable[..., Any],

0 commit comments

Comments
 (0)