Skip to content

Commit 8c9f8ec

Browse files
authored
Added API for easy-scheduling. (#219)
1 parent 71408f6 commit 8c9f8ec

File tree

16 files changed

+189
-50
lines changed

16 files changed

+189
-50
lines changed

docs/examples/extending/schedule_source.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,9 @@ async def get_schedules(self) -> List["ScheduledTask"]:
2424

2525
# This method is optional. You may not implement this.
2626
# It's just a helper to people to be able to interact with your source.
27-
async def add_schedule(self, schedule: "ScheduledTask") -> None:
28-
return await super().add_schedule(schedule)
27+
# This method can be either sync or async.
28+
def add_schedule(self, schedule: "ScheduledTask") -> None:
29+
print("New schedule added:", schedule)
2930

3031
# This method is optional. You may not implement this.
3132
# It's just a helper to people to be able to interact with your source.

docs/examples/schedule/intro.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from taskiq_aio_pika import AioPikaBroker
22

33
from taskiq.schedule_sources import LabelScheduleSource
4-
from taskiq.scheduler import TaskiqScheduler
4+
from taskiq import TaskiqScheduler
55

66
broker = AioPikaBroker("amqp://guest:guest@localhost:5672/")
77

taskiq/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@
2828
from taskiq.middlewares.prometheus_middleware import PrometheusMiddleware
2929
from taskiq.middlewares.retry_middleware import SimpleRetryMiddleware
3030
from taskiq.result import TaskiqResult
31-
from taskiq.scheduler import ScheduledTask, TaskiqScheduler
31+
from taskiq.scheduler.scheduled_task import ScheduledTask
32+
from taskiq.scheduler.scheduler import TaskiqScheduler
3233
from taskiq.state import TaskiqState
3334
from taskiq.task import AsyncTaskiqTask
3435

taskiq/abc/schedule_source.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
from typing import TYPE_CHECKING, Any, Coroutine, List, Union
33

44
if TYPE_CHECKING: # pragma: no cover
5-
from taskiq.scheduler.scheduler import ScheduledTask
5+
from taskiq.scheduler.scheduled_task import ScheduledTask
66

77

88
class ScheduleSource(ABC):
@@ -18,7 +18,10 @@ async def shutdown(self) -> None: # noqa: B027
1818
async def get_schedules(self) -> List["ScheduledTask"]:
1919
"""Get list of taskiq schedules."""
2020

21-
async def add_schedule(self, schedule: "ScheduledTask") -> None:
21+
def add_schedule(
22+
self,
23+
schedule: "ScheduledTask",
24+
) -> Union[None, Coroutine[Any, Any, None]]:
2225
"""
2326
Add a new schedule.
2427

taskiq/api/scheduler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from taskiq.cli.scheduler.run import run_scheduler_loop
2-
from taskiq.scheduler import TaskiqScheduler
2+
from taskiq.scheduler.scheduler import TaskiqScheduler
33

44

55
async def run_scheduler_task(

taskiq/cli/scheduler/run.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@
1010
from taskiq.abc.schedule_source import ScheduleSource
1111
from taskiq.cli.scheduler.args import SchedulerArgs
1212
from taskiq.cli.utils import import_object, import_tasks
13-
from taskiq.scheduler.scheduler import ScheduledTask, TaskiqScheduler
13+
from taskiq.scheduler.scheduled_task import ScheduledTask
14+
from taskiq.scheduler.scheduler import TaskiqScheduler
1415

1516
logger = getLogger(__name__)
1617

taskiq/decor.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from datetime import datetime
12
from typing import (
23
TYPE_CHECKING,
34
Any,
@@ -6,6 +7,7 @@
67
Dict,
78
Generic,
89
TypeVar,
10+
Union,
911
overload,
1012
)
1113

@@ -16,6 +18,8 @@
1618

1719
if TYPE_CHECKING: # pragma: no cover
1820
from taskiq.abc.broker import AsyncBroker
21+
from taskiq.abc.schedule_source import ScheduleSource
22+
from taskiq.scheduler.scheduled_task import CronSpec
1923

2024
_T = TypeVar("_T")
2125
_FuncParams = ParamSpec("_FuncParams")
@@ -93,6 +97,56 @@ async def kiq(
9397
"""
9498
return await self.kicker().kiq(*args, **kwargs)
9599

100+
async def schedule_by_cron(
101+
self,
102+
source: "ScheduleSource",
103+
cron: Union[str, "CronSpec"],
104+
*args: _FuncParams.args,
105+
**kwargs: _FuncParams.kwargs,
106+
) -> None:
107+
"""
108+
Schedule task to run on cron.
109+
110+
This method requires a schedule source,
111+
which is capable of dynamically adding new schedules.
112+
113+
:param source: schedule source.
114+
:param cron: cron string or a CronSpec instance.
115+
:param args: function's arguments.
116+
:param kwargs: function's key word arguments.
117+
"""
118+
await self.kicker().schedule_cron(
119+
source,
120+
cron,
121+
*args,
122+
**kwargs,
123+
)
124+
125+
async def schedule_by_time(
126+
self,
127+
source: "ScheduleSource",
128+
time: datetime,
129+
*args: _FuncParams.args,
130+
**kwargs: _FuncParams.kwargs,
131+
) -> None:
132+
"""
133+
Schedule task to run on specific time.
134+
135+
This method requires a schedule source,
136+
which is capable of dynamically adding new schedules.
137+
138+
:param source: schedule source.
139+
:param time: time to run task.
140+
:param args: function's arguments.
141+
:param kwargs: function's key word arguments.
142+
"""
143+
await self.kicker().schedule_time(
144+
source,
145+
time,
146+
*args,
147+
**kwargs,
148+
)
149+
96150
def kicker(self) -> AsyncKicker[_FuncParams, _ReturnType]:
97151
"""
98152
This function returns kicker object.

taskiq/kicker.py

Lines changed: 69 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from dataclasses import asdict, is_dataclass
2+
from datetime import datetime
23
from logging import getLogger
34
from typing import (
45
TYPE_CHECKING,
@@ -16,13 +17,16 @@
1617
from typing_extensions import ParamSpec
1718

1819
from taskiq.abc.middleware import TaskiqMiddleware
20+
from taskiq.compat import model_dump
1921
from taskiq.exceptions import SendTaskError
2022
from taskiq.message import TaskiqMessage
23+
from taskiq.scheduler.scheduled_task import CronSpec, ScheduledTask
2124
from taskiq.task import AsyncTaskiqTask
2225
from taskiq.utils import maybe_awaitable
2326

2427
if TYPE_CHECKING: # pragma: no cover
2528
from taskiq.abc.broker import AsyncBroker
29+
from taskiq.abc.schedule_source import ScheduleSource
2630

2731
_T = TypeVar("_T")
2832
_FuncParams = ParamSpec("_FuncParams")
@@ -142,6 +146,70 @@ async def kiq(
142146
result_backend=self.broker.result_backend,
143147
)
144148

149+
async def schedule_cron(
150+
self,
151+
source: "ScheduleSource",
152+
cron: Union[str, "CronSpec"],
153+
*args: _FuncParams.args,
154+
**kwargs: _FuncParams.kwargs,
155+
) -> None:
156+
"""
157+
Function to schedule task with cron.
158+
159+
:param source: schedule source.
160+
:param cron: cron expression.
161+
:param args: function's args.
162+
:param cron_offset: cron offset.
163+
:param kwargs: function's kwargs.
164+
"""
165+
message = self._prepare_message(*args, **kwargs)
166+
cron_offset = None
167+
if isinstance(cron, CronSpec):
168+
cron_str = cron.to_cron()
169+
cron_offset = cron.offset
170+
else:
171+
cron_str = cron
172+
await maybe_awaitable(
173+
source.add_schedule(
174+
ScheduledTask(
175+
task_name=message.task_name,
176+
labels=message.labels,
177+
args=message.args,
178+
kwargs=message.kwargs,
179+
cron=cron_str,
180+
cron_offset=cron_offset,
181+
),
182+
),
183+
)
184+
185+
async def schedule_time(
186+
self,
187+
source: "ScheduleSource",
188+
time: datetime,
189+
*args: _FuncParams.args,
190+
**kwargs: _FuncParams.kwargs,
191+
) -> None:
192+
"""
193+
Function to schedule task to run at specific time.
194+
195+
:param source: schedule source.
196+
:param time: time to run task at.
197+
:param args: function's args.
198+
:param kwargs: function's kwargs.
199+
"""
200+
message = self._prepare_message(*args, **kwargs)
201+
await maybe_awaitable(
202+
source.add_schedule(
203+
ScheduledTask(
204+
task_name=message.task_name,
205+
labels=message.labels,
206+
args=message.args,
207+
kwargs=message.kwargs,
208+
time=time,
209+
),
210+
),
211+
)
212+
145213
@classmethod
146214
def _prepare_arg(cls, arg: Any) -> Any:
147215
"""
@@ -154,7 +222,7 @@ def _prepare_arg(cls, arg: Any) -> Any:
154222
:return: Formatted argument.
155223
"""
156224
if isinstance(arg, BaseModel):
157-
arg = arg.dict()
225+
arg = model_dump(arg)
158226
if is_dataclass(arg):
159227
arg = asdict(arg)
160228
return arg

taskiq/schedule_sources/label_based.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
from taskiq.abc.broker import AsyncBroker
55
from taskiq.abc.schedule_source import ScheduleSource
6-
from taskiq.scheduler.scheduler import ScheduledTask
6+
from taskiq.scheduler.scheduled_task import ScheduledTask
77

88
logger = getLogger(__name__)
99

taskiq/scheduler/__init__.py

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1 @@
11
"""Scheduler package."""
2-
from taskiq.scheduler.merge_functions import only_unique, preserve_all
3-
from taskiq.scheduler.scheduler import ScheduledTask, TaskiqScheduler
4-
5-
__all__ = [
6-
"only_unique",
7-
"preserve_all",
8-
"ScheduledTask",
9-
"TaskiqScheduler",
10-
]

0 commit comments

Comments
 (0)