Skip to content

Commit 3763804

Browse files
committed
Optimize task
1 parent 65c7fae commit 3763804

File tree

9 files changed

+142
-89
lines changed

9 files changed

+142
-89
lines changed

backend/app/task/api/router.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,6 @@
55
from backend.app.task.api.v1.task import router as task_router
66
from backend.core.conf import settings
77

8-
v1 = APIRouter(prefix=settings.FASTAPI_API_V1_PATH)
8+
v1 = APIRouter(prefix=settings.FASTAPI_API_V1_PATH, tags=['任务'])
99

10-
v1.include_router(task_router, prefix='/tasks', tags=['任务'])
10+
v1.include_router(task_router, prefix='/tasks')

backend/app/task/api/v1/task.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ async def get_all_tasks() -> ResponseSchemaModel[list[str]]:
2727
description='此接口被视为作废,建议使用 flower 查看任务详情',
2828
dependencies=[DependsJwtAuth],
2929
)
30-
async def get_task_detail(tid: Annotated[str, Path(description='任务ID')]) -> ResponseSchemaModel[TaskResult]:
30+
async def get_task_detail(tid: Annotated[str, Path(description='任务 UUID')]) -> ResponseSchemaModel[TaskResult]:
3131
status = task_service.get_detail(tid=tid)
3232
return response_base.success(data=status)
3333

@@ -40,7 +40,7 @@ async def get_task_detail(tid: Annotated[str, Path(description='任务ID')]) ->
4040
DependsRBAC,
4141
],
4242
)
43-
async def revoke_task(tid: Annotated[str, Path(description='任务ID')]) -> ResponseModel:
43+
async def revoke_task(tid: Annotated[str, Path(description='任务 UUID')]) -> ResponseModel:
4444
task_service.revoke(tid=tid)
4545
return response_base.success()
4646

backend/app/task/celery.py

Lines changed: 36 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
#!/usr/bin/env python3
22
# -*- coding: utf-8 -*-
3+
from typing import Any
4+
35
import celery
46
import celery_aio_pool
57

@@ -9,61 +11,64 @@
911
__all__ = ['celery_app']
1012

1113

