Skip to content

Commit 7d5de92

Browse files
committed
Fix update and delete event
1 parent 2ca0a45 commit 7d5de92

File tree

5 files changed

+35
-28
lines changed

5 files changed

+35
-28
lines changed

backend/app/task/crud/crud_scheduler.py

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@
1313
class CRUDTaskScheduler(CRUDPlus[TaskScheduler]):
1414
"""任务调度数据库操作类"""
1515

16-
async def get(self, db: AsyncSession, pk: int) -> TaskScheduler | None:
16+
@staticmethod
17+
async def get(db: AsyncSession, pk: int) -> TaskScheduler | None:
1718
"""
1819
获取任务调度
1920
@@ -67,7 +68,8 @@ async def create(self, db: AsyncSession, obj: CreateTaskSchedulerParam) -> None:
6768
:param obj: 创建任务调度参数
6869
:return:
6970
"""
70-
await self.create_model(db, obj)
71+
await self.create_model(db, obj, flush=True)
72+
TaskScheduler.no_changes = False
7173

7274
async def update(self, db: AsyncSession, pk: int, obj: UpdateTaskSchedulerParam) -> int:
7375
"""
@@ -78,7 +80,11 @@ async def update(self, db: AsyncSession, pk: int, obj: UpdateTaskSchedulerParam)
7880
:param obj: 更新任务调度参数
7981
:return:
8082
"""
81-
return await self.update_model(db, pk, obj)
83+
task_scheduler = await self.get(db, pk)
84+
for key, value in obj.model_dump(exclude_unset=True).items():
85+
setattr(task_scheduler, key, value)
86+
TaskScheduler.no_changes = False
87+
return 1
8288

8389
async def set_status(self, db: AsyncSession, pk: int, status: bool) -> int:
8490
"""
@@ -89,7 +95,10 @@ async def set_status(self, db: AsyncSession, pk: int, status: bool) -> int:
8995
:param status: 状态
9096
:return:
9197
"""
92-
return await self.update_model(db, pk, {'enabled': status})
98+
task_scheduler = await self.get(db, pk)
99+
setattr(task_scheduler, 'enabled', status)
100+
TaskScheduler.no_changes = False
101+
return 1
93102

94103
async def delete(self, db: AsyncSession, pk: int) -> int:
95104
"""
@@ -99,7 +108,10 @@ async def delete(self, db: AsyncSession, pk: int) -> int:
99108
:param pk: 任务调度 ID
100109
:return:
101110
"""
102-
return await self.delete_model(db, pk)
111+
task_scheduler = await self.get(db, pk)
112+
await db.delete(task_scheduler)
113+
TaskScheduler.no_changes = False
114+
return 1
103115

104116

105117
task_scheduler_dao: CRUDTaskScheduler = CRUDTaskScheduler(TaskScheduler)

backend/app/task/model/scheduler.py

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ class TaskScheduler(Base):
3838
start_time: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), comment='任务开始触发的时间')
3939
expire_time: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), comment='任务不再触发的截止时间')
4040
expire_seconds: Mapped[int | None] = mapped_column(comment='任务不再触发的秒数时间差')
41-
last_run_time: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), comment='任务最后触发的时间')
4241
type: Mapped[int] = mapped_column(comment='调度类型(0间隔 1定时)')
4342
interval_every: Mapped[int | None] = mapped_column(comment='任务再次运行前的间隔周期数')
4443
interval_period: Mapped[str | None] = mapped_column(String(255), comment='任务运行之间的周期类型')
@@ -58,6 +57,9 @@ class TaskScheduler(Base):
5857
Boolean().with_variant(INTEGER, 'postgresql'), default=True, comment='是否启用任务'
5958
)
6059
total_run_count: Mapped[int] = mapped_column(default=0, comment='任务触发的总次数')
60+
last_run_time: Mapped[datetime | None] = mapped_column(
61+
DateTime(timezone=True), default=None, comment='任务最后触发的时间'
62+
)
6163
remark: Mapped[str | None] = mapped_column(
6264
LONGTEXT().with_variant(TEXT, 'postgresql'), default=None, comment='备注'
6365
)
@@ -66,23 +68,22 @@ class TaskScheduler(Base):
6668

6769
@staticmethod
6870
def before_insert_or_update(mapper, connection, target):
69-
print('before_insert_or_update', mapper, connection, target)
7071
if target.expire_seconds is not None and target.expire_time:
7172
raise errors.ConflictError(msg='expires 和 expire_seconds 只能设置一个')
7273

7374
@classmethod
7475
def changed(cls, mapper, connection, target):
75-
print('changed', mapper, connection, target)
7676
if not target.no_changes:
7777
cls.update_changed(mapper, connection, target)
7878

7979
@classmethod
80-
def update_changed(cls, mapper, connection, target):
81-
print('update_changed', mapper, connection, target)
80+
async def update_changed_async(cls):
8281
now = timezone.now()
83-
last_update = asyncio.create_task(redis_client.get(f'{settings.CELERY_REDIS_PREFIX}:last_update'))
84-
if not last_update:
85-
asyncio.create_task(redis_client.set(f'{settings.CELERY_REDIS_PREFIX}:last_update', timezone.to_str(now)))
82+
await redis_client.set(f'{settings.CELERY_REDIS_PREFIX}:last_update', timezone.to_str(now))
83+
84+
@classmethod
85+
def update_changed(cls, mapper, connection, target):
86+
asyncio.create_task(cls.update_changed_async())
8687

