|
| 1 | +import json |
1 | 2 | from apscheduler.schedulers.background import BackgroundScheduler |
2 | 3 | from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore |
3 | 4 | from apscheduler.jobstores.memory import MemoryJobStore |
4 | 5 | from apscheduler.jobstores.redis import RedisJobStore |
5 | 6 | from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor |
6 | 7 | from apscheduler.triggers.cron import CronTrigger |
7 | 8 | from apscheduler.events import EVENT_ALL |
8 | | -import json |
| 9 | +from sqlalchemy.engine import create_engine |
| 10 | +from sqlalchemy.orm import sessionmaker |
9 | 11 | from datetime import datetime, timedelta |
10 | | -from config.database import engine, SQLALCHEMY_DATABASE_URL, SessionLocal |
11 | | -from config.env import RedisConfig |
| 12 | +from config.database import quote_plus, AsyncSessionLocal |
| 13 | +from config.env import DataBaseConfig, RedisConfig |
12 | 14 | from module_admin.service.job_log_service import JobLogService, JobLogModel |
13 | | -from module_admin.dao.job_dao import Session, JobDao |
| 15 | +from module_admin.dao.job_dao import JobDao |
14 | 16 | from utils.log_util import logger |
15 | 17 | import module_task |
16 | 18 |
|
@@ -65,6 +67,17 @@ def __find_recent_workday(cls, day): |
65 | 67 | diff += 1 |
66 | 68 |
|
67 | 69 |
|
| 70 | +SQLALCHEMY_DATABASE_URL = f"mysql+pymysql://{DataBaseConfig.db_username}:{quote_plus(DataBaseConfig.db_password)}@" \ |
| 71 | + f"{DataBaseConfig.db_host}:{DataBaseConfig.db_port}/{DataBaseConfig.db_database}" |
| 72 | +engine = create_engine( |
| 73 | + SQLALCHEMY_DATABASE_URL, |
| 74 | + echo=DataBaseConfig.db_echo, |
| 75 | + max_overflow=DataBaseConfig.db_max_overflow, |
| 76 | + pool_size=DataBaseConfig.db_pool_size, |
| 77 | + pool_recycle=DataBaseConfig.db_pool_recycle, |
| 78 | + pool_timeout=DataBaseConfig.db_pool_timeout |
| 79 | +) |
| 80 | +SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) |
68 | 81 | job_stores = { |
69 | 82 | 'default': MemoryJobStore(), |
70 | 83 | 'sqlalchemy': SQLAlchemyJobStore(url=SQLALCHEMY_DATABASE_URL, engine=engine), |
@@ -96,20 +109,20 @@ class SchedulerUtil: |
96 | 109 | """ |
97 | 110 |
|
98 | 111 | @classmethod |
99 | | - async def init_system_scheduler(cls, query_db: Session = SessionLocal()): |
| 112 | + async def init_system_scheduler(cls): |
100 | 113 | """ |
101 | 114 | 应用启动时初始化定时任务 |
102 | 115 | :return: |
103 | 116 | """ |
104 | 117 | logger.info("开始启动定时任务...") |
105 | 118 | scheduler.start() |
106 | | - job_list = JobDao.get_job_list_for_scheduler(query_db) |
107 | | - for item in job_list: |
108 | | - query_job = cls.get_scheduler_job(job_id=str(item.job_id)) |
109 | | - if query_job: |
110 | | - cls.remove_scheduler_job(job_id=str(item.job_id)) |
111 | | - cls.add_scheduler_job(item) |
112 | | - query_db.close() |
| 119 | + async with AsyncSessionLocal() as session: |
| 120 | + job_list = await JobDao.get_job_list_for_scheduler(session) |
| 121 | + for item in job_list: |
| 122 | + query_job = cls.get_scheduler_job(job_id=str(item.job_id)) |
| 123 | + if query_job: |
| 124 | + cls.remove_scheduler_job(job_id=str(item.job_id)) |
| 125 | + cls.add_scheduler_job(item) |
113 | 126 | scheduler.add_listener(cls.scheduler_event_listener, EVENT_ALL) |
114 | 127 | logger.info("系统初始定时任务加载成功") |
115 | 128 |
|
@@ -225,7 +238,8 @@ def scheduler_event_listener(cls, event): |
225 | 238 | jobTrigger=job_trigger, |
226 | 239 | jobMessage=job_message, |
227 | 240 | status=status, |
228 | | - exceptionInfo=exception_info |
| 241 | + exceptionInfo=exception_info, |
| 242 | + createTime=datetime.now() |
229 | 243 | ) |
230 | 244 | session = SessionLocal() |
231 | 245 | JobLogService.add_job_log_services(session, job_log) |
|
0 commit comments