Skip to content

Commit 8df1aab

Browse files
committed
Merge branch 'release/0.3.0'
2 parents 160dd38 + 786e773 commit 8df1aab

File tree

8 files changed

+143
-66
lines changed

8 files changed

+143
-66
lines changed

docs/available-components/brokers.md

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -79,22 +79,34 @@ await my_task.kicker().with_broker(broker).kiq()
7979

8080
:::
8181

82-
## AioPikaBroker (for RabbitMQ)
8382

84-
This broker is not part of the core taskiq lib. You can install it as a separate package [taskiq-aio-pika](https://pypi.org/project/taskiq-aio-pika/).
83+
## Custom brokers
84+
85+
These brokers are not parts of the core taskiq lib. You can install them as a separate packages.
86+
87+
You can read more about parameters and abilities of these brokers in README.md of each repo.
88+
89+
90+
### AioPikaBroker (for RabbitMQ)
91+
92+
Project link: [taskiq-aio-pika](https://github.com/taskiq-python/taskiq-aio-pika).
8593

8694
```bash
8795
pip install taskiq-aio-pika
8896
```
8997

90-
You can read more about parameters and abilities of this broker in [README.md](https://github.com/taskiq-python/taskiq-aio-pika).
98+
### Redis broker
9199

92-
## Redis broker
93-
94-
This broker is not part of the core taskiq lib. You can install it as a separate package [taskiq-redis](https://pypi.org/project/taskiq-redis/).
100+
Project link: [taskiq-redis](https://github.com/taskiq-python/taskiq-redis).
95101

96102
```bash
97103
pip install taskiq-redis
98104
```
99105

100-
You can read more about parameters and abilities of this broker in [README.md](https://github.com/taskiq-python/taskiq-redis).
106+
### NATS broker
107+
108+
Project link: [taskiq-nats](https://github.com/taskiq-python/taskiq-nats).
109+
110+
```bash
111+
pip install taskiq-nats
112+
```

docs/guide/state-and-deps.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,3 +171,13 @@ By default taskiq has only two deendencies:
171171

172172
- Context from `taskiq.context.Context`
173173
- TaskiqState from `taskiq.state.TaskiqState`
174+
175+
176+
### Adding first-level dependencies
177+
178+
You can expand default list of available dependencies for you application.
179+
Taskiq have an ability to add new first-level dependencies using brokers.
180+
181+
The AsyncBroker interface has a function called `add_dependency_context` and you can add
182+
more default dependencies to the taskiq. This may be useful for libraries if you want to
183+
add new dependencies to users.

poetry.lock

Lines changed: 55 additions & 55 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "taskiq"
3-
version = "0.2.2"
3+
version = "0.3.0"
44
description = "Distributed task queue with full async support"
55
authors = ["Pavel Kirilin <[email protected]>"]
66
maintainers = ["Pavel Kirilin <[email protected]>"]
@@ -35,7 +35,7 @@ watchdog = "^2.1.9"
3535
gitignore-parser = "^0.1.0"
3636
importlib-metadata = "<4.3"
3737
pycron = "^3.0.0"
38-
taskiq_dependencies = "~1.0.0"
38+
taskiq_dependencies = "^1.0.0"
3939

4040
[tool.poetry.dev-dependencies]
4141
pytest = "^7.1.2"

taskiq/abc/broker.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,11 +86,23 @@ def __init__(
8686
# Every event has a list of handlers.
8787
# Every handler is a function which takes state as a first argument.
8888
# And handler can be either sync or async.
89-
self.event_handlers: DefaultDict[ # noqa: WPS234
89+
self.event_handlers: DefaultDict[
9090
TaskiqEvents,
9191
List[Callable[[TaskiqState], Optional[Awaitable[None]]]],
9292
] = defaultdict(list)
9393
self.state = TaskiqState()
94+
self.custom_dependency_context: Dict[Any, Any] = {}
95+
96+
def add_dependency_context(self, new_ctx: Dict[Any, Any]) -> None:
97+
"""
98+
Add first-level dependencies.
99+
100+
Provided dict will be used to inject new dependencies
101+
in all dependency graph contexts.
102+
103+
:param new_ctx: Additional context values for dependnecy injection.
104+
"""
105+
self.custom_dependency_context.update(new_ctx)
94106

95107
def add_middlewares(self, *middlewares: "TaskiqMiddleware") -> None:
96108
"""

taskiq/cli/worker/receiver.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,12 +159,14 @@ async def run_task( # noqa: C901, WPS210
159159
dep_ctx = None
160160
if dependency_graph:
161161
# Create a context for dependency resolving.
162-
dep_ctx = dependency_graph.async_ctx(
162+
broker_ctx = self.broker.custom_dependency_context
163+
broker_ctx.update(
163164
{
164165
Context: Context(message, self.broker),
165166
TaskiqState: self.broker.state,
166167
},
167168
)
169+
dep_ctx = dependency_graph.async_ctx(broker_ctx)
168170
# Resolve all function's dependencies.
169171
dep_kwargs = await dep_ctx.resolve_kwargs()
170172
for key, val in dep_kwargs.items():

tests/cli/worker/test_custom_contexts.py

Whitespace-only changes.

tests/cli/worker/test_receiver.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from typing import Any, Optional
22

33
import pytest
4+
from taskiq_dependencies import Depends
45

56
from taskiq.abc.broker import AsyncBroker
67
from taskiq.abc.middleware import TaskiqMiddleware
@@ -200,3 +201,43 @@ async def test_callback_unknown_task() -> None:
200201
)
201202

202203
await receiver.callback(broker_message)
204+
205+
206+
@pytest.mark.anyio
207+
async def test_custom_ctx() -> None:
208+
"""Tests that run_task can run sync tasks."""
209+
210+
class MyTestClass:
211+
"""Class to test injection."""
212+
213+
def __init__(self, val: int) -> None:
214+
self.val = val
215+
216+
broker = InMemoryBroker()
217+
218+
# We register a task into broker,
219+
# to build dependency graph on startup.
220+
@broker.task
221+
def test_func(tes_val: MyTestClass = Depends()) -> int:
222+
return tes_val.val
223+
224+
# We add custom first-level dependency.
225+
broker.add_dependency_context({MyTestClass: MyTestClass(11)})
226+
# Create a receiver.
227+
receiver = get_receiver(broker)
228+
229+
result = await receiver.run_task(
230+
test_func,
231+
TaskiqMessage(
232+
task_id="",
233+
task_name=test_func.task_name,
234+
labels={},
235+
args=[],
236+
kwargs={},
237+
),
238+
)
239+
240+
# Check that the value is equal
241+
# to the one we supplied.
242+
assert result.return_value == 11
243+
assert not result.is_err

0 commit comments

Comments
 (0)