Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion backend/app/task/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,15 @@

### 层级任务

如果你想对任务进行目录层级划分,使任务结构更加清洗,你可以新建任意目录,但必须注意的是
如果你想对任务进行目录层级划分,使任务结构更加清晰,你可以新建任意目录,但必须注意的是

1. 新建目录后,务必更新任务配置 `CELERY_TASKS_PACKAGES`,将新建目录添加到此列表
2. 在新建目录下,务必添加 `tasks.py` 文件,并在此文件中编写相关任务代码

## 消息代理

你可以通过 `CELERY_BROKER` 控制消息代理选择,它支持 redis 和 rabbitmq

对于本地调试,我们建议使用 redis

对于线上环境,我们强制使用 rabbitmq
14 changes: 7 additions & 7 deletions backend/app/task/api/v1/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,21 @@ async def get_all_tasks() -> ResponseModel:
return response_base.success(data=tasks)


@router.get('/current', summary='获取当前正在执行的任务', dependencies=[DependsJwtAuth])
@router.get('/running', summary='获取正在执行的任务', dependencies=[DependsJwtAuth])
async def get_current_task() -> ResponseModel:
task = task_service.get()
return response_base.success(data=task)


@router.get('/{uid}/status', summary='获取任务状态', dependencies=[DependsJwtAuth])
async def get_task_status(uid: Annotated[str, Path(description='任务ID')]) -> ResponseModel:
status = task_service.get_status(uid)
@router.get('/{tid}/status', summary='获取任务状态', dependencies=[DependsJwtAuth])
async def get_task_status(tid: Annotated[str, Path(description='任务ID')]) -> ResponseModel:
status = task_service.get_status(tid)
return response_base.success(data=status)


@router.get('/{uid}', summary='获取任务结果', dependencies=[DependsJwtAuth])
async def get_task_result(uid: Annotated[str, Path(description='任务ID')]) -> ResponseModel:
task = task_service.get_result(uid)
@router.get('/{tid}', summary='获取任务结果', dependencies=[DependsJwtAuth])
async def get_task_result(tid: Annotated[str, Path(description='任务ID')]) -> ResponseModel:
task = task_service.get_result(tid)
return response_base.success(data=task)


Expand Down
60 changes: 30 additions & 30 deletions backend/app/task/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,62 +10,62 @@


def init_celery() -> celery.Celery:
"""创建 celery 应用"""
"""初始化 celery 应用"""

# TODO: Update this work if celery version >= 6.0.0
# https://github.com/fastapi-practices/fastapi_best_architecture/issues/321
# https://github.com/celery/celery/issues/7874
celery.app.trace.build_tracer = celery_aio_pool.build_async_tracer
celery.app.trace.reset_worker_optimizations()

app = celery.Celery(
'fba_celery',
broker_connection_retry_on_startup=True,
worker_pool=celery_aio_pool.pool.AsyncIOPool,
trace=celery_aio_pool.build_async_tracer,
)
# Celery Schedule Tasks
# https://docs.celeryq.dev/en/stable/userguide/periodic-tasks.html
beat_schedule = task_settings.CELERY_SCHEDULE

