Skip to content

Commit 1d16c3e

Browse files
authored
Added timezone-aware schedule support. (#198)
1 parent ce1f056 commit 1d16c3e

File tree

6 files changed

+442
-191
lines changed

6 files changed

+442
-191
lines changed

poetry.lock

Lines changed: 294 additions & 187 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ uvloop = { version = ">=0.16.0,<1", optional = true }
4343
# For hot-reload.
4444
watchdog = { version = "^2.1.9", optional = true }
4545
gitignore-parser = { version = "^0", optional = true }
46+
pytz = "*"
4647

4748
[tool.poetry.dev-dependencies]
4849
pytest = "^7.1.2"
@@ -62,6 +63,8 @@ wemake-python-styleguide = "^0.18.0"
6263
tox = "^4.6.4"
6364
freezegun = "^1.2.2"
6465
pytest-mock = "^3.11.1"
66+
tzlocal = "^5.0.1"
67+
types-tzlocal = "^5.0.1.1"
6568

6669
[tool.poetry.extras]
6770
zmq = ["pyzmq"]

taskiq/cli/scheduler/run.py

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from logging import basicConfig, getLevelName, getLogger
44
from typing import List
55

6+
import pytz
67
from pycron import is_now
78

89
from taskiq.cli.scheduler.args import SchedulerArgs
@@ -12,6 +13,22 @@
1213
logger = getLogger(__name__)
1314

1415

16+
def to_tz_aware(time: datetime) -> datetime:
17+
"""
18+
Convert datetime to timezone aware.
19+
20+
This function takes a datetime and if
21+
timezone was not yet specified, it will
22+
be set to UTC.
23+
24+
:param time: time to make timezone aware.
25+
:return: timezone aware time.
26+
"""
27+
if time.tzinfo is None:
28+
return time.replace(tzinfo=pytz.UTC)
29+
return time
30+
31+
1532
async def schedules_updater(
1633
scheduler: TaskiqScheduler,
1734
current_schedules: List[ScheduledTask],
@@ -55,9 +72,18 @@ def should_run(task: ScheduledTask) -> bool:
5572
:return: True if task must be sent.
5673
"""
5774
if task.cron is not None:
58-
return is_now(task.cron, datetime.utcnow())
75+
now = datetime.now(tz=pytz.UTC)
76+
# If user specified cron offset we apply it.
77+
# If it's timedelta, we simply add the delta to current time.
78+
if task.cron_offset and isinstance(task.cron_offset, timedelta):
79+
now += task.cron_offset
80+
# If timezone was specified as string we convert it timzone
81+
# offset and then apply.
82+
elif task.cron_offset and isinstance(task.cron_offset, str):
83+
now = now.astimezone(pytz.timezone(task.cron_offset))
84+
return is_now(task.cron, now)
5985
if task.time is not None:
60-
return task.time <= datetime.utcnow()
86+
return to_tz_aware(task.time) <= datetime.now(tz=pytz.UTC)
6187
return False
6288

6389

taskiq/schedule_sources/label_based.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ async def get_schedules(self) -> List["ScheduledTask"]:
4343
kwargs=schedule.get("kwargs", {}),
4444
cron=schedule.get("cron"),
4545
time=schedule.get("time"),
46+
cron_offset=schedule.get("cron_offset"),
4647
source=self,
4748
),
4849
)

taskiq/scheduler/scheduler.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from dataclasses import dataclass, field
2-
from datetime import datetime
3-
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional
2+
from datetime import datetime, timedelta
3+
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Union
44

55
from taskiq.abc.broker import AsyncBroker
66
from taskiq.kicker import AsyncKicker
@@ -21,6 +21,7 @@ class ScheduledTask:
2121
kwargs: Dict[str, Any]
2222
source: "ScheduleSource" # Backward point to source which this task belongs to
2323
cron: Optional[str] = field(default=None)
24+
cron_offset: Optional[Union[str, timedelta]] = field(default=None)
2425
time: Optional[datetime] = field(default=None)
2526

2627
def __post_init__(self) -> None:

tests/cli/scheduler/test_is_now.py

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
import datetime
2+
3+
import pytz
4+
from tzlocal import get_localzone
5+
6+
from taskiq.cli.scheduler.run import should_run
7+
from taskiq.schedule_sources.label_based import LabelScheduleSource
8+
from taskiq.scheduler.scheduler import ScheduledTask
9+
10+
DUMMY_SOURCE = LabelScheduleSource(broker=None) # type: ignore
11+
12+
13+
def test_should_run_success() -> None:
14+
hour = datetime.datetime.utcnow().hour
15+
assert should_run(
16+
ScheduledTask(
17+
task_name="",
18+
labels={},
19+
args=[],
20+
kwargs={},
21+
source=DUMMY_SOURCE,
22+
cron=f"* {hour} * * *",
23+
),
24+
)
25+
26+
27+
def test_should_run_cron_str_offset() -> None:
28+
hour = datetime.datetime.now().hour
29+
zone = get_localzone()
30+
assert should_run(
31+
ScheduledTask(
32+
task_name="",
33+
labels={},
34+
args=[],
35+
kwargs={},
36+
source=DUMMY_SOURCE,
37+
cron=f"* {hour} * * *",
38+
cron_offset=str(zone),
39+
),
40+
)
41+
42+
43+
def test_should_run_cron_td_offset() -> None:
44+
offset = 2
45+
hour = datetime.datetime.utcnow().hour + offset
46+
assert should_run(
47+
ScheduledTask(
48+
task_name="",
49+
labels={},
50+
args=[],
51+
kwargs={},
52+
source=DUMMY_SOURCE,
53+
cron=f"* {hour} * * *",
54+
cron_offset=datetime.timedelta(hours=offset),
55+
),
56+
)
57+
58+
59+
def test_time_utc_without_zone() -> None:
60+
time = datetime.datetime.utcnow()
61+
assert should_run(
62+
ScheduledTask(
63+
task_name="",
64+
labels={},
65+
args=[],
66+
kwargs={},
67+
source=DUMMY_SOURCE,
68+
time=time - datetime.timedelta(seconds=1),
69+
),
70+
)
71+
72+
73+
def test_time_utc_with_zone() -> None:
74+
time = datetime.datetime.now(tz=pytz.UTC)
75+
assert should_run(
76+
ScheduledTask(
77+
task_name="",
78+
labels={},
79+
args=[],
80+
kwargs={},
81+
source=DUMMY_SOURCE,
82+
time=time - datetime.timedelta(seconds=1),
83+
),
84+
)
85+
86+
87+
def test_time_utc_with_local_zone() -> None:
88+
localtz = get_localzone()
89+
time = datetime.datetime.now(tz=localtz)
90+
assert should_run(
91+
ScheduledTask(
92+
task_name="",
93+
labels={},
94+
args=[],
95+
kwargs={},
96+
source=DUMMY_SOURCE,
97+
time=time - datetime.timedelta(seconds=1),
98+
),
99+
)
100+
101+
102+
def test_time_localtime_without_zone() -> None:
103+
time = datetime.datetime.now(tz=pytz.FixedOffset(240)).replace(tzinfo=None)
104+
assert not should_run(
105+
ScheduledTask(
106+
task_name="",
107+
labels={},
108+
args=[],
109+
kwargs={},
110+
source=DUMMY_SOURCE,
111+
time=time - datetime.timedelta(seconds=1),
112+
),
113+
)

0 commit comments

Comments
 (0)