Skip to content

Commit 6cde427

Browse files
authored
Added scheduler. (#47)
* Initial scheduler implementation. Signed-off-by: Pavel Kirilin <[email protected]>
1 parent 11906d0 commit 6cde427

31 files changed

+703
-69
lines changed

docs/available-components/README.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
---
2+
order: 1
3+
dir:
4+
order: 3
5+
---
6+
7+
# Available components
8+
9+
In this section, you can find a list of officially supported plugins for the taskiq.
10+
11+
* [Available brokers](./brokers.md)
12+
* [Available result backends](./result-backends.md)
13+
* [Available schedule sources](./scheduler-sources.md)

docs/guide/available-brokers.md renamed to docs/available-components/brokers.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
---
2-
order: 5
2+
order: 2
33
---
44

55
# Available brokers

docs/guide/result_backends.md renamed to docs/available-components/result-backends.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
---
2-
order: 6
2+
order: 3
33
---
44

5-
# Result backends
5+
# Available result backends
66

77
Result backends are used to store execution results.
88
This includes:
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
---
2+
order: 4
3+
---
4+
5+
6+
# Available schedule sources
7+
8+
These objects are used to fetch current schedule for tasks.
9+
Currently we have only one schedule source.
10+
11+
12+
## LabelScheduleSource
13+
14+
This source parses labels of tasks, and if it finds a `schedule` label, it considers this task as scheduled.
15+
16+
The format of the schedule label is the following:
17+
18+
```python
19+
@broker.task(
20+
schedule=[
21+
{
22+
"cron": "* * * * *", # type: str, required argument.
23+
"args": [], # type List[Any] | None, can be omitted.
24+
"kwargs": {}, # type: Dict[str, Any] | None, can be omitted.
25+
"labels": {}, # type: Dict[str, Any] | None, can be omitted.
26+
}
27+
]
28+
)
29+
async def my_task():
30+
...
31+
```
32+
33+
Parameters:
34+
* `cron` - crontab string when to run the task.
35+
* `args` - args to use, when invoking the task.
36+
* `kwargs` - key-word arguments to use when invoking the task.
37+
* `labels` - additional labels to use wehn invoking the task.
38+
39+
Usage:
40+
41+
```python
42+
from taskiq.scheduler import TaskiqScheduler
43+
from taskiq.schedule_sources import LabelScheduleSource
44+
45+
broker = ...
46+
47+
scheduler = TaskiqScheduler(
48+
broker=broker,
49+
sources=[LabelScheduleSource(broker)],
50+
)
51+
```
52+
53+
54+
::: warning Cool notice!
55+
56+
In order to resolve all labels correctly, don't forget to import
57+
all task modules using CLI interface.
58+
59+
:::
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
from typing import TypeVar
2+
3+
from taskiq import TaskiqResult
4+
from taskiq.abc.result_backend import AsyncResultBackend
5+
6+
_ReturnType = TypeVar("_ReturnType")
7+
8+
9+
class MyResultBackend(AsyncResultBackend[_ReturnType]):
10+
async def startup(self) -> None:
11+
"""Do something when starting broker."""
12+
13+
async def shutdown(self) -> None:
14+
"""Do something on shutdown."""
15+
16+
async def set_result(
17+
self,
18+
task_id: str,
19+
result: TaskiqResult[_ReturnType],
20+
) -> None:
21+
# Here you must set result somewhere.
22+
pass
23+
24+
async def get_result(
25+
self,
26+
task_id: str,
27+
with_logs: bool = False,
28+
) -> TaskiqResult[_ReturnType]:
29+
# Here you must retrieve result by id.
30+
31+
# Logs is a part of a result.
32+
# Here we have a parameter whether you want to
33+
# fetch result with logs or not, because logs
34+
# can have a lot of info and sometimes it's critical
35+
# to get only needed information.
36+
pass
37+
38+
async def is_result_ready(
39+
self,
40+
task_id: str,
41+
) -> bool:
42+
# This function checks if result of a task exists,
43+
# without actual fetching the result.
44+
pass
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
from typing import List
2+
3+
from taskiq import ScheduledTask, ScheduleSource
4+
5+
6+
class MyScheduleSource(ScheduleSource):
7+
async def startup(self) -> None:
8+
"""Do something when starting broker."""
9+
10+
async def shutdown(self) -> None:
11+
"""Do something on shutdown."""
12+
13+
async def get_schedules(self) -> List["ScheduledTask"]:
14+
# Here you must return list of scheduled tasks from your source.
15+
return [
16+
ScheduledTask(
17+
task_name="",
18+
labels={},
19+
args=[],
20+
kwargs={},
21+
cron="* * * * *",
22+
),
23+
]
24+
25+
# This method is optional. You may not implement this.
26+
# It's just a helper to people to be able to interact with your source.
27+
async def add_schedule(self, schedule: "ScheduledTask") -> None:
28+
return await super().add_schedule(schedule)

docs/examples/schedule/intro.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
from taskiq_aio_pika import AioPikaBroker
2+
3+
from taskiq.schedule_sources import LabelScheduleSource
4+
from taskiq.scheduler import TaskiqScheduler
5+
6+
broker = AioPikaBroker("amqp://guest:guest@localhost:5672/")
7+
8+
scheduler = TaskiqScheduler(
9+
broker=broker,
10+
sources=[LabelScheduleSource(broker)],
11+
)
12+
13+
14+
@broker.task(schedule=[{"cron": "*/5 * * * *", "args": [1]}])
15+
async def heavy_task(value: int) -> int:
16+
return value + 1
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
from taskiq_aio_pika import AioPikaBroker
2+
3+
broker = AioPikaBroker("amqp://guest:guest@localhost:5672/")
4+
5+
6+
@broker.task
7+
async def heavy_task(value: int) -> int:
8+
return value + 1

docs/extending-taskiq/README.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,12 @@ Taskiq is super extendable. The core library comes with different abstract class
1010
You can implement these abstract classes to extend functionality.
1111

1212
All abstract classes can be found in `taskiq.abc` package.
13+
14+
15+
## Contents:
16+
17+
* [Brokers](./broker.md)
18+
* [Brokers](./broker.md)
19+
* [Result backends](./resutl-backend.md)
20+
* [CLI](./cli.md)
21+
* [Schedule sources](./schedule-sources.md)

docs/extending-taskiq/cli.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ from setuptools import setup
2727
setup(
2828
# ...,
2929
entry_points={
30-
'taskiq-cli': [
30+
'taskiq_cli': [
3131
'demo = my_project.cmd:MyCommand',
3232
]
3333
}
@@ -37,14 +37,14 @@ setup(
3737
@tab setuptools pyproject.toml
3838

3939
```toml
40-
[project.entry-points.taskiq-cli]
40+
[project.entry-points.taskiq_cli]
4141
demo = "my_project.cmd:MyCommand"
4242
```
4343

4444
@tab poetry
4545

4646
```toml
47-
[tool.poetry.plugins.taskiq-cli]
47+
[tool.poetry.plugins.taskiq_cli]
4848
demo = "my_project.cmd:MyCommand"
4949
```
5050

0 commit comments

Comments
 (0)