# Celery Config
# https://docs.celeryq.dev/en/stable/userguide/configuration.html
_redis_broker = (
f'redis://:{settings.REDIS_PASSWORD}@{settings.REDIS_HOST}:'
f'{settings.REDIS_PORT}/{task_settings.CELERY_BROKER_REDIS_DATABASE}'
)
_amqp_broker = (
f'amqp://{task_settings.RABBITMQ_USERNAME}:{task_settings.RABBITMQ_PASSWORD}@'
f'{task_settings.RABBITMQ_HOST}:{task_settings.RABBITMQ_PORT}'
broker_url = (
(
f'redis://:{settings.REDIS_PASSWORD}@{settings.REDIS_HOST}:'
f'{settings.REDIS_PORT}/{task_settings.CELERY_BROKER_REDIS_DATABASE}'
)
if task_settings.CELERY_BROKER == 'redis'
else (
f'amqp://{task_settings.RABBITMQ_USERNAME}:{task_settings.RABBITMQ_PASSWORD}@'
f'{task_settings.RABBITMQ_HOST}:{task_settings.RABBITMQ_PORT}'
)
)
_result_backend = (
result_backend = (
f'redis://:{settings.REDIS_PASSWORD}@{settings.REDIS_HOST}:'
f'{settings.REDIS_PORT}/{task_settings.CELERY_BACKEND_REDIS_DATABASE}'
)
_result_backend_transport_options = {
'global_keyprefix': f'{task_settings.CELERY_BACKEND_REDIS_PREFIX}_',
result_backend_transport_options = {
'global_keyprefix': f'{task_settings.CELERY_BACKEND_REDIS_PREFIX}',
'retry_policy': {
'timeout': task_settings.CELERY_BACKEND_REDIS_TIMEOUT,
},
}

# Celery Schedule Tasks
# https://docs.celeryq.dev/en/stable/userguide/periodic-tasks.html
_beat_schedule = task_settings.CELERY_SCHEDULE

# Update celery settings
app.conf.update(
broker_url=_redis_broker if task_settings.CELERY_BROKER == 'redis' else _amqp_broker,
result_backend=_result_backend,
result_backend_transport_options=_result_backend_transport_options,
timezone=settings.DATETIME_TIMEZONE,
app = celery.Celery(
'fba_celery',
enable_utc=False,
timezone=settings.DATETIME_TIMEZONE,
beat_schedule=beat_schedule,
broker_url=broker_url,
broker_connection_retry_on_startup=True,
result_backend=result_backend,
result_backend_transport_options=result_backend_transport_options,
task_cls='app.task.celery_task.base:TaskBase',
task_track_started=True,
beat_schedule=_beat_schedule,
# TODO: Update this work if celery version >= 6.0.0
worker_pool=celery_aio_pool.pool.AsyncIOPool,
)

# Load task modules
app.autodiscover_tasks(task_settings.CELERY_TASKS_PACKAGES)
app.autodiscover_tasks(task_settings.CELERY_TASK_PACKAGES)

return app


# 创建 celery 实例
celery_app = init_celery()
celery_app: celery.Celery = init_celery()
24 changes: 24 additions & 0 deletions backend/app/task/celery_task/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from celery import Task
from sqlalchemy.exc import SQLAlchemyError

from backend.app.task.conf import task_settings
from backend.common.socketio.actions import task_notification


class TaskBase(Task):
"""任务基类"""

autoretry_for = (SQLAlchemyError,)
max_retries = task_settings.CELERY_TASK_MAX_RETRIES

async def before_start(self, task_id, args, kwargs):
await task_notification(msg=f'任务 {task_id} 开始执行')

async def on_success(self, retval, task_id, args, kwargs):
await task_notification(msg=f'任务 {task_id} 执行成功')

async def on_failure(self, exc, task_id, args, kwargs, einfo):
await task_notification(msg=f'任务 {task_id} 执行失败')
32 changes: 6 additions & 26 deletions backend/app/task/celery_task/db_log/tasks.py
Original file line number Diff line number Diff line change
@@ -1,39 +1,19 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from sqlalchemy.exc import SQLAlchemyError

from backend.app.admin.service.login_log_service import login_log_service
from backend.app.admin.service.opera_log_service import opera_log_service
from backend.app.task.celery import celery_app
from backend.app.task.conf import task_settings


@celery_app.task(
name='auto_delete_db_opera_log',
bind=True,
retry_backoff=True,
max_retries=task_settings.CELERY_TASK_MAX_RETRIES,
)
async def auto_delete_db_opera_log(self) -> int:
@celery_app.task(name='delete_db_opera_log')
async def delete_db_opera_log() -> int:
"""自动删除数据库操作日志"""
try:
result = await opera_log_service.delete_all()
except SQLAlchemyError as exc:
raise self.retry(exc=exc)
result = await opera_log_service.delete_all()
return result


@celery_app.task(
name='auto_delete_db_login_log',
bind=True,
retry_backoff=True,
max_retries=task_settings.CELERY_TASK_MAX_RETRIES,
)
async def auto_delete_db_login_log(self) -> int:
@celery_app.task(name='delete_db_login_log')
async def delete_db_login_log() -> int:
"""自动删除数据库登录日志"""

try:
result = await login_log_service.delete_all()
except SQLAlchemyError as exc:
raise self.retry(exc=exc)
result = await login_log_service.delete_all()
return result
8 changes: 2 additions & 6 deletions backend/app/task/celery_task/tasks.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import uuid

from anyio import sleep

from backend.app.task.celery import celery_app


@celery_app.task(name='task_demo_async')
async def task_demo_async() -> str:
await sleep(1)
uid = uuid.uuid4().hex
print(f'异步任务 {uid} 执行成功')
return uid
await sleep(10)
return 'test async'
16 changes: 8 additions & 8 deletions backend/app/task/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class TaskSettings(BaseSettings):
ENVIRONMENT: Literal['dev', 'pro']

# Env Celery
CELERY_BROKER_REDIS_DATABASE: int # 仅当使用 redis 作为 broker 时生效, 更适用于测试环境
CELERY_BROKER_REDIS_DATABASE: int # 仅在 dev 模式时生效
CELERY_BACKEND_REDIS_DATABASE: int

# Env Rabbitmq
Expand All @@ -31,9 +31,9 @@ class TaskSettings(BaseSettings):

# Celery
CELERY_BROKER: Literal['rabbitmq', 'redis'] = 'redis'
CELERY_BACKEND_REDIS_PREFIX: str = 'fba:celery'
CELERY_BACKEND_REDIS_TIMEOUT: float = 5.0
CELERY_TASKS_PACKAGES: list[str] = [
CELERY_BACKEND_REDIS_PREFIX: str = 'fba:celery_'
CELERY_BACKEND_REDIS_TIMEOUT: int = 5
CELERY_TASK_PACKAGES: list[str] = [
'app.task.celery_task',
'app.task.celery_task.db_log',
]
Expand All @@ -44,12 +44,12 @@ class TaskSettings(BaseSettings):
'schedule': 10,
},
'exec-every-sunday': {
'task': 'auto_delete_db_opera_log',
'schedule': crontab(0, 0, day_of_week='6'), # type: ignore
'task': 'delete_db_opera_log',
'schedule': crontab('0', '0', day_of_week='6'),
},
'exec-every-15-of-month': {
'task': 'auto_delete_db_login_log',
'schedule': crontab(0, 0, day_of_month='15'), # type: ignore
'task': 'delete_db_login_log',
'schedule': crontab('0', '0', day_of_month='15'),
},
}

Expand Down
8 changes: 4 additions & 4 deletions backend/app/task/service/task_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,18 @@ def get():
@staticmethod
def get_status(uid: str):
try:
result = AsyncResult(id=uid, app=celery_app)
task_result = AsyncResult(id=uid, app=celery_app)
except NotRegistered:
raise NotFoundError(msg='任务不存在')
return result.status
return task_result.status

@staticmethod
def get_result(uid: str):
try:
result = AsyncResult(id=uid, app=celery_app)
task_result = AsyncResult(id=uid, app=celery_app)
except NotRegistered:
raise NotFoundError(msg='任务不存在')
return result
return task_result.result

@staticmethod
def run(*, name: str, args: list | None = None, kwargs: dict | None = None):
Expand Down
File renamed without changes.
13 changes: 13 additions & 0 deletions backend/common/socketio/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,24 @@
# -*- coding: utf-8 -*-
import socketio

from backend.app.task.conf import task_settings
from backend.common.log import log
from backend.common.security.jwt import jwt_authentication
from backend.core.conf import settings

sio = socketio.AsyncServer(
# 此配置是为了集成 celery 实现消息订阅,如果你不使用 celery,可以直接删除此配置,不会造成任何影响
client_manager=socketio.AsyncRedisManager(
f'redis://:{settings.REDIS_PASSWORD}@{settings.REDIS_HOST}:'
f'{settings.REDIS_PORT}/{task_settings.CELERY_BROKER_REDIS_DATABASE}'
)
if task_settings.CELERY_BROKER == 'redis'
else socketio.AsyncAioPikaManager(
(
f'amqp://{task_settings.RABBITMQ_USERNAME}:{task_settings.RABBITMQ_PASSWORD}@'
f'{task_settings.RABBITMQ_HOST}:{task_settings.RABBITMQ_PORT}'
)
),
async_mode='asgi',
cors_allowed_origins=settings.CORS_ALLOWED_ORIGINS,
cors_credentials=True,
Expand Down
1 change: 1 addition & 0 deletions backend/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ lint = [
]
server = [
"gunicorn==21.2.0",
"aio_pika==9.4.3",
"supervisor>=4.2.5",
"wait-for-it>=2.2.2",
]
Expand Down
Loading