Skip to content

Commit 392a02d

Browse files
authored
fix: recursive dict in labels during schedule discovery in LabelScheduleSource (#520)
1 parent c97b483 commit 392a02d

File tree

5 files changed

+27
-65
lines changed

5 files changed

+27
-65
lines changed

docs/available-components/schedule-sources.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,13 @@ The format of the schedule label is the following:
3737
@broker.task(
3838
schedule=[
3939
{
40-
"cron": "* * * * *", # type: str, either cron or time should be specified.
40+
"cron": "*/1 * * * *", # type: str, either cron or time should be specified.
4141
"cron_offset": None # type: str | timedelta | None, can be omitted.
4242
"time": None # type: datetime | None, either cron or time should be specified.
4343
"args": [], # type List[Any] | None, can be omitted.
4444
"kwargs": {}, # type: Dict[str, Any] | None, can be omitted.
4545
"labels": {}, # type: Dict[str, Any] | None, can be omitted.
46+
"schedule_id": "every_minute", # type: str | None, can be omitted.
4647
}
4748
]
4849
)
@@ -58,6 +59,7 @@ Parameters:
5859
- `args` - args to use, when invoking the task.
5960
- `kwargs` - key-word arguments to use when invoking the task.
6061
- `labels` - additional labels to use when invoking the task.
62+
- `schedule_id` - unique identifier of the schedule. If not specified, a random uuid will be generated.
6163

6264
To enable this source, just add it to the list of sources:
6365

docs/extending-taskiq/schedule-sources.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,3 +12,8 @@ Here's a minimal example of a schedule source:
1212
@[code python](../examples/extending/schedule_source.py)
1313

1414
You can implement a schedule source that write schedules in the database and have delayed tasks in runtime.
15+
16+
::: info Cool tip!
17+
You can also use `LabelScheduleSource` as a base class for your schedule source
18+
if you want to parse schedules from task labels and don't want to implement logic for this from scratch.
19+
:::

taskiq/schedule_sources/label_based.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,11 @@ async def startup(self) -> None:
4646
if "cron" not in schedule and "time" not in schedule:
4747
continue
4848
labels = schedule.get("labels", {})
49-
labels.update(task.labels)
50-
schedule_id = uuid.uuid4().hex
49+
50+
task_labels = {k: v for k, v in task.labels.items() if k != "schedule"}
51+
52+
labels.update(task_labels)
53+
schedule_id = schedule.get("schedule_id", uuid.uuid4().hex)
5154

5255
self.schedules[schedule_id] = ScheduledTask(
5356
task_name=task_name,

tests/schedule_sources/test_label_based.py

Lines changed: 0 additions & 60 deletions
This file was deleted.

tests/scheduler/test_label_based_sched.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,14 @@
2020
[
2121
pytest.param([{"cron": "* * * * *"}], id="cron"),
2222
pytest.param([{"time": datetime.now(pytz.UTC)}], id="time"),
23+
pytest.param(
24+
[{"time": datetime.now(pytz.UTC), "labels": {"foo": "bar"}}],
25+
id="labels_inside_schedule",
26+
),
27+
pytest.param(
28+
[{"cron": "*/1 * * * *", "schedule_id": "every_minute"}],
29+
id="schedule_with_id",
30+
),
2331
],
2432
)
2533
async def test_label_discovery(schedule_label: List[Dict[str, Any]]) -> None:
@@ -37,16 +45,20 @@ def task() -> None:
3745
schedules = await source.get_schedules()
3846
assert schedules == [
3947
ScheduledTask(
40-
schedule_id=schedules[0].schedule_id,
48+
schedule_id=schedule_label[0].get("schedule_id", schedules[0].schedule_id),
4149
cron=schedule_label[0].get("cron"),
4250
time=schedule_label[0].get("time"),
4351
task_name="test_task",
44-
labels={"schedule": schedule_label},
52+
labels=schedule_label[0].get("labels", {}),
4553
args=[],
4654
kwargs={},
4755
),
4856
]
4957

58+
# check that labels of tasks are not changed after startup and discovery process
59+
task_from_broker = next(iter(broker.get_all_tasks().values()))
60+
assert task_from_broker.labels == {"schedule": schedule_label}
61+
5062

5163
@pytest.mark.anyio
5264
async def test_label_discovery_no_cron() -> None:

0 commit comments

Comments
 (0)