Skip to content

Commit 5dffdee

Browse files
authored
Bug/task-scheduled-by-time-runs-multiple-times (#164)
1 parent 75b8be6 commit 5dffdee

File tree

13 files changed

+253
-24
lines changed

13 files changed

+253
-24
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,3 +165,6 @@ cython_debug/
165165
node_modules
166166
.temp
167167
.cache
168+
169+
# macOS
170+
.DS_Store

docs/examples/extending/schedule_source.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,32 @@ async def get_schedules(self) -> List["ScheduledTask"]:
1919
args=[],
2020
kwargs={},
2121
cron="* * * * *",
22+
#
23+
# We need point on self source for calling pre_send / post_send when
24+
# task is ready to be enqueued.
25+
source=self,
2226
),
2327
]
2428

2529
# This method is optional. You may not implement this.
2630
# It's just a helper to people to be able to interact with your source.
2731
async def add_schedule(self, schedule: "ScheduledTask") -> None:
2832
return await super().add_schedule(schedule)
33+
34+
# This method is optional. You may not implement this.
35+
# It's just a helper to people to be able to interact with your source.
36+
async def pre_send(self, task: "ScheduledTask") -> None:
37+
"""
38+
Actions to execute before task will be sent to broker.
39+
40+
:param task: task that will be sent
41+
"""
42+
43+
# This method is optional. You may not implement this.
44+
# It's just a helper to people to be able to interact with your source.
45+
async def post_send(self, task: "ScheduledTask") -> None:
46+
"""
47+
Actions to execute after task was sent to broker.
48+
49+
:param task: task that just have sent
50+
"""

poetry.lock

Lines changed: 57 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ pytest-xdist = { version = "^2.5.0", extras = ["psutil"] }
5959
types-mock = "^4.0.15"
6060
wemake-python-styleguide = "^0.18.0"
6161
tox = "^4.6.4"
62+
freezegun = "^1.2.2"
63+
pytest-mock = "^3.11.1"
6264

6365
[tool.poetry.extras]
6466
zmq = ["pyzmq"]
@@ -89,6 +91,9 @@ warn_unused_ignores = false
8991
profile = "black"
9092
multi_line_output = 3
9193

