Skip to content

Commit 56ec18b

Browse files
committed
Fix some bugs
1 parent c8ebc43 commit 56ec18b

File tree

11 files changed

+74
-61
lines changed

11 files changed

+74
-61
lines changed

backend/app/task/api/v1/scheduler.py

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,15 @@
1717

1818

1919
@router.get('/all', summary='获取所有任务调度', dependencies=[DependsJwtAuth])
20-
async def get_all_task_schedulers() -> ResponseModel:
20+
async def get_all_task_schedulers() -> ResponseSchemaModel[list[GetTaskSchedulerDetail]]:
2121
schedulers = await task_scheduler_service.get_all()
2222
return response_base.success(data=schedulers)
2323

2424

2525
@router.get('/{pk}', summary='获取任务调度详情', dependencies=[DependsJwtAuth])
26-
async def get_task_scheduler(pk: Annotated[int, Path(description='任务调度 ID')]):
26+
async def get_task_scheduler(
27+
pk: Annotated[int, Path(description='任务调度 ID')],
28+
) -> ResponseSchemaModel[GetTaskSchedulerDetail]:
2729
task_scheduler = await task_scheduler_service.get(pk=pk)
2830
return response_base.success(data=task_scheduler)
2931

@@ -54,7 +56,7 @@ async def get_task_scheduler_paged(
5456
DependsRBAC,
5557
],
5658
)
57-
async def create_task_scheduler(obj: CreateTaskSchedulerParam):
59+
async def create_task_scheduler(obj: CreateTaskSchedulerParam) -> ResponseModel:
5860
await task_scheduler_service.create(obj=obj)
5961
return response_base.success()
6062

@@ -67,7 +69,9 @@ async def create_task_scheduler(obj: CreateTaskSchedulerParam):
6769
DependsRBAC,
6870
],
6971
)
70-
async def update_task_scheduler(pk: Annotated[int, Path(description='任务调度 ID')], obj: UpdateTaskSchedulerParam):
72+
async def update_task_scheduler(
73+
pk: Annotated[int, Path(description='任务调度 ID')], obj: UpdateTaskSchedulerParam
74+
) -> ResponseModel:
7175
count = await task_scheduler_service.update(pk=pk, obj=obj)
7276
if count > 0:
7377
return response_base.success()
@@ -82,7 +86,7 @@ async def update_task_scheduler(pk: Annotated[int, Path(description='任务调
8286
DependsRBAC,
8387
],
8488
)
85-
async def update_task_scheduler_status(pk: Annotated[int, Path(description='任务调度 ID')]):
89+
async def update_task_scheduler_status(pk: Annotated[int, Path(description='任务调度 ID')]) -> ResponseModel:
8690
count = await task_scheduler_service.update_status(pk=pk)
8791
if count > 0:
8892
return response_base.success()
@@ -97,7 +101,7 @@ async def update_task_scheduler_status(pk: Annotated[int, Path(description='任
97101
DependsRBAC,
98102
],
99103
)
100-
async def delete_task_scheduler(pk: Annotated[int, Path(description='任务调度 ID')]):
104+
async def delete_task_scheduler(pk: Annotated[int, Path(description='任务调度 ID')]) -> ResponseModel:
101105
count = await task_scheduler_service.delete(pk=pk)
102106
if count > 0:
103107
return response_base.success()
@@ -112,7 +116,7 @@ async def delete_task_scheduler(pk: Annotated[int, Path(description='任务调
112116
DependsRBAC,
113117
],
114118
)
115-
async def execute_task(pk: Annotated[int, Path(description='任务调度 ID')]) -> ResponseSchemaModel[str]:
119+
async def execute_task(pk: Annotated[int, Path(description='任务调度 ID')]) -> ResponseModel:
116120
await task_scheduler_service.execute(pk=pk)
117121
return response_base.success()
118122

backend/app/task/celery.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,15 +43,13 @@ def init_celery() -> celery.Celery:
4343
'group': OVERWRITE_CELERY_RESULT_GROUP_TABLE_NAME,
4444
},
4545
result_extended=True,
46-
# result_expires=0, # 任务结果自动清理
46+
# result_expires=0, # 任务结果自动清理,0 或 None 表示不清理
4747
beat_schedule=LOCAL_BEAT_SCHEDULE,
4848
beat_scheduler='app.task.utils.schedulers:DatabaseScheduler',
4949
task_cls='app.task.tasks.base:TaskBase',
5050
task_track_started=True,
5151
enable_utc=False,
5252
timezone=settings.DATETIME_TIMEZONE,
53-
# TODO: Update this work if celery version >= 6.0.0
54-
worker_pool=celery_aio_pool.pool.AsyncIOPool,
5553
)
5654

5755
# 自动发现任务

backend/app/task/crud/crud_scheduler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ async def set_status(self, db: AsyncSession, pk: int, status: bool) -> int:
8989
:param status: 状态
9090
:return:
9191
"""
92-
return await self.update_model(db, pk, {'status': status})
92+
return await self.update_model(db, pk, {'enabled': status})
9393

9494
async def delete(self, db: AsyncSession, pk: int) -> int:
9595
"""

backend/app/task/model/result.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,5 @@
55
OVERWRITE_CELERY_RESULT_TABLE_NAME = 'task_result'
66
OVERWRITE_CELERY_RESULT_GROUP_TABLE_NAME = 'task_group_result'
77

8+
# 重写表名配置
89
TaskResult.configure(name=OVERWRITE_CELERY_RESULT_TABLE_NAME)

backend/app/task/model/scheduler.py

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,10 @@
99
Boolean,
1010
DateTime,
1111
String,
12+
event,
1213
)
1314
from sqlalchemy.dialects.mysql import LONGTEXT
1415
from sqlalchemy.dialects.postgresql import INTEGER, TEXT
15-
from sqlalchemy.event import listen
1616
from sqlalchemy.orm import Mapped, mapped_column
1717

