|
1 | 1 | import pytest
|
2 | 2 | from fastapi.testclient import TestClient
|
| 3 | +from starlette.datastructures import URLPath |
3 | 4 | from sqlmodel import Session, select
|
4 | 5 | from datetime import timedelta
|
| 6 | +from unittest.mock import patch |
| 7 | +import resend |
| 8 | +from urllib.parse import urlparse, parse_qs |
5 | 9 |
|
6 | 10 | from main import app
|
7 | 11 | from utils.models import User, PasswordResetToken
|
|
11 | 15 | create_refresh_token,
|
12 | 16 | verify_password,
|
13 | 17 | get_password_hash,
|
14 |
| - validate_token |
| 18 | + validate_token, |
| 19 | + generate_password_reset_url |
15 | 20 | )
|
16 | 21 |
|
17 | 22 |
|
@@ -51,6 +56,24 @@ def test_user_fixture(session: Session):
|
51 | 56 | return user
|
52 | 57 |
|
53 | 58 |
|
| 59 | +# Mock email response fixture |
| 60 | +@pytest.fixture |
| 61 | +def mock_email_response(): |
| 62 | + """ |
| 63 | + Returns a mock Email response object |
| 64 | + """ |
| 65 | + return resend.Email(id="6229f547-f3f6-4eb8-b0dc-82c1b09121b6") |
| 66 | + |
| 67 | + |
| 68 | +@pytest.fixture |
| 69 | +def mock_resend_send(mock_email_response): |
| 70 | + """ |
| 71 | + Patches resend.Emails.send to return a mock response |
| 72 | + """ |
| 73 | + with patch('resend.Emails.send', return_value=mock_email_response) as mock: |
| 74 | + yield mock |
| 75 | + |
| 76 | + |
54 | 77 | # --- Authentication Helper Function Tests ---
|
55 | 78 |
|
56 | 79 |
|
@@ -166,40 +189,56 @@ def test_refresh_token_endpoint(client: TestClient, test_user: User):
|
166 | 189 | assert decoded["sub"] == test_user.email
|
167 | 190 |
|
168 | 191 |
|
169 |
| -# # TODO: Mock email sending |
170 |
| -# def test_password_reset_flow(client: TestClient, session: Session, test_user: User): |
171 |
| -# # Test forgot password request |
172 |
| -# response = client.post( |
173 |
| -# "/auth/forgot_password", |
174 |
| -# data={"email": test_user.email}, |
175 |
| -# follow_redirects=False |
176 |
| -# ) |
177 |
| -# assert response.status_code == 303 |
178 |
| - |
179 |
| -# # Verify reset token was created |
180 |
| -# reset_token = session.exec(select(PasswordResetToken) |
181 |
| -# .where(PasswordResetToken.user_id == test_user.id)).first() |
182 |
| -# assert reset_token is not None |
183 |
| -# assert not reset_token.used |
184 |
| - |
185 |
| -# # Test password reset |
186 |
| -# response = client.post( |
187 |
| -# "/auth/reset_password", |
188 |
| -# data={ |
189 |
| -# "email": test_user.email, |
190 |
| -# "token": reset_token.token, |
191 |
| -# "new_password": "NewPass123!@#", |
192 |
| -# "confirm_new_password": "NewPass123!@#" |
193 |
| -# }, |
194 |
| -# follow_redirects=False |
195 |
| -# ) |
196 |
| -# assert response.status_code == 303 |
197 |
| - |
198 |
| -# # Verify password was updated and token was marked as used |
199 |
| -# session.refresh(test_user) |
200 |
| -# session.refresh(reset_token) |
201 |
| -# assert verify_password("NewPass123!@#", test_user.hashed_password) |
202 |
| -# assert reset_token.used |
| 192 | +def test_password_reset_flow(client: TestClient, session: Session, test_user: User, mock_resend_send): |
| 193 | + # Test forgot password request |
| 194 | + response = client.post( |
| 195 | + "/auth/forgot_password", |
| 196 | + data={"email": test_user.email}, |
| 197 | + follow_redirects=False |
| 198 | + ) |
| 199 | + assert response.status_code == 303 |
| 200 | + |
| 201 | + # Verify the email was "sent" with correct parameters |
| 202 | + mock_resend_send.assert_called_once() |
| 203 | + call_args = mock_resend_send.call_args[0][0] # Get the SendParams argument |
| 204 | + |
| 205 | + # Verify SendParams structure and required fields |
| 206 | + assert isinstance(call_args, dict) |
| 207 | + assert isinstance(call_args["from"], str) |
| 208 | + assert isinstance(call_args["to"], list) |
| 209 | + assert isinstance(call_args["subject"], str) |
| 210 | + assert isinstance(call_args["html"], str) |
| 211 | + |
| 212 | + # Verify content |
| 213 | + assert call_args["to"] == [test_user.email] |
| 214 | + assert call_args[ "from"] == "[email protected]" |
| 215 | + assert "Password Reset Request" in call_args["subject"] |
| 216 | + assert "reset_password" in call_args["html"] |
| 217 | + |
| 218 | + # Verify reset token was created |
| 219 | + reset_token = session.exec(select(PasswordResetToken) |
| 220 | + .where(PasswordResetToken.user_id == test_user.id)).first() |
| 221 | + assert reset_token is not None |
| 222 | + assert not reset_token.used |
| 223 | + |
| 224 | + # Test password reset |
| 225 | + response = client.post( |
| 226 | + "/auth/reset_password", |
| 227 | + data={ |
| 228 | + "email": test_user.email, |
| 229 | + "token": reset_token.token, |
| 230 | + "new_password": "NewPass123!@#", |
| 231 | + "confirm_new_password": "NewPass123!@#" |
| 232 | + }, |
| 233 | + follow_redirects=False |
| 234 | + ) |
| 235 | + assert response.status_code == 303 |
| 236 | + |
| 237 | + # Verify password was updated and token was marked as used |
| 238 | + session.refresh(test_user) |
| 239 | + session.refresh(reset_token) |
| 240 | + assert verify_password("NewPass123!@#", test_user.hashed_password) |
| 241 | + assert reset_token.used |
203 | 242 |
|
204 | 243 |
|
205 | 244 | def test_logout_endpoint(client: TestClient):
|
@@ -256,3 +295,69 @@ def test_password_reset_with_invalid_token(client: TestClient, test_user: User):
|
256 | 295 | }
|
257 | 296 | )
|
258 | 297 | assert response.status_code == 400
|
| 298 | + |
| 299 | + |
| 300 | +def test_password_reset_url_generation(client: TestClient): |
| 301 | + """ |
| 302 | + Tests that the password reset URL is correctly formatted and contains |
| 303 | + the required query parameters. |
| 304 | + """ |
| 305 | + |
| 306 | + test_token = "abc123" |
| 307 | + |
| 308 | + url = generate_password_reset_url(test_email, test_token) |
| 309 | + |
| 310 | + # Parse the URL |
| 311 | + parsed = urlparse(url) |
| 312 | + query_params = parse_qs(parsed.query) |
| 313 | + |
| 314 | + # Get the actual path from the FastAPI app |
| 315 | + reset_password_path: URLPath = app.url_path_for("reset_password") |
| 316 | + |
| 317 | + # Verify URL path |
| 318 | + assert parsed.path == str(reset_password_path) |
| 319 | + |
| 320 | + # Verify query parameters |
| 321 | + assert "email" in query_params |
| 322 | + assert "token" in query_params |
| 323 | + assert query_params["email"][0] == test_email |
| 324 | + assert query_params["token"][0] == test_token |
| 325 | + |
| 326 | + |
| 327 | +def test_password_reset_email_url(client: TestClient, session: Session, test_user: User, mock_resend_send): |
| 328 | + """ |
| 329 | + Tests that the password reset email contains a properly formatted reset URL. |
| 330 | + """ |
| 331 | + response = client.post( |
| 332 | + "/auth/forgot_password", |
| 333 | + data={"email": test_user.email}, |
| 334 | + follow_redirects=False |
| 335 | + ) |
| 336 | + assert response.status_code == 303 |
| 337 | + |
| 338 | + # Get the reset token from the database |
| 339 | + reset_token = session.exec(select(PasswordResetToken) |
| 340 | + .where(PasswordResetToken.user_id == test_user.id)).first() |
| 341 | + assert reset_token is not None |
| 342 | + |
| 343 | + # Get the actual path from the FastAPI app |
| 344 | + reset_password_path: URLPath = app.url_path_for("reset_password") |
| 345 | + |
| 346 | + # Verify the email HTML contains the correct URL |
| 347 | + mock_resend_send.assert_called_once() |
| 348 | + call_args = mock_resend_send.call_args[0][0] |
| 349 | + html_content = call_args["html"] |
| 350 | + |
| 351 | + # Extract URL from HTML |
| 352 | + import re |
| 353 | + url_match = re.search(r'href=[\'"]([^\'"]*)[\'"]', html_content) |
| 354 | + assert url_match is not None |
| 355 | + reset_url = url_match.group(1) |
| 356 | + |
| 357 | + # Parse and verify the URL |
| 358 | + parsed = urlparse(reset_url) |
| 359 | + query_params = parse_qs(parsed.query) |
| 360 | + |
| 361 | + assert parsed.path == str(reset_password_path) |
| 362 | + assert query_params["email"][0] == test_user.email |
| 363 | + assert query_params["token"][0] == reset_token.token |
0 commit comments