Skip to content

Commit 27bddb7

Browse files
authored
Fixed delay for specific time values. (#200)
1 parent 915df3e commit 27bddb7

File tree

2 files changed

+76
-20
lines changed

2 files changed

+76
-20
lines changed

taskiq/cli/scheduler/run.py

Lines changed: 44 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import asyncio
22
from datetime import datetime, timedelta
33
from logging import basicConfig, getLevelName, getLogger
4-
from typing import List
4+
from typing import List, Optional
55

66
import pytz
77
from pycron import is_now
@@ -64,15 +64,15 @@ async def schedules_updater(
6464
await asyncio.sleep(scheduler.refresh_delay)
6565

6666

67-
def should_run(task: ScheduledTask) -> bool:
67+
def get_task_delay(task: ScheduledTask) -> Optional[int]: # noqa: C901
6868
"""
69-
Checks if it's time to run a task.
69+
Get delay of the task in seconds.
7070
7171
:param task: task to check.
7272
:return: True if task must be sent.
7373
"""
74+
now = datetime.now(tz=pytz.UTC)
7475
if task.cron is not None:
75-
now = datetime.now(tz=pytz.UTC)
7676
# If user specified cron offset we apply it.
7777
# If it's timedelta, we simply add the delta to current time.
7878
if task.cron_offset and isinstance(task.cron_offset, timedelta):
@@ -81,10 +81,43 @@ def should_run(task: ScheduledTask) -> bool:
8181
# offset and then apply.
8282
elif task.cron_offset and isinstance(task.cron_offset, str):
8383
now = now.astimezone(pytz.timezone(task.cron_offset))
84-
return is_now(task.cron, now)
85-
if task.time is not None:
86-
return to_tz_aware(task.time) <= datetime.now(tz=pytz.UTC)
87-
return False
84+
if is_now(task.cron, now):
85+
return 0
86+
return None
87+
elif task.time is not None:
88+
task_time = to_tz_aware(task.time).replace(microsecond=0)
89+
if task_time <= now:
90+
return 0
91+
one_min_ahead = (now + timedelta(minutes=1)).replace(second=1, microsecond=0)
92+
if task_time <= one_min_ahead:
93+
return int((task_time - now).total_seconds())
94+
return None
95+
96+
97+
async def delayed_send(
98+
scheduler: TaskiqScheduler,
99+
task: ScheduledTask,
100+
delay: int,
101+
) -> None:
102+
"""
103+
Send a task with a delay.
104+
105+
This function waits for some time and then
106+
sends a task.
107+
108+
The main idea is that scheduler gathers
109+
tasks every minute and some of them have
110+
specfic time. To respect the time, we calculate
111+
the delay and send the task after some delay.
112+
113+
:param scheduler: current scheduler.
114+
:param task: task to send.
115+
:param delay: how long to wait.
116+
"""
117+
if delay > 0:
118+
await asyncio.sleep(delay)
119+
logger.info("Sending task %s.", task.task_name)
120+
await scheduler.on_ready(task)
88121

89122

90123
async def _run_loop(scheduler: TaskiqScheduler) -> None:
@@ -105,17 +138,16 @@ async def _run_loop(scheduler: TaskiqScheduler) -> None:
105138
while True: # noqa: WPS457
106139
for task in tasks:
107140
try:
108-
ready = should_run(task)
141+
task_delay = get_task_delay(task)
109142
except ValueError:
110143
logger.warning(
111144
"Cannot parse cron: %s for task: %s",
112145
task.cron,
113146
task.task_name,
114147
)
115148
continue
116-
if ready:
117-
logger.info("Sending task %s.", task.task_name)
118-
loop.create_task(scheduler.on_ready(task))
149+
if task_delay is not None:
150+
loop.create_task(delayed_send(scheduler, task, task_delay))
119151

120152
delay = (
121153
datetime.now().replace(second=1, microsecond=0)

tests/cli/scheduler/test_is_now.py renamed to tests/cli/scheduler/test_task_delays.py

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
import datetime
22

33
import pytz
4+
from freezegun import freeze_time
45
from tzlocal import get_localzone
56

6-
from taskiq.cli.scheduler.run import should_run
7+
from taskiq.cli.scheduler.run import get_task_delay
78
from taskiq.schedule_sources.label_based import LabelScheduleSource
89
from taskiq.scheduler.scheduler import ScheduledTask
910

@@ -12,7 +13,7 @@
1213

1314
def test_should_run_success() -> None:
1415
hour = datetime.datetime.utcnow().hour
15-
assert should_run(
16+
delay = get_task_delay(
1617
ScheduledTask(
1718
task_name="",
1819
labels={},
@@ -22,12 +23,13 @@ def test_should_run_success() -> None:
2223
cron=f"* {hour} * * *",
2324
),
2425
)
26+
assert delay is not None and delay >= 0
2527

2628

2729
def test_should_run_cron_str_offset() -> None:
2830
hour = datetime.datetime.now().hour
2931
zone = get_localzone()
30-
assert should_run(
32+
delay = get_task_delay(
3133
ScheduledTask(
3234
task_name="",
3335
labels={},
@@ -38,12 +40,13 @@ def test_should_run_cron_str_offset() -> None:
3840
cron_offset=str(zone),
3941
),
4042
)
43+
assert delay is not None and delay >= 0
4144

4245

4346
def test_should_run_cron_td_offset() -> None:
4447
offset = 2
4548
hour = datetime.datetime.utcnow().hour + offset
46-
assert should_run(
49+
delay = get_task_delay(
4750
ScheduledTask(
4851
task_name="",
4952
labels={},
@@ -54,11 +57,12 @@ def test_should_run_cron_td_offset() -> None:
5457
cron_offset=datetime.timedelta(hours=offset),
5558
),
5659
)
60+
assert delay is not None and delay >= 0
5761

5862

5963
def test_time_utc_without_zone() -> None:
6064
time = datetime.datetime.utcnow()
61-
assert should_run(
65+
delay = get_task_delay(
6266
ScheduledTask(
6367
task_name="",
6468
labels={},
@@ -68,11 +72,12 @@ def test_time_utc_without_zone() -> None:
6872
time=time - datetime.timedelta(seconds=1),
6973
),
7074
)
75+
assert delay is not None and delay >= 0
7176

7277

7378
def test_time_utc_with_zone() -> None:
7479
time = datetime.datetime.now(tz=pytz.UTC)
75-
assert should_run(
80+
delay = get_task_delay(
7681
ScheduledTask(
7782
task_name="",
7883
labels={},
@@ -82,12 +87,13 @@ def test_time_utc_with_zone() -> None:
8287
time=time - datetime.timedelta(seconds=1),
8388
),
8489
)
90+
assert delay is not None and delay >= 0
8591

8692

8793
def test_time_utc_with_local_zone() -> None:
8894
localtz = get_localzone()
8995
time = datetime.datetime.now(tz=localtz)
90-
assert should_run(
96+
delay = get_task_delay(
9197
ScheduledTask(
9298
task_name="",
9399
labels={},
@@ -97,11 +103,12 @@ def test_time_utc_with_local_zone() -> None:
97103
time=time - datetime.timedelta(seconds=1),
98104
),
99105
)
106+
assert delay is not None and delay >= 0
100107

101108

102109
def test_time_localtime_without_zone() -> None:
103110
time = datetime.datetime.now(tz=pytz.FixedOffset(240)).replace(tzinfo=None)
104-
assert not should_run(
111+
delay = get_task_delay(
105112
ScheduledTask(
106113
task_name="",
107114
labels={},
@@ -111,3 +118,20 @@ def test_time_localtime_without_zone() -> None:
111118
time=time - datetime.timedelta(seconds=1),
112119
),
113120
)
121+
assert delay is None
122+
123+
124+
@freeze_time("2023-01-14 12:00:00")
125+
def test_time_delay() -> None:
126+
time = datetime.datetime.now(tz=pytz.UTC) + datetime.timedelta(seconds=15)
127+
delay = get_task_delay(
128+
ScheduledTask(
129+
task_name="",
130+
labels={},
131+
args=[],
132+
kwargs={},
133+
source=DUMMY_SOURCE,
134+
time=time,
135+
),
136+
)
137+
assert delay is not None and 15 == delay

0 commit comments

Comments
 (0)