Skip to content

Commit eaf704e

Browse files
authored
Add scheduler pre_send exception handling (#254)
1 parent 708d60c commit eaf704e

File tree

5 files changed

+62
-6
lines changed

5 files changed

+62
-6
lines changed

docs/examples/extending/schedule_source.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@ async def pre_send(self, task: "ScheduledTask") -> None:
3838
"""
3939
Actions to execute before task will be sent to broker.
4040
41+
This method may raise ScheduledTaskCancelledError.
42+
This cancels the task execution.
43+
4144
:param task: task that will be sent
4245
"""
4346

taskiq/abc/schedule_source.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,9 @@ def pre_send( # noqa: B027
6060
"""
6161
Actions to execute before task will be sent to broker.
6262
63+
This method may raise ScheduledTaskCancelledError.
64+
This cancels the task execution.
65+
6366
:param task: task that will be sent
6467
"""
6568

taskiq/exceptions.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,3 +40,7 @@ class NoResultError(TaskiqError):
4040

4141
class TaskRejectedError(TaskiqError):
4242
"""Task was rejected."""
43+
44+
45+
class ScheduledTaskCancelledError(TaskiqError):
46+
"""Scheduled task was cancelled and not sent to the queue."""

taskiq/scheduler/scheduler.py

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
1+
from logging import getLogger
12
from typing import TYPE_CHECKING, List
23

4+
from taskiq.exceptions import ScheduledTaskCancelledError
35
from taskiq.kicker import AsyncKicker
46
from taskiq.scheduler.scheduled_task import ScheduledTask
57
from taskiq.utils import maybe_awaitable
@@ -8,6 +10,8 @@
810
from taskiq.abc.broker import AsyncBroker
911
from taskiq.abc.schedule_source import ScheduleSource
1012

13+
logger = getLogger(__name__)
14+
1115

1216
class TaskiqScheduler:
1317
"""Scheduler class."""
@@ -36,12 +40,16 @@ async def on_ready(self, source: "ScheduleSource", task: ScheduledTask) -> None:
3640
It's triggered on proper time depending on `task.cron` or `task.time` attribute.
3741
:param task: task to send
3842
"""
39-
await maybe_awaitable(source.pre_send(task))
40-
await AsyncKicker(task.task_name, self.broker, task.labels).kiq(
41-
*task.args,
42-
**task.kwargs,
43-
)
44-
await maybe_awaitable(source.post_send(task))
43+
try:
44+
await maybe_awaitable(source.pre_send(task))
45+
except ScheduledTaskCancelledError:
46+
logger.info("Scheduled task %s has been cancelled.", task.task_name)
47+
else:
48+
await AsyncKicker(task.task_name, self.broker, task.labels).kiq(
49+
*task.args,
50+
**task.kwargs,
51+
)
52+
await maybe_awaitable(source.post_send(task))
4553

4654
async def shutdown(self) -> None:
4755
"""Shutdown the scheduler process."""

tests/scheduler/test_scheduler.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
from typing import Any, Coroutine, List, Union
2+
3+
import pytest
4+
5+
from taskiq.abc.schedule_source import ScheduleSource
6+
from taskiq.brokers.inmemory_broker import InMemoryBroker
7+
from taskiq.exceptions import ScheduledTaskCancelledError
8+
from taskiq.scheduler.scheduled_task import ScheduledTask
9+
from taskiq.scheduler.scheduler import TaskiqScheduler
10+
11+
12+
class CancellingScheduleSource(ScheduleSource):
13+
async def get_schedules(self) -> List["ScheduledTask"]:
14+
"""Return schedules list."""
15+
return []
16+
17+
def pre_send(
18+
self,
19+
task: "ScheduledTask",
20+
) -> Union[None, Coroutine[Any, Any, None]]:
21+
"""Raise cancelled error."""
22+
raise ScheduledTaskCancelledError
23+
24+
25+
@pytest.mark.anyio
26+
async def test_scheduled_task_cancelled() -> None:
27+
broker = InMemoryBroker()
28+
source = CancellingScheduleSource()
29+
scheduler = TaskiqScheduler(broker=broker, sources=[source])
30+
task = ScheduledTask(
31+
task_name="ping:pong",
32+
labels={},
33+
args=[],
34+
kwargs={},
35+
cron="* * * * *",
36+
)
37+
38+
await scheduler.on_ready(source, task) # error is caught

0 commit comments

Comments
 (0)