1- import asyncio
2- from datetime import datetime
3- from functools import cached_property
4- from typing import Any , Dict , List , Optional , Sequence , Set , Union
5-
6- from fastapi_amis_admin import admin
7- from fastapi_amis_admin .admin import AdminAction , AdminApp
8- from fastapi_amis_admin .amis import FormItem , SchemaNode , TableColumn , TableCRUD
9- from fastapi_amis_admin .crud .base import ItemListSchema , SchemaCreateT , SchemaFilterT , SchemaModelT , SchemaReadT , SchemaUpdateT
10- from fastapi_amis_admin .crud .parser import TableModelT
11- from fastapi_amis_admin .crud .schema import CrudEnum
12- from fastapi_amis_admin .utils .pydantic import ModelField
13- from sqlalchemy .engine import Result
1+ from fastapi_amis_admin .admin import BaseAuthFieldModelAdmin , BaseAuthSelectModelAdmin
142from sqlalchemy .sql import Select
153from starlette .requests import Request
164
17- from fastapi_user_auth .admin .utils import get_schema_fields_name_label
185from fastapi_user_auth .auth .schemas import SystemUserEnum
19- from fastapi_user_auth .mixins .schemas import FieldPermEnum , SelectPerm
20-
21-
22- class ReadOnlyModelAdmin (admin .ModelAdmin ):
23- """只读模型管理Mixin
24- 移除了创建,更新,删除等操作
25- """
26-
27- @cached_property
28- def registered_admin_actions (self ) -> Dict [str , "AdminAction" ]:
29- actions = super ().registered_admin_actions
30- return {
31- key : action
32- for key , action in actions .items ()
33- if key not in {"create" , "update" , "delete" , "bulk_delete" , "bulk_update" , "bulk_create" }
34- }
35-
36- async def has_create_permission (self , request : Request , data : SchemaUpdateT , ** kwargs ) -> bool :
37- return False
38-
39- async def has_update_permission (
40- self ,
41- request : Request ,
42- item_id : List [str ],
43- data : SchemaUpdateT ,
44- ** kwargs ,
45- ) -> bool :
46- return False
47-
48- async def has_delete_permission (self , request : Request , item_id : List [str ], ** kwargs ) -> bool :
49- return False
50-
51-
52- class AutoTimeModelAdmin (admin .ModelAdmin ):
53- """禁止修改时间Mixin,没有Id,创建时间,更新时间,删除时间等字段的创建和更新"""
54-
55- create_exclude = {
56- "id" ,
57- "create_time" ,
58- "update_time" ,
59- "delete_time" ,
60- }
61- update_exclude = {
62- "id" ,
63- "create_time" ,
64- "update_time" ,
65- "delete_time" ,
66- }
67-
68-
69- class SoftDeleteModelAdmin (AutoTimeModelAdmin ):
70- """软删除模型Mixin.
71- - 需要在模型中定义delete_time字段.如果delete_time字段为None,则表示未删除.
72- """
73-
74- def __init__ (self , app : "AdminApp" ):
75- super ().__init__ (app )
76- assert hasattr (self .model , "delete_time" ), "SoftDeleteModelAdmin需要在模型中定义delete_time字段"
77-
78- async def get_select (self , request : Request ):
79- sel = await super ().get_select (request )
80- return sel .where (self .model .delete_time == None ) # noqa E711
81-
82- def delete_item (self , obj : SchemaModelT ) -> None :
83- obj .delete_time = datetime .now ()
84-
85-
86- class FootableModelAdmin (admin .ModelAdmin ):
87- """为模型管理Amis表格添加底部展示(Footable)属性"""
88-
89- async def get_list_table (self , request : Request ) -> TableCRUD :
90- table = await super ().get_list_table (request )
91- table .footable = True
92- return table
93-
94-
95- class BaseAuthFieldModelAdmin (admin .ModelAdmin ):
96- """字段级别权限控制模型管理.
97- - xxx_permission_fields:
98- 1.动作权限字段,可以通过覆盖这些属性来控制哪些字段需要进行权限验证.
99- 2.未设置的字段,则不进行权限验证.
100- 3.一旦类被实例化,则会缓存这些属性,禁止再次修改.
101- #todo 初步实现,未优化
102- """
103-
104- perm_fields_exclude : Dict [Union [FieldPermEnum , int ], Sequence [str ]] = None
105- """exclude指定的字段,不进行权限验证."""
106-
107- def __init__ (self , app : "AdminApp" ):
108- super ().__init__ (app )
109-
110- def get_permission_fields (self , action : str ) -> Dict [str , str ]:
111- """获取权限字段"""
112- if not self .perm_fields_exclude :
113- return {}
114- info = {
115- "list" : (self .schema_list , "列表展示-" , FieldPermEnum .LIST ),
116- "filter" : (self .schema_filter , "列表筛选-" , FieldPermEnum .FILTER ),
117- "create" : (self .schema_create , "新增-" , FieldPermEnum .CREATE ),
118- "read" : (self .schema_read , "查看-" , FieldPermEnum .READ ),
119- "update" : (self .schema_update , "更新-" , FieldPermEnum .UPDATE ),
120- }
121- if action not in info :
122- return {}
123- schema , prefix , perm = info [action ]
124- exlude = set ()
125- for k , fileds in self .perm_fields_exclude .items ():
126- if (k & perm ) == perm :
127- exlude .update (set (fileds ))
128- return get_schema_fields_name_label (schema , prefix = prefix , exclude_required = True , exclude = exlude )
129-
130- @cached_property
131- def create_permission_fields (self ) -> Dict [str , str ]:
132- """创建权限字段"""
133- return self .get_permission_fields ("create" )
134-
135- @cached_property
136- def read_permission_fields (self ) -> Dict [str , str ]:
137- """读取权限字段"""
138- return self .get_permission_fields ("read" )
139-
140- @cached_property
141- def update_permission_fields (self ) -> Dict [str , str ]:
142- """更新权限字段"""
143- return self .get_permission_fields ("update" )
144-
145- @cached_property
146- def list_permission_fields (self ) -> Dict [str , str ]:
147- """列表权限字段"""
148- return self .get_permission_fields ("list" )
149-
150- @cached_property
151- def filter_permission_fields (self ) -> Dict [str , str ]:
152- """过滤筛选权限字段"""
153- return self .get_permission_fields ("filter" )
154-
155- async def has_field_permission (self , request : Request , field : str , action : str = "" ) -> bool :
156- """判断用户是否有字段权限"""
157- return True
158-
159- async def get_deny_fields (self , request : Request , action : str = None ) -> Set [str ]:
160- """获取没有权限的字段"""
161- cache_key = f"{ self .unique_id } _exclude_fields"
162- request_cache = request .scope .get (cache_key , {})
163- if action in request_cache :
164- return request_cache [action ]
165- check_fields = {}
166- if action == "list" :
167- check_fields = self .list_permission_fields .keys ()
168- elif action == "filter" :
169- check_fields = self .filter_permission_fields .keys ()
170- elif action == "create" :
171- check_fields = self .create_permission_fields .keys ()
172- elif action == "update" :
173- check_fields = self .update_permission_fields .keys ()
174- elif action == "read" :
175- check_fields = self .read_permission_fields .keys ()
176- else :
177- pass
178- fields = {field for field in check_fields if not await self .has_field_permission (request , field , action )}
179- request_cache [action ] = fields
180- if cache_key not in request .scope :
181- request .scope [cache_key ] = request_cache
182- return fields
183-
184- async def on_list_after (self , request : Request , result : Result , data : ItemListSchema , ** kwargs ) -> ItemListSchema :
185- """Parse the database data query result dictionary into schema_list."""
186- exclude = await self .get_deny_fields (request , "list" ) # 过滤没有权限的字段
187- data = await super ().on_list_after (request , result , data , ** kwargs )
188- data .items = [item .dict (exclude = exclude ) for item in data .items ] # 过滤没有权限的字段
189- return data
190-
191- async def on_filter_pre (self , request : Request , obj : Optional [SchemaFilterT ], ** kwargs ) -> Dict [str , Any ]:
192- data = await super ().on_filter_pre (request , obj , ** kwargs )
193- if not data :
194- return data
195- exclude = await self .get_deny_fields (request , "filter" ) # 过滤没有权限的字段
196- return {k : v for k , v in data .items () if k not in exclude }
197-
198- async def create_items (self , request : Request , items : List [SchemaCreateT ]) -> List [TableModelT ]:
199- """Create multiple data"""
200- exclude = await self .get_deny_fields (request , "create" )
201- items = [item .copy (exclude = exclude ) for item in items ] # 过滤没有权限的字段
202- items = await super ().create_items (request , items )
203- return items
204-
205- async def read_items (self , request : Request , item_id : List [str ]) -> List [SchemaReadT ]:
206- """Read multiple data"""
207- items = await super ().read_items (request , item_id )
208- exclude = await self .get_deny_fields (request , "read" ) # 过滤没有权限的字段
209- return [item .copy (exclude = exclude ) for item in items ]
210-
211- async def on_update_pre (
212- self ,
213- request : Request ,
214- obj : SchemaUpdateT ,
215- item_id : Union [List [str ], List [int ]],
216- ** kwargs ,
217- ) -> Dict [str , Any ]:
218- exclude = await self .get_deny_fields (request , "update" ) # 过滤没有权限的字段
219- obj = obj .copy (exclude = exclude ) # 过滤没有权限的字段
220- data = await super ().on_update_pre (request , obj , item_id , ** kwargs )
221- return data
222-
223- async def get_form_item (
224- self , request : Request , modelfield : ModelField , action : CrudEnum
225- ) -> Union [FormItem , SchemaNode , None ]:
226- """过滤前端创建,更新,筛选表单字段"""
227- # todo 优化筛选和列表动作的界定
228- # action为list时,表示列表展示字段.否则为筛选表单字段
229- act = "filter" if action == "list" else action
230- exclude = await self .get_deny_fields (request , act ) # 获取没有权限的字段
231- name = modelfield .alias or modelfield .name
232- if name in exclude :
233- return None
234- form_item = await super ().get_form_item (request , modelfield , action )
235- return form_item
236-
237- async def get_list_column (self , request : Request , modelfield : ModelField ) -> Optional [TableColumn ]:
238- """过滤前端展示字段"""
239- exclude = await self .get_deny_fields (request , "list" ) # 获取没有权限的字段
240- name = modelfield .alias or modelfield .name
241- if name in exclude :
242- return None
243- column = await super ().get_list_column (request , modelfield )
244- return column
2456
2467
2478class AuthFieldModelAdmin (BaseAuthFieldModelAdmin ):
@@ -253,34 +14,6 @@ async def has_field_permission(self, request: Request, field: str, action: str =
25314 return effect
25415
25516
256- class BaseAuthSelectModelAdmin (admin .ModelAdmin ):
257- """包含选择数据集权限控制的模型管理"""
258-
259- select_permissions : List [SelectPerm ] = []
260- """需要进行权限控制的数据集列表"""
261-
262- async def has_select_permission (self , request : Request , name : str ) -> bool :
263- """判断用户是否有数据集权限"""
264- return True
265-
266- async def get_select (self , request : Request ) -> Select :
267- sel = await super ().get_select (request )
268- return await self .filter_select (request , sel )
269-
270- async def filter_select (self , request : Request , sel : Select ) -> Select :
271- """在sel中添加权限过滤条件"""
272- for permission in self .select_permissions :
273- if not isinstance (permission , SelectPerm ):
274- continue
275- effect = await self .has_select_permission (request , permission .name )
276- # 如果权限为反向权限,则判断用户是否没有权限
277- if permission .reverse ^ effect :
278- sel = permission .call (self , request , sel )
279- if asyncio .iscoroutine (sel ):
280- sel = await sel
281- return sel
282-
283-
28417class AuthSelectModelAdmin (BaseAuthSelectModelAdmin ):
28518 async def has_select_permission (self , request : Request , name : str ) -> bool :
28619 """判断用户是否有数据集权限"""
@@ -294,9 +27,3 @@ async def filter_select(self, request: Request, sel: Select) -> Select:
29427 if subject == SystemUserEnum .ROOT :
29528 return sel
29629 return await super ().filter_select (request , sel )
297-
298-
299- class BaseAuthFieldFormAdmin (admin .FormAdmin ):
300- """#todo 字段级别权限控制表单管理"""
301-
302- pass
0 commit comments