Skip to content

Commit 3e31b41

Browse files
committed
Merge branch 'release/0.0.6'
2 parents 59b1eaa + a859626 commit 3e31b41

File tree

3 files changed

+69
-13
lines changed

3 files changed

+69
-13
lines changed

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "taskiq"
3-
version = "0.0.5"
3+
version = "0.0.6"
44
description = "Distributed task queue with full async support"
55
authors = ["Pavel Kirilin <[email protected]>"]
66
maintainers = ["Pavel Kirilin <[email protected]>"]

taskiq/cli/async_task_runner.py

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from taskiq.abc.middleware import TaskiqMiddleware
1313
from taskiq.cli.args import TaskiqArgs
1414
from taskiq.cli.log_collector import log_collector
15+
from taskiq.context import Context, context_updater
1516
from taskiq.message import TaskiqMessage
1617
from taskiq.result import TaskiqResult
1718
from taskiq.utils import maybe_awaitable
@@ -224,11 +225,6 @@ async def async_listen_messages( # noqa: C901, WPS210, WPS213
224225
exc_info=True,
225226
)
226227
continue
227-
logger.info(
228-
"Executing task %s with ID: %s",
229-
taskiq_msg.task_name,
230-
taskiq_msg.task_id,
231-
)
232228
for middleware in broker.middlewares:
233229
if middleware.__class__.pre_execute != TaskiqMiddleware.pre_execute:
234230
taskiq_msg = await maybe_awaitable(
@@ -237,14 +233,20 @@ async def async_listen_messages( # noqa: C901, WPS210, WPS213
237233
),
238234
)
239235

240-
result = await run_task(
241-
target=broker.available_tasks[message.task_name].original_func,
242-
signature=task_signatures.get(message.task_name),
243-
message=taskiq_msg,
244-
log_collector_format=cli_args.log_collector_format,
245-
executor=executor,
246-
middlewares=broker.middlewares,
236+
logger.info(
237+
"Executing task %s with ID: %s",
238+
taskiq_msg.task_name,
239+
taskiq_msg.task_id,
247240
)
241+
with context_updater(Context(taskiq_msg, broker)):
242+
result = await run_task(
243+
target=broker.available_tasks[message.task_name].original_func,
244+
signature=task_signatures.get(message.task_name),
245+
message=taskiq_msg,
246+
log_collector_format=cli_args.log_collector_format,
247+
executor=executor,
248+
middlewares=broker.middlewares,
249+
)
248250
for middleware in broker.middlewares:
249251
if middleware.__class__.post_execute != TaskiqMiddleware.post_execute:
250252
await maybe_awaitable(middleware.post_execute(taskiq_msg, result))

taskiq/context.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
from contextlib import contextmanager
2+
from typing import Generator
3+
4+
from taskiq.abc.broker import AsyncBroker
5+
from taskiq.message import TaskiqMessage
6+
7+
8+
class Context:
9+
"""Context class."""
10+
11+
def __init__(self, message: TaskiqMessage, broker: AsyncBroker) -> None:
12+
self.message = message
13+
self.broker = broker
14+
15+
16+
default_context = Context(None, None) # type: ignore
17+
current_context = None
18+
19+
20+
@contextmanager
21+
def context_updater(new_context: Context) -> Generator[None, None, None]:
22+
"""
23+
Update context for some time.
24+
25+
:param new_context: new context to set.
26+
:yield: nothing.
27+
"""
28+
global current_context # noqa: WPS420
29+
current_context = new_context # noqa: WPS442
30+
31+
yield
32+
33+
current_context = None # noqa: WPS442
34+
35+
36+
def get_context() -> Context:
37+
"""
38+
Get current context.
39+
40+
This function always return contexts,
41+
but if you call this function inside tests,
42+
or somewhere you have to be careful,
43+
since if current_context is None it will
44+
return default_context.
45+
46+
To override context please use context_updater
47+
context manager.
48+
49+
:return: context.
50+
"""
51+
global current_context # noqa: WPS420
52+
if current_context is None:
53+
return default_context
54+
return current_context

0 commit comments

Comments
 (0)