Skip to content

Commit b921e40

Browse files
committed
Changed get_schedules to accept non-hashable sources.
1 parent e515583 commit b921e40

File tree

2 files changed

+19
-12
lines changed

2 files changed

+19
-12
lines changed

taskiq/cli/scheduler/run.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import sys
44
from datetime import datetime, timedelta
55
from logging import basicConfig, getLevelName, getLogger
6-
from typing import Any, Dict, List, Optional, Set
6+
from typing import Any, Dict, List, Optional, Set, Tuple
77

88
import pytz
99
from pycron import is_now
@@ -55,7 +55,7 @@ async def get_schedules(source: ScheduleSource) -> List[ScheduledTask]:
5555

5656
async def get_all_schedules(
5757
scheduler: TaskiqScheduler,
58-
) -> Dict[ScheduleSource, List[ScheduledTask]]:
58+
) -> List[Tuple[ScheduleSource, List[ScheduledTask]]]:
5959
"""
6060
Task to update all schedules.
6161
@@ -71,7 +71,7 @@ async def get_all_schedules(
7171
schedules = await asyncio.gather(
7272
*[get_schedules(source) for source in scheduler.sources],
7373
)
74-
return dict(zip(scheduler.sources, schedules))
74+
return list(zip(scheduler.sources, schedules))
7575

7676

7777
def get_task_delay(task: ScheduledTask) -> Optional[int]:
@@ -162,15 +162,22 @@ async def run_scheduler_loop( # noqa: C901
162162
current_minute = datetime.now(tz=pytz.UTC).minute
163163
while True:
164164
now = datetime.now(tz=pytz.UTC)
165+
# If minute changed, we need to clear
166+
# ran_cron_jobs set and update current minute.
165167
if now.minute != current_minute:
166168
current_minute = now.minute
167169
ran_cron_jobs.clear()
170+
# If interval is not None, we need to
171+
# calculate next run time using it.
168172
if interval is not None:
169173
next_run = now + interval
174+
# otherwise we need assume that
175+
# we will run it at the start of the next minute.
176+
# as crontab does.
170177
else:
171178
next_run = (now + timedelta(minutes=1)).replace(second=1, microsecond=0)
172179
scheduled_tasks = await get_all_schedules(scheduler)
173-
for source, task_list in scheduled_tasks.items():
180+
for source, task_list in scheduled_tasks:
174181
logger.debug("Got %d schedules from source %s.", len(task_list), source)
175182
for task in task_list:
176183
try:

tests/cli/scheduler/test_updater.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -56,10 +56,10 @@ async def test_get_schedules_success() -> None:
5656
schedules = await get_all_schedules(
5757
TaskiqScheduler(InMemoryBroker(), sources),
5858
)
59-
assert schedules == {
60-
sources[0]: schedules1,
61-
sources[1]: schedules2,
62-
}
59+
assert schedules == [
60+
(sources[0], schedules1),
61+
(sources[1], schedules2),
62+
]
6363

6464

6565
@pytest.mark.anyio
@@ -81,7 +81,7 @@ async def test_get_schedules_error() -> None:
8181
schedules = await get_all_schedules(
8282
TaskiqScheduler(InMemoryBroker(), [source1, source2]),
8383
)
84-
assert schedules == {
85-
source1: source1.schedules,
86-
source2: [],
87-
}
84+
assert schedules == [
85+
(source1, source1.schedules),
86+
(source2, []),
87+
]

0 commit comments

Comments
 (0)