Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions taskiq/schedule_sources/label_based.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,11 @@ async def startup(self) -> None:
if "cron" not in schedule and "time" not in schedule:
continue
labels = schedule.get("labels", {})
labels.update(task.labels)
schedule_id = uuid.uuid4().hex

task_labels = {k: v for k, v in task.labels.items() if k != "schedule"}

labels.update(task_labels)
schedule_id = schedule.get("schedule_id", uuid.uuid4().hex)

self.schedules[schedule_id] = ScheduledTask(
task_name=task_name,
Expand Down
60 changes: 0 additions & 60 deletions tests/schedule_sources/test_label_based.py

This file was deleted.

16 changes: 14 additions & 2 deletions tests/scheduler/test_label_based_sched.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,14 @@
[
pytest.param([{"cron": "* * * * *"}], id="cron"),
pytest.param([{"time": datetime.now(pytz.UTC)}], id="time"),
pytest.param(
[{"time": datetime.now(pytz.UTC), "labels": {"foo": "bar"}}],
id="labels_inside_schedule",
),
pytest.param(
[{"cron": "*/1 * * * *", "schedule_id": "every_minute"}],
id="schedule_with_id",
),
],
)
async def test_label_discovery(schedule_label: List[Dict[str, Any]]) -> None:
Expand All @@ -37,16 +45,20 @@ def task() -> None:
schedules = await source.get_schedules()
assert schedules == [
ScheduledTask(
schedule_id=schedules[0].schedule_id,
schedule_id=schedule_label[0].get("schedule_id", schedules[0].schedule_id),
cron=schedule_label[0].get("cron"),
time=schedule_label[0].get("time"),
task_name="test_task",
labels={"schedule": schedule_label},
labels=schedule_label[0].get("labels", {}),
args=[],
kwargs={},
),
]

# check that labels of tasks are not changed after startup and discovery process
task_from_broker = next(iter(broker.get_all_tasks().values()))
assert task_from_broker.labels == {"schedule": schedule_label}


@pytest.mark.anyio
async def test_label_discovery_no_cron() -> None:
Expand Down