8788

8889
# 事件监听器

backend/app/task/schema/scheduler.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ class TaskSchedulerSchemeBase(SchemaBase):
2222
start_time: datetime | None = Field(default=None, description='任务开始触发的时间')
2323
expire_time: datetime | None = Field(default=None, description='任务不再触发的截止时间')
2424
expire_seconds: int | None = Field(default=None, description='任务不再触发的秒数时间差')
25-
last_run_time: datetime | None = Field(default=None, description='任务最后触发的时间')
2625
type: TaskSchedulerType = Field(default=TaskSchedulerType.INTERVAL, description='任务调度类型(0间隔 1定时)')
2726
interval_every: int | None = Field(default=None, description='任务再次运行前的间隔周期数')
2827
interval_period: PeriodType | None = Field(default=None, description='任务运行之间的周期类型')
@@ -32,7 +31,6 @@ class TaskSchedulerSchemeBase(SchemaBase):
3231
crontab_day_of_month: str | None = Field(default='*', description='运行的每月日期,"*" 表示全部')
3332
crontab_month_of_year: str | None = Field(default='*', description='运行的月份,"*" 表示全部')
3433
one_off: bool = Field(default=False, description='是否仅运行一次')
35-
total_run_count: int = Field(default=0, description='任务触发的总次数')
3634
remark: str | None = Field(default=None, description='备注')
3735

3836

backend/app/task/utils/schedulers.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
from celery import current_app, schedules
1111
from celery.beat import ScheduleEntry, Scheduler
1212
from celery.utils.log import get_logger
13-
from sqlalchemy import select, update
13+
from sqlalchemy import select
1414
from sqlalchemy.exc import DatabaseError, InterfaceError
1515

1616
from backend.app.task.enums import PeriodType, TaskSchedulerType
@@ -163,8 +163,8 @@ async def from_entry(cls, name, app=None, **entry):
163163
task = TaskScheduler(**temp)
164164
db.add(task)
165165
else:
166-
stmt = update(TaskScheduler).where(TaskScheduler.name == name).values(**temp)
167-
await db.execute(stmt)
166+
for key, value in temp.items():
167+
setattr(task, key, value)
168168
res = cls(task, app=app)
169169
return res
170170

@@ -228,7 +228,7 @@ async def _unpack_fields(
228228
kwargs: dict | None = None,
229229
options: dict = None,
230230
**entry,
231-
):
231+
) -> dict:
232232
model_schedule = await cls.to_model_schedule(name, task, schedule)
233233
model_dict = select_as_dict(model_schedule)
234234
for k in ['id', 'created_time', 'updated_time']:
@@ -254,7 +254,7 @@ def _unpack_options(
254254
expires: datetime = None,
255255
expire_seconds: int = None,
256256
one_off: bool = False,
257-
):
257+
) -> dict:
258258
data = {
259259
'queue': queue,
260260
'exchange': exchange,
@@ -294,7 +294,7 @@ def setup_schedule(self):
294294
self.install_default_entries(tasks)
295295
self.update_from_dict(self.app.conf.beat_schedule)
296296

297-
async def get_all_task_schedules(self):
297+
async def get_all_task_schedulers(self):
298298
"""获取所有任务调度"""
299299
async with async_db_session() as db:
300300
logger.debug('DatabaseScheduler: Fetching database schedule')
@@ -402,7 +402,7 @@ def schedule(self) -> dict[str, ModelEntry]:
402402
if update:
403403
logger.debug('beat: Synchronizing schedule...')
404404
self.sync()
405-
self._schedule = run_await(self.get_all_task_schedules)()
405+
self._schedule = run_await(self.get_all_task_schedulers)()
406406
# 计划已更改,使 Scheduler.tick 中的堆无效
407407
if not initial:
408408
self._heap = []

backend/cli.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
#!/usr/bin/env python3
22
# -*- coding: utf-8 -*-
33
import asyncio
4-
import os
54

65
from dataclasses import dataclass
76
from typing import Annotated
@@ -12,12 +11,12 @@
1211
from rich.panel import Panel
1312
from rich.text import Text
1413
from sqlalchemy import text
14+
from watchfiles import PythonFilter
1515

1616
from backend import console, get_version
1717
from backend.common.enums import DataBaseType, PrimaryKeyType
1818
from backend.common.exception.errors import BaseExceptionMixin
1919
from backend.core.conf import settings
20-
from backend.core.path_conf import BASE_PATH
2120
from backend.database.db import async_db_session
2221
from backend.plugin.tools import get_plugin_sql
2322
from backend.utils.file_ops import install_git_plugin, install_zip_plugin, parse_sql_script
@@ -45,10 +44,7 @@ def run(host: str, port: int, reload: bool, workers: int | None) -> None:
4544
address=host,
4645
port=port,
4746
reload=not reload,
48-
reload_ignore_paths=[
49-
os.path.join(BASE_PATH.parent / '.venv'),
50-
os.path.join(BASE_PATH / 'log'),
51-
],
47+
reload_filter=PythonFilter(),
5248
workers=workers or 1,
5349
).serve()
5450

0 commit comments

Comments
 (0)