1
- from typing import List
2
- from datetime import datetime
1
+ # TODO: User with permission to create/edit roles can only assign permissions
2
+ # they themselves have.
3
+ from typing import List , Sequence , Optional
3
4
from logging import getLogger
4
5
from fastapi import APIRouter , Depends , Form , HTTPException
5
6
from fastapi .responses import RedirectResponse
6
7
from pydantic import BaseModel , ConfigDict , field_validator
7
- from sqlmodel import Session , select
8
+ from sqlmodel import Session , select , col
9
+ from sqlalchemy .orm import selectinload
8
10
from utils .db import get_session
9
- from utils .auth import get_authenticated_user
10
- from utils .models import Role , RolePermissionLink , ValidPermissions , utc_time , User
11
+ from utils .auth import get_authenticated_user , InsufficientPermissionsError
12
+ from utils .models import Role , Permission , ValidPermissions , utc_time , User , DataIntegrityError
11
13
12
14
logger = getLogger ("uvicorn.error" )
13
15
17
19
# -- Custom Exceptions --
18
20
19
21
22
+ class InvalidPermissionError (HTTPException ):
23
+ """Raised when a user attempts to assign an invalid permission to a role"""
24
+
25
+ def __init__ (self , permission : ValidPermissions ):
26
+ super ().__init__ (
27
+ status_code = 400 ,
28
+ detail = f"Invalid permission: { permission } "
29
+ )
30
+
31
+
20
32
class RoleAlreadyExistsError (HTTPException ):
21
33
"""Raised when attempting to create a role with a name that already exists"""
22
34
@@ -31,53 +43,46 @@ def __init__(self):
31
43
super ().__init__ (status_code = 404 , detail = "Role not found" )
32
44
33
45
46
+ class RoleHasUsersError (HTTPException ):
47
+ """Raised when a requested role to be deleted has users"""
48
+
49
+ def __init__ (self ):
50
+ super ().__init__ (
51
+ status_code = 400 ,
52
+ detail = "Role cannot be deleted until users with that role are reassigned"
53
+ )
54
+
55
+
34
56
# -- Server Request Models --
35
57
36
58
class RoleCreate (BaseModel ):
37
59
model_config = ConfigDict (from_attributes = True )
38
60
39
61
name : str
62
+ organization_id : int
40
63
permissions : List [ValidPermissions ]
41
64
42
- @field_validator ("name" )
43
- @classmethod
44
- def validate_unique_name (cls , name : str , info ):
45
- # Note: This requires passing session as a dependency to as_form
46
- session = info .context .get ("session" )
47
- if session and session .exec (select (Role ).where (Role .name == name )).first ():
48
- raise RoleAlreadyExistsError ()
49
- return name
50
-
51
65
@classmethod
52
66
async def as_form (
53
67
cls ,
54
68
name : str = Form (...),
55
- permissions : List [ ValidPermissions ] = Form (...),
56
- session : Session = Depends ( get_session )
69
+ organization_id : int = Form (...),
70
+ permissions : List [ ValidPermissions ] = Form (... )
57
71
):
58
72
# Pass session to validator context
59
73
return cls (
60
74
name = name ,
61
- permissions = permissions ,
62
- context = { "session" : session }
75
+ organization_id = organization_id ,
76
+ permissions = permissions
63
77
)
64
78
65
79
66
- class RoleRead (BaseModel ):
67
- model_config = ConfigDict (from_attributes = True )
68
-
69
- id : int
70
- name : str
71
- created_at : datetime
72
- updated_at : datetime
73
- permissions : List [ValidPermissions ]
74
-
75
-
76
80
class RoleUpdate (BaseModel ):
77
81
model_config = ConfigDict (from_attributes = True )
78
82
79
83
id : int
80
84
name : str
85
+ organization_id : int
81
86
permissions : List [ValidPermissions ]
82
87
83
88
@field_validator ("id" )
@@ -95,17 +100,32 @@ async def as_form(
95
100
cls ,
96
101
id : int = Form (...),
97
102
name : str = Form (...),
98
- permissions : List [ ValidPermissions ] = Form (...),
99
- session : Session = Depends ( get_session )
103
+ organization_id : int = Form (...),
104
+ permissions : List [ ValidPermissions ] = Form (... )
100
105
):
101
106
return cls (
102
107
id = id ,
103
108
name = name ,
104
- permissions = permissions ,
105
- context = { "session" : session }
109
+ organization_id = organization_id ,
110
+ permissions = permissions
106
111
)
107
112
108
113
114
+ class RoleDelete (BaseModel ):
115
+ model_config = ConfigDict (from_attributes = True )
116
+
117
+ id : int
118
+ organization_id : int
119
+
120
+ @classmethod
121
+ async def as_form (
122
+ cls ,
123
+ id : int = Form (...),
124
+ organization_id : int = Form (...)
125
+ ):
126
+ return cls (id = id , organization_id = organization_id )
127
+
128
+
109
129
# -- Routes --
110
130
111
131
@@ -115,19 +135,34 @@ def create_role(
115
135
user : User = Depends (get_authenticated_user ),
116
136
session : Session = Depends (get_session )
117
137
) -> RedirectResponse :
118
- # Create role and permissions in a single transaction
138
+ # Check that the user-selected role name is unique for the organization
139
+ if session .exec (
140
+ select (Role ).where (
141
+ Role .name == role .name ,
142
+ Role .organization_id == role .organization_id
143
+ )
144
+ ).first ():
145
+ raise RoleAlreadyExistsError ()
146
+
147
+ # Check that the user is authorized to create roles in the organization
148
+ if not user .has_permission (ValidPermissions .CREATE_ROLE , role .organization_id ):
149
+ raise InsufficientPermissionsError ()
150
+
151
+ # Create role
119
152
db_role = Role (
120
153
name = role .name ,
121
- organization_id = user .organization_id # Add organization ID to role
154
+ organization_id = role .organization_id
122
155
)
156
+ session .add (db_role )
123
157
124
- # Create RolePermissionLink objects and associate them with the role
125
- db_role .permissions = [
126
- RolePermissionLink (permission_id = permission .name )
127
- for permission in role .permissions
128
- ]
158
+ # Select Permission records corresponding to the user-selected permissions
159
+ # and associate them with the newly created role
160
+ permissions : Sequence [Permission ] = session .exec (
161
+ select (Permission ).where (col (Permission .name ).in_ (role .permissions ))
162
+ ).all ()
163
+ db_role .permissions .extend (permissions )
129
164
130
- session . add ( db_role )
165
+ # Commit transaction
131
166
session .commit ()
132
167
133
168
return RedirectResponse (url = "/profile" , status_code = 303 )
@@ -139,39 +174,85 @@ def update_role(
139
174
user : User = Depends (get_authenticated_user ),
140
175
session : Session = Depends (get_session )
141
176
) -> RedirectResponse :
142
- db_role : Role | None = session .get (Role , role .id )
143
- role_data = role .model_dump (exclude_unset = True )
144
- for key , value in role_data .items ():
145
- setattr (db_role , key , value )
146
- db_role .updated_at = utc_time ()
147
- session .add (db_role )
148
- session .commit ()
177
+ # Check that the user is authorized to update the role
178
+ if not user .has_permission (ValidPermissions .EDIT_ROLE , role .organization_id ):
179
+ raise InsufficientPermissionsError ()
180
+
181
+ # Select db_role to update, along with its permissions, by ID
182
+ db_role : Optional [Role ] = session .exec (
183
+ select (Role ).where (Role .id == role .id ).options (
184
+ selectinload (Role .permissions ))
185
+ ).first ()
186
+
187
+ if not db_role :
188
+ raise RoleNotFoundError ()
149
189
150
- # Correctly delete RolePermissionLinks for the role
151
- session .delete (RolePermissionLink .role_id == role .id )
190
+ # If any user-selected permissions are not valid, raise an error
191
+ for permission in role .permissions :
192
+ if permission not in ValidPermissions :
193
+ raise InvalidPermissionError (permission )
152
194
195
+ # Add any user-selected permissions that are not already associated with the role
153
196
for permission in role .permissions :
154
- db_role_permission_link = RolePermissionLink (
155
- role_id = db_role .id ,
156
- permission_id = permission .name
197
+ if permission not in [p .name for p in db_role .permissions ]:
198
+ db_permission : Optional [Permission ] = session .exec (
199
+ select (Permission ).where (Permission .name == permission )
200
+ ).first ()
201
+ if db_permission :
202
+ db_role .permissions .append (db_permission )
203
+ else :
204
+ raise DataIntegrityError (resource = f"Permission: { permission } " )
205
+
206
+ # Remove any permissions that are not user-selected
207
+ for db_permission in db_role .permissions :
208
+ if db_permission .name not in role .permissions :
209
+ db_role .permissions .remove (db_permission )
210
+
211
+ # Check that no existing organization role has the same name but a different ID
212
+ if session .exec (
213
+ select (Role ).where (
214
+ Role .name == role .name ,
215
+ Role .organization_id == role .organization_id ,
216
+ Role .id != role .id
157
217
)
158
- session .add (db_role_permission_link )
218
+ ).first ():
219
+ raise RoleAlreadyExistsError ()
220
+
221
+ # Update role name and updated_at timestamp
222
+ db_role .name = role .name
223
+ db_role .updated_at = utc_time ()
159
224
160
225
session .commit ()
161
226
session .refresh (db_role )
162
227
return RedirectResponse (url = "/profile" , status_code = 303 )
163
228
164
229
165
- # TODO: Reject role deletion if anyone in the organization has that role
166
230
@router .delete ("/{role_id}" , response_class = RedirectResponse )
167
231
def delete_role (
168
- role_id : int ,
232
+ role : RoleDelete = Depends ( RoleDelete . as_form ) ,
169
233
user : User = Depends (get_authenticated_user ),
170
234
session : Session = Depends (get_session )
171
235
) -> RedirectResponse :
172
- db_role = session .get (Role , role_id )
236
+ # Check that the user is authorized to delete the role
237
+ if not user .has_permission (ValidPermissions .DELETE_ROLE , role .organization_id ):
238
+ raise InsufficientPermissionsError ()
239
+
240
+ # Select the role to delete by ID, along with its users
241
+ db_role : Role | None = session .exec (
242
+ select (Role ).where (Role .id == role .id ).options (
243
+ selectinload (Role .users )
244
+ )
245
+ ).first ()
246
+
173
247
if not db_role :
174
- raise HTTPException (status_code = 404 , detail = "Role not found" )
248
+ raise RoleNotFoundError ()
249
+
250
+ # Check that no users have the role
251
+ if db_role .users :
252
+ raise RoleHasUsersError ()
253
+
254
+ # Delete the role
175
255
session .delete (db_role )
176
256
session .commit ()
257
+
177
258
return RedirectResponse (url = "/profile" , status_code = 303 )
0 commit comments