Skip to content

Commit 12ca432

Browse files
authored
First attempt to add dependency injection. (#53)
* First attempt to add dependency injection. Signed-off-by: Pavel Kirilin <[email protected]>
1 parent d451520 commit 12ca432

19 files changed

+1040
-241
lines changed

.flake8

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,14 @@ ignore =
8080
WPS229,
8181
; Found function with too much cognitive complexity
8282
WPS231,
83+
; Found too deep nesting
84+
WPS220,
85+
; Found line with high Jones Complexity
86+
WPS221,
87+
; function name should be lowercase
88+
N802,
89+
; Do not perform function calls in argument defaults.
90+
B008,
8391

8492
; all init files
8593
__init__.py:
@@ -99,6 +107,10 @@ per-file-ignores =
99107
WPS432,
100108
; Missing parameter(s) in Docstring
101109
DAR101,
110+
; Found too short name
111+
WPS111,
112+
; Found complex default value
113+
WPS404,
102114

103115
exclude =
104116
./.git,
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
import asyncio
2+
from typing import AsyncGenerator
3+
4+
from taskiq import TaskiqDepends
5+
6+
7+
async def dependency() -> AsyncGenerator[str, None]:
8+
print("Startup")
9+
await asyncio.sleep(0.1)
10+
11+
yield "value"
12+
13+
await asyncio.sleep(0.1)
14+
print("Shutdown")
15+
16+
17+
async def my_task(dep: str = TaskiqDepends(dependency)) -> None:
18+
print(dep.upper())
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
from taskiq import TaskiqDepends
2+
3+
4+
async def db_connection() -> str:
5+
return "let's pretend as this is a connection"
6+
7+
8+
class MyDAO:
9+
def __init__(self, db_conn: str = TaskiqDepends(db_connection)) -> None:
10+
self.db_conn = db_conn
11+
12+
def get_users(self) -> str:
13+
return self.db_conn.upper()
14+
15+
16+
def my_task(dao: MyDAO = TaskiqDepends()) -> None:
17+
print(dao.get_users())
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
import random
2+
3+
from taskiq import TaskiqDepends
4+
5+
6+
def common_dep() -> int:
7+
# For example it returns 8
8+
return random.randint(1, 10)
9+
10+
11+
def dep1(cd: int = TaskiqDepends(common_dep)) -> int:
12+
# This function will return 9
13+
return cd + 1
14+
15+
16+
def dep2(cd: int = TaskiqDepends(common_dep)) -> int:
17+
# This function will return 10
18+
return cd + 2
19+
20+
21+
def my_task(
22+
d1: int = TaskiqDepends(dep1),
23+
d2: int = TaskiqDepends(dep2),
24+
) -> int:
25+
# This function will return 19
26+
return d1 + d2

docs/examples/state/events_example.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,7 @@
55
from taskiq_aio_pika import AioPikaBroker
66
from taskiq_redis import RedisAsyncResultBackend
77

8-
from taskiq import Context, TaskiqEvents, TaskiqState
9-
from taskiq.context import default_context
8+
from taskiq import Context, TaskiqDepends, TaskiqEvents, TaskiqState
109

1110
# To run this example, please install:
1211
# * taskiq
@@ -34,14 +33,14 @@ async def shutdown(state: TaskiqState) -> None:
3433

3534

3635
@broker.task
37-
async def get_val(key: str, context: Context = default_context) -> Optional[str]:
36+
async def get_val(key: str, context: Context = TaskiqDepends()) -> Optional[str]:
3837
# Now we can use our pool.
3938
redis = Redis(connection_pool=context.state.redis, decode_responses=True)
4039
return await redis.get(key)
4140

4241

4342
@broker.task
44-
async def set_val(key: str, value: str, context: Context = default_context) -> None:
43+
async def set_val(key: str, value: str, context: Context = TaskiqDepends()) -> None:
4544
# Now we can use our pool to set value.
4645
await Redis(connection_pool=context.state.redis).set(key, value)
4746

docs/examples/state/generator_deps.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
from typing import Generator
2+
3+
from taskiq import TaskiqDepends
4+
5+
6+
def dependency() -> Generator[str, None, None]:
7+
print("Startup")
8+
9+
yield "value"
10+
11+
print("Shutdown")
12+
13+
14+
async def my_task(dep: str = TaskiqDepends(dependency)) -> None:
15+
print(dep.upper())

docs/examples/state/no_cache.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
import random
2+
3+
from taskiq import TaskiqDepends
4+
5+
6+
def common_dep() -> int:
7+
return random.randint(1, 10)
8+
9+
10+
def dep1(cd: int = TaskiqDepends(common_dep)) -> int:
11+
return cd + 1
12+
13+
14+
def dep2(cd: int = TaskiqDepends(common_dep, use_cache=False)) -> int:
15+
return cd + 2
16+
17+
18+
def my_task(
19+
d1: int = TaskiqDepends(dep1),
20+
d2: int = TaskiqDepends(dep2),
21+
) -> int:
22+
return d1 + d2

docs/guide/scheduling-tasks.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,12 @@ it may execute one task N times, where N is the number of running scheduler inst
4545

4646
This command will import the scheduler you defined and start sending tasks to your broker.
4747

48+
::: tip Cool tip!
49+
50+
The scheduler doesn't execute tasks. It only sends them.
51+
52+
:::
53+
4854
You can check list of available schedule sources in the [Available schedule sources](../available-components/schedule-sources.md) section.
4955

5056

docs/guide/state-and-deps.md

Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
---
2+
order: 7
3+
---
4+
5+
# State and Dependencies
6+
7+
8+
## State
9+
10+
The `TaskiqState` is a global variable where you can keep the variables you want to use later.
11+
For example, you want to open a database connection pool at a broker's startup.
12+
13+
This can be acieved by adding event handlers.
14+
15+
You can use one of these events:
16+
* `WORKER_STARTUP`
17+
* `CLIENT_STARTUP`
18+
* `WORKER_SHUTDOWN`
19+
* `CLIENT_SHUTDOWN`
20+
21+
Worker events are called when you start listening to the broker messages using taskiq.
22+
Client events are called when you call the `startup` method of your broker from your code.
23+
24+
This is an example of code using event handlers:
25+
26+
@[code python](../examples/state/events_example.py)
27+
28+
::: tip Cool tip!
29+
30+
If you want to add handlers programmatically, you can use the `broker.add_event_handler` function.
31+
32+
:::
33+
34+
As you can see in this example, this worker will initialize the Redis pool at the startup.
35+
You can access the state from the context.
36+
37+
38+
## Dependencies
39+
40+
Using context directly is nice, but this way won't get completion.
41+
42+
That's why we suggest you try TaskiqDependencies. The implementation is very similar to FastApi's dependencies. You can use classes, functions, and generators as dependencies.
43+
44+
::: danger Cool alarm!
45+
46+
FastAPI's `Depends` is not compatible with `TaskiqDepends`.
47+
48+
:::
49+
50+
### How dependencies are useful
51+
52+
You can use dependencies for better autocompletion and reduce the amount of code you write.
53+
Since the state is generic, we cannot guess the types of the state fields.
54+
Dependencies can be annotated with type hints and therfore provide better auto-completion.
55+
56+
Let's assume that you've stored a Redis connection pool in the state as in the example above.
57+
```python
58+
@broker.on_event(TaskiqEvents.WORKER_STARTUP)
59+
async def startup(state: TaskiqState) -> None:
60+
# Here we store connection pool on startup for later use.
61+
state.redis = ConnectionPool.from_url("redis://localhost/1")
62+
63+
```
64+
65+
You can access this variable by using the current execution context directly, like this:
66+
67+
```python
68+
@broker.task
69+
async def my_task(context: Context = TaskiqDepends()) -> None:
70+
async with Redis(connection_pool=context.state.redis, decode_responses=True) as redis:
71+
await redis.set('key', 'value')
72+
```
73+
74+
If you hit the `TAB` button after the `context.state.` expression, your IDE won't give you any auto-completion.
75+
But we can create a dependency function to add auto-completion.
76+
77+
```python
78+
79+
def redis_dep(context: Context = TaskiqDepends()) -> Redis:
80+
return Redis(connection_pool=context.state.redis, decode_responses=True)
81+
82+
@broker.task
83+
async def my_task(redis: Redis = TaskiqDepends(redis_dep)) -> None:
84+
await redis.set('key', 'value')
85+
86+
```
87+
88+
Now, this dependency injection will be autocompleted. But, of course, state fields cannot be autocompleted,
89+
even in dependencies. But this way, you won't make any typos while writing tasks.
90+
91+
92+
### How do dependencies work
93+
94+
We build a graph of dependencies on startup. If the parameter of the function has
95+
the default value of `TaskiqDepends` this parameter will be treated as a dependency.
96+
97+
Dependencies can also depend on something. Also dependencies are optimized to **not** evaluate things many times.
98+
99+
For example:
100+
101+
@[code python](../examples/state/dependencies_tree.py)
102+
103+
In this code, the dependency `common_dep` is going to be evaluated only once and the `dep1` and the `dep2` are going to recevie the same value. You can control this behaviour by using the `use_cache=False` parameter to you dependency. This parameter will force the
104+
dependency to reevaluate all it's subdependencies.
105+
106+
107+
In this example we cannot predict the result. Since the `dep2` doesn't use cache for the `common_dep` function.
108+
@[code python](../examples/state/no_cache.py)
109+
110+
The graph for cached dependencies looks like this:
111+
112+
```mermaid
113+
graph TD
114+
A[common_dep]
115+
B[dep1]
116+
C[dep2]
117+
D[my_task]
118+
A --> B
119+
A --> C
120+
B --> D
121+
C --> D
122+
```
123+
124+
The dependencies graph for `my_task` where `dep2` doesn't use cached value for `common_dep` looks like this:
125+
126+
```mermaid
127+
graph TD
128+
A[common_dep]
129+
B[dep1]
130+
D[my_task]
131+
C[dep2]
132+
subgraph without cache
133+
A1[common_dep]
134+
end
135+
A --> B
136+
A1 --> C
137+
B --> D
138+
C --> D
139+
```
140+
141+
### Class as a dependency
142+
143+
You can use classes as dependencies, and they can also use other dependencies too.
144+
145+
Let's see an example:
146+
147+
@[code python](../examples/state/class_dependency.py)
148+
149+
As you can see, the dependency for `my_task` function is declared with `TaskiqDependency()`.
150+
It's because you can omit the class if it's declared in typehint for the parameter. This feature doesn't
151+
work with dependency functions, it's only for classes.
152+
153+
You can pass dependencies for classes in the constructor.
154+
155+
### Generator dependencies
156+
157+
Generator dependencies are used to perform startup before task execution and teardown after the task execution.
158+
159+
@[code python](../examples/state/generator_deps.py)
160+
161+
In this example, we can do something at startup before the execution and at shutdown after the task is completed.
162+
163+
If you want to do something asynchronously, convert this function to an asynchronous generator. Like this:
164+
165+
@[code python](../examples/state/async_generator_deps.py)
166+
167+
168+
### Default dependencies
169+
170+
By default taskiq has only two deendencies:
171+
* Context from `taskiq.context.Context`
172+
* TaskiqState from `taskiq.state.TaskiqState`

docs/guide/state-and-events.md

Lines changed: 0 additions & 32 deletions
This file was deleted.

0 commit comments

Comments
 (0)