Skip to content

Commit 6df4308

Browse files
committed
Merge branch 'release/0.10.1'
2 parents b137807 + 5fa5558 commit 6df4308

File tree

13 files changed

+158
-66
lines changed

13 files changed

+158
-66
lines changed

docs/examples/extending/schedule_source.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from typing import List
1+
from typing import Any, Coroutine, List
22

33
from taskiq import ScheduledTask, ScheduleSource
44

@@ -24,10 +24,14 @@ async def get_schedules(self) -> List["ScheduledTask"]:
2424

2525
# This method is optional. You may not implement this.
2626
# It's just a helper to people to be able to interact with your source.
27-
# This method can be either sync or async.
28-
def add_schedule(self, schedule: "ScheduledTask") -> None:
27+
async def add_schedule(self, schedule: "ScheduledTask") -> None:
2928
print("New schedule added:", schedule)
3029

30+
# This method is completely optional, but if you want to support
31+
# schedule cancelation, you must implement it.
32+
async def delete_schedule(self, schedule_id: str) -> None:
33+
print("Deleting schedule:", schedule_id)
34+
3135
# This method is optional. You may not implement this.
3236
# It's just a helper to people to be able to interact with your source.
3337
async def pre_send(self, task: "ScheduledTask") -> None:

docs/extending-taskiq/schedule-sources.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,5 @@ To create new `schedule source` you have to implement the `taskiq.abc.schedule_s
1010
Here's a minimal example of a schedule source:
1111

1212
@[code python](../examples/extending/schedule_source.py)
13+
14+
You can implement a schedule source that write schedules in the database and have delayed tasks in runtime.

poetry.lock

Lines changed: 29 additions & 29 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "taskiq"
3-
version = "0.10.0"
3+
version = "0.10.1"
44
description = "Distributed task queue with full async support"
55
authors = ["Pavel Kirilin <[email protected]>"]
66
maintainers = ["Pavel Kirilin <[email protected]>"]
@@ -31,7 +31,7 @@ typing-extensions = ">=3.10.0.0"
3131
pydantic = ">=1.0,<=3.0"
3232
importlib-metadata = "*"
3333
pycron = "^3.0.0"
34-
taskiq_dependencies = "^1"
34+
taskiq_dependencies = ">=1.3.1,<2"
3535
anyio = ">=3"
3636
packaging = ">=19"
3737
# For prometheus metrics

taskiq/abc/broker.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ def __init__(
112112
] = defaultdict(list)
113113
self.state = TaskiqState()
114114
self.custom_dependency_context: Dict[Any, Any] = {}
115+
self.dependency_overrides: Dict[Any, Any] = {}
115116
# True only if broker runs in worker process.
116117
self.is_worker_process: bool = False
117118
# True only if broker runs in scheduler process.

taskiq/abc/schedule_source.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,10 @@ async def shutdown(self) -> None: # noqa: B027
1818
async def get_schedules(self) -> List["ScheduledTask"]:
1919
"""Get list of taskiq schedules."""
2020

21-
def add_schedule(
21+
async def add_schedule(
2222
self,
2323
schedule: "ScheduledTask",
24-
) -> Union[None, Coroutine[Any, Any, None]]:
24+
) -> None:
2525
"""
2626
Add a new schedule.
2727
@@ -40,6 +40,19 @@ def add_schedule(
4040
f"The source {self.__class__.__name__} does not support adding schedules.",
4141
)
4242

43+
async def delete_schedule(self, schedule_id: str) -> None:
44+
"""
45+
Method to delete schedule by id.
46+
47+
This is useful for schedule cancelation.
48+
49+
:param schedule_id: id of schedule to delete.
50+
"""
51+
raise NotImplementedError(
52+
f"The source {self.__class__.__name__} does "
53+
"not support deleting schedules.",
54+
)
55+
4356
def pre_send( # noqa: B027
4457
self,
4558
task: "ScheduledTask",

taskiq/decor.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from typing_extensions import ParamSpec
1515

1616
from taskiq.kicker import AsyncKicker
17+
from taskiq.scheduler.created_schedule import CreatedSchedule
1718
from taskiq.task import AsyncTaskiqTask
1819

1920
if TYPE_CHECKING: # pragma: no cover
@@ -103,7 +104,7 @@ async def schedule_by_cron(
103104
cron: Union[str, "CronSpec"],
104105
*args: _FuncParams.args,
105106
**kwargs: _FuncParams.kwargs,
106-
) -> None:
107+
) -> CreatedSchedule[_ReturnType]:
107108
"""
108109
Schedule task to run on cron.
109110
@@ -114,8 +115,9 @@ async def schedule_by_cron(
114115
:param cron: cron string or a CronSpec instance.
115116
:param args: function's arguments.
116117
:param kwargs: function's key word arguments.
118+
:return: schedule id.
117119
"""
118-
await self.kicker().schedule_cron(
120+
return await self.kicker().schedule_by_cron(
119121
source,
120122
cron,
121123
*args,
@@ -128,7 +130,7 @@ async def schedule_by_time(
128130
time: datetime,
129131
*args: _FuncParams.args,
130132
**kwargs: _FuncParams.kwargs,
131-
) -> None:
133+
) -> CreatedSchedule[_ReturnType]:
132134
"""
133135
Schedule task to run on specific time.
134136
@@ -139,8 +141,9 @@ async def schedule_by_time(
139141
:param time: time to run task.
140142
:param args: function's arguments.
141143
:param kwargs: function's key word arguments.
144+
:return: schedule id.
142145
"""
143-
await self.kicker().schedule_time(
146+
return await self.kicker().schedule_by_time(
144147
source,
145148
time,
146149
*args,

taskiq/kicker.py

Lines changed: 28 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
from taskiq.compat import model_dump
2121
from taskiq.exceptions import SendTaskError
2222
from taskiq.message import TaskiqMessage
23+
from taskiq.scheduler.created_schedule import CreatedSchedule
2324
from taskiq.scheduler.scheduled_task import CronSpec, ScheduledTask
2425
from taskiq.task import AsyncTaskiqTask
2526
from taskiq.utils import maybe_awaitable
@@ -146,13 +147,13 @@ async def kiq(
146147
result_backend=self.broker.result_backend,
147148
)
148149

149-
async def schedule_cron(
150+
async def schedule_by_cron(
150151
self,
151152
source: "ScheduleSource",
152153
cron: Union[str, "CronSpec"],
153154
*args: _FuncParams.args,
154155
**kwargs: _FuncParams.kwargs,
155-
) -> None:
156+
) -> CreatedSchedule[_ReturnType]:
156157
"""
157158
Function to schedule task with cron.
158159
@@ -161,34 +162,36 @@ async def schedule_cron(
161162
:param args: function's args.
162163
:param cron_offset: cron offset.
163164
:param kwargs: function's kwargs.
165+
166+
:return: schedule id.
164167
"""
168+
schedule_id = self.broker.id_generator()
165169
message = self._prepare_message(*args, **kwargs)
166170
cron_offset = None
167171
if isinstance(cron, CronSpec):
168172
cron_str = cron.to_cron()
169173
cron_offset = cron.offset
170174
else:
171175
cron_str = cron
172-
await maybe_awaitable(
173-
source.add_schedule(
174-
ScheduledTask(
175-
task_name=message.task_name,
176-
labels=message.labels,
177-
args=message.args,
178-
kwargs=message.kwargs,
179-
cron=cron_str,
180-
cron_offset=cron_offset,
181-
),
182-
),
176+
scheduled = ScheduledTask(
177+
schedule_id=schedule_id,
178+
task_name=message.task_name,
179+
labels=message.labels,
180+
args=message.args,
181+
kwargs=message.kwargs,
182+
cron=cron_str,
183+
cron_offset=cron_offset,
183184
)
185+
await source.add_schedule(scheduled)
186+
return CreatedSchedule(self, source, scheduled)
184187

185-
async def schedule_time(
188+
async def schedule_by_time(
186189
self,
187190
source: "ScheduleSource",
188191
time: datetime,
189192
*args: _FuncParams.args,
190193
**kwargs: _FuncParams.kwargs,
191-
) -> None:
194+
) -> CreatedSchedule[_ReturnType]:
192195
"""
193196
Function to schedule task to run at specific time.
194197
@@ -197,18 +200,18 @@ async def schedule_time(
197200
:param args: function's args.
198201
:param kwargs: function's kwargs.
199202
"""
203+
schedule_id = self.broker.id_generator()
200204
message = self._prepare_message(*args, **kwargs)
201-
await maybe_awaitable(
202-
source.add_schedule(
203-
ScheduledTask(
204-
task_name=message.task_name,
205-
labels=message.labels,
206-
args=message.args,
207-
kwargs=message.kwargs,
208-
time=time,
209-
),
210-
),
205+
scheduled = ScheduledTask(
206+
schedule_id=schedule_id,
207+
task_name=message.task_name,
208+
labels=message.labels,
209+
args=message.args,
210+
kwargs=message.kwargs,
211+
time=time,
211212
)
213+
await source.add_schedule(scheduled)
214+
return CreatedSchedule(self, source, scheduled)
212215

213216
@classmethod
214217
def _prepare_arg(cls, arg: Any) -> Any:

taskiq/receiver/receiver.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,10 @@ async def run_task( # noqa: C901, PLR0912, PLR0915
207207
TaskiqState: self.broker.state,
208208
},
209209
)
210-
dep_ctx = dependency_graph.async_ctx(broker_ctx)
210+
dep_ctx = dependency_graph.async_ctx(
211+
broker_ctx,
212+
self.broker.dependency_overrides or None,
213+
)
211214
# Resolve all function's dependencies.
212215

213216
# Start a timer.

taskiq/scheduler/created_schedule.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
from typing import TYPE_CHECKING, Any, Coroutine, Generic, TypeVar, overload
2+
3+
from taskiq.abc.schedule_source import ScheduleSource
4+
from taskiq.scheduler.scheduled_task import ScheduledTask
5+
from taskiq.task import AsyncTaskiqTask
6+
7+
if TYPE_CHECKING:
8+
from taskiq.kicker import AsyncKicker
9+
10+
_ReturnType = TypeVar("_ReturnType")
11+
_T = TypeVar("_T")
12+
13+
14+
class CreatedSchedule(Generic[_ReturnType]):
15+
"""A schedule that has been created."""
16+
17+
def __init__(
18+
self,
19+
kicker: "AsyncKicker[Any,_ReturnType]",
20+
source: ScheduleSource,
21+
task: ScheduledTask,
22+
) -> None:
23+
self.kicker = kicker
24+
self.source = source
25+
self.task = task
26+
self.schedule_id = task.schedule_id
27+
28+
@overload
29+
async def kiq(
30+
self: "CreatedSchedule[Coroutine[Any,Any, _T]]",
31+
) -> AsyncTaskiqTask[_T]:
32+
...
33+
34+
@overload
35+
async def kiq(self: "CreatedSchedule[_ReturnType]") -> AsyncTaskiqTask[_ReturnType]:
36+
...
37+
38+
async def kiq(self) -> Any:
39+
"""Kick the task as if you were not scheduling it."""
40+
return await self.kicker.kiq(
41+
*self.task.args,
42+
**self.task.kwargs,
43+
)
44+
45+
async def unschedule(self) -> None:
46+
"""Unschedule the task."""
47+
await self.source.delete_schedule(self.task.schedule_id)
48+
49+
def __str__(self) -> str:
50+
return (
51+
"CreatedSchedule("
52+
f"id={self.schedule_id}, "
53+
f"time={self.task.time}, "
54+
f"cron={self.task.cron}, "
55+
f"cron_offset={self.task.cron_offset or 'UTC'}, "
56+
f"task_name={self.task.task_name}, "
57+
f"args={self.task.args}, "
58+
f"kwargs={self.task.kwargs})"
59+
)

0 commit comments

Comments
 (0)