1818
from backend.common.exception import errors
@@ -66,26 +66,28 @@ class TaskScheduler(Base):
6666

6767
@staticmethod
6868
def before_insert_or_update(mapper, connection, target):
69-
"""插入或更新前的验证"""
69+
print('before_insert_or_update', mapper, connection, target)
7070
if target.expire_seconds is not None and target.expire_time:
7171
raise errors.ConflictError(msg='expires 和 expire_seconds 只能设置一个')
7272

7373
@classmethod
7474
def changed(cls, mapper, connection, target):
75+
print('changed', mapper, connection, target)
7576
if not target.no_changes:
7677
cls.update_changed(mapper, connection, target)
7778

7879
@classmethod
7980
def update_changed(cls, mapper, connection, target):
81+
print('update_changed', mapper, connection, target)
8082
now = timezone.now()
81-
last_update = asyncio.create_task(redis_client.get(f'{settings.CELERY_REDIS_PREFIX}last_update'))
83+
last_update = asyncio.create_task(redis_client.get(f'{settings.CELERY_REDIS_PREFIX}:last_update'))
8284
if not last_update:
83-
asyncio.create_task(redis_client.set(f'{settings.CELERY_REDIS_PREFIX}last_update', timezone.to_str(now)))
85+
asyncio.create_task(redis_client.set(f'{settings.CELERY_REDIS_PREFIX}:last_update', timezone.to_str(now)))
8486

8587

8688
# 事件监听器
87-
listen(TaskScheduler, 'before_insert', TaskScheduler.before_insert_or_update)
88-
listen(TaskScheduler, 'before_update', TaskScheduler.before_insert_or_update)
89-
listen(TaskScheduler, 'after_insert', TaskScheduler.update_changed)
90-
listen(TaskScheduler, 'after_delete', TaskScheduler.update_changed)
91-
listen(TaskScheduler, 'after_update', TaskScheduler.changed)
89+
event.listen(TaskScheduler, 'before_insert', TaskScheduler.before_insert_or_update)
90+
event.listen(TaskScheduler, 'before_update', TaskScheduler.before_insert_or_update)
91+
event.listen(TaskScheduler, 'after_insert', TaskScheduler.update_changed)
92+
event.listen(TaskScheduler, 'after_delete', TaskScheduler.update_changed)
93+
event.listen(TaskScheduler, 'after_update', TaskScheduler.changed)

