77
88from taskiq .cli .scheduler .args import SchedulerArgs
99from taskiq .cli .utils import import_object , import_tasks
10- from taskiq .kicker import AsyncKicker
1110from taskiq .scheduler .scheduler import ScheduledTask , TaskiqScheduler
1211
1312logger = getLogger (__name__ )
@@ -40,8 +39,10 @@ async def schedules_updater(
4039 )
4140 logger .debug (exc , exc_info = True )
4241 continue
42+
4343 for schedule in scheduler .merge_func (new_schedules , schedules ):
4444 new_schedules .append (schedule )
45+
4546 current_schedules .clear ()
4647 current_schedules .extend (new_schedules )
4748 await asyncio .sleep (scheduler .refresh_delay )
@@ -55,7 +56,7 @@ def should_run(task: ScheduledTask) -> bool:
5556 :return: True if task must be sent.
5657 """
5758 if task .cron is not None :
58- return is_now (task .cron )
59+ return is_now (task .cron , datetime . utcnow () )
5960 if task .time is not None :
6061 return task .time <= datetime .utcnow ()
6162 return False
@@ -74,7 +75,7 @@ async def run_scheduler(args: SchedulerArgs) -> None: # noqa: C901, WPS210, WPS
7475 scheduler = import_object (args .scheduler )
7576 else :
7677 scheduler = args .scheduler
77- if not isinstance (args . scheduler , TaskiqScheduler ):
78+ if not isinstance (scheduler , TaskiqScheduler ):
7879 print ( # noqa: WPS421
7980 "Imported scheduler is not a subclass of TaskiqScheduler." ,
8081 )
@@ -98,7 +99,6 @@ async def run_scheduler(args: SchedulerArgs) -> None: # noqa: C901, WPS210, WPS
9899 await scheduler .startup ()
99100 logger .info ("Startup completed." )
100101 while True : # noqa: WPS457
101- not_fired_tasks = []
102102 for task in tasks :
103103 try :
104104 ready = should_run (task )
@@ -111,14 +111,8 @@ async def run_scheduler(args: SchedulerArgs) -> None: # noqa: C901, WPS210, WPS
111111 continue
112112 if ready :
113113 logger .info ("Sending task %s." , task .task_name )
114- loop .create_task (
115- AsyncKicker (task .task_name , scheduler .broker , task .labels ).kiq (
116- * task .args ,
117- ** task .kwargs ,
118- ),
119- )
120- else :
121- not_fired_tasks .append (task )
114+ loop .create_task (scheduler .on_ready (task ))
115+
122116 delay = (
123117 datetime .now ().replace (second = 1 , microsecond = 0 )
124118 + timedelta (minutes = 1 )
0 commit comments