Skip to content

Commit e66d3e2

Browse files
authored
Added datetime schedule. (#60)
Signed-off-by: Pavel Kirilin <[email protected]>
1 parent e713043 commit e66d3e2

File tree

3 files changed

+32
-8
lines changed

3 files changed

+32
-8
lines changed

taskiq/cli/scheduler/run.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,20 @@ async def schedules_updater(
4747
await asyncio.sleep(scheduler.refresh_delay)
4848

4949

50+
def should_run(task: ScheduledTask) -> bool:
51+
"""
52+
Checks if it's time to run a task.
53+
54+
:param task: task to check.
55+
:return: True if task must be sent.
56+
"""
57+
if task.cron is not None:
58+
return is_now(task.cron)
59+
if task.time is not None:
60+
return task.time <= datetime.utcnow()
61+
return False
62+
63+
5064
async def run_scheduler(args: SchedulerArgs) -> None: # noqa: C901, WPS210, WPS213
5165
"""
5266
Runs scheduler loop.
@@ -83,7 +97,7 @@ async def run_scheduler(args: SchedulerArgs) -> None: # noqa: C901, WPS210, WPS
8397
not_fired_tasks = []
8498
for task in tasks:
8599
try:
86-
ready = is_now(task.cron)
100+
ready = should_run(task)
87101
except ValueError:
88102
logger.warning(
89103
"Cannot parse cron: %s for task: %s",

taskiq/cli/utils.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from importlib import import_module
55
from logging import getLogger
66
from pathlib import Path
7-
from typing import Any, Generator
7+
from typing import Any, Generator, List
88

99
logger = getLogger("taskiq.worker")
1010

@@ -51,7 +51,7 @@ def import_object(object_spec: str) -> Any:
5151
return getattr(module, import_spec[1])
5252

5353

54-
def import_from_modules(modules: list[str]) -> None:
54+
def import_from_modules(modules: List[str]) -> None:
5555
"""
5656
Import all modules from modules variable.
5757
@@ -66,7 +66,7 @@ def import_from_modules(modules: list[str]) -> None:
6666
logger.warning(f"Cannot import {module}")
6767

6868

69-
def import_tasks(modules: list[str], pattern: str, fs_discover: bool) -> None:
69+
def import_tasks(modules: List[str], pattern: str, fs_discover: bool) -> None:
7070
"""
7171
Import tasks modules.
7272

taskiq/scheduler/scheduler.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1-
from dataclasses import dataclass
2-
from typing import TYPE_CHECKING, Any, Callable, Dict, List
1+
from dataclasses import dataclass, field
2+
from datetime import datetime
3+
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional
34

45
from taskiq.abc.broker import AsyncBroker
56
from taskiq.scheduler.merge_functions import preserve_all
@@ -16,7 +17,17 @@ class ScheduledTask:
1617
labels: Dict[str, Any]
1718
args: List[Any]
1819
kwargs: Dict[str, Any]
19-
cron: str
20+
cron: Optional[str] = field(default=None)
21+
time: Optional[datetime] = field(default=None)
22+
23+
def __post_init__(self) -> None:
24+
"""
25+
This method validates, that either `cron` or `time` field is present.
26+
27+
:raises ValueError: if cron and time are none.
28+
"""
29+
if self.cron is None and self.time is None:
30+
raise ValueError("Either cron or datetime must be present.")
2031

2132

2233
class TaskiqScheduler:
@@ -44,4 +55,3 @@ async def startup(self) -> None: # pragma: no cover
4455
Here you can do stuff, like creating
4556
connections or anything you'd like.
4657
"""
47-
await self.broker.startup()

0 commit comments

Comments
 (0)