Skip to content

Commit 5a8c09f

Browse files
authored
Added shutdown event for scheduler. (#195)
1 parent e10a4b9 commit 5a8c09f

File tree

2 files changed

+49
-28
lines changed

2 files changed

+49
-28
lines changed

taskiq/cli/scheduler/run.py

Lines changed: 45 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ async def schedules_updater(
2626
:param scheduler: current scheduler.
2727
:param current_schedules: list of schedules.
2828
"""
29-
while True:
29+
while True: # noqa: WPS457
3030
logger.debug("Started schedule update.")
3131
new_schedules: "List[ScheduledTask]" = []
3232
for source in scheduler.sources:
@@ -40,8 +40,7 @@ async def schedules_updater(
4040
logger.debug(exc, exc_info=True)
4141
continue
4242

43-
for schedule in scheduler.merge_func(new_schedules, schedules):
44-
new_schedules.append(schedule)
43+
new_schedules = scheduler.merge_func(new_schedules, schedules)
4544

4645
current_schedules.clear()
4746
current_schedules.extend(new_schedules)
@@ -62,38 +61,15 @@ def should_run(task: ScheduledTask) -> bool:
6261
return False
6362

6463

65-
async def run_scheduler(args: SchedulerArgs) -> None: # noqa: C901, WPS210, WPS213
64+
async def _run_loop(scheduler: TaskiqScheduler) -> None:
6665
"""
6766
Runs scheduler loop.
6867
6968
This function imports taskiq scheduler
7069
and runs tasks when needed.
7170
72-
:param args: parsed CLI args.
71+
:param scheduler: current scheduler.
7372
"""
74-
if isinstance(args.scheduler, str):
75-
scheduler = import_object(args.scheduler)
76-
else:
77-
scheduler = args.scheduler
78-
if not isinstance(scheduler, TaskiqScheduler):
79-
print( # noqa: WPS421
80-
"Imported scheduler is not a subclass of TaskiqScheduler.",
81-
)
82-
exit(1) # noqa: WPS421
83-
scheduler.broker.is_scheduler_process = True
84-
import_tasks(args.modules, args.tasks_pattern, args.fs_discover)
85-
if args.configure_logging:
86-
basicConfig(
87-
level=getLevelName(args.log_level),
88-
format=(
89-
"[%(asctime)s][%(levelname)-7s]"
90-
"[%(module)s:%(funcName)s:%(lineno)d]"
91-
" %(message)s"
92-
),
93-
)
94-
getLogger("taskiq").setLevel(level=getLevelName(args.log_level))
95-
for source in scheduler.sources:
96-
await source.startup()
9773
loop = asyncio.get_event_loop()
9874
tasks: "List[ScheduledTask]" = []
9975
loop.create_task(schedules_updater(scheduler, tasks))
@@ -121,3 +97,44 @@ async def run_scheduler(args: SchedulerArgs) -> None: # noqa: C901, WPS210, WPS
12197
- datetime.now()
12298
)
12399
await asyncio.sleep(delay.total_seconds())
100+
101+
102+
async def run_scheduler(args: SchedulerArgs) -> None: # noqa: WPS213
103+
"""
104+
Run scheduler.
105+
106+
This function takes all CLI arguments
107+
and starts the scheduler process.
108+
109+
:param args: parsed CLI arguments.
110+
"""
111+
if isinstance(args.scheduler, str):
112+
scheduler = import_object(args.scheduler)
113+
else:
114+
scheduler = args.scheduler
115+
if not isinstance(scheduler, TaskiqScheduler):
116+
print( # noqa: WPS421
117+
"Imported scheduler is not a subclass of TaskiqScheduler.",
118+
)
119+
exit(1) # noqa: WPS421
120+
scheduler.broker.is_scheduler_process = True
121+
import_tasks(args.modules, args.tasks_pattern, args.fs_discover)
122+
if args.configure_logging:
123+
basicConfig(
124+
level=getLevelName(args.log_level),
125+
format=(
126+
"[%(asctime)s][%(levelname)-7s]"
127+
"[%(module)s:%(funcName)s:%(lineno)d]"
128+
" %(message)s"
129+
),
130+
)
131+
getLogger("taskiq").setLevel(level=getLevelName(args.log_level))
132+
for source in scheduler.sources:
133+
await source.startup()
134+
135+
try:
136+
await _run_loop(scheduler)
137+
except asyncio.CancelledError:
138+
logger.warning("Shutting down scheduler.")
139+
await scheduler.shutdown()
140+
logger.info("Scheduler shut down. Good bye!")

taskiq/scheduler/scheduler.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,3 +73,7 @@ async def on_ready(self, task: ScheduledTask) -> None:
7373
**task.kwargs,
7474
)
7575
await maybe_awaitable(task.source.post_send(task))
76+
77+
async def shutdown(self) -> None:
78+
"""Shutdown the scheduler process."""
79+
await self.broker.shutdown()

0 commit comments

Comments
 (0)