Skip to content

Commit 802b0d4

Browse files
authored
Optimize celery integrations and events (#721)
1 parent 6767f0e commit 802b0d4

File tree

8 files changed

+45
-17
lines changed

8 files changed

+45
-17
lines changed

backend/app/task/__init__.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22
# -*- coding: utf-8 -*-
33
import sys
44

5-
from pathlib import Path
5+
from backend.core.path_conf import BASE_PATH
6+
7+
from .actions import * # noqa: F403
68

79
# 导入项目根目录
8-
sys.path.append(str(Path(__file__).resolve().parent.parent.parent.parent))
10+
sys.path.append(str(BASE_PATH.parent))

backend/app/task/actions.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
#!/usr/bin/env python3
2+
# -*- coding: utf-8 -*-
3+
from starlette.concurrency import run_in_threadpool
4+
5+
from backend.app.task.celery import celery_app
6+
from backend.common.socketio.server import sio
7+
8+
9+
@sio.event
10+
async def task_worker_status(sid, data):
11+
"""任务 Worker 状态事件"""
12+
worker = await run_in_threadpool(celery_app.control.ping)
13+
await sio.emit('task_worker_status', worker, sid)

backend/app/task/celery.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ def init_celery() -> celery.Celery:
3535
if settings.CELERY_BROKER == 'redis'
3636
else f'amqp://{settings.CELERY_RABBITMQ_USERNAME}:{settings.CELERY_RABBITMQ_PASSWORD}@{settings.CELERY_RABBITMQ_HOST}:{settings.CELERY_RABBITMQ_PORT}',
3737
broker_connection_retry_on_startup=True,
38-
backend=f'db+{settings.DATABASE_TYPE + "+pymysql" if settings.DATABASE_TYPE == "mysql" else settings.DATABASE_TYPE}' # noqa: E501
38+
backend=f'db+{settings.DATABASE_TYPE}+{"pymysql" if settings.DATABASE_TYPE == "mysql" else "psycopg"}'
3939
f'://{settings.DATABASE_USER}:{settings.DATABASE_PASSWORD}@{settings.DATABASE_HOST}:{settings.DATABASE_PORT}/{settings.DATABASE_SCHEMA}',
4040
database_engine_options={'echo': settings.DATABASE_ECHO},
4141
database_table_names={
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
#!/usr/bin/env python3
22
# -*- coding: utf-8 -*-
3+
from .actions import * # noqa: F403

backend/common/socketio/server.py

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,17 +9,8 @@
99

1010
# 创建 Socket.IO 服务器实例
1111
sio = socketio.AsyncServer(
12-
# 集成 Celery 实现消息订阅
1312
client_manager=socketio.AsyncRedisManager(
14-
f'redis://:{settings.REDIS_PASSWORD}@{settings.REDIS_HOST}:'
15-
f'{settings.REDIS_PORT}/{settings.CELERY_BROKER_REDIS_DATABASE}'
16-
)
17-
if settings.CELERY_BROKER == 'redis'
18-
else socketio.AsyncAioPikaManager(
19-
(
20-
f'amqp://{settings.CELERY_RABBITMQ_USERNAME}:{settings.CELERY_RABBITMQ_PASSWORD}@'
21-
f'{settings.CELERY_RABBITMQ_HOST}:{settings.CELERY_RABBITMQ_PORT}'
22-
)
13+
f'redis://:{settings.REDIS_PASSWORD}@{settings.REDIS_HOST}:{settings.REDIS_PORT}/{settings.REDIS_DATABASE}'
2314
),
2415
async_mode='asgi',
2516
cors_allowed_origins=settings.CORS_ALLOWED_ORIGINS,
@@ -30,7 +21,7 @@
3021

3122
@sio.event
3223
async def connect(sid, environ, auth):
33-
"""处理 WebSocket 连接事件"""
24+
"""Socket 连接事件"""
3425
if not auth:
3526
log.error('WebSocket 连接失败:无授权')
3627
return False
@@ -57,6 +48,6 @@ async def connect(sid, environ, auth):
5748

5849

5950
@sio.event
60-
async def disconnect(sid: str) -> None:
61-
"""处理 WebSocket 断开连接事件"""
51+
async def disconnect(sid) -> None:
52+
"""Socket 断开连接事件"""
6253
await redis_client.spop(settings.TOKEN_ONLINE_REDIS_PREFIX)

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ dependencies = [
4040
"msgspec>=0.19.0",
4141
"path>=17.0.0",
4242
"psutil>=7.0.0",
43+
"psycopg>=3.2.9",
4344
"pwdlib>=0.2.1",
4445
"pydantic>=2.11.0",
4546
"pydantic-settings>=2.10.0",

requirements.txt

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,8 @@ prompt-toolkit==3.0.51
185185
# via click-repl
186186
psutil==7.0.0
187187
# via fastapi-best-architecture
188+
psycopg==3.2.9
189+
# via fastapi-best-architecture
188190
pwdlib==0.2.1
189191
# via fastapi-best-architecture
190192
pyasn1==0.6.1
@@ -296,6 +298,7 @@ typing-extensions==4.14.1
296298
# exceptiongroup
297299
# fastapi
298300
# fastapi-pagination
301+
# psycopg
299302
# pydantic
300303
# pydantic-core
301304
# rich
@@ -310,7 +313,9 @@ typing-inspection==0.4.1
310313
# pydantic
311314
# pydantic-settings
312315
tzdata==2025.2
313-
# via kombu
316+
# via
317+
# kombu
318+
# psycopg
314319
ua-parser==1.0.1
315320
# via user-agents
316321
ua-parser-builtins==0.18.0.post1

uv.lock

Lines changed: 15 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)