1
1
from typing import List
2
2
from datetime import datetime
3
3
from logging import getLogger
4
- from fastapi import APIRouter , Depends , HTTPException , Form
4
+ from fastapi import APIRouter , Depends , Form , HTTPException
5
5
from fastapi .responses import RedirectResponse
6
- from pydantic import BaseModel , ConfigDict
6
+ from pydantic import BaseModel , ConfigDict , field_validator
7
7
from sqlmodel import Session , select
8
8
from utils .db import get_session
9
9
from utils .models import Role , RolePermissionLink , ValidPermissions , utc_time
13
13
router = APIRouter (prefix = "/roles" , tags = ["roles" ])
14
14
15
15
16
+ # -- Custom Exceptions --
17
+
18
+ class RoleAlreadyExistsError (HTTPException ):
19
+ """Raised when attempting to create a role with a name that already exists"""
20
+
21
+ def __init__ (self ):
22
+ super ().__init__ (status_code = 400 , detail = "Role already exists" )
23
+
24
+
25
+ class RoleNotFoundError (HTTPException ):
26
+ """Raised when a requested role does not exist or is deleted"""
27
+
28
+ def __init__ (self ):
29
+ super ().__init__ (status_code = 404 , detail = "Role not found" )
30
+
31
+
32
+ # -- Server Request Models --
33
+
16
34
class RoleCreate (BaseModel ):
17
35
model_config = ConfigDict (from_attributes = True )
18
36
19
37
name : str
20
38
permissions : List [ValidPermissions ]
21
39
40
+ @field_validator ("name" )
41
+ @classmethod
42
+ def validate_unique_name (cls , name : str , info ):
43
+ # Note: This requires passing session as a dependency to as_form
44
+ session = info .context .get ("session" )
45
+ if session and session .exec (select (Role ).where (Role .name == name )).first ():
46
+ raise RoleAlreadyExistsError ()
47
+ return name
48
+
22
49
@classmethod
23
- async def as_form (cls , name : str = Form (...), permissions : List [ValidPermissions ] = Form (...)):
24
- return cls (name = name , permissions = permissions )
50
+ async def as_form (
51
+ cls ,
52
+ name : str = Form (...),
53
+ permissions : List [ValidPermissions ] = Form (...),
54
+ session : Session = Depends (get_session )
55
+ ):
56
+ # Pass session to validator context
57
+ return cls (
58
+ name = name ,
59
+ permissions = permissions ,
60
+ context = {"session" : session }
61
+ )
25
62
26
63
27
64
class RoleRead (BaseModel ):
@@ -42,20 +79,39 @@ class RoleUpdate(BaseModel):
42
79
name : str
43
80
permissions : List [ValidPermissions ]
44
81
82
+ @field_validator ("id" )
83
+ @classmethod
84
+ def validate_role_exists (cls , id : int , info ):
85
+ session = info .context .get ("session" )
86
+ if session :
87
+ role = session .get (Role , id )
88
+ if not role or not role .id or role .deleted :
89
+ raise RoleNotFoundError ()
90
+ return id
91
+
45
92
@classmethod
46
- async def as_form (cls , id : int = Form (...), name : str = Form (...), permissions : List [ValidPermissions ] = Form (...)):
47
- return cls (id = id , name = name , permissions = permissions )
93
+ async def as_form (
94
+ cls ,
95
+ id : int = Form (...),
96
+ name : str = Form (...),
97
+ permissions : List [ValidPermissions ] = Form (...),
98
+ session : Session = Depends (get_session )
99
+ ):
100
+ return cls (
101
+ id = id ,
102
+ name = name ,
103
+ permissions = permissions ,
104
+ context = {"session" : session }
105
+ )
106
+
48
107
108
+ # -- Routes --
49
109
50
110
@router .post ("/" , response_class = RedirectResponse )
51
111
def create_role (
52
112
role : RoleCreate = Depends (RoleCreate .as_form ),
53
113
session : Session = Depends (get_session )
54
114
) -> RedirectResponse :
55
- db_role = session .exec (select (Role ).where (Role .name == role .name )).first ()
56
- if db_role :
57
- raise HTTPException (status_code = 400 , detail = "Role already exists" )
58
-
59
115
# Create role and permissions in a single transaction
60
116
db_role = Role (name = role .name )
61
117
@@ -66,7 +122,7 @@ def create_role(
66
122
]
67
123
68
124
session .add (db_role )
69
- session .commit () # Commit once after all operations
125
+ session .commit ()
70
126
71
127
return RedirectResponse (url = "/roles" , status_code = 303 )
72
128
@@ -75,7 +131,7 @@ def create_role(
75
131
def read_role (role_id : int , session : Session = Depends (get_session )):
76
132
db_role : Role | None = session .get (Role , role_id )
77
133
if not db_role or not db_role .id or db_role .deleted :
78
- raise HTTPException ( status_code = 404 , detail = "Role not found" )
134
+ raise RoleNotFoundError ( )
79
135
80
136
permissions = [
81
137
ValidPermissions (link .permission .name )
@@ -99,8 +155,7 @@ def update_role(
99
155
session : Session = Depends (get_session )
100
156
) -> RedirectResponse :
101
157
db_role : Role | None = session .get (Role , role .id )
102
- if not db_role or not db_role .id or db_role .deleted :
103
- raise HTTPException (status_code = 404 , detail = "Role not found" )
158
+
104
159
role_data = role .model_dump (exclude_unset = True )
105
160
for key , value in role_data .items ():
106
161
setattr (db_role , key , value )
@@ -130,7 +185,8 @@ def delete_role(
130
185
) -> RedirectResponse :
131
186
db_role = session .get (Role , role_id )
132
187
if not db_role :
133
- raise HTTPException (status_code = 404 , detail = "Role not found" )
188
+ raise RoleNotFoundError ()
189
+
134
190
db_role .deleted = True
135
191
db_role .updated_at = utc_time ()
136
192
session .add (db_role )
0 commit comments