Skip to content

Commit 88a65a8

Browse files
Tests and custom exception types in authentication router
1 parent 4e0f364 commit 88a65a8

File tree

5 files changed

+200
-32
lines changed

5 files changed

+200
-32
lines changed

routers/authentication.py

Lines changed: 42 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,8 @@
66
from fastapi.responses import RedirectResponse
77
from pydantic import BaseModel, EmailStr, ConfigDict
88
from sqlmodel import Session, select
9-
from utils.models import User, UserPassword
9+
from utils.models import User, UserPassword, DataIntegrityError
1010
from utils.auth import (
11-
AuthenticationError,
1211
get_session,
1312
get_user_from_reset_token,
1413
create_password_validator,
@@ -29,6 +28,40 @@
2928

3029
router = APIRouter(prefix="/auth", tags=["auth"])
3130

31+
# --- Custom Exceptions ---
32+
33+
34+
class EmailAlreadyRegisteredError(HTTPException):
35+
def __init__(self):
36+
super().__init__(
37+
status_code=409,
38+
detail="This email is already registered"
39+
)
40+
41+
42+
class InvalidCredentialsError(HTTPException):
43+
def __init__(self):
44+
super().__init__(
45+
status_code=401,
46+
detail="Invalid credentials"
47+
)
48+
49+
50+
class InvalidResetTokenError(HTTPException):
51+
def __init__(self):
52+
super().__init__(
53+
status_code=401,
54+
detail="Invalid or expired password reset token; please request a new one"
55+
)
56+
57+
58+
class InvalidEmailUpdateTokenError(HTTPException):
59+
def __init__(self):
60+
super().__init__(
61+
status_code=401,
62+
detail="Invalid or expired email update token; please request a new one"
63+
)
64+
3265

3366
# --- Server Request and Response Models ---
3467

@@ -145,7 +178,7 @@ async def register(
145178
User.email == user.email)).first()
146179

147180
if db_user:
148-
raise HTTPException(status_code=400, detail="Email already registered")
181+
raise EmailAlreadyRegisteredError()
149182

150183
# Hash the password
151184
hashed_password = get_password_hash(user.password)
@@ -179,7 +212,7 @@ async def login(
179212
User.email == user.email)).first()
180213

181214
if not db_user or not db_user.password or not verify_password(user.password, db_user.password.hashed_password):
182-
raise HTTPException(status_code=400, detail="Invalid credentials")
215+
raise InvalidCredentialsError()
183216

184217
# Create access token
185218
access_token = create_access_token(
@@ -277,7 +310,7 @@ async def reset_password(
277310
user.email, user.token, session)
278311

279312
if not authorized_user or not reset_token:
280-
raise HTTPException(status_code=400, detail="Invalid or expired token")
313+
raise InvalidResetTokenError()
281314

282315
# Update password and mark token as used
283316
if authorized_user.password:
@@ -318,16 +351,10 @@ async def request_email_update(
318351
).first()
319352

320353
if existing_user:
321-
raise HTTPException(
322-
status_code=400,
323-
detail="This email is already registered"
324-
)
354+
raise EmailAlreadyRegisteredError()
325355

326-
if not user or not user.id:
327-
raise HTTPException(
328-
status_code=400,
329-
detail="User not found"
330-
)
356+
if not user.id:
357+
raise DataIntegrityError(resource="User id")
331358

332359
# Send confirmation email
333360
send_email_update_confirmation(
@@ -355,7 +382,7 @@ async def confirm_email_update(
355382
)
356383

357384
if not user or not update_token:
358-
raise AuthenticationError()
385+
raise InvalidResetTokenError()
359386

360387
# Update email and mark token as used
361388
user.email = new_email

routers/organization.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010

1111
logger = getLogger("uvicorn.error")
1212

13+
router = APIRouter(prefix="/organizations", tags=["organizations"])
14+
1315
# --- Custom Exceptions ---
1416

1517

@@ -37,9 +39,6 @@ def __init__(self):
3739
)
3840

3941

40-
router = APIRouter(prefix="/organizations", tags=["organizations"])
41-
42-
4342
# --- Server Request and Response Models ---
4443

4544

tests/test_auth.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -215,9 +215,9 @@ def test_send_email_update_confirmation(mock_send: MagicMock) -> None:
215215
# Test existing token case
216216
session.reset_mock()
217217
session.exec.return_value.first.return_value = EmailUpdateToken() # Existing token
218-
218+
219219
send_email_update_confirmation(current_email, new_email, user_id, session)
220-
220+
221221
# Verify no new token was created or email sent
222222
assert not session.add.called
223223
assert not session.commit.called
@@ -228,7 +228,7 @@ def test_get_user_from_email_update_token() -> None:
228228
Tests retrieving a user using an email update token.
229229
"""
230230
session = MagicMock()
231-
231+
232232
# Test valid token
233233
mock_user = User(id=1, email="[email protected]")
234234
mock_token = EmailUpdateToken(

tests/test_authentication.py

Lines changed: 146 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,12 @@
99
from html import unescape
1010

1111
from main import app
12-
from utils.models import User, PasswordResetToken
12+
from utils.models import User, PasswordResetToken, UserPassword, EmailUpdateToken
1313
from utils.auth import (
1414
create_access_token,
1515
verify_password,
1616
validate_token,
17+
get_password_hash
1718
)
1819
from .conftest import SetupError
1920

@@ -199,7 +200,7 @@ def test_register_with_existing_email(unauth_client: TestClient, test_user: User
199200
"confirm_password": "Test123!@#"
200201
}
201202
)
202-
assert response.status_code == 400
203+
assert response.status_code == 409
203204

204205

205206
def test_login_with_invalid_credentials(unauth_client: TestClient, test_user: User):
@@ -210,7 +211,7 @@ def test_login_with_invalid_credentials(unauth_client: TestClient, test_user: Us
210211
"password": "WrongPass123!@#"
211212
}
212213
)
213-
assert response.status_code == 400
214+
assert response.status_code == 401
214215

215216

216217
def test_password_reset_with_invalid_token(unauth_client: TestClient, test_user: User):
@@ -223,7 +224,7 @@ def test_password_reset_with_invalid_token(unauth_client: TestClient, test_user:
223224
"confirm_new_password": "NewPass123!@#"
224225
}
225226
)
226-
assert response.status_code == 400
227+
assert response.status_code == 401
227228

228229

229230
def test_password_reset_email_url(unauth_client: TestClient, session: Session, test_user: User, mock_resend_send):
@@ -264,3 +265,144 @@ def test_password_reset_email_url(unauth_client: TestClient, session: Session, t
264265
assert parsed.path == str(reset_password_path)
265266
assert query_params["email"][0] == test_user.email
266267
assert query_params["token"][0] == reset_token.token
268+
269+
270+
def test_request_email_update_success(auth_client: TestClient, test_user: User, mock_resend_send):
271+
"""Test successful email update request"""
272+
new_email = "[email protected]"
273+
274+
response = auth_client.post(
275+
app.url_path_for("request_email_update"),
276+
data={"new_email": new_email},
277+
follow_redirects=False
278+
)
279+
280+
assert response.status_code == 303
281+
assert "profile?email_update_requested=true" in response.headers["location"]
282+
283+
# Verify email was "sent"
284+
mock_resend_send.assert_called_once()
285+
call_args = mock_resend_send.call_args[0][0]
286+
287+
# Verify email content
288+
assert call_args["to"] == [test_user.email]
289+
assert call_args["from"] == "[email protected]"
290+
assert "Confirm Email Update" in call_args["subject"]
291+
assert "confirm_email_update" in call_args["html"]
292+
assert new_email in call_args["html"]
293+
294+
295+
def test_request_email_update_already_registered(auth_client: TestClient, session: Session, test_user: User):
296+
"""Test email update request with already registered email"""
297+
# Create another user with the target email
298+
existing_email = "[email protected]"
299+
existing_user = User(
300+
name="Existing User",
301+
email=existing_email,
302+
password=UserPassword(hashed_password=get_password_hash("Test123!@#"))
303+
)
304+
session.add(existing_user)
305+
session.commit()
306+
307+
response = auth_client.post(
308+
app.url_path_for("request_email_update"),
309+
data={"new_email": existing_email}
310+
)
311+
312+
assert response.status_code == 409
313+
assert "already registered" in response.text
314+
315+
316+
def test_request_email_update_unauthenticated(unauth_client: TestClient):
317+
"""Test email update request without authentication"""
318+
response = unauth_client.post(
319+
app.url_path_for("request_email_update"),
320+
data={"new_email": "[email protected]"},
321+
follow_redirects=False
322+
)
323+
324+
assert response.status_code == 303 # Redirect to login
325+
326+
327+
def test_confirm_email_update_success(unauth_client: TestClient, session: Session, test_user: User):
328+
"""Test successful email update confirmation"""
329+
new_email = "[email protected]"
330+
331+
# Create an email update token
332+
update_token = EmailUpdateToken(user_id=test_user.id)
333+
session.add(update_token)
334+
session.commit()
335+
336+
response = unauth_client.get(
337+
app.url_path_for("confirm_email_update"),
338+
params={
339+
"user_id": test_user.id,
340+
"token": update_token.token,
341+
"new_email": new_email
342+
},
343+
follow_redirects=False
344+
)
345+
346+
assert response.status_code == 303
347+
assert "profile?email_updated=true" in response.headers["location"]
348+
349+
# Verify email was updated
350+
session.refresh(test_user)
351+
assert test_user.email == new_email
352+
353+
# Verify token was marked as used
354+
session.refresh(update_token)
355+
assert update_token.used
356+
357+
# Verify new auth cookies were set
358+
cookies = response.cookies
359+
assert "access_token" in cookies
360+
assert "refresh_token" in cookies
361+
362+
363+
def test_confirm_email_update_invalid_token(unauth_client: TestClient, session: Session, test_user: User):
364+
"""Test email update confirmation with invalid token"""
365+
response = unauth_client.get(
366+
app.url_path_for("confirm_email_update"),
367+
params={
368+
"user_id": test_user.id,
369+
"token": "invalid_token",
370+
"new_email": "[email protected]"
371+
}
372+
)
373+
374+
assert response.status_code == 401
375+
assert "Invalid or expired" in response.text
376+
377+
# Verify email was not updated
378+
session.refresh(test_user)
379+
assert test_user.email == "[email protected]"
380+
381+
382+
def test_confirm_email_update_used_token(unauth_client: TestClient, session: Session, test_user: User):
383+
"""Test email update confirmation with already used token"""
384+
# Create an already used token
385+
used_token = PasswordResetToken(
386+
user_id=test_user.id,
387+
token="test_used_token",
388+
used=True,
389+
token_type="email_update"
390+
)
391+
session.add(used_token)
392+
session.commit()
393+
394+
response = unauth_client.get(
395+
app.url_path_for("confirm_email_update"),
396+
params={
397+
"user_id": test_user.id,
398+
"token": used_token.token,
399+
"new_email": "[email protected]"
400+
}
401+
)
402+
403+
assert response.status_code == 401
404+
assert "Invalid or expired" in response.text
405+
406+
# Verify email was not updated
407+
session.refresh(test_user)
408+
assert test_user.email == "[email protected]"

utils/auth.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,13 @@ def replacer(match: re.Match) -> str:
8787
# --- Custom Exceptions ---
8888

8989

90+
class NeedsNewTokens(Exception):
91+
def __init__(self, user: User, access_token: str, refresh_token: str):
92+
self.user = user
93+
self.access_token = access_token
94+
self.refresh_token = refresh_token
95+
96+
9097
class AuthenticationError(HTTPException):
9198
def __init__(self):
9299
super().__init__(
@@ -324,13 +331,6 @@ def get_optional_user(
324331
return None
325332

326333

327-
class NeedsNewTokens(Exception):
328-
def __init__(self, user: User, access_token: str, refresh_token: str):
329-
self.user = user
330-
self.access_token = access_token
331-
self.refresh_token = refresh_token
332-
333-
334334
def generate_password_reset_url(email: str, token: str) -> str:
335335
"""
336336
Generates the password reset URL with proper query parameters.

0 commit comments

Comments
 (0)