Skip to content

Add intervals for scheduled task #498

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions docs/available-components/schedule-sources.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@ The format of the schedule label is the following:
@broker.task(
schedule=[
{
"cron": "* * * * *", # type: str, either cron or time should be specified.
"cron_offset": None # type: str | timedelta | None, can be omitted.
"time": None # type: datetime | None, either cron or time should be specified.
"cron": "* * * * *", # type: str, either cron, interval or time should be specified.
"cron_offset": None, # type: str | timedelta | None, can be omitted.
"interval": None, # type: int | timedelta, either cron, interval or time should be specified.
"time": None, # type: datetime | None, either cron, interval or time should be specified.
"args": [], # type List[Any] | None, can be omitted.
"kwargs": {}, # type: Dict[str, Any] | None, can be omitted.
"labels": {}, # type: Dict[str, Any] | None, can be omitted.
Expand All @@ -51,6 +52,7 @@ Parameters:

- `cron` - crontab string when to run the task.
- `cron_offset` - timezone offset for cron values. Explained [here](../guide/scheduling-tasks.md#working-with-timezones)
- `interval` - Interval to run periodic tasks.
- `time` - specific time when send the task.
- `args` - args to use, when invoking the task.
- `kwargs` - key-word arguments to use when invoking the task.
Expand Down
2 changes: 2 additions & 0 deletions docs/guide/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -164,3 +164,5 @@ Path to scheduler is the only required argument.
- `--no-configure-logging` - use this parameter if your application configures custom logging.
- `--log-level` is used to set a log level (default `INFO`).
- `--skip-first-run` - skip first run of scheduler. This option skips running tasks immediately after scheduler start.
- `--update-interval` - interval in seconds to check for new tasks from sources (default `60`).
- `--loop-interval` - interval in seconds to check tasks to send (default `1`).
11 changes: 10 additions & 1 deletion docs/guide/scheduling-tasks.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ Now we can use this source to add new schedules in runtime. Here's an example:
)
```

Or if you want to use cron schedules instead, just use `schedule_by_cron` method.
You can also use cron or interval scheduling, just use the `schedule_by_cron` or `schedule_by_interval` methods

```python
await my_task.schedule_by_cron(
Expand All @@ -129,6 +129,15 @@ Or if you want to use cron schedules instead, just use `schedule_by_cron` method
)
```

```python
await my_task.schedule_by_interval(
redis_source,
datetime.timedelta(seconds=5),
11,
arg2="arg2",
)
```

If you want to pass additional labels, you can call these methods on the `Kicker` instance.

```python
Expand Down
11 changes: 9 additions & 2 deletions taskiq/api/scheduler.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
from datetime import timedelta
from typing import Optional

from taskiq.cli.scheduler.run import run_scheduler_loop
from taskiq.cli.scheduler.run import SchedulerLoop
from taskiq.scheduler.scheduler import TaskiqScheduler


async def run_scheduler_task(
scheduler: TaskiqScheduler,
run_startup: bool = False,
interval: Optional[timedelta] = None,
loop_interval: Optional[timedelta] = None,
) -> None:
"""
Run scheduler task.
Expand All @@ -18,10 +19,16 @@ async def run_scheduler_task(

:param scheduler: scheduler instance.
:param run_startup: whether to run startup function or not.
:param interval: interval to check for schedule updates.
:param loop_interval: interval to check tasks to send.
"""
for source in scheduler.sources:
await source.startup()
if run_startup:
await scheduler.startup()
while True:
await run_scheduler_loop(scheduler, interval)
scheduler_loop = SchedulerLoop(scheduler)
await scheduler_loop.run(
update_interval=interval,
loop_interval=loop_interval,
)
10 changes: 10 additions & 0 deletions taskiq/cli/scheduler/args.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class SchedulerArgs:
tasks_pattern: Sequence[str] = ("**/tasks.py",)
skip_first_run: bool = False
update_interval: Optional[int] = None
loop_interval: Optional[int] = None

@classmethod
def from_cli(cls, args: Optional[Sequence[str]] = None) -> "SchedulerArgs":
Expand Down Expand Up @@ -90,6 +91,15 @@ def from_cli(cls, args: Optional[Sequence[str]] = None) -> "SchedulerArgs":
"If not specified, scheduler will run once a minute."
),
)
parser.add_argument(
"--loop-interval",
type=int,
default=None,
help=(
"Interval in seconds to check tasks to send. "
"If not specified, scheduler will run once a second."
),
)

namespace = parser.parse_args(args)
# If there are any patterns specified, remove default.
Expand Down
Loading