Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,6 @@ venv/
.python-version
.ruff_cache/
.pytest_cache/
.env
docker-compose-dev.yml
GEMINI.md
11 changes: 11 additions & 0 deletions backend/app/admin/crud/crud_opera_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,17 @@ async def create(self, db: AsyncSession, obj: CreateOperaLogParam) -> None:
"""
await self.create_model(db, obj)

async def batch_create(self, db: AsyncSession, obj_list: list[CreateOperaLogParam]) -> None:
"""
批量创建操作日志

:param db: 数据库会话
:param obj_list: 创建操作日志参数列表
:return:
"""
db.add_all([OperaLog(**obj.model_dump()) for obj in obj_list])
await db.flush()

async def delete(self, db: AsyncSession, pks: list[int]) -> int:
"""
批量删除操作日志
Expand Down
41 changes: 40 additions & 1 deletion backend/app/admin/service/opera_log_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

from backend.app.admin.crud.crud_opera_log import opera_log_dao
from backend.app.admin.schema.opera_log import CreateOperaLogParam, DeleteOperaLogParam
from backend.common.log import log
from backend.common.queue import get_many_from_queue, opera_log_queue
from backend.database.db import async_db_session


Expand All @@ -25,14 +27,51 @@ async def get_select(*, username: str | None, status: int | None, ip: str | None
@staticmethod
async def create(*, obj: CreateOperaLogParam) -> None:
"""
创建操作日志
创建操作日志(同步)

:param obj: 操作日志创建参数
:return:
"""
async with async_db_session.begin() as db:
await opera_log_dao.create(db, obj)

@staticmethod
async def create_in_queue(*, obj: CreateOperaLogParam) -> None:
"""
创建操作日志(入队)

:param obj: 操作日志创建参数
:return:
"""
await opera_log_queue.put(obj)

@staticmethod
async def batch_create_consumer() -> None:
"""
批量创建操作日志消费者

:return:
"""
while True:
try:
# TODO max_items timeout Queue.maxsize 应该设置为可配置, 在 setting ?
logs = await get_many_from_queue(opera_log_queue, max_items=100, timeout=1)
if logs:
log.info(
f'处理日志: {len(logs)} 条.',
)
async with async_db_session.begin() as db:
await opera_log_dao.batch_create(db, logs)
else:
log.debug('无日志可处理')

except Exception as e:
log.error(f'批量创建操作日志失败: {e}')
finally:
# 防止队列阻塞
if not opera_log_queue.empty():
opera_log_queue.task_done()

@staticmethod
async def delete(*, obj: DeleteOperaLogParam) -> int:
"""
Expand Down
40 changes: 40 additions & 0 deletions backend/common/queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import asyncio

from asyncio import Queue
from typing import List

# 操作日志队列
opera_log_queue: Queue = Queue(maxsize=100000)


async def get_many_from_queue(queue: Queue, max_items: int, timeout: float) -> List:
"""
在指定的超时时间内,从异步队列中批量获取项目。
此函数会尝试从给定的 `asyncio.Queue` 中获取最多 `max_items` 个项目。
它会为整个获取过程设置一个总的 `timeout` 秒数的超时限制。如果在超时
时间内未能收集到 `max_items` 个项目,函数将返回当前已成功获取的所有项目。
Args:
queue: 用于获取项目的 `asyncio.Queue` 队列。
max_items: 希望从队列中获取的最大项目数量。
timeout: 总的等待超时时间(秒)。
Returns:
一个从队列中获取到的项目列表。如果发生超时,
列表中的项目数量可能会少于 `max_items`。
"""
results = []

async def collector():
while len(results) < max_items:
item = await queue.get()
results.append(item)

try:
await asyncio.wait_for(collector(), timeout=timeout)
except asyncio.TimeoutError:
pass # 超时后返回已有的 items
return results
4 changes: 4 additions & 0 deletions backend/core/registrar.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# -*- coding: utf-8 -*-
import os

from asyncio import create_task
from contextlib import asynccontextmanager
from typing import AsyncGenerator

Expand All @@ -14,6 +15,7 @@
from starlette.middleware.authentication import AuthenticationMiddleware
from starlette.staticfiles import StaticFiles

from backend.app.admin.service.opera_log_service import opera_log_service
from backend.common.exception.exception_handler import register_exception
from backend.common.log import set_custom_logfile, setup_logging
from backend.core.conf import settings
Expand Down Expand Up @@ -47,6 +49,8 @@ async def register_init(app: FastAPI) -> AsyncGenerator[None, None]:
prefix=settings.REQUEST_LIMITER_REDIS_PREFIX,
http_callback=http_limit_callback,
)
# 启动操作日志消费者
app.state.opera_log_consumer = create_task(opera_log_service.batch_create_consumer())

yield

Expand Down
3 changes: 1 addition & 2 deletions backend/middleware/opera_log_middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
# -*- coding: utf-8 -*-
import time

from asyncio import create_task
from typing import Any

from asgiref.sync import sync_to_async
Expand Down Expand Up @@ -108,7 +107,7 @@ async def dispatch(self, request: Request, call_next: Any) -> Response:
cost_time=elapsed, # 可能和日志存在微小差异(可忽略)
opera_time=request.state.start_time,
)
create_task(opera_log_service.create(obj=opera_log_in)) # noqa: ignore
await opera_log_service.create_in_queue(obj=opera_log_in)

# 错误抛出
if error:
Expand Down