12
12
from taskiq .abc .middleware import TaskiqMiddleware
13
13
from taskiq .cli .args import TaskiqArgs
14
14
from taskiq .cli .log_collector import log_collector
15
+ from taskiq .context import Context , context_updater
15
16
from taskiq .message import TaskiqMessage
16
17
from taskiq .result import TaskiqResult
17
18
from taskiq .utils import maybe_awaitable
@@ -224,11 +225,6 @@ async def async_listen_messages( # noqa: C901, WPS210, WPS213
224
225
exc_info = True ,
225
226
)
226
227
continue
227
- logger .info (
228
- "Executing task %s with ID: %s" ,
229
- taskiq_msg .task_name ,
230
- taskiq_msg .task_id ,
231
- )
232
228
for middleware in broker .middlewares :
233
229
if middleware .__class__ .pre_execute != TaskiqMiddleware .pre_execute :
234
230
taskiq_msg = await maybe_awaitable (
@@ -237,14 +233,20 @@ async def async_listen_messages( # noqa: C901, WPS210, WPS213
237
233
),
238
234
)
239
235
240
- result = await run_task (
241
- target = broker .available_tasks [message .task_name ].original_func ,
242
- signature = task_signatures .get (message .task_name ),
243
- message = taskiq_msg ,
244
- log_collector_format = cli_args .log_collector_format ,
245
- executor = executor ,
246
- middlewares = broker .middlewares ,
236
+ logger .info (
237
+ "Executing task %s with ID: %s" ,
238
+ taskiq_msg .task_name ,
239
+ taskiq_msg .task_id ,
247
240
)
241
+ with context_updater (Context (taskiq_msg , broker )):
242
+ result = await run_task (
243
+ target = broker .available_tasks [message .task_name ].original_func ,
244
+ signature = task_signatures .get (message .task_name ),
245
+ message = taskiq_msg ,
246
+ log_collector_format = cli_args .log_collector_format ,
247
+ executor = executor ,
248
+ middlewares = broker .middlewares ,
249
+ )
248
250
for middleware in broker .middlewares :
249
251
if middleware .__class__ .post_execute != TaskiqMiddleware .post_execute :
250
252
await maybe_awaitable (middleware .post_execute (taskiq_msg , result ))
0 commit comments