Skip to content

Commit 915df3e

Browse files
authored
Docs updated. (#199)
1 parent 1d16c3e commit 915df3e

17 files changed

+1816
-1260
lines changed

docs/available-components/schedule-sources.md

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@ The format of the schedule label is the following:
1717
@broker.task(
1818
schedule=[
1919
{
20-
"cron": "* * * * *", # type: str, required argument.
20+
"cron": "* * * * *", # type: str, either cron or time shoule be specified.
21+
"cron_offset": None # type: str | timedelta | None, can be ommited.
22+
"time": None # type: datetime | None, either cron or time shoule be specified.
2123
"args": [], # type List[Any] | None, can be omitted.
2224
"kwargs": {}, # type: Dict[str, Any] | None, can be omitted.
2325
"labels": {}, # type: Dict[str, Any] | None, can be omitted.
@@ -31,11 +33,13 @@ async def my_task():
3133
Parameters:
3234

3335
- `cron` - crontab string when to run the task.
36+
- `cron_offset` - timezone offset for cron values. Explained [here](../guide/scheduling-tasks.md#working-with-timezones)
37+
- `time` - specific time when send the task.
3438
- `args` - args to use, when invoking the task.
3539
- `kwargs` - key-word arguments to use when invoking the task.
3640
- `labels` - additional labels to use when invoking the task.
3741

38-
Usage:
42+
To enable this source, just add it to the list of sources:
3943

4044
```python
4145
from taskiq.scheduler import TaskiqScheduler
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
import asyncio
2+
from typing import Annotated, AsyncGenerator
3+
4+
from taskiq import TaskiqDepends
5+
6+
7+
async def dependency() -> AsyncGenerator[str, None]:
8+
print("Startup")
9+
await asyncio.sleep(0.1)
10+
11+
yield "value"
12+
13+
await asyncio.sleep(0.1)
14+
print("Shutdown")
15+
16+
17+
async def my_task(dep: Annotated[str, TaskiqDepends(dependency)]) -> None:
18+
print(dep.upper())
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
from typing import Annotated
2+
3+
from taskiq import TaskiqDepends
4+
5+
6+
async def db_connection() -> str:
7+
return "let's pretend as this is a connection"
8+
9+
10+
class MyDAO:
11+
def __init__(self, db_conn: Annotated[str, TaskiqDepends(db_connection)]) -> None:
12+
self.db_conn = db_conn
13+
14+
def get_users(self) -> str:
15+
return self.db_conn.upper()
16+
17+
18+
def my_task(dao: Annotated[MyDAO, TaskiqDepends()]) -> None:
19+
print(dao.get_users())
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
import random
2+
from typing import Annotated
3+
4+
from taskiq import TaskiqDepends
5+
6+
7+
def common_dep() -> int:
8+
# For example it returns 8
9+
return random.randint(1, 10)
10+
11+
12+
def dep1(cd: Annotated[int, TaskiqDepends(common_dep)]) -> int:
13+
# This function will return 9
14+
return cd + 1
15+
16+
17+
def dep2(cd: Annotated[int, TaskiqDepends(common_dep)]) -> int:
18+
# This function will return 10
19+
return cd + 2
20+
21+
22+
def my_task(
23+
d1: Annotated[int, TaskiqDepends(dep1)],
24+
d2: Annotated[int, TaskiqDepends(dep2)],
25+
) -> int:
26+
# This function will return 19
27+
return d1 + d2
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
import asyncio
2+
from typing import Annotated, Optional
3+
4+
from redis.asyncio import ConnectionPool, Redis # type: ignore
5+
from taskiq_aio_pika import AioPikaBroker
6+
from taskiq_redis import RedisAsyncResultBackend
7+
8+
from taskiq import Context, TaskiqDepends, TaskiqEvents, TaskiqState
9+
10+
# To run this example, please install:
11+
# * taskiq
12+
# * taskiq-redis
13+
# * taskiq-aio-pika
14+
15+
broker = AioPikaBroker(
16+
"amqp://localhost",
17+
).with_result_backend(RedisAsyncResultBackend("redis://localhost"))
18+
19+
20+
@broker.on_event(TaskiqEvents.WORKER_STARTUP)
21+
async def startup(state: TaskiqState) -> None:
22+
# Here we store connection pool on startup for later use.
23+
state.redis = ConnectionPool.from_url("redis://localhost/1")
24+
25+
26+
@broker.on_event(TaskiqEvents.WORKER_SHUTDOWN)
27+
async def shutdown(state: TaskiqState) -> None:
28+
# Here we close our pool on shutdown event.
29+
await state.redis.disconnect()
30+
31+
32+
@broker.task
33+
async def get_val(
34+
key: str,
35+
context: Annotated[Context, TaskiqDepends()],
36+
) -> Optional[str]:
37+
# Now we can use our pool.
38+
redis = Redis(connection_pool=context.state.redis, decode_responses=True)
39+
return await redis.get(key)
40+
41+
42+
@broker.task
43+
async def set_val(
44+
key: str,
45+
value: str,
46+
context: Annotated[Context, TaskiqDepends()],
47+
) -> None:
48+
# Now we can use our pool to set value.
49+
await Redis(connection_pool=context.state.redis).set(key, value)
50+
51+
52+
async def main() -> None:
53+
await broker.startup()
54+
55+
set_task = await set_val.kiq("key", "value")
56+
set_result = await set_task.wait_result(with_logs=True)
57+
if set_result.is_err:
58+
print(set_result.log)
59+
raise ValueError("Cannot set value in redis. See logs.")
60+
61+
get_task = await get_val.kiq("key")
62+
get_res = await get_task.wait_result()
63+
print(f"Got redis value: {get_res.return_value}")
64+
65+
await broker.shutdown()
66+
67+
68+
if __name__ == "__main__":
69+
asyncio.run(main())
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
from typing import Annotated, Generator
2+
3+
from taskiq import TaskiqDepends
4+
5+
6+
def dependency() -> Generator[str, None, None]:
7+
print("Startup")
8+
9+
yield "value"
10+
11+
print("Shutdown")
12+
13+
14+
async def my_task(dep: Annotated[str, TaskiqDepends(dependency)]) -> None:
15+
print(dep.upper())

docs/examples/state/generics.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
from typing import Generic, TypeVar
2+
3+
from taskiq import TaskiqDepends
4+
5+
T = TypeVar("T")
6+
7+
8+
def default_dependency() -> int:
9+
return 1
10+
11+
12+
class GenericDep(Generic[T]):
13+
def __init__(self, dep: T = TaskiqDepends()) -> None:
14+
self.dep = dep
15+
16+
def print_dep(self) -> None:
17+
print(self.dep)
18+
19+
20+
class MyClassDep:
21+
def __init__(self, i: int = TaskiqDepends(default_dependency)) -> None:
22+
self.i = i
23+
24+
25+
async def my_task(dep: GenericDep[MyClassDep] = TaskiqDepends()) -> None:
26+
assert isinstance(dep.dep, MyClassDep)
27+
dep.print_dep()

docs/examples/state/generics_annot.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
from typing import Annotated, Generic, TypeVar
2+
3+
from taskiq import TaskiqDepends
4+
5+
T = TypeVar("T")
6+
7+
8+
def default_dependency() -> int:
9+
return 1
10+
11+
12+
class GenericDep(Generic[T]):
13+
def __init__(self, dep: Annotated[T, TaskiqDepends()]) -> None:
14+
self.dep = dep
15+
16+
def print_dep(self) -> None:
17+
print(self.dep)
18+
19+
20+
class MyClassDep:
21+
def __init__(self, i: Annotated[int, TaskiqDepends(default_dependency)]) -> None:
22+
self.i = i
23+
24+
25+
async def my_task(dep: Annotated[GenericDep[MyClassDep], TaskiqDepends()]) -> None:
26+
assert isinstance(dep.dep, MyClassDep)
27+
dep.print_dep()

docs/examples/state/no_cache_annot.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
import random
2+
from typing import Annotated
3+
4+
from taskiq import TaskiqDepends
5+
6+
7+
def common_dep() -> int:
8+
return random.randint(1, 10)
9+
10+
11+
def dep1(cd: Annotated[int, TaskiqDepends(common_dep)]) -> int:
12+
return cd + 1
13+
14+
15+
def dep2(cd: Annotated[int, TaskiqDepends(common_dep, use_cache=False)]) -> int:
16+
return cd + 2
17+
18+
19+
def my_task(
20+
d1: Annotated[int, TaskiqDepends(dep1)],
21+
d2: Annotated[int, TaskiqDepends(dep2)],
22+
) -> int:
23+
return d1 + d2

docs/framework_integrations/taskiq-with-aiohttp.md

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,24 @@ taskiq_aiohttp.init(broker, "my_project.main:app")
6060
From this point, you'll be able to reuse the same dependencies as with `aiohttp-deps`.
6161
Let's take a look at this function:
6262

63+
::: tabs
64+
65+
@tab Annotated 3.10+
66+
67+
```python
68+
from aiohttp import web
69+
from typing import Annotated
70+
from taskiq import TaskiqDepends
71+
from my_project.tkq import broker
72+
73+
@broker.task
74+
async def my_task(app: Annotated[web.Application, TaskiqDepends()]):
75+
...
76+
77+
```
78+
79+
@tab default values
80+
6381
```python
6482
from aiohttp import web
6583
from taskiq import TaskiqDepends
@@ -71,6 +89,8 @@ async def my_task(app: web.Application = TaskiqDepends()):
7189

7290
```
7391

92+
:::
93+
7494
In this example, we depend on the current application. We can use its state in a current task or any other dependency. We can take db_pool from your application's state, which is the same pool, as the one you've created on AiohTTP's startup.
7595
But this application is only a mock of your application. It has correct types and all your variables that you filled on startup, but it doesn't handle any request.
7696
This integration adds two main dependencies:

0 commit comments

Comments
 (0)