Skip to content

Commit 3658167

Browse files
committed
Refactor task routes and add control routes
1 parent 0dd745b commit 3658167

File tree

6 files changed

+66
-29
lines changed

6 files changed

+66
-29
lines changed

backend/app/task/api/router.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,13 @@
22
# -*- coding: utf-8 -*-
33
from fastapi import APIRouter
44

5+
from backend.app.task.api.v1.control import router as task_control_router
56
from backend.app.task.api.v1.result import router as task_result_router
67
from backend.app.task.api.v1.scheduler import router as task_scheduler_router
78
from backend.core.conf import settings
89

9-
v1 = APIRouter(prefix=f'{settings.FASTAPI_API_V1_PATH}/task', tags=['任务'])
10+
v1 = APIRouter(prefix=f'{settings.FASTAPI_API_V1_PATH}/tasks', tags=['任务'])
1011

12+
v1.include_router(task_control_router)
1113
v1.include_router(task_result_router, prefix='/results')
1214
v1.include_router(task_scheduler_router, prefix='/schedulers')

backend/app/task/api/v1/control.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
#!/usr/bin/env python3
2+
# -*- coding: utf-8 -*-
3+
from typing import Annotated
4+
5+
from fastapi import APIRouter, Depends, Path
6+
from starlette.concurrency import run_in_threadpool
7+
8+
from backend.app.task import celery_app
9+
from backend.app.task.schema.control import TaskRegisteredDetail
10+
from backend.common.exception import errors
11+
from backend.common.response.response_schema import ResponseModel, ResponseSchemaModel, response_base
12+
from backend.common.security.jwt import DependsJwtAuth
13+
from backend.common.security.permission import RequestPermission
14+
from backend.common.security.rbac import DependsRBAC
15+
16+
router = APIRouter()
17+
18+
19+
@router.get('/registered', summary='获取已注册的任务', dependencies=[DependsJwtAuth])
20+
async def get_task_registered() -> ResponseSchemaModel[list[TaskRegisteredDetail]]:
21+
inspector = celery_app.control.inspect(timeout=0.5)
22+
registered = await run_in_threadpool(inspector.registered)
23+
if not registered:
24+
raise errors.ServerError(msg='Celery Worker 暂不可用,请稍后重试')
25+
task_registered = []
26+
celery_app_tasks = celery_app.tasks
27+
for _, tasks in registered.items():
28+
for task in tasks:
29+
task_ins = celery_app_tasks.get(task)
30+
if task_ins:
31+
task_doc = task_ins.__doc__
32+
task_registered.append({'name': task_doc or task_ins, 'task': task_ins})
33+
else:
34+
task_registered.append({'name': task, 'task': task})
35+
return response_base.success(data=task_registered)
36+
37+
38+
@router.delete(
39+
'/{task_id}/cancel',
40+
summary='撤销任务',
41+
dependencies=[
42+
Depends(RequestPermission('sys:task:revoke')),
43+
DependsRBAC,
44+
],
45+
)
46+
async def revoke_task(task_id: Annotated[str, Path(description='任务 UUID')]) -> ResponseModel:
47+
workers = await run_in_threadpool(celery_app.control.ping, timeout=0.5)
48+
if not workers:
49+
raise errors.ServerError(msg='Celery Worker 暂不可用,请稍后重试')
50+
celery_app.control.revoke(task_id)
51+
return response_base.success()

backend/app/task/api/v1/scheduler.py

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -119,16 +119,3 @@ async def delete_task_scheduler(pk: Annotated[int, Path(description='任务调
119119
async def execute_task(pk: Annotated[int, Path(description='任务调度 ID')]) -> ResponseModel:
120120
await task_scheduler_service.execute(pk=pk)
121121
return response_base.success()
122-
123-
124-
@router.delete(
125-
'/{task_id}/cancel',
126-
summary='撤销任务',
127-
dependencies=[
128-
Depends(RequestPermission('sys:task:revoke')),
129-
DependsRBAC,
130-
],
131-
)
132-
async def revoke_task(task_id: Annotated[str, Path(description='任务 UUID')]) -> ResponseModel:
133-
await task_scheduler_service.revoke(task_id=task_id)
134-
return response_base.success()

backend/app/task/celery.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@
1313

1414
def find_task_packages():
1515
packages = []
16-
for root, dirs, files in os.walk(os.path.join(BASE_PATH, 'app', 'task', 'tasks')):
16+
task_dir = os.path.join(BASE_PATH, 'app', 'task', 'tasks')
17+
for root, dirs, files in os.walk(task_dir):
1718
if 'tasks.py' in files:
1819
package = root.replace(str(BASE_PATH.parent) + os.path.sep, '').replace(os.path.sep, '.')
1920
packages.append(package)
@@ -54,7 +55,8 @@ def init_celery() -> celery.Celery:
5455
)
5556

5657
# 自动发现任务
57-
app.autodiscover_tasks(find_task_packages())
58+
packages = find_task_packages()
59+
app.autodiscover_tasks(packages)
5860

5961
return app
6062

backend/app/task/schema/control.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
#!/usr/bin/env python3
2+
# -*- coding: utf-8 -*-
3+
from backend.common.schema import SchemaBase
4+
5+
6+
class TaskRegisteredDetail(SchemaBase):
7+
name: str
8+
task: str

backend/app/task/service/scheduler_service.py

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -142,18 +142,5 @@ async def execute(*, pk: int) -> None:
142142
else:
143143
celery_app.send_task(name=task_scheduler.task, args=args, kwargs=kwargs)
144144

145-
@staticmethod
146-
async def revoke(*, task_id: str) -> None:
147-
"""
148-
撤销指定的任务
149-
150-
:param task_id: 任务 UUID
151-
:return:
152-
"""
153-
workers = await run_in_threadpool(celery_app.control.ping, timeout=0.5)
154-
if not workers:
155-
raise errors.ServerError(msg='Celery Worker 暂不可用,请稍后重试')
156-
celery_app.control.revoke(task_id)
157-
158145

159146
task_scheduler_service: TaskSchedulerService = TaskSchedulerService()

0 commit comments

Comments
 (0)