11#!/usr/bin/env python3
22# -*- coding: utf-8 -*-
3+ from typing import TYPE_CHECKING
4+
35from fastapi import Request
4- from sqlalchemy import ColumnElement
6+ from sqlalchemy import ColumnElement , and_ , or_
57
8+ from backend .common .enums import RoleDataRuleExpressionType , RoleDataRuleOperatorType
9+ from backend .common .exception import errors
610from backend .common .exception .errors import ServerError
711from backend .core .conf import settings
12+ from backend .utils .import_parse import dynamic_import
13+
14+ if TYPE_CHECKING :
15+ from backend .app .admin .schema .data_rule import GetDataRuleListDetails
816
917
1018class RequestPermission :
@@ -27,4 +35,75 @@ async def __call__(self, request: Request):
2735 request .state .permission = self .value
2836
2937
30- def filter_data_scope () -> ColumnElement [bool ]: ...
38+ def filter_data_permission (request : Request ) -> ColumnElement [bool ]:
39+ """
40+ 过滤数据权限
41+
42+ 使用场景:用户登录前台后,控制其能看到哪些数据
43+
44+ :param request:
45+ :return:
46+ """
47+ user_data_rules : list [GetDataRuleListDetails ] = request .user .roles .rules
48+
49+ # 超级管理员和无规则用户不做过滤
50+ if request .user .is_superuser or not user_data_rules :
51+ return or_ (1 == 1 )
52+
53+ where_and_list = []
54+ where_or_list = []
55+ allowed_models = frozenset (m .split ('.' )[- 1 ] for m in settings .ALLOWED_MODELS )
56+
57+ for rule in user_data_rules :
58+ rule_model = rule .model
59+ if rule_model not in allowed_models :
60+ raise errors .NotFoundError (msg = '数据模型不存在' )
61+ try :
62+ model_ins = dynamic_import (rule_model )
63+ except Exception :
64+ raise errors .ServerError (msg = '数据模型动态调用失败,请联系系统超级管理员' )
65+ model_columns = model_ins .__table__ .columns .keys ()
66+ column = rule .column
67+ if column not in model_columns :
68+ raise errors .NotFoundError (msg = '数据模型列不存在' )
69+
70+ # 获取模型的列对象
71+ column_obj = getattr (model_ins , column )
72+ rule_expression = rule .expression
73+
74+ # 根据表达式类型构建条件
75+ condition = None
76+ if rule_expression == RoleDataRuleExpressionType .eq :
77+ condition = column_obj == rule .value
78+ elif rule_expression == RoleDataRuleExpressionType .ne :
79+ condition = column_obj != rule .value
80+ elif rule_expression == RoleDataRuleExpressionType .gt :
81+ condition = column_obj > rule .value
82+ elif rule_expression == RoleDataRuleExpressionType .ge :
83+ condition = column_obj >= rule .value
84+ elif rule_expression == RoleDataRuleExpressionType .lt :
85+ condition = column_obj < rule .value
86+ elif rule_expression == RoleDataRuleExpressionType .le :
87+ condition = column_obj <= rule .value
88+ elif rule_expression == RoleDataRuleExpressionType .in_ :
89+ values = rule .value .split (',' ) if isinstance (rule .value , str ) else rule .value
90+ condition = column_obj .in_ (values )
91+ elif rule .expression == RoleDataRuleExpressionType .not_in :
92+ values = rule .value .split (',' ) if isinstance (rule .value , str ) else rule .value
93+ condition = ~ column_obj .in_ (values )
94+
95+ if condition is not None :
96+ rule_operator = rule .operator
97+ if rule_operator == RoleDataRuleOperatorType .AND :
98+ where_and_list .append (condition )
99+ elif rule_operator == RoleDataRuleOperatorType .OR :
100+ where_or_list .append (condition )
101+
102+ # 组合条件
103+ where_list = []
104+ if where_and_list :
105+ where_list .append (and_ (* where_and_list ))
106+ if where_or_list :
107+ where_list .append (or_ (* where_or_list ))
108+
109+ return or_ (* where_list ) if where_list else or_ (1 == 1 )
0 commit comments