Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,6 @@ venv/
.python-version
.ruff_cache/
.pytest_cache/
.env
docker-compose-dev.yml
GEMINI.md
12 changes: 11 additions & 1 deletion backend/app/admin/crud/crud_opera_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,21 @@ async def create(self, db: AsyncSession, obj: CreateOperaLogParam) -> None:
创建操作日志

:param db: 数据库会话
:param obj: 创建操作日志参数
:param obj: 操作日志创建参数
:return:
"""
await self.create_model(db, obj)

async def bulk_create(self, db: AsyncSession, objs: list[CreateOperaLogParam]) -> None:
"""
批量创建操作日志

:param db: 数据库会话
:param objs: 操作日志创建参数列表
:return:
"""
await self.create_models(db, objs)

async def delete(self, db: AsyncSession, pks: list[int]) -> int:
"""
批量删除操作日志
Expand Down
11 changes: 11 additions & 0 deletions backend/app/admin/service/opera_log_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,17 @@ async def create(*, obj: CreateOperaLogParam) -> None:
async with async_db_session.begin() as db:
await opera_log_dao.create(db, obj)

@staticmethod
async def bulk_create(*, objs: list[CreateOperaLogParam]) -> None:
"""
批量创建操作日志

:param objs: 操作日志创建参数列表
:return:
"""
async with async_db_session.begin() as db:
await opera_log_dao.bulk_create(db, objs)

@staticmethod
async def delete(*, obj: DeleteOperaLogParam) -> int:
"""
Expand Down
32 changes: 32 additions & 0 deletions backend/common/queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import asyncio

from asyncio import Queue


async def batch_dequeue(queue: Queue, max_items: int, timeout: float) -> list:
"""
从异步队列中获取多个项目

:param queue: 用于获取项目的 `asyncio.Queue` 队列
:param max_items: 从队列中获取的最大项目数量
:param timeout: 总的等待超时时间(秒)
:return:
"""
items = []

loop = asyncio.get_event_loop()
end_time = loop.time() + timeout

while len(items) < max_items:
remaining = end_time - loop.time()
if remaining <= 0:
break
try:
item = await asyncio.wait_for(queue.get(), timeout=remaining)
items.append(item)
except asyncio.TimeoutError:
break

return items
4 changes: 3 additions & 1 deletion backend/core/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class Settings(BaseSettings):
FASTAPI_STATIC_FILES: bool = True

# 数据库
DATABASE_ECHO: bool | Literal['debug'] = False
DATABASE_ECHO: bool | Literal['debug'] = True
DATABASE_POOL_ECHO: bool | Literal['debug'] = False
DATABASE_SCHEMA: str = 'fba'
DATABASE_CHARSET: str = 'utf8mb4'
Expand Down Expand Up @@ -177,6 +177,8 @@ class Settings(BaseSettings):
'new_password',
'confirm_password',
]
OPERA_LOG_QUEUE_MAX: int = 100
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. OPERA_LOG_QUEUE_MAX 参数名有点歧义,看上去是整个queue队列的maxsize,应该是单次从queue批量获取数据量
  2. OPERA_LOG_QUEUE_TIMEOUT 一分钟是否有点太长?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Agree
  2. 1 分钟感觉还好,空闲状态下,不会过于频繁执行任务,并发情况下,1分钟则并不重要,因为 items 会很快填满

OPERA_LOG_QUEUE_TIMEOUT: int = 60 # 1 分钟

# Plugin 配置
PLUGIN_PIP_CHINA: bool = True
Expand Down
3 changes: 3 additions & 0 deletions backend/core/registrar.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# -*- coding: utf-8 -*-
import os

from asyncio import create_task
from contextlib import asynccontextmanager
from typing import AsyncGenerator

Expand Down Expand Up @@ -47,6 +48,8 @@ async def register_init(app: FastAPI) -> AsyncGenerator[None, None]:
prefix=settings.REQUEST_LIMITER_REDIS_PREFIX,
http_callback=http_limit_callback,
)
# 创建操作日志任务
create_task(OperaLogMiddleware.consumer())

yield

Expand Down
25 changes: 23 additions & 2 deletions backend/middleware/opera_log_middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# -*- coding: utf-8 -*-
import time

from asyncio import create_task
from asyncio import Queue
from typing import Any

from asgiref.sync import sync_to_async
Expand All @@ -15,6 +15,7 @@
from backend.app.admin.service.opera_log_service import opera_log_service
from backend.common.enums import OperaLogCipherType, StatusType
from backend.common.log import log
from backend.common.queue import batch_dequeue
from backend.core.conf import settings
from backend.utils.encrypt import AESCipher, ItsDCipher, Md5Cipher
from backend.utils.trace_id import get_request_trace_id
Expand All @@ -23,6 +24,8 @@
class OperaLogMiddleware(BaseHTTPMiddleware):
"""操作日志中间件"""

opera_log_queue: Queue = Queue(maxsize=100000)

async def dispatch(self, request: Request, call_next: Any) -> Response:
"""
处理请求并记录操作日志
Expand Down Expand Up @@ -108,7 +111,7 @@ async def dispatch(self, request: Request, call_next: Any) -> Response:
cost_time=elapsed, # 可能和日志存在微小差异(可忽略)
opera_time=request.state.start_time,
)
create_task(opera_log_service.create(obj=opera_log_in)) # noqa: ignore
await self.opera_log_queue.put(opera_log_in)

# 错误抛出
if error:
Expand Down Expand Up @@ -191,3 +194,21 @@ def desensitization(args: dict[str, Any]) -> dict[str, Any] | None:
case _:
args[arg_type][key] = '******'
return args

@classmethod
async def consumer(cls) -> None:
"""操作日志消费者"""
while True:
logs = await batch_dequeue(
cls.opera_log_queue,
max_items=settings.OPERA_LOG_QUEUE_MAX,
timeout=settings.OPERA_LOG_QUEUE_TIMEOUT,
)
if logs:
try:
if settings.DATABASE_ECHO:
log.info('自动执行【操作日志批量创建】任务...')
await opera_log_service.bulk_create(objs=logs)
finally:
if not cls.opera_log_queue.empty():
cls.opera_log_queue.task_done()