diff --git a/docs/available-components/schedule-sources.md b/docs/available-components/schedule-sources.md index 4502ac0b..5bb6d627 100644 --- a/docs/available-components/schedule-sources.md +++ b/docs/available-components/schedule-sources.md @@ -34,9 +34,10 @@ The format of the schedule label is the following: @broker.task( schedule=[ { - "cron": "* * * * *", # type: str, either cron or time should be specified. - "cron_offset": None # type: str | timedelta | None, can be omitted. - "time": None # type: datetime | None, either cron or time should be specified. + "cron": "* * * * *", # type: str, either cron, interval or time should be specified. + "cron_offset": None, # type: str | timedelta | None, can be omitted. + "interval": None, # type: int | timedelta, either cron, interval or time should be specified. + "time": None, # type: datetime | None, either cron, interval or time should be specified. "args": [], # type List[Any] | None, can be omitted. "kwargs": {}, # type: Dict[str, Any] | None, can be omitted. "labels": {}, # type: Dict[str, Any] | None, can be omitted. @@ -51,6 +52,7 @@ Parameters: - `cron` - crontab string when to run the task. - `cron_offset` - timezone offset for cron values. Explained [here](../guide/scheduling-tasks.md#working-with-timezones) +- `interval` - Interval to run periodic tasks. - `time` - specific time when send the task. - `args` - args to use, when invoking the task. - `kwargs` - key-word arguments to use when invoking the task. diff --git a/docs/guide/cli.md b/docs/guide/cli.md index 83c85cbc..ab78d1ea 100644 --- a/docs/guide/cli.md +++ b/docs/guide/cli.md @@ -164,3 +164,5 @@ Path to scheduler is the only required argument. - `--no-configure-logging` - use this parameter if your application configures custom logging. - `--log-level` is used to set a log level (default `INFO`). - `--skip-first-run` - skip first run of scheduler. This option skips running tasks immediately after scheduler start. +- `--update-interval` - interval in seconds to check for new tasks from sources (default `60`). +- `--loop-interval` - interval in seconds to check tasks to send (default `1`). diff --git a/docs/guide/scheduling-tasks.md b/docs/guide/scheduling-tasks.md index 1d9b305a..d0c0ecb2 100644 --- a/docs/guide/scheduling-tasks.md +++ b/docs/guide/scheduling-tasks.md @@ -118,7 +118,7 @@ Now we can use this source to add new schedules in runtime. Here's an example: ) ``` -Or if you want to use cron schedules instead, just use `schedule_by_cron` method. +You can also use cron or interval scheduling, just use the `schedule_by_cron` or `schedule_by_interval` methods ```python await my_task.schedule_by_cron( @@ -129,6 +129,15 @@ Or if you want to use cron schedules instead, just use `schedule_by_cron` method ) ``` +```python + await my_task.schedule_by_interval( + redis_source, + datetime.timedelta(seconds=5), + 11, + arg2="arg2", + ) +``` + If you want to pass additional labels, you can call these methods on the `Kicker` instance. ```python diff --git a/taskiq/api/scheduler.py b/taskiq/api/scheduler.py index 6928b128..aa80b4d8 100644 --- a/taskiq/api/scheduler.py +++ b/taskiq/api/scheduler.py @@ -1,7 +1,7 @@ from datetime import timedelta from typing import Optional -from taskiq.cli.scheduler.run import run_scheduler_loop +from taskiq.cli.scheduler.run import SchedulerLoop from taskiq.scheduler.scheduler import TaskiqScheduler @@ -9,6 +9,7 @@ async def run_scheduler_task( scheduler: TaskiqScheduler, run_startup: bool = False, interval: Optional[timedelta] = None, + loop_interval: Optional[timedelta] = None, ) -> None: """ Run scheduler task. @@ -18,10 +19,16 @@ async def run_scheduler_task( :param scheduler: scheduler instance. :param run_startup: whether to run startup function or not. + :param interval: interval to check for schedule updates. + :param loop_interval: interval to check tasks to send. """ for source in scheduler.sources: await source.startup() if run_startup: await scheduler.startup() while True: - await run_scheduler_loop(scheduler, interval) + scheduler_loop = SchedulerLoop(scheduler) + await scheduler_loop.run( + update_interval=interval, + loop_interval=loop_interval, + ) diff --git a/taskiq/cli/scheduler/args.py b/taskiq/cli/scheduler/args.py index d1f6d821..3fdbaf00 100644 --- a/taskiq/cli/scheduler/args.py +++ b/taskiq/cli/scheduler/args.py @@ -18,6 +18,7 @@ class SchedulerArgs: tasks_pattern: Sequence[str] = ("**/tasks.py",) skip_first_run: bool = False update_interval: Optional[int] = None + loop_interval: Optional[int] = None @classmethod def from_cli(cls, args: Optional[Sequence[str]] = None) -> "SchedulerArgs": @@ -90,6 +91,15 @@ def from_cli(cls, args: Optional[Sequence[str]] = None) -> "SchedulerArgs": "If not specified, scheduler will run once a minute." ), ) + parser.add_argument( + "--loop-interval", + type=int, + default=None, + help=( + "Interval in seconds to check tasks to send. " + "If not specified, scheduler will run once a second." + ), + ) namespace = parser.parse_args(args) # If there are any patterns specified, remove default. diff --git a/taskiq/cli/scheduler/run.py b/taskiq/cli/scheduler/run.py index 7a7d9f53..a3d0b14e 100644 --- a/taskiq/cli/scheduler/run.py +++ b/taskiq/cli/scheduler/run.py @@ -1,12 +1,13 @@ import asyncio import inspect import sys -from datetime import datetime, timedelta, timezone +from datetime import datetime, timedelta from logging import basicConfig, getLevelName, getLogger -from typing import Any, Dict, List, Optional, Set, Tuple +from typing import Any, Dict, List, Optional, Union +import pycron import pytz -from pycron import is_now +from typing_extensions import TypeAlias from taskiq.abc.schedule_source import ScheduleSource from taskiq.cli.scheduler.args import SchedulerArgs @@ -17,6 +18,9 @@ logger = getLogger(__name__) +ScheduleId: TypeAlias = str + + def to_tz_aware(time: datetime) -> datetime: """ Convert datetime to timezone aware. @@ -55,7 +59,7 @@ async def get_schedules(source: ScheduleSource) -> List[ScheduledTask]: async def get_all_schedules( scheduler: TaskiqScheduler, -) -> List[Tuple[ScheduleSource, List[ScheduledTask]]]: +) -> Dict[ScheduleSource, List[ScheduledTask]]: """ Task to update all schedules. @@ -68,73 +72,91 @@ async def get_all_schedules( :return: dict with source as a key and list of scheduled tasks as a value. """ logger.debug("Started schedule update.") - schedules = await asyncio.gather( + schedules: List[List[ScheduledTask]] = await asyncio.gather( *[get_schedules(source) for source in scheduler.sources], ) - return list(zip(scheduler.sources, schedules)) + return dict(zip(scheduler.sources, schedules)) + +class CronValueError(Exception): + """Raised on invalid cron value.""" -def get_task_delay(task: ScheduledTask) -> Optional[int]: + +def is_cron_task_now( + cron_value: str, + now: datetime, + offset: Union[str, timedelta, None] = None, + last_run: Optional[datetime] = None, +) -> bool: """ - Get delay of the task in seconds. + Checks whether the cron task should start now. - :param task: task to check. - :return: True if task must be sent. + :raises CronValueError: On invalid cron value. """ - now = datetime.now(tz=pytz.UTC) - if task.cron is not None: - # If user specified cron offset we apply it. - # If it's timedelta, we simply add the delta to current time. - if task.cron_offset and isinstance(task.cron_offset, timedelta): - now += task.cron_offset - # If timezone was specified as string we convert it timzone - # offset and then apply. - elif task.cron_offset and isinstance(task.cron_offset, str): - now = now.astimezone(pytz.timezone(task.cron_offset)) - if is_now(task.cron, now): - return 0 - return None - if task.time is not None: - task_time = to_tz_aware(task.time) - if task_time <= now: - return 0 - delay = task_time - now - if delay.microseconds: - return int(delay.total_seconds()) + 1 - return int(delay.total_seconds()) - return None - - -async def delayed_send( + if last_run is not None: + seconds_spend = (now - last_run).total_seconds() + seconds_should_sped = timedelta(minutes=1).total_seconds() + if round(seconds_spend) < round(seconds_should_sped): + return False + # If user specified cron offset we apply it. + # If it's timedelta, we simply add the delta to current time. + if offset and isinstance(offset, timedelta): + now += offset + # If timezone was specified as string we convert it timezone + # offset and then apply. + elif offset and isinstance(offset, str): + now = now.astimezone(pytz.timezone(offset)) + + try: + return pycron.is_now(cron_value, now) + except ValueError as e: + raise CronValueError(e) from e + + +def is_time_task_now( + time_value: datetime, + now: datetime, + last_run: Optional[datetime] = None, +) -> bool: + """Checks whether the time task should start now.""" + if last_run is not None: + return False + + time_value = to_tz_aware(time_value) + now = to_tz_aware(now) + return time_value <= now + + +def is_interval_task_now( + interval_value: Union[int, timedelta], + now: datetime, + last_run: Optional[datetime] = None, +) -> bool: + """Checks whether the interval task should start now.""" + if last_run is None: + return True + + if isinstance(interval_value, int): + interval_value = timedelta(seconds=interval_value) + + seconds_passed: float = (now - last_run).total_seconds() + interval_seconds: float = interval_value.total_seconds() + + return round(seconds_passed) >= round(interval_seconds) + + +async def send( scheduler: TaskiqScheduler, source: ScheduleSource, task: ScheduledTask, - delay: int, ) -> None: """ - Send a task with a delay. - - This function waits for some time and then - sends a task. - - The main idea is that scheduler gathers - tasks every minute and some of them have - specific time. To respect the time, we calculate - the delay and send the task after some delay. + Send a task. :param scheduler: current scheduler. :param source: source of the task. :param task: task to send. - :param delay: how long to wait. """ - logger.debug( - "Waiting %d seconds before sending task %s with schedule_id %s.", - delay, - task.task_name, - task.schedule_id, - ) - if delay > 0: - await asyncio.sleep(delay) logger.info( "Sending task %s with schedule_id %s.", task.task_name, @@ -143,89 +165,187 @@ async def delayed_send( await scheduler.on_ready(source, task) -async def run_scheduler_loop( # noqa: C901 - scheduler: TaskiqScheduler, - interval: Optional[timedelta] = None, -) -> None: - """ - Runs scheduler loop. +async def _sleep_until_next_second() -> None: + now = datetime.now(tz=pytz.UTC) + await asyncio.sleep(1 - now.microsecond / 1_000_000) - This function imports taskiq scheduler - and runs tasks when needed. - :param scheduler: current scheduler. - :param interval: interval to check for schedule updates. - """ - loop = asyncio.get_event_loop() - running_schedules: Dict[str, asyncio.Task[Any]] = {} - ran_cron_jobs: Set[str] = set() - current_minute = datetime.now(tz=pytz.UTC).minute - while True: - now = datetime.now(tz=pytz.UTC) - # If minute changed, we need to clear - # ran_cron_jobs set and update current minute. - if now.minute != current_minute: - current_minute = now.minute - ran_cron_jobs.clear() - # If interval is not None, we need to - # calculate next run time using it. - if interval is not None: - next_run = now + interval - # otherwise we need assume that - # we will run it at the start of the next minute. - # as crontab does. - else: - next_run = (now + timedelta(minutes=1)).replace(second=1, microsecond=0) - scheduled_tasks = await get_all_schedules(scheduler) - for source, task_list in scheduled_tasks: +class SchedulerLoop: + """Abstraction over scheduler loop.""" + + def __init__( + self, + scheduler: TaskiqScheduler, + *, + event_loop: Optional[asyncio.AbstractEventLoop] = None, + ) -> None: + self.scheduler = scheduler + self._event_loop = event_loop or asyncio.get_event_loop() + + # Variables for control the last run of schedules. + self.cron_tasks_last_run: dict[ScheduleId, datetime] = {} + self.interval_tasks_last_run: dict[ScheduleId, datetime] = {} + self.time_tasks_last_run: dict[ScheduleId, datetime] = {} + + self.scheduled_tasks: Dict[ScheduleSource, List[ScheduledTask]] = {} + self.scheduled_tasks_updated_at: Optional[datetime] = None + self._update_schedules_task_future: Optional[asyncio.Task[Any]] = None + + def _update_schedules_task_future_callback(self, task_: asyncio.Task[Any]) -> None: + self.scheduled_tasks = task_.result() + + new_schedules_ids: set[str] = set() + for source, task_list in self.scheduled_tasks.items(): logger.debug("Got %d schedules from source %s.", len(task_list), source) + new_schedules_ids.update({t.schedule_id for t in task_list}) + + # Deleting irrelevant scheduled tasks so they don't take up memory. + for id_ in self.cron_tasks_last_run.keys() - new_schedules_ids: + del self.cron_tasks_last_run[id_] + for id_ in self.interval_tasks_last_run.keys() - new_schedules_ids: + del self.interval_tasks_last_run[id_] + for id_ in self.time_tasks_last_run.keys() - new_schedules_ids: + del self.time_tasks_last_run[id_] + + async def _update_scheduled_tasks(self) -> None: + if ( + self._update_schedules_task_future is not None + and not self._update_schedules_task_future.done() + ): + logger.warning( + "Schedules getting task started " + "before the previous one finished. " + "Consider increasing the update_interval.", + ) + else: + self._update_schedules_task_future = self._event_loop.create_task( + get_all_schedules(self.scheduler), + ) + self._update_schedules_task_future.add_done_callback( + self._update_schedules_task_future_callback, + ) + + def _mark_cron_tasks_as_already_run(self) -> None: + current_minute = datetime.now(tz=pytz.UTC).replace(second=0, microsecond=0) + for _, task_list in self.scheduled_tasks.items(): for task in task_list: - try: - task_delay = get_task_delay(task) - except ValueError: - logger.warning( - "Cannot parse cron: %s for task: %s, schedule_id: %s.", - task.cron, - task.task_name, - task.schedule_id, - ) - continue - # If task delay is None, we don't need to run it. - if task_delay is None: - continue - # If task is delayed for more than next_run, - # we don't need to run it, because we will - # run it in the next iteration. - if now + timedelta(seconds=task_delay) >= next_run: - continue - # If task is already running, we don't need to run it again. - if task.schedule_id in running_schedules and task_delay < 1: - continue - # If task is cron job, we need to check if - # we already ran it this minute. if task.cron is not None: - if task.schedule_id in ran_cron_jobs: - continue - ran_cron_jobs.add(task.schedule_id) - send_task = loop.create_task( - delayed_send(scheduler, source, task, task_delay), - # We need to set the name of the task - # to be able to discard its reference - # after it is done. - name=f"schedule_{task.schedule_id}", + self.cron_tasks_last_run[task.schedule_id] = current_minute + + def _is_schedule_ready_to_send( + self, + task: ScheduledTask, + now: datetime, + ) -> bool: + is_ready_to_send: bool = False + + if not is_ready_to_send and task.cron is not None: + try: + is_ready_to_send = is_cron_task_now( + cron_value=task.cron, + now=now, + offset=task.cron_offset, + last_run=self.cron_tasks_last_run.get(task.schedule_id), ) - running_schedules[task.schedule_id] = send_task - send_task.add_done_callback( - lambda task_future: running_schedules.pop( - task_future.get_name().removeprefix("schedule_"), - ), + except CronValueError: + logger.warning( + "Cannot parse cron: %s for task: %s, schedule_id: %s.", + task.cron, + task.task_name, + task.schedule_id, ) - delay = next_run - datetime.now(tz=pytz.UTC) - logger.debug( - "Sleeping for %.2f seconds before getting schedules.", - delay.total_seconds(), - ) - await asyncio.sleep(delay.total_seconds()) + if is_ready_to_send: + self.cron_tasks_last_run[task.schedule_id] = now + + if not is_ready_to_send and task.interval is not None: + is_ready_to_send = is_interval_task_now( + interval_value=task.interval, + now=now, + last_run=self.interval_tasks_last_run.get(task.schedule_id), + ) + if is_ready_to_send: + self.interval_tasks_last_run[task.schedule_id] = now + + if not is_ready_to_send and task.time is not None: + is_ready_to_send = is_time_task_now( + time_value=task.time, + now=now, + last_run=self.time_tasks_last_run.get(task.schedule_id), + ) + if is_ready_to_send: + self.time_tasks_last_run[task.schedule_id] = now + + return is_ready_to_send + + async def run( + self, + *, + update_interval: Optional[timedelta] = None, + loop_interval: Optional[timedelta] = None, + skip_first_run: bool = False, + ) -> None: + """ + Runs scheduler loop. + + This function imports taskiq scheduler + and runs tasks when needed. + + :param update_interval: interval to check for schedule updates. + :param loop_interval: interval to check tasks to send. + :param skip_first_run: Wait for the beginning of the next minute + to skip the first run. + """ + if update_interval is None: + update_interval = timedelta(minutes=1) + if loop_interval is None: + loop_interval = timedelta(seconds=1) + + running_schedules: Dict[str, asyncio.Task[Any]] = {} + + self.scheduled_tasks = await get_all_schedules(self.scheduler) + self.scheduled_tasks_updated_at = datetime.now(tz=pytz.UTC) + + if skip_first_run: + self._mark_cron_tasks_as_already_run() + + await _sleep_until_next_second() + + while True: + now = datetime.now(tz=pytz.UTC) + next_run = (now + loop_interval).replace(microsecond=0) + + if now - self.scheduled_tasks_updated_at >= update_interval: + await self._update_scheduled_tasks() + self.scheduled_tasks_updated_at = now + + for source, task_list in self.scheduled_tasks.items(): + for task in task_list: + is_ready_to_send: bool = self._is_schedule_ready_to_send( + task=task, + now=now, + ) + + if is_ready_to_send and task.schedule_id not in running_schedules: + send_task = self._event_loop.create_task( + send(self.scheduler, source, task), + # We need to set the name of the task + # to be able to discard its reference + # after it is done. + name=f"schedule_{task.schedule_id}", + ) + running_schedules[task.schedule_id] = send_task + send_task.add_done_callback( + lambda task_future: running_schedules.pop( + task_future.get_name().removeprefix("schedule_"), + ), + ) + + delay = next_run - datetime.now(tz=pytz.UTC) + logger.debug( + "Sleeping for %.3f seconds before getting schedules.", + delay.total_seconds(), + ) + await asyncio.sleep(delay.total_seconds()) async def run_scheduler(args: SchedulerArgs) -> None: @@ -265,27 +385,25 @@ async def run_scheduler(args: SchedulerArgs) -> None: for source in scheduler.sources: await source.startup() - interval = None - if args.update_interval: - interval = timedelta(seconds=args.update_interval) + update_interval = timedelta(seconds=60) + if args.update_interval is not None: + update_interval = timedelta(seconds=args.update_interval) + + loop_interval = timedelta(seconds=1) + if args.loop_interval is not None: + loop_interval = timedelta(seconds=args.loop_interval) logger.info("Starting scheduler.") await scheduler.startup() logger.info("Startup completed.") - if args.skip_first_run: - next_minute = datetime.now(timezone.utc).replace( - second=0, - microsecond=0, - ) + timedelta( - minutes=1, - ) - delay = next_minute - datetime.now(timezone.utc) - delay_secs = int(delay.total_seconds()) - logger.info(f"Skipping first run. Waiting {delay_secs} seconds.") - await asyncio.sleep(delay.total_seconds()) - logger.info("First run skipped. The scheduler is now running.") + + scheduler_loop = SchedulerLoop(scheduler) try: - await run_scheduler_loop(scheduler, interval) + await scheduler_loop.run( + update_interval=update_interval, + loop_interval=loop_interval, + skip_first_run=args.skip_first_run, + ) except asyncio.CancelledError: logger.warning("Shutting down scheduler.") await scheduler.shutdown() diff --git a/taskiq/decor.py b/taskiq/decor.py index 6222ac77..6f0fdb69 100644 --- a/taskiq/decor.py +++ b/taskiq/decor.py @@ -1,6 +1,6 @@ import sys from collections.abc import Coroutine -from datetime import datetime +from datetime import datetime, timedelta from types import CoroutineType from typing import ( TYPE_CHECKING, @@ -169,6 +169,32 @@ async def schedule_by_cron( **kwargs, ) + async def schedule_by_interval( + self, + source: "ScheduleSource", + interval: Union[int, timedelta], + *args: _FuncParams.args, + **kwargs: _FuncParams.kwargs, + ) -> CreatedSchedule[_ReturnType]: + """ + Schedule the task to start using an interval. + + This method requires a schedule source, + which is capable of dynamically adding new schedules. + + :param source: schedule source. + :param interval: interval in seconds or timedelta instance. + :param args: function's arguments. + :param kwargs: function's key word arguments. + :return: schedule id. + """ + return await self.kicker().schedule_by_interval( + source, + interval, + *args, + **kwargs, + ) + async def schedule_by_time( self, source: "ScheduleSource", diff --git a/taskiq/kicker.py b/taskiq/kicker.py index bdc62dae..b36cd448 100644 --- a/taskiq/kicker.py +++ b/taskiq/kicker.py @@ -1,6 +1,6 @@ from collections.abc import Coroutine from dataclasses import asdict, is_dataclass -from datetime import datetime +from datetime import datetime, timedelta from logging import getLogger from types import CoroutineType from typing import ( @@ -214,6 +214,38 @@ async def schedule_by_cron( await source.add_schedule(scheduled) return CreatedSchedule(self, source, scheduled) + async def schedule_by_interval( + self, + source: "ScheduleSource", + interval: Union[int, timedelta], + *args: _FuncParams.args, + **kwargs: _FuncParams.kwargs, + ) -> CreatedSchedule[_ReturnType]: + """ + Function to schedule task using an interval. + + :param source: schedule source. + :param interval: interval in seconds or timedelta instance. + :param args: function's args. + :param kwargs: function's kwargs. + + :return: schedule id. + """ + schedule_id = self.custom_schedule_id + if schedule_id is None: + schedule_id = self.broker.id_generator() + message = self._prepare_message(*args, **kwargs) + scheduled = ScheduledTask( + schedule_id=schedule_id, + task_name=message.task_name, + labels=message.labels, + args=message.args, + kwargs=message.kwargs, + interval=interval, + ) + await source.add_schedule(scheduled) + return CreatedSchedule(self, source, scheduled) + async def schedule_by_time( self, source: "ScheduleSource", diff --git a/taskiq/schedule_sources/label_based.py b/taskiq/schedule_sources/label_based.py index 94fd42de..83470bf4 100644 --- a/taskiq/schedule_sources/label_based.py +++ b/taskiq/schedule_sources/label_based.py @@ -43,7 +43,7 @@ async def startup(self) -> None: ) continue for schedule in task.labels.get("schedule", []): - if "cron" not in schedule and "time" not in schedule: + if not {"cron", "interval", "time"} & schedule.keys(): continue labels = schedule.get("labels", {}) labels.update(task.labels) @@ -58,6 +58,7 @@ async def startup(self) -> None: cron=schedule.get("cron"), time=schedule.get("time"), cron_offset=schedule.get("cron_offset"), + interval=schedule.get("interval"), ) return await super().startup() diff --git a/taskiq/scheduler/created_schedule.py b/taskiq/scheduler/created_schedule.py index 2c03c3f6..e9d4480c 100644 --- a/taskiq/scheduler/created_schedule.py +++ b/taskiq/scheduler/created_schedule.py @@ -61,6 +61,7 @@ def __str__(self) -> str: "CreatedSchedule<" f"id={self.schedule_id}, " f"time={self.task.time}, " + f"interval={self.task.interval}, " f"cron={self.task.cron}, " f"cron_offset={self.task.cron_offset or 'UTC'}, " f"task_name={self.task.task_name}, " diff --git a/taskiq/scheduler/scheduled_task/v1.py b/taskiq/scheduler/scheduled_task/v1.py index 5209f61e..09d0efab 100644 --- a/taskiq/scheduler/scheduled_task/v1.py +++ b/taskiq/scheduler/scheduled_task/v1.py @@ -16,15 +16,18 @@ class ScheduledTask(BaseModel): cron: Optional[str] = None cron_offset: Optional[Union[str, timedelta]] = None time: Optional[datetime] = None + interval: Union[int, timedelta, None] = None @root_validator(pre=False) # type: ignore @classmethod def __check(cls, values: Dict[str, Any]) -> Dict[str, Any]: - """ - This method validates, that either `cron` or `time` field is present. + """Validate. + + This method validates, + that either `cron`, `interval` or `time` field is present. - :raises ValueError: if cron and time are none. + :raises ValueError: if cron, interval and time are none. """ - if values.get("cron") is None and values.get("time") is None: - raise ValueError("Either cron or datetime must be present.") + if not {"cron", "interval", "time"} & values.keys(): + raise ValueError("Either cron, interval, or datetime must be present.") return values diff --git a/taskiq/scheduler/scheduled_task/v2.py b/taskiq/scheduler/scheduled_task/v2.py index 332dce5d..0efe559e 100644 --- a/taskiq/scheduler/scheduled_task/v2.py +++ b/taskiq/scheduler/scheduled_task/v2.py @@ -17,14 +17,17 @@ class ScheduledTask(BaseModel): cron: Optional[str] = None cron_offset: Optional[Union[str, timedelta]] = None time: Optional[datetime] = None + interval: Union[int, timedelta, None] = None @model_validator(mode="after") def __check(self) -> Self: - """ - This method validates, that either `cron` or `time` field is present. + """Validate. + + This method validates, + that either `cron`, `interval` or `time` field is present. - :raises ValueError: if cron and time are none. + :raises ValueError: if cron, interval and time are none. """ - if self.cron is None and self.time is None: - raise ValueError("Either cron or datetime must be present.") + if self.cron is None and self.time is None and self.interval is None: + raise ValueError("Either cron, interval, or datetime must be present.") return self diff --git a/taskiq/scheduler/scheduler.py b/taskiq/scheduler/scheduler.py index b2484243..e17a37d3 100644 --- a/taskiq/scheduler/scheduler.py +++ b/taskiq/scheduler/scheduler.py @@ -37,7 +37,8 @@ async def on_ready(self, source: "ScheduleSource", task: ScheduledTask) -> None: """ This method is called when task is ready to be enqueued. - It's triggered on proper time depending on `task.cron` or `task.time` attribute. + It's triggered on proper time depending on `task.cron`, `task.interval` + or `task.time` attribute. :param source: source that triggered this event. :param task: task to send """ diff --git a/tests/api/test_scheduler.py b/tests/api/test_scheduler.py index 96546d38..1ab5e71a 100644 --- a/tests/api/test_scheduler.py +++ b/tests/api/test_scheduler.py @@ -20,7 +20,7 @@ async def test_successful() -> None: def _() -> None: ... - msg = await asyncio.wait_for(broker.queue.get(), 0.3) + msg = await asyncio.wait_for(broker.queue.get(), 1) assert msg scheduler_task.cancel() @@ -37,7 +37,7 @@ def _() -> None: scheduler_task = asyncio.create_task(run_scheduler_task(scheduler)) - msg = await asyncio.wait_for(broker.queue.get(), 0.3) + msg = await asyncio.wait_for(broker.queue.get(), 1) assert msg scheduler_task.cancel() diff --git a/tests/cli/scheduler/test_is_cron_task_now.py b/tests/cli/scheduler/test_is_cron_task_now.py new file mode 100644 index 00000000..ac7e4106 --- /dev/null +++ b/tests/cli/scheduler/test_is_cron_task_now.py @@ -0,0 +1,83 @@ +from datetime import datetime, timedelta, timezone +from typing import Optional, Union + +import pytest +import pytz +from tzlocal import get_localzone + +from taskiq.cli.scheduler.run import CronValueError, is_cron_task_now + + +def test_should_run_success() -> None: + now = datetime.now(timezone.utc) + is_now = is_cron_task_now( + cron_value=f"* {now.hour} * * *", + now=now, + ) + assert is_now + + +def test_should_run_cron_str_offset() -> None: + now = datetime.now(timezone.utc) + hour = datetime.now().hour + zone = get_localzone() + is_now = is_cron_task_now( + cron_value=f"* {hour} * * *", + offset=str(zone), + now=now, + ) + assert is_now + + +def test_should_run_cron_td_offset() -> None: + offset = 2 + now = datetime.now(timezone.utc) + hour = (now.hour + offset) % 24 + is_now = is_cron_task_now( + cron_value=f"* {hour} * * *", + offset=timedelta(hours=offset), + now=now, + ) + assert is_now + + +@pytest.mark.parametrize( + "cron_value,now,offset,last_run,expected", + [ + ("* * * * *", datetime(2023, 1, 1, 0, 0, 30), None, None, True), + ("* * * * *", datetime(2023, 1, 1, 0, 0, 30), timedelta(hours=1), None, True), + ("* * * * *", datetime(2023, 1, 1, 0, 0, 30), "US/Eastern", None, True), + ( + "* * * * *", + datetime(2023, 1, 1, 0, 0, 30), + None, + datetime(2023, 1, 1, 0, 0, 0), + False, + ), + ( + "* * * * *", + datetime(2023, 1, 1, 0, 0, 30), + None, + datetime(2023, 1, 1, 0, 0, 29), + False, + ), + ("0 * * * *", datetime(2023, 1, 1, 0, 30, 0), None, None, False), + ], +) +def test_is_cron_task_now( + cron_value: str, + now: datetime, + offset: Union[str, timedelta, None], + last_run: Optional[datetime], + expected: bool, +) -> None: + now = pytz.UTC.localize(now) + if last_run: + last_run = pytz.UTC.localize(last_run) + + assert is_cron_task_now(cron_value, now, offset, last_run) == expected + + +def test_is_cron_task_now_invalid_cron() -> None: + with pytest.raises(CronValueError): + is_cron_task_now("invalid cron", datetime.now()) diff --git a/tests/cli/scheduler/test_is_interval_task_now.py b/tests/cli/scheduler/test_is_interval_task_now.py new file mode 100644 index 00000000..39b7f31c --- /dev/null +++ b/tests/cli/scheduler/test_is_interval_task_now.py @@ -0,0 +1,34 @@ +from datetime import datetime, timedelta +from typing import Optional, Union + +import pytest + +from taskiq.cli.scheduler.run import is_interval_task_now + + +@pytest.mark.parametrize( + "interval_value,now,last_run,expected", + [ + ( + timedelta(seconds=10), + datetime(2023, 1, 1, 0, 0, 15), + datetime(2023, 1, 1, 0, 0, 0), + True, + ), + (10, datetime(2023, 1, 1, 0, 0, 15), datetime(2023, 1, 1, 0, 0, 0), True), + ( + timedelta(seconds=10), + datetime(2023, 1, 1, 0, 0, 5), + datetime(2023, 1, 1, 0, 0, 0), + False, + ), + (timedelta(seconds=10), datetime(2023, 1, 1, 0, 0, 0), None, True), + ], +) +def test_is_interval_task_now( + interval_value: Union[int, timedelta], + now: datetime, + last_run: Optional[datetime], + expected: bool, +) -> None: + assert is_interval_task_now(interval_value, now, last_run) == expected diff --git a/tests/cli/scheduler/test_is_time_task_now.py b/tests/cli/scheduler/test_is_time_task_now.py new file mode 100644 index 00000000..cb30471d --- /dev/null +++ b/tests/cli/scheduler/test_is_time_task_now.py @@ -0,0 +1,54 @@ +from datetime import datetime, timedelta +from typing import Optional + +import pytest +import pytz +from tzlocal import get_localzone + +from taskiq.cli.scheduler.run import is_time_task_now + + +def test_time_utc_without_zone() -> None: + now = datetime.now() + is_now = is_time_task_now( + time_value=now - timedelta(seconds=1), + now=now, + ) + assert is_now + + +def test_time_utc_with_zone() -> None: + now = datetime.now(tz=pytz.UTC) + is_now = is_time_task_now( + time_value=now - timedelta(seconds=1), + now=now, + ) + assert is_now + + +def test_time_utc_with_local_zone() -> None: + localtz = get_localzone() + now = datetime.now(tz=localtz) + is_now = is_time_task_now( + time_value=now - timedelta(seconds=1), + now=now, + ) + assert is_now + + +@pytest.mark.parametrize( + "time_value,now,last_run,expected", + [ + (datetime(2023, 1, 1), datetime(2023, 1, 2), None, True), + (datetime(2023, 1, 1), datetime(2023, 1, 1), None, True), + (datetime(2023, 1, 2), datetime(2023, 1, 1), None, False), + (datetime(2023, 1, 1), datetime(2023, 1, 2), datetime(2023, 1, 1), False), + ], +) +def test_is_time_task_now( + time_value: datetime, + now: datetime, + last_run: Optional[datetime], + expected: bool, +) -> None: + assert is_time_task_now(time_value, now, last_run) == expected diff --git a/tests/cli/scheduler/test_task_delays.py b/tests/cli/scheduler/test_task_delays.py deleted file mode 100644 index 1087fe52..00000000 --- a/tests/cli/scheduler/test_task_delays.py +++ /dev/null @@ -1,152 +0,0 @@ -import datetime - -import pytz -from freezegun import freeze_time -from tzlocal import get_localzone - -from taskiq.cli.scheduler.run import get_task_delay -from taskiq.scheduler.scheduled_task import ScheduledTask - - -def test_should_run_success() -> None: - hour = datetime.datetime.now(datetime.timezone.utc).hour - delay = get_task_delay( - ScheduledTask( - task_name="", - labels={}, - args=[], - kwargs={}, - cron=f"* {hour} * * *", - ), - ) - assert delay is not None and delay >= 0 - - -def test_should_run_cron_str_offset() -> None: - hour = datetime.datetime.now().hour - zone = get_localzone() - delay = get_task_delay( - ScheduledTask( - task_name="", - labels={}, - args=[], - kwargs={}, - cron=f"* {hour} * * *", - cron_offset=str(zone), - ), - ) - assert delay is not None and delay >= 0 - - -def test_should_run_cron_td_offset() -> None: - offset = 2 - hour = (datetime.datetime.now(datetime.timezone.utc).hour + offset) % 24 - delay = get_task_delay( - ScheduledTask( - task_name="", - labels={}, - args=[], - kwargs={}, - cron=f"* {hour} * * *", - cron_offset=datetime.timedelta(hours=offset), - ), - ) - assert delay is not None and delay >= 0 - - -def test_time_utc_without_zone() -> None: - time = datetime.datetime.now() - delay = get_task_delay( - ScheduledTask( - task_name="", - labels={}, - args=[], - kwargs={}, - time=time - datetime.timedelta(seconds=1), - ), - ) - assert delay is not None and delay >= 0 - - -def test_time_utc_with_zone() -> None: - time = datetime.datetime.now(tz=pytz.UTC) - delay = get_task_delay( - ScheduledTask( - task_name="", - labels={}, - args=[], - kwargs={}, - time=time - datetime.timedelta(seconds=1), - ), - ) - assert delay is not None and delay >= 0 - - -def test_time_utc_with_local_zone() -> None: - localtz = get_localzone() - time = datetime.datetime.now(tz=localtz) - delay = get_task_delay( - ScheduledTask( - task_name="", - labels={}, - args=[], - kwargs={}, - time=time - datetime.timedelta(seconds=1), - ), - ) - assert delay is not None and delay >= 0 - - -@freeze_time("2023-01-14 12:00:00") -def test_time_localtime_without_zone() -> None: - time = datetime.datetime.now(tz=pytz.FixedOffset(240)).replace(tzinfo=None) - time_to_run = time - datetime.timedelta(seconds=1) - - delay = get_task_delay( - ScheduledTask( - task_name="", - labels={}, - args=[], - kwargs={}, - time=time_to_run, - ), - ) - - expected_delay = time_to_run.replace(tzinfo=pytz.UTC) - datetime.datetime.now( - pytz.UTC, - ) - - assert delay == int(expected_delay.total_seconds()) - - -@freeze_time("2023-01-14 12:00:00") -def test_time_delay() -> None: - time = datetime.datetime.now(tz=pytz.UTC) + datetime.timedelta(seconds=15) - delay = get_task_delay( - ScheduledTask( - task_name="", - labels={}, - args=[], - kwargs={}, - time=time, - ), - ) - assert delay is not None and delay == 15 - - -@freeze_time("2023-01-14 12:00:00.05") -def test_time_delay_with_milliseconds() -> None: - time = datetime.datetime.now(tz=pytz.UTC) + datetime.timedelta( - seconds=15, - milliseconds=150, - ) - delay = get_task_delay( - ScheduledTask( - task_name="", - labels={}, - args=[], - kwargs={}, - time=time, - ), - ) - assert delay is not None and delay == 16 diff --git a/tests/cli/scheduler/test_to_tz_aware.py b/tests/cli/scheduler/test_to_tz_aware.py new file mode 100644 index 00000000..f408f27b --- /dev/null +++ b/tests/cli/scheduler/test_to_tz_aware.py @@ -0,0 +1,24 @@ +from datetime import datetime + +import pytest +import pytz + +from taskiq.cli.scheduler.run import to_tz_aware + + +@pytest.mark.parametrize( + "input_time,expected_tz", + [ + (datetime(2023, 1, 1), pytz.UTC), + ( + datetime(2023, 1, 1, tzinfo=pytz.timezone("US/Eastern")), + pytz.timezone("US/Eastern"), + ), + ], +) +def test_to_tz_aware( + input_time: datetime, + expected_tz: pytz.BaseTzInfo, +) -> None: + result = to_tz_aware(input_time) + assert result.tzinfo == expected_tz diff --git a/tests/cli/scheduler/test_updater.py b/tests/cli/scheduler/test_updater.py index 2ac9ef8d..c2a7b9e5 100644 --- a/tests/cli/scheduler/test_updater.py +++ b/tests/cli/scheduler/test_updater.py @@ -56,10 +56,10 @@ async def test_get_schedules_success() -> None: schedules = await get_all_schedules( TaskiqScheduler(InMemoryBroker(), sources), ) - assert schedules == [ - (sources[0], schedules1), - (sources[1], schedules2), - ] + assert schedules == { + sources[0]: schedules1, + sources[1]: schedules2, + } @pytest.mark.anyio @@ -81,7 +81,7 @@ async def test_get_schedules_error() -> None: schedules = await get_all_schedules( TaskiqScheduler(InMemoryBroker(), [source1, source2]), ) - assert schedules == [ - (source1, source1.schedules), - (source2, []), - ] + assert schedules == { + source1: source1.schedules, + source2: [], + }