4
4
from logging import getLogger
5
5
from fastapi import APIRouter , Depends , Form
6
6
from fastapi .responses import RedirectResponse
7
- from pydantic import BaseModel , ConfigDict
8
7
from sqlmodel import Session , select , col
9
8
from sqlalchemy .orm import selectinload
10
9
from utils .db import get_session
16
15
17
16
router = APIRouter (prefix = "/roles" , tags = ["roles" ])
18
17
19
- # --- Server Request Models ---
20
-
21
- class RoleCreate (BaseModel ):
22
- model_config = ConfigDict (from_attributes = True )
23
-
24
- name : str
25
- organization_id : int
26
- permissions : List [ValidPermissions ]
27
-
28
- @classmethod
29
- async def as_form (
30
- cls ,
31
- name : str = Form (...),
32
- organization_id : int = Form (...),
33
- permissions : List [ValidPermissions ] = Form (...)
34
- ):
35
- # Pass session to validator context
36
- return cls (
37
- name = name ,
38
- organization_id = organization_id ,
39
- permissions = permissions
40
- )
41
-
42
-
43
- class RoleUpdate (BaseModel ):
44
- model_config = ConfigDict (from_attributes = True )
45
-
46
- id : int
47
- name : str
48
- organization_id : int
49
- permissions : List [ValidPermissions ]
50
-
51
- @classmethod
52
- async def as_form (
53
- cls ,
54
- id : int = Form (...),
55
- name : str = Form (...),
56
- organization_id : int = Form (...),
57
- permissions : List [ValidPermissions ] = Form (...)
58
- ):
59
- return cls (
60
- id = id ,
61
- name = name ,
62
- organization_id = organization_id ,
63
- permissions = permissions
64
- )
65
-
66
-
67
- class RoleDelete (BaseModel ):
68
- model_config = ConfigDict (from_attributes = True )
69
-
70
- id : int
71
- organization_id : int
72
-
73
- @classmethod
74
- async def as_form (
75
- cls ,
76
- id : int = Form (...),
77
- organization_id : int = Form (...)
78
- ):
79
- return cls (id = id , organization_id = organization_id )
80
-
81
-
82
18
# --- Routes ---
83
19
84
-
85
20
@router .post ("/create" , response_class = RedirectResponse )
86
21
def create_role (
87
- role : RoleCreate = Depends (RoleCreate .as_form ),
22
+ name : str = Form (...),
23
+ organization_id : int = Form (...),
88
24
user : User = Depends (get_authenticated_user ),
89
25
session : Session = Depends (get_session )
90
26
) -> RedirectResponse :
91
27
# Check that the user-selected role name is unique for the organization
92
28
if session .exec (
93
29
select (Role ).where (
94
- Role .name == role . name ,
95
- Role .organization_id == role . organization_id
30
+ Role .name == name ,
31
+ Role .organization_id == organization_id
96
32
)
97
33
).first ():
98
34
raise RoleAlreadyExistsError ()
99
35
100
36
# Check that the user is authorized to create roles in the organization
101
- if not user .has_permission (ValidPermissions .CREATE_ROLE , role . organization_id ):
37
+ if not user .has_permission (ValidPermissions .CREATE_ROLE , organization_id ):
102
38
raise InsufficientPermissionsError ()
103
39
104
40
# Create role
105
41
db_role = Role (
106
- name = role . name ,
107
- organization_id = role . organization_id
42
+ name = name ,
43
+ organization_id = organization_id
108
44
)
109
45
session .add (db_role )
110
46
111
47
# Select Permission records corresponding to the user-selected permissions
112
48
# and associate them with the newly created role
113
49
permissions : Sequence [Permission ] = session .exec (
114
- select (Permission ).where (col (Permission .name ).in_ (role . permissions ))
50
+ select (Permission ).where (col (Permission .name ).in_ (permissions ))
115
51
).all ()
116
52
db_role .permissions .extend (permissions )
117
53
@@ -123,30 +59,33 @@ def create_role(
123
59
124
60
@router .post ("/update" , response_class = RedirectResponse )
125
61
def update_role (
126
- role : RoleUpdate = Depends (RoleUpdate .as_form ),
62
+ id : int = Form (...),
63
+ name : str = Form (...),
64
+ organization_id : int = Form (...),
65
+ permissions : List [ValidPermissions ] = Form (...),
127
66
user : User = Depends (get_authenticated_user ),
128
67
session : Session = Depends (get_session )
129
68
) -> RedirectResponse :
130
69
# Check that the user is authorized to update the role
131
- if not user .has_permission (ValidPermissions .EDIT_ROLE , role . organization_id ):
70
+ if not user .has_permission (ValidPermissions .EDIT_ROLE , organization_id ):
132
71
raise InsufficientPermissionsError ()
133
72
134
73
# Select db_role to update, along with its permissions, by ID
135
74
db_role : Optional [Role ] = session .exec (
136
- select (Role ).where (Role .id == role . id ).options (
75
+ select (Role ).where (Role .id == id ).options (
137
76
selectinload (Role .permissions ))
138
77
).first ()
139
78
140
79
if not db_role :
141
80
raise RoleNotFoundError ()
142
81
143
82
# If any user-selected permissions are not valid, raise an error
144
- for permission in role . permissions :
83
+ for permission in permissions :
145
84
if permission not in ValidPermissions :
146
85
raise InvalidPermissionError (permission )
147
86
148
87
# Add any user-selected permissions that are not already associated with the role
149
- for permission in role . permissions :
88
+ for permission in permissions :
150
89
if permission not in [p .name for p in db_role .permissions ]:
151
90
db_permission : Optional [Permission ] = session .exec (
152
91
select (Permission ).where (Permission .name == permission )
@@ -158,21 +97,21 @@ def update_role(
158
97
159
98
# Remove any permissions that are not user-selected
160
99
for db_permission in db_role .permissions :
161
- if db_permission .name not in role . permissions :
100
+ if db_permission .name not in permissions :
162
101
db_role .permissions .remove (db_permission )
163
102
164
103
# Check that no existing organization role has the same name but a different ID
165
104
if session .exec (
166
105
select (Role ).where (
167
- Role .name == role . name ,
168
- Role .organization_id == role . organization_id ,
169
- Role .id != role . id
106
+ Role .name == name ,
107
+ Role .organization_id == organization_id ,
108
+ Role .id != id
170
109
)
171
110
).first ():
172
111
raise RoleAlreadyExistsError ()
173
112
174
113
# Update role name and updated_at timestamp
175
- db_role .name = role . name
114
+ db_role .name = name
176
115
db_role .updated_at = utc_time ()
177
116
178
117
session .commit ()
@@ -182,17 +121,18 @@ def update_role(
182
121
183
122
@router .post ("/delete" , response_class = RedirectResponse )
184
123
def delete_role (
185
- role : RoleDelete = Depends (RoleDelete .as_form ),
124
+ id : int = Form (...),
125
+ organization_id : int = Form (...),
186
126
user : User = Depends (get_authenticated_user ),
187
127
session : Session = Depends (get_session )
188
128
) -> RedirectResponse :
189
129
# Check that the user is authorized to delete the role
190
- if not user .has_permission (ValidPermissions .DELETE_ROLE , role . organization_id ):
130
+ if not user .has_permission (ValidPermissions .DELETE_ROLE , organization_id ):
191
131
raise InsufficientPermissionsError ()
192
132
193
133
# Select the role to delete by ID, along with its users
194
134
db_role : Role | None = session .exec (
195
- select (Role ).where (Role .id == role . id ).options (
135
+ select (Role ).where (Role .id == id ).options (
196
136
selectinload (Role .users )
197
137
)
198
138
).first ()
0 commit comments