Skip to content

Commit fc5a0f9

Browse files
authored
Update the celery configuration and tasks (#458)
* Update the celery configuration and tasks * fix message notifications
1 parent 576ed77 commit fc5a0f9

File tree

13 files changed

+408
-82
lines changed

13 files changed

+408
-82
lines changed

backend/app/task/README.md

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,15 @@
1414

1515
### 层级任务
1616

17-
如果你想对任务进行目录层级划分,使任务结构更加清洗,你可以新建任意目录,但必须注意的是
17+
如果你想对任务进行目录层级划分,使任务结构更加清晰,你可以新建任意目录,但必须注意的是
1818

1919
1. 新建目录后,务必更新任务配置 `CELERY_TASKS_PACKAGES`,将新建目录添加到此列表
2020
2. 在新建目录下,务必添加 `tasks.py` 文件,并在此文件中编写相关任务代码
21+
22+
## 消息代理
23+
24+
你可以通过 `CELERY_BROKER` 控制消息代理选择,它支持 redis 和 rabbitmq
25+
26+
对于本地调试,我们建议使用 redis
27+
28+
对于线上环境,我们强制使用 rabbitmq

backend/app/task/api/v1/task.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,21 +19,21 @@ async def get_all_tasks() -> ResponseModel:
1919
return response_base.success(data=tasks)
2020

2121

22-
@router.get('/current', summary='获取当前正在执行的任务', dependencies=[DependsJwtAuth])
22+
@router.get('/running', summary='获取正在执行的任务', dependencies=[DependsJwtAuth])
2323
async def get_current_task() -> ResponseModel:
2424
task = task_service.get()
2525
return response_base.success(data=task)
2626

2727

28-
@router.get('/{uid}/status', summary='获取任务状态', dependencies=[DependsJwtAuth])
29-
async def get_task_status(uid: Annotated[str, Path(description='任务ID')]) -> ResponseModel:
30-
status = task_service.get_status(uid)
28+
@router.get('/{tid}/status', summary='获取任务状态', dependencies=[DependsJwtAuth])
29+
async def get_task_status(tid: Annotated[str, Path(description='任务ID')]) -> ResponseModel:
30+
status = task_service.get_status(tid)
3131
return response_base.success(data=status)
3232

3333

34-
@router.get('/{uid}', summary='获取任务结果', dependencies=[DependsJwtAuth])
35-
async def get_task_result(uid: Annotated[str, Path(description='任务ID')]) -> ResponseModel:
36-
task = task_service.get_result(uid)
34+
@router.get('/{tid}', summary='获取任务结果', dependencies=[DependsJwtAuth])
35+
async def get_task_result(tid: Annotated[str, Path(description='任务ID')]) -> ResponseModel:
36+
task = task_service.get_result(tid)
3737
return response_base.success(data=task)
3838

3939

backend/app/task/celery.py

Lines changed: 30 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -10,62 +10,62 @@
1010

1111

1212
def init_celery() -> celery.Celery:
13-
"""创建 celery 应用"""
13+
"""初始化 celery 应用"""
1414

1515
# TODO: Update this work if celery version >= 6.0.0
1616
# https://github.com/fastapi-practices/fastapi_best_architecture/issues/321
1717
# https://github.com/celery/celery/issues/7874
1818
celery.app.trace.build_tracer = celery_aio_pool.build_async_tracer
1919
celery.app.trace.reset_worker_optimizations()
2020

21-
app = celery.Celery(
22-
'fba_celery',
23-
broker_connection_retry_on_startup=True,
24-
worker_pool=celery_aio_pool.pool.AsyncIOPool,
25-
trace=celery_aio_pool.build_async_tracer,
26-
)
21+
# Celery Schedule Tasks
22+
# https://docs.celeryq.dev/en/stable/userguide/periodic-tasks.html
23+
beat_schedule = task_settings.CELERY_SCHEDULE
2724

2825
# Celery Config
2926
# https://docs.celeryq.dev/en/stable/userguide/configuration.html
30-
_redis_broker = (
31-
f'redis://:{settings.REDIS_PASSWORD}@{settings.REDIS_HOST}:'
32-
f'{settings.REDIS_PORT}/{task_settings.CELERY_BROKER_REDIS_DATABASE}'
33-
)
34-
_amqp_broker = (
35-
f'amqp://{task_settings.RABBITMQ_USERNAME}:{task_settings.RABBITMQ_PASSWORD}@'
36-
f'{task_settings.RABBITMQ_HOST}:{task_settings.RABBITMQ_PORT}'
27+
broker_url = (
28+
(
29+
f'redis://:{settings.REDIS_PASSWORD}@{settings.REDIS_HOST}:'
30+
f'{settings.REDIS_PORT}/{task_settings.CELERY_BROKER_REDIS_DATABASE}'
31+
)
32+
if task_settings.CELERY_BROKER == 'redis'
33+
else (
34+
f'amqp://{task_settings.RABBITMQ_USERNAME}:{task_settings.RABBITMQ_PASSWORD}@'
35+
f'{task_settings.RABBITMQ_HOST}:{task_settings.RABBITMQ_PORT}'
36+
)
3737
)
38-
_result_backend = (
38+
result_backend = (
3939
f'redis://:{settings.REDIS_PASSWORD}@{settings.REDIS_HOST}:'
4040
f'{settings.REDIS_PORT}/{task_settings.CELERY_BACKEND_REDIS_DATABASE}'
4141
)
42-
_result_backend_transport_options = {
43-
'global_keyprefix': f'{task_settings.CELERY_BACKEND_REDIS_PREFIX}_',
42+
result_backend_transport_options = {
43+
'global_keyprefix': f'{task_settings.CELERY_BACKEND_REDIS_PREFIX}',
4444
'retry_policy': {
4545
'timeout': task_settings.CELERY_BACKEND_REDIS_TIMEOUT,
4646
},
4747
}
4848

49-
# Celery Schedule Tasks
50-
# https://docs.celeryq.dev/en/stable/userguide/periodic-tasks.html
51-
_beat_schedule = task_settings.CELERY_SCHEDULE
52-
53-
# Update celery settings
54-
app.conf.update(
55-
broker_url=_redis_broker if task_settings.CELERY_BROKER == 'redis' else _amqp_broker,
56-
result_backend=_result_backend,
57-
result_backend_transport_options=_result_backend_transport_options,
58-
timezone=settings.DATETIME_TIMEZONE,
49+
app = celery.Celery(
50+
'fba_celery',
5951
enable_utc=False,
52+
timezone=settings.DATETIME_TIMEZONE,
53+
beat_schedule=beat_schedule,
54+
broker_url=broker_url,
55+
broker_connection_retry_on_startup=True,
56+
result_backend=result_backend,
57+
result_backend_transport_options=result_backend_transport_options,
58+
task_cls='app.task.celery_task.base:TaskBase',
6059
task_track_started=True,
61-
beat_schedule=_beat_schedule,
60+
# TODO: Update this work if celery version >= 6.0.0
61+
worker_pool=celery_aio_pool.pool.AsyncIOPool,
6262
)
6363

6464
# Load task modules
65-
app.autodiscover_tasks(task_settings.CELERY_TASKS_PACKAGES)
65+
app.autodiscover_tasks(task_settings.CELERY_TASK_PACKAGES)
6666

6767
return app
6868

6969

7070
# 创建 celery 实例
71-
celery_app = init_celery()
71+
celery_app: celery.Celery = init_celery()
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
#!/usr/bin/env python3
2+
# -*- coding: utf-8 -*-
3+
4+
from celery import Task
5+
from sqlalchemy.exc import SQLAlchemyError
6+
7+
from backend.app.task.conf import task_settings
8+
from backend.common.socketio.actions import task_notification
9+
10+
11+
class TaskBase(Task):
12+
"""任务基类"""
13+
14+
autoretry_for = (SQLAlchemyError,)
15+
max_retries = task_settings.CELERY_TASK_MAX_RETRIES
16+
17+
async def before_start(self, task_id, args, kwargs):
18+
await task_notification(msg=f'任务 {task_id} 开始执行')
19+
20+
async def on_success(self, retval, task_id, args, kwargs):
21+
await task_notification(msg=f'任务 {task_id} 执行成功')
22+
23+
async def on_failure(self, exc, task_id, args, kwargs, einfo):
24+
await task_notification(msg=f'任务 {task_id} 执行失败')
Lines changed: 6 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,39 +1,19 @@
11
#!/usr/bin/env python3
22
# -*- coding: utf-8 -*-
3-
from sqlalchemy.exc import SQLAlchemyError
4-
53
from backend.app.admin.service.login_log_service import login_log_service
64
from backend.app.admin.service.opera_log_service import opera_log_service
75
from backend.app.task.celery import celery_app
8-
from backend.app.task.conf import task_settings
96

107

11-
@celery_app.task(
12-
name='auto_delete_db_opera_log',
13-
bind=True,
14-
retry_backoff=True,
15-
max_retries=task_settings.CELERY_TASK_MAX_RETRIES,
16-
)
17-
async def auto_delete_db_opera_log(self) -> int:
8+
@celery_app.task(name='delete_db_opera_log')
9+
async def delete_db_opera_log() -> int:
1810
"""自动删除数据库操作日志"""
19-
try:
20-
result = await opera_log_service.delete_all()
21-
except SQLAlchemyError as exc:
22-
raise self.retry(exc=exc)
11+
result = await opera_log_service.delete_all()
2312
return result
2413

2514

26-
@celery_app.task(
27-
name='auto_delete_db_login_log',
28-
bind=True,
29-
retry_backoff=True,
30-
max_retries=task_settings.CELERY_TASK_MAX_RETRIES,
31-
)
32-
async def auto_delete_db_login_log(self) -> int:
15+
@celery_app.task(name='delete_db_login_log')
16+
async def delete_db_login_log() -> int:
3317
"""自动删除数据库登录日志"""
34-
35-
try:
36-
result = await login_log_service.delete_all()
37-
except SQLAlchemyError as exc:
38-
raise self.retry(exc=exc)
18+
result = await login_log_service.delete_all()
3919
return result
Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,11 @@
11
#!/usr/bin/env python3
22
# -*- coding: utf-8 -*-
3-
import uuid
4-
53
from anyio import sleep
64

75
from backend.app.task.celery import celery_app
86

97

108
@celery_app.task(name='task_demo_async')
119
async def task_demo_async() -> str:
12-
await sleep(1)
13-
uid = uuid.uuid4().hex
14-
print(f'异步任务 {uid} 执行成功')
15-
return uid
10+
await sleep(10)
11+
return 'test async'

backend/app/task/conf.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ class TaskSettings(BaseSettings):
1919
ENVIRONMENT: Literal['dev', 'pro']
2020

2121
# Env Celery
22-
CELERY_BROKER_REDIS_DATABASE: int # 仅当使用 redis 作为 broker 时生效, 更适用于测试环境
22+
CELERY_BROKER_REDIS_DATABASE: int # 仅在 dev 模式时生效
2323
CELERY_BACKEND_REDIS_DATABASE: int
2424

2525
# Env Rabbitmq
@@ -31,9 +31,9 @@ class TaskSettings(BaseSettings):
3131

3232
# Celery
3333
CELERY_BROKER: Literal['rabbitmq', 'redis'] = 'redis'
34-
CELERY_BACKEND_REDIS_PREFIX: str = 'fba:celery'
35-
CELERY_BACKEND_REDIS_TIMEOUT: float = 5.0
36-
CELERY_TASKS_PACKAGES: list[str] = [
34+
CELERY_BACKEND_REDIS_PREFIX: str = 'fba:celery_'
35+
CELERY_BACKEND_REDIS_TIMEOUT: int = 5
36+
CELERY_TASK_PACKAGES: list[str] = [
3737
'app.task.celery_task',
3838
'app.task.celery_task.db_log',
3939
]
@@ -44,12 +44,12 @@ class TaskSettings(BaseSettings):
4444
'schedule': 10,
4545
},
4646
'exec-every-sunday': {
47-
'task': 'auto_delete_db_opera_log',
48-
'schedule': crontab(0, 0, day_of_week='6'), # type: ignore
47+
'task': 'delete_db_opera_log',
48+
'schedule': crontab('0', '0', day_of_week='6'),
4949
},
5050
'exec-every-15-of-month': {
51-
'task': 'auto_delete_db_login_log',
52-
'schedule': crontab(0, 0, day_of_month='15'), # type: ignore
51+
'task': 'delete_db_login_log',
52+
'schedule': crontab('0', '0', day_of_month='15'),
5353
},
5454
}
5555

backend/app/task/service/task_service.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,18 +24,18 @@ def get():
2424
@staticmethod
2525
def get_status(uid: str):
2626
try:
27-
result = AsyncResult(id=uid, app=celery_app)
27+
task_result = AsyncResult(id=uid, app=celery_app)
2828
except NotRegistered:
2929
raise NotFoundError(msg='任务不存在')
30-
return result.status
30+
return task_result.status
3131

3232
@staticmethod
3333
def get_result(uid: str):
3434
try:
35-
result = AsyncResult(id=uid, app=celery_app)
35+
task_result = AsyncResult(id=uid, app=celery_app)
3636
except NotRegistered:
3737
raise NotFoundError(msg='任务不存在')
38-
return result
38+
return task_result.result
3939

4040
@staticmethod
4141
def run(*, name: str, args: list | None = None, kwargs: dict | None = None):
File renamed without changes.

backend/common/socketio/server.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,24 @@
22
# -*- coding: utf-8 -*-
33
import socketio
44

5+
from backend.app.task.conf import task_settings
56
from backend.common.log import log
67
from backend.common.security.jwt import jwt_authentication
78
from backend.core.conf import settings
89

910
sio = socketio.AsyncServer(
11+
# 此配置是为了集成 celery 实现消息订阅,如果你不使用 celery,可以直接删除此配置,不会造成任何影响
12+
client_manager=socketio.AsyncRedisManager(
13+
f'redis://:{settings.REDIS_PASSWORD}@{settings.REDIS_HOST}:'
14+
f'{settings.REDIS_PORT}/{task_settings.CELERY_BROKER_REDIS_DATABASE}'
15+
)
16+
if task_settings.CELERY_BROKER == 'redis'
17+
else socketio.AsyncAioPikaManager(
18+
(
19+
f'amqp://{task_settings.RABBITMQ_USERNAME}:{task_settings.RABBITMQ_PASSWORD}@'
20+
f'{task_settings.RABBITMQ_HOST}:{task_settings.RABBITMQ_PORT}'
21+
)
22+
),
1023
async_mode='asgi',
1124
cors_allowed_origins=settings.CORS_ALLOWED_ORIGINS,
1225
cors_credentials=True,

0 commit comments

Comments
 (0)