Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions backend/app/admin/crud/crud_opera_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,20 @@ async def create(self, db: AsyncSession, obj: CreateOperaLogParam) -> None:
创建操作日志

:param db: 数据库会话
:param obj: 创建操作日志参数
:param obj: 操作日志创建参数
:return:
"""
await self.create_model(db, obj)

async def batch_create(self, db: AsyncSession, obj_list: list[CreateOperaLogParam]) -> None:
async def bulk_create(self, db: AsyncSession, objs: list[CreateOperaLogParam]) -> None:
"""
批量创建操作日志

:param db: 数据库会话
:param obj_list: 创建操作日志参数列表
:param objs: 操作日志创建参数列表
:return:
"""
await self.create_models(db, obj_list)
await self.create_models(db, objs)

async def delete(self, db: AsyncSession, pks: list[int]) -> int:
"""
Expand Down
6 changes: 3 additions & 3 deletions backend/app/admin/service/opera_log_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@ async def create(*, obj: CreateOperaLogParam) -> None:
await opera_log_dao.create(db, obj)

@staticmethod
async def batch_create(*, obj_list: list[CreateOperaLogParam]) -> None:
async def bulk_create(*, objs: list[CreateOperaLogParam]) -> None:
"""
批量创建操作日志

:param obj: 操作日志创建参数
:param objs: 操作日志创建参数列表
:return:
"""
async with async_db_session.begin() as db:
await opera_log_dao.batch_create(db, obj_list)
await opera_log_dao.bulk_create(db, objs)

@staticmethod
async def delete(*, obj: DeleteOperaLogParam) -> int:
Expand Down
46 changes: 20 additions & 26 deletions backend/common/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,36 +3,30 @@
import asyncio

from asyncio import Queue
from typing import List


async def get_many_from_queue(queue: Queue, max_items: int, timeout: float) -> List:
async def batch_dequeue(queue: Queue, max_items: int, timeout: float) -> list:
"""
在指定的超时时间内,从异步队列中批量获取项目。
从异步队列中获取多个项目
此函数会尝试从给定的 ``asyncio.Queue`` 中获取最多 ``max_items`` 个项目。
它会为整个获取过程设置一个总的 ``timeout`` 秒数的超时限制。
如果在超时时间内未能收集到 ``max_items`` 个项目,
函数将返回当前已成功获取的所有项目。
:param queue: 用于获取项目的 ``asyncio.Queue`` 队列。
:type queue: asyncio.Queue
:param max_items: 希望从队列中获取的最大项目数量。
:type max_items: int
:param timeout: 总的等待超时时间(秒)。
:type timeout: float
:return: 一个从队列中获取到的项目列表。如果发生超时,列表中的项目数量可能会少于 ``max_items``。
:rtype: List
:param queue: 用于获取项目的 `asyncio.Queue` 队列
:param max_items: 从队列中获取的最大项目数量
:param timeout: 总的等待超时时间(秒)
:return:
"""
results = []
items = []

loop = asyncio.get_event_loop()
end_time = loop.time() + timeout

async def collector():
while len(results) < max_items:
item = await queue.get()
results.append(item)
while len(items) < max_items:
remaining = end_time - loop.time()
if remaining <= 0:
break
try:
item = await asyncio.wait_for(queue.get(), timeout=remaining)
items.append(item)
except asyncio.TimeoutError:
break

try:
await asyncio.wait_for(collector(), timeout=timeout)
except asyncio.TimeoutError:
pass # 超时后返回已有的 items
return results
return items
4 changes: 3 additions & 1 deletion backend/core/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class Settings(BaseSettings):
FASTAPI_STATIC_FILES: bool = True

# 数据库
DATABASE_ECHO: bool | Literal['debug'] = False
DATABASE_ECHO: bool | Literal['debug'] = True
DATABASE_POOL_ECHO: bool | Literal['debug'] = False
DATABASE_SCHEMA: str = 'fba'
DATABASE_CHARSET: str = 'utf8mb4'
Expand Down Expand Up @@ -177,6 +177,8 @@ class Settings(BaseSettings):
'new_password',
'confirm_password',
]
OPERA_LOG_QUEUE_MAX: int = 100
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. OPERA_LOG_QUEUE_MAX 参数名有点歧义,看上去是整个queue队列的maxsize,应该是单次从queue批量获取数据量
  2. OPERA_LOG_QUEUE_TIMEOUT 一分钟是否有点太长?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Agree
  2. 1 分钟感觉还好,空闲状态下,不会过于频繁执行任务,并发情况下,1分钟则并不重要,因为 items 会很快填满

OPERA_LOG_QUEUE_TIMEOUT: int = 60 # 1 分钟

# Plugin 配置
PLUGIN_PIP_CHINA: bool = True
Expand Down
4 changes: 2 additions & 2 deletions backend/core/registrar.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ async def register_init(app: FastAPI) -> AsyncGenerator[None, None]:
prefix=settings.REQUEST_LIMITER_REDIS_PREFIX,
http_callback=http_limit_callback,
)
# 启动操作日志消费者
app.state.opera_log_consumer = create_task(OperaLogMiddleware.batch_create_consumer())
# 创建操作日志任务
create_task(OperaLogMiddleware.consumer())

yield

Expand Down
35 changes: 17 additions & 18 deletions backend/middleware/opera_log_middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from backend.app.admin.service.opera_log_service import opera_log_service
from backend.common.enums import OperaLogCipherType, StatusType
from backend.common.log import log
from backend.common.queue import get_many_from_queue
from backend.common.queue import batch_dequeue
from backend.core.conf import settings
from backend.utils.encrypt import AESCipher, ItsDCipher, Md5Cipher
from backend.utils.trace_id import get_request_trace_id
Expand All @@ -24,7 +24,6 @@
class OperaLogMiddleware(BaseHTTPMiddleware):
"""操作日志中间件"""

# 操作日志队列, 指定默认队列长度为100000
opera_log_queue: Queue = Queue(maxsize=100000)

async def dispatch(self, request: Request, call_next: Any) -> Response:
Expand Down Expand Up @@ -196,20 +195,20 @@ def desensitization(args: dict[str, Any]) -> dict[str, Any] | None:
args[arg_type][key] = '******'
return args

@staticmethod
async def batch_create_consumer() -> None:
"""批量创建操作日志消费者"""
@classmethod
async def consumer(cls) -> None:
"""操作日志消费者"""
while True:
opera_log_queue = OperaLogMiddleware.opera_log_queue
logs = await get_many_from_queue(opera_log_queue, max_items=100, timeout=1)
if len(logs) < 1:
continue
log.info(f'处理日志: {len(logs)} 条.')
try:
await opera_log_service.batch_create(obj_list=logs)
except Exception as e:
log.error(f'批量创建操作日志失败: {e}, logs: {logs}')
finally:
# 防止队列阻塞
if not opera_log_queue.empty():
opera_log_queue.task_done()
logs = await batch_dequeue(
cls.opera_log_queue,
max_items=settings.OPERA_LOG_QUEUE_MAX,
timeout=settings.OPERA_LOG_QUEUE_TIMEOUT,
)
if logs:
try:
if settings.DATABASE_ECHO:
log.info('自动执行【操作日志批量创建】任务...')
await opera_log_service.bulk_create(objs=logs)
finally:
if not cls.opera_log_queue.empty():
cls.opera_log_queue.task_done()