Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions docs/available-components/schedule-sources.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@ The format of the schedule label is the following:
@broker.task(
schedule=[
{
"cron": "*/1 * * * *", # type: str, either cron or time should be specified.
"cron_offset": None # type: str | timedelta | None, can be omitted.
"time": None # type: datetime | None, either cron or time should be specified.
"cron": "* * * * *", # type: str, either cron, interval or time should be specified.
"cron_offset": None, # type: str | timedelta | None, can be omitted.
"interval": None, # type: int | timedelta, either cron, interval or time should be specified.
"time": None, # type: datetime | None, either cron, interval or time should be specified.
"args": [], # type List[Any] | None, can be omitted.
"kwargs": {}, # type: Dict[str, Any] | None, can be omitted.
"labels": {}, # type: Dict[str, Any] | None, can be omitted.
Expand All @@ -55,6 +56,7 @@ Parameters:

- `cron` - crontab string when to run the task.
- `cron_offset` - timezone offset for cron values. Explained [here](../guide/scheduling-tasks.md#working-with-timezones)
- `interval` - Interval to run periodic tasks. Must be at least 1 second and cannot have fractional seconds.
- `time` - specific time when send the task.
- `args` - args to use, when invoking the task.
- `kwargs` - key-word arguments to use when invoking the task.
Expand Down
11 changes: 10 additions & 1 deletion docs/guide/scheduling-tasks.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ Now we can use this source to add new schedules in runtime. Here's an example:
)
```

Or if you want to use cron schedules instead, just use `schedule_by_cron` method.
You can also use cron or interval scheduling, just use the `schedule_by_cron` or `schedule_by_interval` methods

```python
await my_task.schedule_by_cron(
Expand All @@ -135,6 +135,15 @@ Or if you want to use cron schedules instead, just use `schedule_by_cron` method
)
```

```python
await my_task.schedule_by_interval(
redis_source,
datetime.timedelta(seconds=5),
11,
arg2="arg2",
)
```

If you want to pass additional labels, you can call these methods on the `Kicker` instance.

```python
Expand Down
11 changes: 9 additions & 2 deletions taskiq/api/scheduler.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
from datetime import timedelta
from typing import Optional

from taskiq.cli.scheduler.run import run_scheduler_loop
from taskiq.cli.scheduler.run import SchedulerLoop
from taskiq.scheduler.scheduler import TaskiqScheduler


async def run_scheduler_task(
scheduler: TaskiqScheduler,
run_startup: bool = False,
interval: Optional[timedelta] = None,
loop_interval: Optional[timedelta] = None,
) -> None:
"""
Run scheduler task.
Expand All @@ -18,10 +19,16 @@ async def run_scheduler_task(

:param scheduler: scheduler instance.
:param run_startup: whether to run startup function or not.
:param interval: interval to check for schedule updates.
:param loop_interval: interval to check tasks to send.
"""
for source in scheduler.sources:
await source.startup()
if run_startup:
await scheduler.startup()
while True:
await run_scheduler_loop(scheduler, interval)
scheduler_loop = SchedulerLoop(scheduler)
await scheduler_loop.run(
update_interval=interval,
loop_interval=loop_interval,
)
10 changes: 10 additions & 0 deletions taskiq/cli/scheduler/args.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class SchedulerArgs:
tasks_pattern: Sequence[str] = ("**/tasks.py",)
skip_first_run: bool = False
update_interval: Optional[int] = None
loop_interval: Optional[int] = None

@classmethod
def from_cli(cls, args: Optional[Sequence[str]] = None) -> "SchedulerArgs":
Expand Down Expand Up @@ -101,6 +102,15 @@ def from_cli(cls, args: Optional[Sequence[str]] = None) -> "SchedulerArgs":
"If not specified, scheduler will run once a minute."
),
)
parser.add_argument(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why we need --loop-inteval if we alerady have --update-interval argument in line 96. Can you explain the difference between this two?

Copy link
Author

@so-saf so-saf Nov 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The --loop-interval parameter specifies how often to check tasks for execution needs, while --update-interval specifies how often to synchronize the list of scheduled tasks.

I separated them for two reasons:

  • Polling sources every second might be too resource-intensive for some types due to network overhead (Redis, PostgreSQL, etc.). This is why the --update-interval parameter cannot regulate the loop interval.
  • One second is quite frequent; some users might want to increase this interval. This is why the --loop-interval parameter is needed.

"--loop-interval",
type=int,
default=None,
help=(
"Interval in seconds to check tasks to send. "
"If not specified, scheduler will run once a second."
),
)

namespace = parser.parse_args(args)
# If there are any patterns specified, remove default.
Expand Down
Loading