44[ ![ PyPI] ( https://img.shields.io/pypi/v/taskiq-ydb?style=for-the-badge&logo=pypi )] ( https://pypi.org/project/taskiq-ydb/ )
55[ ![ Checks] ( https://img.shields.io/github/actions/workflow/status/danfimov/taskiq-ydb/code_check.yml?style=for-the-badge&logo=pytest&label=checks )] ( https://github.com/danfimov/taskiq-ydb )
66
7-
8-
9- Plugin for taskiq that adds a new result backend and broker based on YDB.
7+ Plugin for taskiq that adds a new result backend, broker and schedule source based on YDB.
108
119## Installation
1210
@@ -16,75 +14,119 @@ This project can be installed using pip/poetry/uv (choose your preferred package
1614pip install taskiq-ydb
1715```
1816
19- ## Usage
17+ ## Quick start
2018
21- Let's see the example with YDB broker and result backend:
19+ ### Basic task processing
2220
23- ``` Python
24- # example.py
25- import asyncio
21+ 1 . Define your broker with [ asyncpg] ( https://github.com/MagicStack/asyncpg ) :
2622
27- from ydb.aio.driver import DriverConfig
23+ ``` python
24+ # broker_example.py
25+ import asyncio
26+ from ydb.aio.driver import DriverConfig
27+ from taskiq_ydb import YdbBroker, YdbResultBackend
2828
29- from taskiq_ydb import YdbBroker, YdbResultBackend
3029
30+ driver_config = DriverConfig(
31+ endpoint = ' grpc://localhost:2136' ,
32+ database = ' /local' ,
33+ )
34+ broker = YdbBroker(
35+ driver_config = driver_config,
36+ ).with_result_backend(
37+ YdbResultBackend(driver_config = driver_config),
38+ )
3139
32- driver_config = DriverConfig(
33- endpoint = ' grpc://localhost:2136' ,
34- database = ' /local' ,
35- )
3640
37- broker = YdbBroker(
38- driver_config = driver_config,
39- ).with_result_backend(YdbResultBackend(driver_config = driver_config))
41+ @broker.task (' solve_all_problems' )
42+ async def best_task_ever () -> None :
43+ """ Solve all problems in the world."""
44+ await asyncio.sleep(2 )
45+ print (' All problems are solved!' )
4046
4147
42- @broker.task (task_name = ' best_task_ever' )
43- async def best_task_ever () -> str :
44- """ Solve all problems in the world."""
45- return ' Problems solved!'
48+ async def main () -> None :
49+ await broker.startup()
50+ task = await best_task_ever.kiq()
51+ print (await task.wait_result())
52+ await broker.shutdown()
4653
4754
48- async def main () -> None :
49- """ Start the application with broker."""
50- await broker.startup()
51- task = await best_task_ever.kiq()
52- result = await task.wait_result()
53- print (f ' Task result: { result.return_value} ' )
54- await broker.shutdown()
55+ if __name__ == ' __main__' :
56+ asyncio.run(main())
57+ ```
5558
59+ 2 . Start a worker to process tasks (by default taskiq runs two instances of worker):
5660
57- if __name__ == ' __main__' :
58- loop = asyncio.get_event_loop()
59- loop.run_until_complete(main())
60- ```
61+ ``` bash
62+ taskiq worker broker_example:broker
63+ ```
6164
62- Example can be run using the following command :
65+ 3 . Run ` broker_example.py ` file to send a task to the worker :
6366
64- ``` bash
65- # Start broker
66- python3 -m example
67- ```
67+ ``` bash
68+ python broker_example.py
69+ ```
6870
69- ``` bash
70- # Start worker for executing command
71- taskiq worker example:broker
72- ```
71+ Your experience with other drivers will be pretty similar. Just change the import statement and that's it.
72+
73+ ### Task scheduling
74+
75+ 1 . Define your broker and schedule source:
76+
77+ ``` python
78+ # scheduler_example.py
79+ import asyncio
80+
81+ from taskiq import TaskiqScheduler
82+ from ydb.aio.driver import DriverConfig
83+
84+ from taskiq_ydb import YdbBroker, YdbScheduleSource
7385
74- ## Configuration
7586
76- ** Broker:**
87+ driver_config = DriverConfig(
88+ endpoint = ' grpc://localhost:2136' ,
89+ database = ' /local' ,
90+ )
91+ broker = YdbBroker(driver_config = driver_config)
92+ scheduler = TaskiqScheduler(
93+ broker = broker,
94+ sources = [
95+ YdbScheduleSource(
96+ driver_config = driver_config,
97+ broker = broker,
98+ ),
99+ ],
100+ )
77101
78- - ` driver_config ` : connection config for YDB client, you can read more about it in [ YDB documentation] ( https://ydb.tech/docs/en/concepts/connect ) ;
79- - ` topic_path ` : path to the topic where tasks will be stored, default is ` /taskiq-tasks ` ;
80- - ` connection_timeout ` : timeout for connection to database during startup, default is 5 seconds.
81- - ` read_timeout ` : timeout for read topic operation, default is 5 seconds.
82102
83- ** Result backend:**
103+ @broker.task (
104+ task_name = ' solve_all_problems' ,
105+ schedule = [
106+ {
107+ ' cron' : ' */1 * * * *' , # type: str , either cron or time should be specified .
108+ ' cron_offset' : None , # type: str | timedelta | None , can be omitted .
109+ ' time' : None , # type: datetime | None , either cron or time should be specified .
110+ ' args' : [], # type list[Any] | None, can be omitted.
111+ ' kwargs' : {}, # type: dict [ str , Any ] | None , can be omitted .
112+ ' labels' : {}, # type: dict [ str , Any ] | None , can be omitted .
113+ },
114+ ],
115+ )
116+ async def best_task_ever () -> None :
117+ """ Solve all problems in the world."""
118+ await asyncio.sleep(2 )
119+ print (' All problems are solved!' )
120+ ```
121+
122+ 2 . Start worker processes:
123+
124+ ``` bash
125+ taskiq worker scheduler_example:broker
126+ ```
84127
85- - ` driver_config ` : connection config for YDB client, you can read more about it in [ YDB documentation] ( https://ydb.tech/docs/en/concepts/connect ) ;
86- - ` table_name ` : name of the table to store task results;
87- - ` table_primary_key_type ` : type of primary key in task results table, default is ` uuid ` ;
88- - ` serializer ` : type of ` TaskiqSerializer ` default is ` PickleSerializer ` ;
89- - ` pool_size ` : size of the connection pool for YDB client, default is ` 5 ` ;
90- - ` connection_timeout ` : timeout for connection to database during startup, default is 5 seconds.
128+ 3 . Run scheduler process:
129+
130+ ``` bash
131+ taskiq scheduler scheduler_example:scheduler
132+ ```
0 commit comments