backend/app/task/schema/scheduler.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from datetime import datetime
44

55
from pydantic import ConfigDict, Field
6+
from pydantic.types import JsonValue
67

78
from backend.app.task.enums import PeriodType, TaskSchedulerType
89
from backend.common.schema import SchemaBase
@@ -13,8 +14,8 @@ class TaskSchedulerSchemeBase(SchemaBase):
1314

1415
name: str = Field(description='任务名称')
1516
task: str = Field(description='要运行的 Celery 任务(模块化字符串)')
16-
args: str | None = Field(default='[]', description='任务可接收的位置参数')
17-
kwargs: str | None = Field(default='{}', description='任务可接收的关键字参数')
17+
args: JsonValue | None = Field(default='[]', description='任务可接收的位置参数')
18+
kwargs: JsonValue | None = Field(default='{}', description='任务可接收的关键字参数')
1819
queue: str | None = Field(default=None, description='CELERY_TASK_QUEUES 中定义的队列')
1920
exchange: str | None = Field(default=None, description='低级别 AMQP 路由的交换机')
2021
routing_key: str | None = Field(default=None, description='低级别 AMQP 路由的路由密钥')
@@ -24,7 +25,7 @@ class TaskSchedulerSchemeBase(SchemaBase):
2425
last_run_time: datetime | None = Field(default=None, description='任务最后触发的时间')
2526
type: TaskSchedulerType = Field(default=TaskSchedulerType.INTERVAL, description='任务调度类型(0间隔 1定时)')
2627
interval_every: int | None = Field(default=None, description='任务再次运行前的间隔周期数')
27-
interval_period: PeriodType = Field(default=None, description='任务运行之间的周期类型')
28+
interval_period: PeriodType | None = Field(default=None, description='任务运行之间的周期类型')
2829
crontab_minute: str | None = Field(default='*', description='运行的分钟,"*" 表示全部')
2930
crontab_hour: str | None = Field(default='*', description='运行的小时,"*" 表示全部')
3031
crontab_day_of_week: str | None = Field(default='*', description='运行的星期,"*" 表示全部')
@@ -47,3 +48,7 @@ class GetTaskSchedulerDetail(TaskSchedulerSchemeBase):
4748
"""任务调度详情"""
4849

4950
model_config = ConfigDict(from_attributes=True)
51+
52+
id: int = Field(description='任务调度 ID')
53+
created_time: datetime = Field(description='创建时间')
54+
updated_time: datetime | None = Field(None, description='更新时间')

backend/app/task/service/scheduler_service.py

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
#!/usr/bin/env python3
22
# -*- coding: utf-8 -*-
3+
import json
4+
35
from typing import Sequence
46

57
from sqlalchemy import Select
@@ -125,19 +127,6 @@ async def delete(*, pk) -> int:
125127
count = await task_scheduler_dao.delete(db, pk)
126128
return count
127129

