Skip to content

Commit 5f4887e

Browse files
so-safs3rius
andauthored
Add intervals for scheduled task (#498)
* add scheduler intervals * add some tests, interval value validation * tests fixes * resolve conflicts * tests fixes * extract interval validation to separate module * change schedules storage to list of tuples --------- Co-authored-by: Pavel Kirilin <[email protected]>
1 parent 82e63c2 commit 5f4887e

22 files changed

+711
-326
lines changed

docs/available-components/schedule-sources.md

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,10 @@ The format of the schedule label is the following:
3737
@broker.task(
3838
schedule=[
3939
{
40-
"cron": "*/1 * * * *", # type: str, either cron or time should be specified.
41-
"cron_offset": None # type: str | timedelta | None, can be omitted.
42-
"time": None # type: datetime | None, either cron or time should be specified.
40+
"cron": "* * * * *", # type: str, either cron, interval or time should be specified.
41+
"cron_offset": None, # type: str | timedelta | None, can be omitted.
42+
"interval": None, # type: int | timedelta, either cron, interval or time should be specified.
43+
"time": None, # type: datetime | None, either cron, interval or time should be specified.
4344
"args": [], # type List[Any] | None, can be omitted.
4445
"kwargs": {}, # type: Dict[str, Any] | None, can be omitted.
4546
"labels": {}, # type: Dict[str, Any] | None, can be omitted.
@@ -55,6 +56,7 @@ Parameters:
5556

5657
- `cron` - crontab string when to run the task.
5758
- `cron_offset` - timezone offset for cron values. Explained [here](../guide/scheduling-tasks.md#working-with-timezones)
59+
- `interval` - Interval to run periodic tasks. Must be at least 1 second and cannot have fractional seconds.
5860
- `time` - specific time when send the task.
5961
- `args` - args to use, when invoking the task.
6062
- `kwargs` - key-word arguments to use when invoking the task.

docs/guide/scheduling-tasks.md

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ Now we can use this source to add new schedules in runtime. Here's an example:
124124
)
125125
```
126126

127-
Or if you want to use cron schedules instead, just use `schedule_by_cron` method.
127+
You can also use cron or interval scheduling, just use the `schedule_by_cron` or `schedule_by_interval` methods
128128

129129
```python
130130
await my_task.schedule_by_cron(
@@ -135,6 +135,15 @@ Or if you want to use cron schedules instead, just use `schedule_by_cron` method
135135
)
136136
```
137137

138+
```python
139+
await my_task.schedule_by_interval(
140+
redis_source,
141+
datetime.timedelta(seconds=5),
142+
11,
143+
arg2="arg2",
144+
)
145+
```
146+
138147
If you want to pass additional labels, you can call these methods on the `Kicker` instance.
139148

140149
```python

taskiq/api/scheduler.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
11
from datetime import timedelta
22
from typing import Optional
33

4-
from taskiq.cli.scheduler.run import run_scheduler_loop
4+
from taskiq.cli.scheduler.run import SchedulerLoop
55
from taskiq.scheduler.scheduler import TaskiqScheduler
66

77

88
async def run_scheduler_task(
99
scheduler: TaskiqScheduler,
1010
run_startup: bool = False,
1111
interval: Optional[timedelta] = None,
12+
loop_interval: Optional[timedelta] = None,
1213
) -> None:
1314
"""
1415
Run scheduler task.
@@ -18,10 +19,16 @@ async def run_scheduler_task(
1819
1920
:param scheduler: scheduler instance.
2021
:param run_startup: whether to run startup function or not.
22+
:param interval: interval to check for schedule updates.
23+
:param loop_interval: interval to check tasks to send.
2124
"""
2225
for source in scheduler.sources:
2326
await source.startup()
2427
if run_startup:
2528
await scheduler.startup()
2629
while True:
27-
await run_scheduler_loop(scheduler, interval)
30+
scheduler_loop = SchedulerLoop(scheduler)
31+
await scheduler_loop.run(
32+
update_interval=interval,
33+
loop_interval=loop_interval,
34+
)

taskiq/cli/scheduler/args.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ class SchedulerArgs:
1919
tasks_pattern: Sequence[str] = ("**/tasks.py",)
2020
skip_first_run: bool = False
2121
update_interval: Optional[int] = None
22+
loop_interval: Optional[int] = None
2223

2324
@classmethod
2425
def from_cli(cls, args: Optional[Sequence[str]] = None) -> "SchedulerArgs":
@@ -101,6 +102,15 @@ def from_cli(cls, args: Optional[Sequence[str]] = None) -> "SchedulerArgs":
101102
"If not specified, scheduler will run once a minute."
102103
),
103104
)
105+
parser.add_argument(
106+
"--loop-interval",
107+
type=int,
108+
default=None,
109+
help=(
110+
"Interval in seconds to check tasks to send. "
111+
"If not specified, scheduler will run once a second."
112+
),
113+
)
104114

105115
namespace = parser.parse_args(args)
106116
# If there are any patterns specified, remove default.

0 commit comments

Comments
 (0)