Skip to content

Conversation

@so-saf
Copy link

@so-saf so-saf commented Aug 18, 2025

Hi!

Following up on PR #399, which I left unfinished half a year ago:

In this PR, I want to add intervals for scheduled tasks.

List of changes:

  • Added a new interval field to ScheduledTask.
  • Implemented the SchedulerLoop class—an abstraction over the scheduler loop.
  • Added a new loop_interval argument to SchedulerArgs.
  • Moved the skip_first_run mechanism into SchedulerLoop.run().
  • The teskiq.api.scheduler.run_scheduler_tasks function now uses SchedulerLoop.
  • Implemented the schedule_by_interval method in the AsyncTaskiqDecoratedTask and AsyncKicker classes.
  • Wrote tests for the new logic.
  • Updated the documentation.

The main idea is to iterate through all tasks every loop_interval and check if they need to be executed.

Example:

@broker.task(schedule=[{"interval": 5, "args": [1]}])  
async def add_one(value: int) -> int:  
    await asyncio.sleep(0.5)  
    return value + 1  

Logs:

[2025-08-18 11:07:50,547][INFO   ][run:run_scheduler:396] Starting scheduler.
[2025-08-18 11:07:50,551][INFO   ][run:run_scheduler:398] Startup completed.
[2025-08-18 11:07:51,001][INFO   ][run:send:160] Sending task test:add_one with schedule_id 5b32c74a07d447d68f506b089616d526.
[2025-08-18 11:07:56,002][INFO   ][run:send:160] Sending task test:add_one with schedule_id 5b32c74a07d447d68f506b089616d526.
[2025-08-18 11:08:01,002][INFO   ][run:send:160] Sending task test:add_one with schedule_id 5b32c74a07d447d68f506b089616d526.
[2025-08-18 11:08:06,002][INFO   ][run:send:160] Sending task test:add_one with schedule_id 5b32c74a07d447d68f506b089616d526.
[2025-08-18 11:08:11,001][INFO   ][run:send:160] Sending task test:add_one with schedule_id 5b32c74a07d447d68f506b089616d526.
[2025-08-18 11:08:16,001][INFO   ][run:send:160] Sending task test:add_one with schedule_id 5b32c74a07d447d68f506b089616d526.
[2025-08-18 11:08:21,001][INFO   ][run:send:160] Sending task test:add_one with schedule_id 5b32c74a07d447d68f506b089616d526.

@so-saf
Copy link
Author

so-saf commented Oct 25, 2025

I saw that several tests failed on older versions of Pydantic and Python, so I pushed fixes.

from tests.utils import AsyncQueueBroker


@pytest.mark.anyio
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I enabled anyio_mode = "auto" flag for pytest. So tests doesn't need to explicitly say that they are async with pytest.mark.anyio marker.

You can just remove it and tests should work as they were)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

…er_intervals

# Conflicts:
#	docs/guide/cli.md
#	taskiq/cli/scheduler/run.py
#	tests/cli/scheduler/test_task_delays.py
@so-saf
Copy link
Author

so-saf commented Nov 14, 2025

I have resolved the merge conflicts and fixed the failing tests.

raise ValueError("Either cron, interval, or datetime must be present.")

# Validate interval constraints
if self.interval is not None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like this logic should be in a separate validator for interval field. Also we can use strict=True and ge=1 arguments for pydantic.Field to check that it's really a positive integer.

I think it should look like that and work the same as your implementation:

class ScheduledTask(BaseModel):
    ....  # other field here
    interval: Union[int, timedelta, None] = Field(ge=1, strict=True)
    
    @pydantic.field_validator("interval", mode="before")
    @classmethod
    def __check_interval(cls, value: Any) -> int | None:
        if isinstance(value, timedelta):
           return value.total_seconds()
        return value

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, this approach won't work because the strict=True parameter doesn't function with Union annotations, and le=1 is not designed for timedelta classes.

However, I've reviewed the validation for the interval parameter and decided to extract it into a separate function to avoid code duplication between the v1 and v2 schemas.

"If not specified, scheduler will run once a minute."
),
)
parser.add_argument(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why we need --loop-inteval if we alerady have --update-interval argument in line 96. Can you explain the difference between this two?

Copy link
Author

@so-saf so-saf Nov 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The --loop-interval parameter specifies how often to check tasks for execution needs, while --update-interval specifies how often to synchronize the list of scheduled tasks.

I separated them for two reasons:

  • Polling sources every second might be too resource-intensive for some types due to network overhead (Redis, PostgreSQL, etc.). This is why the --update-interval parameter cannot regulate the loop interval.
  • One second is quite frequent; some users might want to increase this interval. This is why the --loop-interval parameter is needed.

@so-saf so-saf requested a review from danfimov November 16, 2025 19:17
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants