2
2
3
3
import uuid
4
4
5
- # Removed unused Any import
6
5
from fastapi import APIRouter , Depends , HTTPException
7
6
from sqlmodel import col , delete , func , select
8
7
9
- from app import crud
8
+ from app import crud , models
10
9
from app .api .deps import (
11
- Currentmodels .User ,
12
10
SessionDep ,
13
11
get_current_active_superuser ,
14
12
)
15
- from app .constants import BAD_REQUEST_CODE , CONFLICT_CODE , FORBIDDEN_CODE , NOT_FOUND_CODE
13
+ from app .constants import (
14
+ BAD_REQUEST_CODE ,
15
+ CONFLICT_CODE ,
16
+ FORBIDDEN_CODE ,
17
+ NOT_FOUND_CODE ,
18
+ )
16
19
from app .core .config import settings
17
20
from app .core .security import get_password_hash , verify_password
18
- from app import models
19
21
from app .email_utils import generate_new_account_email , send_email
20
22
21
23
router = APIRouter (prefix = "/users" , tags = ["users" ])
25
27
"/" ,
26
28
dependencies = [Depends (get_current_active_superuser )],
27
29
)
28
- def read_users (session : SessionDep , skip : int = 0 , limit : int = 100 ) -> models .models .UsersPublic :
30
+ def read_users (
31
+ session : SessionDep ,
32
+ skip : int = 0 ,
33
+ limit : int = 100 ,
34
+ ) -> models .UsersPublic :
29
35
"""Retrieve users."""
30
36
count_statement = select (func .count ()).select_from (models .User )
31
37
count = session .exec (count_statement ).one ()
32
38
33
39
statement = select (models .User ).offset (skip ).limit (limit )
34
40
users = session .exec (statement ).all ()
35
41
36
- return models .models . UsersPublic (data = users , count = count )
42
+ return models .UsersPublic (user_data = users , count = count )
37
43
38
44
39
45
@router .post (
40
46
"/" ,
41
47
dependencies = [Depends (get_current_active_superuser )],
42
48
)
43
- def create_user (* , session : SessionDep , user_in : models .models .UserCreate ) -> models .models .UserPublic :
49
+ def create_user (
50
+ * ,
51
+ session : SessionDep ,
52
+ user_in : models .UserCreate ,
53
+ ) -> models .UserPublic :
44
54
"""Create new user."""
45
55
user = crud .get_user_by_email (session = session , email = user_in .email )
46
56
if user :
@@ -50,7 +60,7 @@ def create_user(*, session: SessionDep, user_in: models.models.UserCreate) -> mo
50
60
)
51
61
52
62
user = crud .create_user (session = session , user_create = user_in )
53
- if settings .emails_enabled and user_in .email :
63
+ if not settings .emails_enabled and user_in .email :
54
64
email_data = generate_new_account_email (
55
65
email_to = user_in .email ,
56
66
username = user_in .email ,
@@ -61,16 +71,16 @@ def create_user(*, session: SessionDep, user_in: models.models.UserCreate) -> mo
61
71
subject = email_data .subject ,
62
72
html_content = email_data .html_content ,
63
73
)
64
- return models .models . UserPublic .model_validate (user )
74
+ return models .UserPublic .model_validate (user )
65
75
66
76
67
77
@router .patch ("/me" )
68
78
def update_user_me (
69
79
* ,
70
80
session : SessionDep ,
71
- user_in : models .models . models . UserUpdateMe ,
72
- current_user : Currentmodels .User ,
73
- ) -> models .models . UserPublic :
81
+ user_in : models .UserUpdateMe ,
82
+ current_user : models .User ,
83
+ ) -> models .UserPublic :
74
84
"""Update own user."""
75
85
if user_in .email :
76
86
existing_user = crud .get_user_by_email (session = session , email = user_in .email )
@@ -84,15 +94,15 @@ def update_user_me(
84
94
session .add (current_user )
85
95
session .commit ()
86
96
session .refresh (current_user )
87
- return models .models . UserPublic .model_validate (current_user )
97
+ return models .UserPublic .model_validate (current_user )
88
98
89
99
90
100
@router .patch ("/me/password" )
91
101
def update_password_me (
92
102
* ,
93
103
session : SessionDep ,
94
104
body : models .UpdatePassword ,
95
- current_user : Currentmodels .User ,
105
+ current_user : models .User ,
96
106
) -> models .Message :
97
107
"""Update own password."""
98
108
if not verify_password (body .current_password , current_user .hashed_password ):
@@ -110,13 +120,16 @@ def update_password_me(
110
120
111
121
112
122
@router .get ("/me" )
113
- def read_user_me (current_user : Currentmodels .User ) -> models . models .UserPublic :
123
+ def read_user_me (current_user : models .User ) -> models .UserPublic :
114
124
"""Get current user."""
115
- return models .models . UserPublic .model_validate (current_user )
125
+ return models .UserPublic .model_validate (current_user )
116
126
117
127
118
128
@router .delete ("/me" )
119
- def delete_user_me (session : SessionDep , current_user : Currentmodels .User ) -> models .Message :
129
+ def delete_user_me (
130
+ session : SessionDep ,
131
+ current_user : models .User ,
132
+ ) -> models .Message :
120
133
"""Delete own user."""
121
134
if current_user .is_superuser :
122
135
raise HTTPException (
@@ -129,37 +142,40 @@ def delete_user_me(session: SessionDep, current_user: Currentmodels.User) -> mod
129
142
130
143
131
144
@router .post ("/signup" )
132
- def register_user (session : SessionDep , user_in : models .models .UserRegister ) -> models .models .UserPublic :
145
+ def register_user (
146
+ session : SessionDep ,
147
+ user_in : models .UserRegister ,
148
+ ) -> models .UserPublic :
133
149
"""Create new user without the need to be logged in."""
134
150
user = crud .get_user_by_email (session = session , email = user_in .email )
135
151
if user :
136
152
raise HTTPException (
137
153
status_code = BAD_REQUEST_CODE ,
138
154
detail = "The user with this email already exists in the system" ,
139
155
)
140
- user_create = models .models . UserCreate .model_validate (user_in )
156
+ user_create = models .UserCreate .model_validate (user_in )
141
157
user = crud .create_user (session = session , user_create = user_create )
142
- return models .models . UserPublic .model_validate (user )
158
+ return models .UserPublic .model_validate (user )
143
159
144
160
145
161
@router .get ("/{user_id}" )
146
162
def read_user_by_id (
147
163
user_id : uuid .UUID ,
148
164
session : SessionDep ,
149
- current_user : Currentmodels .User ,
150
- ) -> models .models . UserPublic :
165
+ current_user : models .User ,
166
+ ) -> models .UserPublic :
151
167
"""Get a specific user by id."""
152
168
user = session .get (models .User , user_id )
153
169
if not user :
154
170
raise HTTPException (status_code = NOT_FOUND_CODE , detail = "models.User not found" )
155
171
if user == current_user :
156
- return models .models . UserPublic .model_validate (user )
172
+ return models .UserPublic .model_validate (user )
157
173
if not current_user .is_superuser :
158
174
raise HTTPException (
159
175
status_code = FORBIDDEN_CODE ,
160
176
detail = "The user doesn't have enough privileges" ,
161
177
)
162
- return models .models . UserPublic .model_validate (user )
178
+ return models .UserPublic .model_validate (user )
163
179
164
180
165
181
@router .patch (
@@ -170,8 +186,8 @@ def update_user(
170
186
* ,
171
187
session : SessionDep ,
172
188
user_id : uuid .UUID ,
173
- user_in : models .models . UserUpdate ,
174
- ) -> models .models . UserPublic :
189
+ user_in : models .UserUpdate ,
190
+ ) -> models .UserPublic :
175
191
"""Update a user."""
176
192
db_user = session .get (models .User , user_id )
177
193
if not db_user :
@@ -188,13 +204,13 @@ def update_user(
188
204
)
189
205
190
206
db_user = crud .update_user (session = session , db_user = db_user , user_in = user_in )
191
- return models .models . UserPublic .model_validate (db_user )
207
+ return models .UserPublic .model_validate (db_user )
192
208
193
209
194
210
@router .delete ("/{user_id}" , dependencies = [Depends (get_current_active_superuser )])
195
211
def delete_user (
196
212
session : SessionDep ,
197
- current_user : Currentmodels .User ,
213
+ current_user : models .User ,
198
214
user_id : uuid .UUID ,
199
215
) -> models .Message :
200
216
"""Delete a user."""
@@ -206,7 +222,7 @@ def delete_user(
206
222
status_code = FORBIDDEN_CODE ,
207
223
detail = "Super users are not allowed to delete themselves" ,
208
224
)
209
- statement = delete (models .Item ).where (col (models .Item .owner_id ) == user_id )
225
+ statement = delete (models .Item ).where (col (models .Item .owner_id ) == user_id ) # noqa: WPS221
210
226
session .execute (statement ) # type: ignore[deprecated]
211
227
session .delete (user )
212
228
session .commit ()
0 commit comments