11import json
22from apscheduler .events import EVENT_ALL
33from apscheduler .executors .asyncio import AsyncIOExecutor
4- from apscheduler .executors .pool import ThreadPoolExecutor , ProcessPoolExecutor
4+ from apscheduler .executors .pool import ProcessPoolExecutor
55from apscheduler .jobstores .memory import MemoryJobStore
66from apscheduler .jobstores .redis import RedisJobStore
77from apscheduler .jobstores .sqlalchemy import SQLAlchemyJobStore
88from apscheduler .schedulers .asyncio import AsyncIOScheduler
9- from apscheduler .schedulers .background import BackgroundScheduler
109from apscheduler .triggers .cron import CronTrigger
1110from asyncio import iscoroutinefunction
1211from datetime import datetime , timedelta
@@ -112,12 +111,9 @@ def __find_recent_workday(cls, day: int):
112111 )
113112 ),
114113}
115- async_executors = {'default' : AsyncIOExecutor ()}
116- executors = {'default' : ThreadPoolExecutor (20 ), 'processpool' : ProcessPoolExecutor (5 )}
114+ executors = {'default' : AsyncIOExecutor (), 'processpool' : ProcessPoolExecutor (5 )}
117115job_defaults = {'coalesce' : False , 'max_instance' : 1 }
118- async_scheduler = AsyncIOScheduler ()
119- scheduler = BackgroundScheduler ()
120- async_scheduler .configure (jobstores = job_stores , executors = async_executors , job_defaults = job_defaults )
116+ scheduler = AsyncIOScheduler ()
121117scheduler .configure (jobstores = job_stores , executors = executors , job_defaults = job_defaults )
122118
123119
@@ -135,14 +131,12 @@ async def init_system_scheduler(cls):
135131 """
136132 logger .info ('开始启动定时任务...' )
137133 scheduler .start ()
138- async_scheduler .start ()
139134 async with AsyncSessionLocal () as session :
140135 job_list = await JobDao .get_job_list_for_scheduler (session )
141136 for item in job_list :
142137 cls .remove_scheduler_job (job_id = str (item .job_id ))
143138 cls .add_scheduler_job (item )
144139 scheduler .add_listener (cls .scheduler_event_listener , EVENT_ALL )
145- async_scheduler .add_listener (cls .scheduler_event_listener , EVENT_ALL )
146140 logger .info ('系统初始定时任务加载成功' )
147141
148142 @classmethod
@@ -153,7 +147,6 @@ async def close_system_scheduler(cls):
153147 :return:
154148 """
155149 scheduler .shutdown ()
156- async_scheduler .shutdown ()
157150 logger .info ('关闭定时任务成功' )
158151
159152 @classmethod
@@ -164,7 +157,7 @@ def get_scheduler_job(cls, job_id: Union[str, int]):
164157 :param job_id: 任务id
165158 :return: 任务对象
166159 """
167- query_job = scheduler .get_job (job_id = str (job_id )) or async_scheduler . get_job ( job_id = str ( job_id ))
160+ query_job = scheduler .get_job (job_id = str (job_id ))
168161
169162 return query_job
170163
@@ -177,8 +170,11 @@ def add_scheduler_job(cls, job_info: JobModel):
177170 :return:
178171 """
179172 job_func = eval (job_info .invoke_target )
180- job_param = dict (
181- func = job_func ,
173+ job_executor = job_info .job_executor
174+ if iscoroutinefunction (job_func ):
175+ job_executor = 'default'
176+ scheduler .add_job (
177+ func = eval (job_info .invoke_target ),
182178 trigger = MyCronTrigger .from_crontab (job_info .cron_expression ),
183179 args = job_info .job_args .split (',' ) if job_info .job_args else None ,
184180 kwargs = json .loads (job_info .job_kwargs ) if job_info .job_kwargs else None ,
@@ -188,11 +184,8 @@ def add_scheduler_job(cls, job_info: JobModel):
188184 coalesce = True if job_info .misfire_policy == '2' else False ,
189185 max_instances = 3 if job_info .concurrent == '0' else 1 ,
190186 jobstore = job_info .job_group ,
187+ executor = job_executor ,
191188 )
192- if iscoroutinefunction (job_func ):
193- async_scheduler .add_job (** job_param )
194- else :
195- scheduler .add_job (executor = job_info .job_executor , ** job_param )
196189
197190 @classmethod
198191 def execute_scheduler_job_once (cls , job_info : JobModel ):
@@ -203,8 +196,11 @@ def execute_scheduler_job_once(cls, job_info: JobModel):
203196 :return:
204197 """
205198 job_func = eval (job_info .invoke_target )
206- job_param = dict (
207- func = job_func ,
199+ job_executor = job_info .job_executor
200+ if iscoroutinefunction (job_func ):
201+ job_executor = 'default'
202+ scheduler .add_job (
203+ func = eval (job_info .invoke_target ),
208204 trigger = 'date' ,
209205 run_date = datetime .now () + timedelta (seconds = 1 ),
210206 args = job_info .job_args .split (',' ) if job_info .job_args else None ,
@@ -215,11 +211,8 @@ def execute_scheduler_job_once(cls, job_info: JobModel):
215211 coalesce = True if job_info .misfire_policy == '2' else False ,
216212 max_instances = 3 if job_info .concurrent == '0' else 1 ,
217213 jobstore = job_info .job_group ,
214+ executor = job_executor ,
218215 )
219- if iscoroutinefunction (job_func ):
220- async_scheduler .add_job (** job_param )
221- else :
222- scheduler .add_job (executor = job_info .job_executor , ** job_param )
223216
224217 @classmethod
225218 def remove_scheduler_job (cls , job_id : Union [str , int ]):
@@ -231,12 +224,7 @@ def remove_scheduler_job(cls, job_id: Union[str, int]):
231224 """
232225 query_job = cls .get_scheduler_job (job_id = job_id )
233226 if query_job :
234- query_job_info = query_job .__getstate__ ()
235- job_func = eval (query_job_info .get ('func' ).replace (':' , '.' ))
236- if iscoroutinefunction (job_func ):
237- async_scheduler .remove_job (job_id = str (job_id ))
238- else :
239- scheduler .remove_job (job_id = str (job_id ))
227+ scheduler .remove_job (job_id = str (job_id ))
240228
241229 @classmethod
242230 def scheduler_event_listener (cls , event ):
0 commit comments