94+
[tool.pytest.ini_options]
95+
log_level='INFO'
96+
9297
[tool.coverage.run]
9398
omit = [
9499
"taskiq/__main__.py",

taskiq/abc/schedule_source.py

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from abc import ABC, abstractmethod
2-
from typing import TYPE_CHECKING, List
2+
from typing import TYPE_CHECKING, Any, Coroutine, List, Union
33

44
if TYPE_CHECKING: # pragma: no cover
55
from taskiq.scheduler.scheduler import ScheduledTask
@@ -33,3 +33,23 @@ async def add_schedule(self, schedule: "ScheduledTask") -> None: # noqa: B027
3333
3434
:param schedule: schedule to add.
3535
"""
36+
37+
def pre_send( # noqa: B027
38+
self,
39+
task: "ScheduledTask",
40+
) -> Union[None, Coroutine[Any, Any, None]]:
41+
"""
42+
Actions to execute before task will be sent to broker.
43+
44+
:param task: task that will be sent
45+
"""
46+
47+
def post_send( # noqa: B027
48+
self,
49+
task: "ScheduledTask",
50+
) -> Union[None, Coroutine[Any, Any, None]]:
51+
"""
52+
Actions to execute after task was sent to broker.
53+
54+
:param task: task that just have sent
55+
"""

taskiq/cli/scheduler/args.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,16 @@
11
from argparse import ZERO_OR_MORE, ArgumentDefaultsHelpFormatter, ArgumentParser
22
from dataclasses import dataclass
3-
from typing import List, Optional, Sequence
3+
from typing import List, Optional, Sequence, Union
44

55
from taskiq.cli.common_args import LogLevel
6+
from taskiq.scheduler.scheduler import TaskiqScheduler
67

78

89
@dataclass
910
class SchedulerArgs:
1011
"""Arguments for scheduler."""
1112

12-
scheduler: str
13+
scheduler: Union[str, TaskiqScheduler]
1314
modules: List[str]
1415
log_level: str = LogLevel.INFO.name
1516
fs_discover: bool = False

taskiq/cli/scheduler/run.py

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@
55

66
from pycron import is_now
77

8+
from taskiq.abc.broker import AsyncBroker
89
from taskiq.cli.scheduler.args import SchedulerArgs
910
from taskiq.cli.utils import import_object, import_tasks
10-
from taskiq.kicker import AsyncKicker
1111
from taskiq.scheduler.scheduler import ScheduledTask, TaskiqScheduler
1212

1313
logger = getLogger(__name__)
@@ -40,8 +40,10 @@ async def schedules_updater(
4040
)
4141
logger.debug(exc, exc_info=True)
4242
continue
43+
4344
for schedule in scheduler.merge_func(new_schedules, schedules):
4445
new_schedules.append(schedule)
46+
4547
current_schedules.clear()
4648
current_schedules.extend(new_schedules)
4749
await asyncio.sleep(scheduler.refresh_delay)
@@ -55,7 +57,7 @@ def should_run(task: ScheduledTask) -> bool:
5557
:return: True if task must be sent.
5658
"""
5759
if task.cron is not None:
58-
return is_now(task.cron)
60+
return is_now(task.cron, datetime.utcnow())
5961
if task.time is not None:
6062
return task.time <= datetime.utcnow()
6163
return False
@@ -70,6 +72,7 @@ async def run_scheduler(args: SchedulerArgs) -> None: # noqa: C901, WPS210, WPS
7072
7173
:param args: parsed CLI args.
7274
"""
75+
AsyncBroker.is_scheduler_process = True
7376
if isinstance(args.scheduler, str):
7477
scheduler = import_object(args.scheduler)
7578
else:
@@ -98,7 +101,6 @@ async def run_scheduler(args: SchedulerArgs) -> None: # noqa: C901, WPS210, WPS
98101
await scheduler.startup()
99102
logger.info("Startup completed.")
100103
while True: # noqa: WPS457
101-
not_fired_tasks = []
102104
for task in tasks:
103105
try:
104106
ready = should_run(task)
@@ -111,14 +113,8 @@ async def run_scheduler(args: SchedulerArgs) -> None: # noqa: C901, WPS210, WPS
111113
continue
112114
if ready:
113115
logger.info("Sending task %s.", task.task_name)
114-
loop.create_task(
115-
AsyncKicker(task.task_name, scheduler.broker, task.labels).kiq(
116-
*task.args,
117-
**task.kwargs,
118-
),
119-
)
120-
else:
121-
not_fired_tasks.append(task)
116+
loop.create_task(scheduler.on_ready(task))
117+
122118
delay = (
123119
datetime.now().replace(second=1, microsecond=0)
124120
+ timedelta(minutes=1)

taskiq/schedule_sources/label_based.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
1+
from logging import getLogger
12
from typing import List
23

34
from taskiq.abc.broker import AsyncBroker
45
from taskiq.abc.schedule_source import ScheduleSource
56
from taskiq.scheduler.scheduler import ScheduledTask
67

8+
logger = getLogger(__name__)
9+
710

811
class LabelScheduleSource(ScheduleSource):
912
"""Schedule source based on labels."""
@@ -40,6 +43,29 @@ async def get_schedules(self) -> List["ScheduledTask"]:
4043
kwargs=schedule.get("kwargs", {}),
4144
cron=schedule.get("cron"),
4245
time=schedule.get("time"),
46+
source=self,
4347
),
4448
)
4549
return schedules
50+
51+
def post_send(self, scheduled_task: ScheduledTask) -> None:
52+
"""
53+
Remove `time` schedule from task's scheduler list.
54+
55+
Task just have sent and won't be sent by that trigger anymore. Other triggers in
56+
scheduler list left unchanged.
57+
58+
:param scheduled_task: task that just have sent
59+
"""
60+
if scheduled_task.cron or not scheduled_task.time:
61+
return # it's scheduled task with cron label, do not remove this trigger.
62+
63+
for task_name, task in self.broker.available_tasks.items():
64+
if task.broker != self.broker or scheduled_task.task_name != task_name:
65+
continue
66+
67+
schedule_list = task.labels.get("schedule", []).copy()
68+
for idx, schedule in enumerate(schedule_list):
69+
if schedule.get("time") == scheduled_task.time:
70+
task.labels.get("schedule", []).pop(idx)
71+
return

taskiq/scheduler/scheduler.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@
33
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional
44

55
from taskiq.abc.broker import AsyncBroker
6+
from taskiq.kicker import AsyncKicker
67
from taskiq.scheduler.merge_functions import preserve_all
8+
from taskiq.utils import maybe_awaitable
79

810
if TYPE_CHECKING: # pragma: no cover
911
from taskiq.abc.schedule_source import ScheduleSource
@@ -17,6 +19,7 @@ class ScheduledTask:
1719
labels: Dict[str, Any]
1820
args: List[Any]
1921
kwargs: Dict[str, Any]
22+
source: "ScheduleSource" # Backward point to source which this task belongs to
2023
cron: Optional[str] = field(default=None)
2124
time: Optional[datetime] = field(default=None)
2225

@@ -56,3 +59,17 @@ async def startup(self) -> None: # pragma: no cover
5659
connections or anything you'd like.
5760
"""
5861
await self.broker.startup()
62+
63+
async def on_ready(self, task: ScheduledTask) -> None:
64+
"""
65+
This method is called when task is ready to be enqueued.
66+
67+
It's triggered on proper time depending on `task.cron` or `task.time` attribute.
68+
:param task: task to send
69+
"""
70+
await maybe_awaitable(task.source.pre_send(task))
71+
await AsyncKicker(task.task_name, self.broker, task.labels).kiq(
72+
*task.args,
73+
**task.kwargs,
74+
)
75+
await maybe_awaitable(task.source.post_send(task))

tests/conftest.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1+
import asyncio
12
from typing import Generator
23

34
import pytest
5+
from pytest_mock import MockerFixture
46

57
from taskiq.abc.broker import AsyncBroker
68

@@ -26,3 +28,14 @@ def reset_broker() -> Generator[None, None, None]:
2628
"""
2729
yield
2830
AsyncBroker.available_tasks = {}
31+
AsyncBroker.is_worker_process = False
32+
AsyncBroker.is_scheduler_process = False
33+
34+
35+
@pytest.fixture
36+
def mock_sleep(mocker: MockerFixture) -> None:
37+
async def _fast_sleep(delay: float) -> None:
38+
await asyncio_sleep(delay / 10000)
39+
40+
asyncio_sleep = asyncio.sleep
41+
mocker.patch("asyncio.sleep", _fast_sleep)

0 commit comments

Comments
 (0)