12-
def init_celery() -> celery.Celery:
13-
"""初始化 celery 应用"""
14-
15-
# TODO: Update this work if celery version >= 6.0.0
16-
# https://github.com/fastapi-practices/fastapi_best_architecture/issues/321
17-
# https://github.com/celery/celery/issues/7874
18-
celery.app.trace.build_tracer = celery_aio_pool.build_async_tracer
19-
celery.app.trace.reset_worker_optimizations()
20-
21-
# Celery Schedule Tasks
22-
# https://docs.celeryq.dev/en/stable/userguide/periodic-tasks.html
23-
beat_schedule = task_settings.CELERY_SCHEDULE
24-
25-
# Celery Config
26-
# https://docs.celeryq.dev/en/stable/userguide/configuration.html
27-
broker_url = (
28-
(
14+
def get_broker_url() -> str:
15+
"""获取消息代理 URL"""
16+
if task_settings.CELERY_BROKER == 'redis':
17+
return (
2918
f'redis://:{settings.REDIS_PASSWORD}@{settings.REDIS_HOST}:'
3019
f'{settings.REDIS_PORT}/{task_settings.CELERY_BROKER_REDIS_DATABASE}'
3120
)
32-
if task_settings.CELERY_BROKER == 'redis'
33-
else (
34-
f'amqp://{task_settings.RABBITMQ_USERNAME}:{task_settings.RABBITMQ_PASSWORD}@'
35-
f'{task_settings.RABBITMQ_HOST}:{task_settings.RABBITMQ_PORT}'
36-
)
21+
return (
22+
f'amqp://{task_settings.RABBITMQ_USERNAME}:{task_settings.RABBITMQ_PASSWORD}@'
23+
f'{task_settings.RABBITMQ_HOST}:{task_settings.RABBITMQ_PORT}'
3724
)
38-
result_backend = (
25+
26+
27+
def get_result_backend() -> str:
28+
"""获取结果后端 URL"""
29+
return (
3930
f'redis://:{settings.REDIS_PASSWORD}@{settings.REDIS_HOST}:'
4031
f'{settings.REDIS_PORT}/{task_settings.CELERY_BACKEND_REDIS_DATABASE}'
4132
)
42-
result_backend_transport_options = {
43-
'global_keyprefix': f'{task_settings.CELERY_BACKEND_REDIS_PREFIX}',
33+
34+
35+
def get_result_backend_transport_options() -> dict[str, Any]:
36+
"""获取结果后端传输选项"""
37+
return {
38+
'global_keyprefix': task_settings.CELERY_BACKEND_REDIS_PREFIX,
4439
'retry_policy': {
4540
'timeout': task_settings.CELERY_BACKEND_REDIS_TIMEOUT,
4641
},
4742
}
4843

44+
45+
def init_celery() -> celery.Celery:
46+
"""初始化 Celery 应用"""
47+
48+
# TODO: Update this work if celery version >= 6.0.0
49+
# https://github.com/fastapi-practices/fastapi_best_architecture/issues/321
50+
# https://github.com/celery/celery/issues/7874
51+
celery.app.trace.build_tracer = celery_aio_pool.build_async_tracer
52+
celery.app.trace.reset_worker_optimizations()
53+
4954
app = celery.Celery(
5055
'fba_celery',
5156
enable_utc=False,
5257
timezone=settings.DATETIME_TIMEZONE,
53-
beat_schedule=beat_schedule,
54-
broker_url=broker_url,
58+
beat_schedule=task_settings.CELERY_SCHEDULE,
59+
broker_url=get_broker_url(),
5560
broker_connection_retry_on_startup=True,
56-
result_backend=result_backend,
57-
result_backend_transport_options=result_backend_transport_options,
61+
result_backend=get_result_backend(),
62+
result_backend_transport_options=get_result_backend_transport_options(),
5863
task_cls='app.task.celery_task.base:TaskBase',
5964
task_track_started=True,
6065
)
6166

62-
# Load task modules
67+
# 自动发现任务模块
6368
app.autodiscover_tasks(task_settings.CELERY_TASK_PACKAGES)
6469

6570
return app
6671

6772

68-
# 创建 celery 实例
73+
# 创建 Celery 实例
6974
celery_app: celery.Celery = init_celery()
Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#!/usr/bin/env python3
22
# -*- coding: utf-8 -*-
3+
from typing import Any
34

45
from celery import Task
56
from sqlalchemy.exc import SQLAlchemyError
@@ -9,16 +10,37 @@
910

1011

1112
class TaskBase(Task):
12-
"""任务基类"""
13+
"""Celery 任务基类"""
1314

1415
autoretry_for = (SQLAlchemyError,)
1516
max_retries = task_settings.CELERY_TASK_MAX_RETRIES
1617

17-
async def before_start(self, task_id, args, kwargs):
18+
async def before_start(self, task_id: str, args, kwargs) -> None:
19+
"""
20+
任务开始前执行钩子
21+
22+
:param task_id: 任务 ID
23+
:return:
24+
"""
1825
await task_notification(msg=f'任务 {task_id} 开始执行')
1926

20-
async def on_success(self, retval, task_id, args, kwargs):
27+
async def on_success(self, retval: Any, task_id: str, args, kwargs) -> None:
28+
"""
29+
任务成功后执行钩子
30+
31+
:param retval: 任务返回值
32+
:param task_id: 任务 ID
33+
:return:
34+
"""
2135
await task_notification(msg=f'任务 {task_id} 执行成功')
2236

23-
async def on_failure(self, exc, task_id, args, kwargs, einfo):
37+
async def on_failure(self, exc: Exception, task_id: str, args, kwargs, einfo) -> None:
38+
"""
39+
任务失败后执行钩子
40+
41+
:param exc: 异常对象
42+
:param task_id: 任务 ID
43+
:param einfo: 异常信息
44+
:return:
45+
"""
2446
await task_notification(msg=f'任务 {task_id} 执行失败')

backend/app/task/celery_task/tasks.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,6 @@
77

88
@celery_app.task(name='task_demo_async')
99
async def task_demo_async() -> str:
10+
"""异步示例任务,模拟耗时操作"""
1011
await sleep(20)
1112
return 'test async'

backend/app/task/conf.py

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
#!/usr/bin/env python3
22
# -*- coding: utf-8 -*-
33
from functools import lru_cache
4-
from typing import Literal
4+
from typing import Any, Literal
55

66
from celery.schedules import crontab
77
from pydantic import model_validator
@@ -11,25 +11,22 @@
1111

1212

1313
class TaskSettings(BaseSettings):
14-
"""Task Settings"""
14+
"""Celery 任务配置"""
1515

1616
model_config = SettingsConfigDict(env_file=f'{BASE_PATH}/.env', env_file_encoding='utf-8', extra='ignore')
1717

18-
# Env Config
19-
ENVIRONMENT: Literal['dev', 'pro']
20-
21-
# Env Celery
22-
CELERY_BROKER_REDIS_DATABASE: int # 仅在 dev 模式时生效
18+
# .env Redis 配置
19+
CELERY_BROKER_REDIS_DATABASE: int
2320
CELERY_BACKEND_REDIS_DATABASE: int
2421

25-
# Env Rabbitmq
22+
# .env RabbitMQ 配置
2623
# docker run -d --hostname fba-mq --name fba-mq -p 5672:5672 -p 15672:15672 rabbitmq:latest
2724
RABBITMQ_HOST: str
2825
RABBITMQ_PORT: int
2926
RABBITMQ_USERNAME: str
3027
RABBITMQ_PASSWORD: str
3128

32-
# Celery
29+
# Celery 基础配置
3330
CELERY_BROKER: Literal['rabbitmq', 'redis'] = 'redis'
3431
CELERY_BACKEND_REDIS_PREFIX: str = 'fba:celery:'
3532
CELERY_BACKEND_REDIS_TIMEOUT: int = 5
@@ -38,7 +35,9 @@ class TaskSettings(BaseSettings):
3835
'app.task.celery_task.db_log',
3936
]
4037
CELERY_TASK_MAX_RETRIES: int = 5
41-
CELERY_SCHEDULE: dict = {
38+
39+
# Celery 定时任务配置
40+
CELERY_SCHEDULE: dict[str, dict[str, Any]] = {
4241
'exec-every-10-seconds': {
4342
'task': 'task_demo_async',
4443
'schedule': 10,
@@ -55,15 +54,16 @@ class TaskSettings(BaseSettings):
5554

5655
@model_validator(mode='before')
5756
@classmethod
58-
def validate_celery_broker(cls, values):
57+
def validate_celery_broker(cls, values: dict[str, Any]) -> dict[str, Any]:
58+
"""生产环境强制使用 RabbitMQ 作为消息代理"""
5959
if values['ENVIRONMENT'] == 'pro':
6060
values['CELERY_BROKER'] = 'rabbitmq'
6161
return values
6262

6363

6464
@lru_cache
6565
def get_task_settings() -> TaskSettings:
66-
"""获取 task 配置"""
66+
"""获取 Celery 任务配置"""
6767
return TaskSettings()
6868

6969

backend/app/task/schema/task.py

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,29 @@
11
#!/usr/bin/env python3
22
# -*- coding: utf-8 -*-
3+
from typing import Any
4+
35
from pydantic import Field
46

57
from backend.common.schema import SchemaBase
68

79

810
class RunParam(SchemaBase):
11+
"""任务运行参数"""
12+
913
name: str = Field(description='任务名称')
10-
args: list | None = Field(default=None, description='任务函数位置参数')
11-
kwargs: dict | None = Field(default=None, description='任务函数关键字参数')
14+
args: list[Any] | None = Field(default=None, description='任务函数位置参数')
15+
kwargs: dict[str, Any] | None = Field(default=None, description='任务函数关键字参数')
1216

1317

1418
class TaskResult(SchemaBase):
15-
result: str
16-
traceback: str
17-
status: str
18-
name: str
19-
args: list | None
20-
kwargs: dict | None
21-
worker: str
22-
retries: int | None
23-
queue: str | None
19+
"""任务执行结果"""
20+
21+
result: str = Field(description='任务执行结果')
22+
traceback: str = Field(description='错误堆栈信息')
23+
status: str = Field(description='任务状态')
24+
name: str = Field(description='任务名称')
25+
args: list[Any] | None = Field(default=None, description='任务函数位置参数')
26+
kwargs: dict[str, Any] | None = Field(default=None, description='任务函数关键字参数')
27+
worker: str = Field(description='执行任务的 worker')
28+
retries: int | None = Field(default=None, description='重试次数')
29+
queue: str | None = Field(default=None, description='任务队列')

backend/app/task/service/task_service.py

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,21 @@
1313
class TaskService:
1414
@staticmethod
1515
async def get_list() -> list[str]:
16+
"""获取所有已注册的 Celery 任务列表"""
1617
registered_tasks = await run_in_threadpool(celery_app.control.inspect().registered)
1718
if not registered_tasks:
18-
raise errors.ForbiddenError(msg='celery 服务未启动')
19+
raise errors.ForbiddenError(msg='Celery 服务未启动')
1920
tasks = list(registered_tasks.values())[0]
2021
return tasks
2122

2223
@staticmethod
2324
def get_detail(*, tid: str) -> TaskResult:
25+
"""
26+
获取指定任务的详细信息
27+
28+
:param tid: 任务 UUID
29+
:return:
30+
"""
2431
try:
2532
result = AsyncResult(id=tid, app=celery_app)
2633
except NotRegistered:
@@ -38,7 +45,13 @@ def get_detail(*, tid: str) -> TaskResult:
3845
)
3946

4047
@staticmethod
41-
def revoke(*, tid: str):
48+
def revoke(*, tid: str) -> None:
49+
"""
50+
撤销指定的任务
51+
52+
:param tid: 任务 UUID
53+
:return:
54+
"""
4255
try:
4356
result = AsyncResult(id=tid, app=celery_app)
4457
except NotRegistered:
@@ -47,6 +60,12 @@ def revoke(*, tid: str):
4760

4861
@staticmethod
4962
def run(*, obj: RunParam) -> str:
63+
"""
64+
运行指定的任务
65+
66+
:param obj: 任务运行参数
67+
:return:
68+
"""
5069
task: AsyncResult = celery_app.send_task(name=obj.name, args=obj.args, kwargs=obj.kwargs)
5170
return task.task_id
5271

0 commit comments

Comments
 (0)