-
Notifications
You must be signed in to change notification settings - Fork 2.6k
feat: Audit Log #2571
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Audit Log #2571
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,85 @@ | ||
| # coding=utf-8 | ||
| """ | ||
| @project: MaxKB | ||
| @Author:虎 | ||
| @file: log.py | ||
| @date:2025/3/14 16:09 | ||
| @desc: | ||
| """ | ||
|
|
||
| from setting.models.log_management import Log | ||
|
|
||
|
|
||
| def _get_ip_address(request): | ||
| """ | ||
| 获取ip地址 | ||
| @param request: | ||
| @return: | ||
| """ | ||
| x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR') | ||
| if x_forwarded_for: | ||
| ip = x_forwarded_for.split(',')[0] | ||
| else: | ||
| ip = request.META.get('REMOTE_ADDR') | ||
| return ip | ||
|
|
||
|
|
||
| def _get_user(request): | ||
| """ | ||
| 获取用户 | ||
| @param request: | ||
| @return: | ||
| """ | ||
| user = request.user | ||
| return { | ||
| "id": str(user.id), | ||
| "email": user.email, | ||
| "phone": user.phone, | ||
| "nick_name": user.nick_name, | ||
| "username": user.username, | ||
| "role": user.role, | ||
| } | ||
|
|
||
|
|
||
| def _get_details(request): | ||
| path = request.path | ||
| body = request.data | ||
| query = request.query_params | ||
| return { | ||
| 'path': path, | ||
| 'body': body, | ||
| 'query': query | ||
| } | ||
|
|
||
|
|
||
| def log(menu: str, operate, get_user=_get_user, get_ip_address=_get_ip_address, get_details=_get_details): | ||
| """ | ||
| 记录审计日志 | ||
| @param menu: 操作菜单 str | ||
| @param operate: 操作 str|func 如果是一个函数 入参将是一个request 响应为str def operate(request): return "操作菜单" | ||
| @param get_user: 获取用户 | ||
| @param get_ip_address:获取IP地址 | ||
| @param get_details: 获取执行详情 | ||
| @return: | ||
| """ | ||
|
|
||
| def inner(func): | ||
| def run(view, request, **kwargs): | ||
| status = 200 | ||
| try: | ||
| return func(view, request, **kwargs) | ||
| except Exception as e: | ||
| status = 500 | ||
| finally: | ||
| ip = get_ip_address(request) | ||
| user = get_user(request) | ||
| details = get_details(request) | ||
| _operate = operate | ||
| if callable(operate): | ||
| _operate = operate(request) | ||
| # 插入审计日志 | ||
| Log(menu=menu, operate=_operate, user=user, status=status, ip_address=ip, details=details).save() | ||
|
|
||
| return run | ||
|
|
||
| return inner | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here are some points of improvement:
Here's a revised version incorporating these points: # coding=utf-8
"""
@project: MaxKB
@author:虎
@file: log.py
@date:2025/3/14 16:09
@desc:
"""
from django.db import transaction
from .models.log_management import Log
def _get_ip_address(request):
"""
获取 IP 地址
:param request:
:return:
"""
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for:
ip = x_forwarded_for.split(',')[0]
else:
ip = request.META.get('REMOTE_ADDR', '')
return ip
def _get_user(request):
"""
获取用户信息
:param request:
:return:
"""
user = request.user
return {
"id": str(user.id),
"email": user.email,
"phone": user.phone,
"nick_name": user.nick_name,
"username": user.username,
"role": getattr(user, 'role'),
}
def _get_details(request):
"""
获取请求详细信息
:param request:
:return:
"""
path = request.path
body = request.body.decode() if request.method == 'POST' else None
query = request.GET.dict()
return {
'path': path,
'body': body,
'query': query
}
def record_audit_log(menu: str, operate: Union[str, Callable], get_user: Optional[Callable] = _get_user,
get_ip_address: Optional[Callable] = _get_ip_address,
get_details: Optional[Callable] = _get_details) -> Callable[[Any), Any]]:
"""
记录审计日志的装饰器
:param menu: 操作类别(str)
:param operate: 操作方法(str或callable),若 callable 则为函数,入参为请求对象,返回值为要写入的日志内容
:param get_user: 用户提取函数,默认使用内置的 `_get_user`
:param get_ip_address: 请求IP地址提取函数,默认使用内置的 `_get_ip_address`
:param get_details: 请求详细信息提取函数,默认使用内置的 `_get_details`
返回:
打印并记录到数据库中的视图处理函数。
"""
def decorator(view_func):
@wraps(view_func)
def wrapper(request, *args, **kwargs):
try:
response = view_func(request, *args, **kwargs)
status_code = response.status_code
user_info = get_user(request)
ip_address = get_ip_address(request)
details = get_details(request)
# If operate is callable, call it with the current request
if callable(operate):
_operate = operate(request)
else:
_operate = operate
# Insert audit log into DB using non-committing transactions
with transaction.atomic():
Log.objects.create(
menu=menu,
operate=_operate,
user=user_info,
status=status_code,
ip_address=ip_address,
details=details
)
return response
except Exception as e:
# Handle errors while writing to the database
print(f"Error while recording log: {e}")
raise
return wrapper
return decoratorThis revised version addresses several issues identified, making it clearer, maintainable, and robust. |
||
Uh oh!
There was an error while loading. Please reload this page.