128-
@staticmethod
129-
async def revoke(*, task_id: str) -> None:
130-
"""
131-
撤销指定的任务
132-
133-
:param task_id: 任务 UUID
134-
:return:
135-
"""
136-
workers = await run_in_threadpool(celery_app.control.ping())
137-
if not workers:
138-
raise errors.ServerError(msg='Celery Worker 未启动,请联系超级管理员')
139-
celery_app.control.revoke(task_id, terminate=True)
140-
141130
@staticmethod
142131
async def execute(*, pk: int) -> None:
143132
"""
@@ -147,17 +136,30 @@ async def execute(*, pk: int) -> None:
147136
:return:
148137
"""
149138
async with async_db_session() as db:
150-
workers = await run_in_threadpool(celery_app.control.ping())
139+
workers = await run_in_threadpool(celery_app.control.ping, timeout=0.5)
151140
if not workers:
152-
raise errors.ServerError(msg='Celery Worker 未启动,请联系超级管理员')
141+
raise errors.ServerError(msg='Celery Worker 暂不可用,请稍后重试')
153142
task_scheduler = await task_scheduler_dao.get(db, pk)
154143
if not task_scheduler:
155144
raise errors.NotFoundError(msg='任务调度不存在')
156145
celery_app.send_task(
157146
name=task_scheduler.task,
158-
args=task_scheduler.args,
159-
kwargs=task_scheduler.kwargs,
147+
args=json.loads(task_scheduler.args),
148+
kwargs=json.loads(task_scheduler.kwargs),
160149
)
161150

151+
@staticmethod
152+
async def revoke(*, task_id: str) -> None:
153+
"""
154+
撤销指定的任务
155+
156+
:param task_id: 任务 UUID
157+
:return:
158+
"""
159+
workers = await run_in_threadpool(celery_app.control.ping, timeout=0.5)
160+
if not workers:
161+
raise errors.ServerError(msg='Celery Worker 暂不可用,请稍后重试')
162+
celery_app.control.revoke(task_id)
163+
162164

163165
task_scheduler_service: TaskSchedulerService = TaskSchedulerService()

backend/app/task/utils/schedulers.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -309,14 +309,15 @@ async def get_all_task_schedules(self):
309309
def schedule_changed(self) -> bool:
310310
"""任务调度变更状态"""
311311
now = timezone.now()
312-
last_update = run_await(redis_client.get)(f'{settings.CELERY_REDIS_PREFIX}last_update')
312+
last_update = run_await(redis_client.get)(f'{settings.CELERY_REDIS_PREFIX}:last_update')
313313
if not last_update:
314-
run_await(redis_client.set)(f'{settings.CELERY_REDIS_PREFIX}last_update', timezone.to_str(now))
314+
run_await(redis_client.set)(f'{settings.CELERY_REDIS_PREFIX}:last_update', timezone.to_str(now))
315315
return False
316+
317+
last, ts = self._last_update, timezone.from_str(last_update)
316318
try:
317-
if last_update and self._last_update:
318-
if timezone.from_str(last_update) > self._last_update:
319-
return True
319+
if ts and ts > (last if last else ts):
320+
return True
320321
finally:
321322
self._last_update = now
322323

@@ -337,10 +338,10 @@ def sync(self):
337338
try:
338339
tasks = self.schedule
339340
run_await(tasks[name].save)()
340-
logger.debug(f'保存任务 {name} 状态到数据库')
341+
logger.debug(f'保存任务 {name} 最新状态到数据库')
341342
_tried.add(name)
342343
except KeyError as e:
343-
logger.error(f'保存任务 {name} 状态失败{e} ')
344+
logger.error(f'保存任务 {name} 最新状态失败{e} ')
344345
_failed.add(name)
345346
except DatabaseError as e:
346347
logger.exception('同步时出现数据库错误: %r', e)

backend/app/task/utils/tzcrontab.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,4 +83,4 @@ def crontab_verify(filed: Literal['m', 'h', 'dow', 'dom', 'moy'], value: str, ra
8383
if raise_exc:
8484
raise errors.RequestError(msg=f'crontab 值 {value} 非法')
8585

86-
raise valid
86+
return valid

backend/core/conf.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,7 @@ class Settings(BaseSettings):
209209

210210
# 基础配置
211211
CELERY_BROKER: Literal['rabbitmq', 'redis'] = 'redis'
212-
CELERY_REDIS_PREFIX: str = 'fba:celery:'
212+
CELERY_REDIS_PREFIX: str = 'fba:celery'
213213
CELERY_TASK_MAX_RETRIES: int = 5
214214

215215
# Plugin Code Generator

0 commit comments

Comments
 (0)