1-
21import json
32from typing import List , Optional
43
1110from sqlbot_xpack .permissions .models .ds_rules import DsRules
1211from sqlalchemy .dialects .postgresql import JSONB
1312
14- def get_row_permission_filters (session : SessionDep , current_user : CurrentUser , ds : CoreDatasource , tables : Optional [list ] = None , single_table : Optional [CoreTable ] = None ):
13+
14+ def get_row_permission_filters (session : SessionDep , current_user : CurrentUser , ds : CoreDatasource ,
15+ tables : Optional [list ] = None , single_table : Optional [CoreTable ] = None ):
1516 if single_table :
1617 table_list = [session .get (CoreTable , single_table .id )]
1718 else :
18- table_list = session .query (CoreTable ).filter (
19+ table_list = session .query (CoreTable ).filter (
1920 and_ (CoreTable .ds_id == ds .id , CoreTable .table_name .in_ (tables ))
2021 ).all ()
2122
2223 filters = []
23- for table in table_list :
24- row_permissions = session .query (DsPermission ).filter (
25- and_ (DsPermission .table_id == table .id , DsPermission .type == 'row' )).all ()
26- res : List [PermissionDTO ] = []
27- if row_permissions is not None :
28- for permission in row_permissions :
29- # check permission and user in same rules
30- obj = session .query (DsRules ).filter (
31- and_ (DsRules .permission_list .op ('@>' )(cast ([permission .id ], JSONB )),
32- or_ (DsRules .user_list .op ('@>' )(cast ([f'{ current_user .id } ' ], JSONB )),
33- DsRules .user_list .op ('@>' )(cast ([current_user .id ], JSONB ))))
34- ).first ()
35- if obj is not None :
36- res .append (transRecord2DTO (session , permission ))
37- where_str = transFilterTree (session , res , ds )
38- filters .append ({"table" : table .table_name , "filter" : where_str })
24+ if is_normal_user (current_user ):
25+ for table in table_list :
26+ row_permissions = session .query (DsPermission ).filter (
27+ and_ (DsPermission .table_id == table .id , DsPermission .type == 'row' )).all ()
28+ contain_rules = session .query (DsRules ).all ()
29+ res : List [PermissionDTO ] = []
30+ if row_permissions is not None :
31+ for permission in row_permissions :
32+ # check permission and user in same rules
33+ # obj = session.query(DsRules).filter(
34+ # and_(DsRules.permission_list.op('@>')(cast([permission.id], JSONB)),
35+ # or_(DsRules.user_list.op('@>')(cast([f'{current_user.id}'], JSONB)),
36+ # DsRules.user_list.op('@>')(cast([current_user.id], JSONB))))
37+ # ).first()
38+ flag = False
39+ for r in contain_rules :
40+ p_list = json .loads (r .permission_list )
41+ u_list = json .loads (r .user_list )
42+ if p_list is not None and u_list is not None and permission .id in p_list and (
43+ current_user .id in u_list or f'{ current_user .id } ' in u_list ):
44+ flag = True
45+ if flag :
46+ res .append (transRecord2DTO (session , permission ))
47+ where_str = transFilterTree (session , res , ds )
48+ filters .append ({"table" : table .table_name , "filter" : where_str })
3949 return filters
4050
41- def get_column_permission_fields (session : SessionDep , current_user : CurrentUser , table : CoreTable , fields : list [CoreField ]):
51+
52+ def get_column_permission_fields (session : SessionDep , current_user : CurrentUser , table : CoreTable ,
53+ fields : list [CoreField ]):
4254 if is_normal_user (current_user ):
4355 column_permissions = session .query (DsPermission ).filter (
4456 and_ (DsPermission .table_id == table .id , DsPermission .type == 'column' )).all ()
57+ contain_rules = session .query (DsRules ).all ()
4558 if column_permissions is not None :
4659 for permission in column_permissions :
4760 # check permission and user in same rules
48- obj = session .query (DsRules ).filter (
49- and_ (DsRules .permission_list .op ('@>' )(cast ([permission .id ], JSONB )),
50- or_ (DsRules .user_list .op ('@>' )(cast ([f'{ current_user .id } ' ], JSONB )),
51- DsRules .user_list .op ('@>' )(cast ([current_user .id ], JSONB ))))
52- ).first ()
53- if obj is not None :
61+ # obj = session.query(DsRules).filter(
62+ # and_(DsRules.permission_list.op('@>')(cast([permission.id], JSONB)),
63+ # or_(DsRules.user_list.op('@>')(cast([f'{current_user.id}'], JSONB)),
64+ # DsRules.user_list.op('@>')(cast([current_user.id], JSONB))))
65+ # ).first()
66+ flag = False
67+ for r in contain_rules :
68+ p_list = json .loads (r .permission_list )
69+ u_list = json .loads (r .user_list )
70+ if p_list is not None and u_list is not None and permission .id in p_list and (
71+ current_user .id in u_list or f'{ current_user .id } ' in u_list ):
72+ flag = True
73+ if flag :
5474 permission_list = json .loads (permission .permissions )
5575 fields = filter_list (fields , permission_list )
5676 return fields
77+
78+
5779def is_normal_user (current_user : CurrentUser ):
5880 return current_user .id != 1
5981
82+
6083def filter_list (list_a , list_b ):
6184 id_to_invalid = {}
6285 for b in list_b :
6386 if not b ['enable' ]:
6487 id_to_invalid [b ['field_id' ]] = True
6588
66- return [a for a in list_a if not id_to_invalid .get (a .id , False )]
89+ return [a for a in list_a if not id_to_invalid .get (a .id , False )]
0 commit comments