Skip to content

9 testvalidate password recovery flow using resend #24

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
# Secret key for JWT
SECRET_KEY=

# Base URL
BASE_URL=http://localhost:8000

# Database
DB_USER=
DB_PASSWORD=
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ jobs:
echo "DB_PORT=5432" >> $GITHUB_ENV
echo "DB_NAME=test_db" >> $GITHUB_ENV
echo "SECRET_KEY=$(openssl rand -base64 32)" >> $GITHUB_ENV
echo "BASE_URL=http://localhost:8000" >> $GITHUB_ENV

- name: Verify environment variables
run: |
Expand Down
175 changes: 140 additions & 35 deletions tests/test_authentication.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import pytest
from fastapi.testclient import TestClient
from starlette.datastructures import URLPath
from sqlmodel import Session, select
from datetime import timedelta
from unittest.mock import patch
import resend
from urllib.parse import urlparse, parse_qs

from main import app
from utils.models import User, PasswordResetToken
Expand All @@ -11,7 +15,8 @@
create_refresh_token,
verify_password,
get_password_hash,
validate_token
validate_token,
generate_password_reset_url
)


Expand Down Expand Up @@ -51,6 +56,24 @@ def test_user_fixture(session: Session):
return user


# Mock email response fixture
@pytest.fixture
def mock_email_response():
"""
Returns a mock Email response object
"""
return resend.Email(id="6229f547-f3f6-4eb8-b0dc-82c1b09121b6")


@pytest.fixture
def mock_resend_send(mock_email_response):
"""
Patches resend.Emails.send to return a mock response
"""
with patch('resend.Emails.send', return_value=mock_email_response) as mock:
yield mock


# --- Authentication Helper Function Tests ---


Expand Down Expand Up @@ -166,40 +189,56 @@ def test_refresh_token_endpoint(client: TestClient, test_user: User):
assert decoded["sub"] == test_user.email


# # TODO: Mock email sending
# def test_password_reset_flow(client: TestClient, session: Session, test_user: User):
# # Test forgot password request
# response = client.post(
# "/auth/forgot_password",
# data={"email": test_user.email},
# follow_redirects=False
# )
# assert response.status_code == 303

# # Verify reset token was created
# reset_token = session.exec(select(PasswordResetToken)
# .where(PasswordResetToken.user_id == test_user.id)).first()
# assert reset_token is not None
# assert not reset_token.used

# # Test password reset
# response = client.post(
# "/auth/reset_password",
# data={
# "email": test_user.email,
# "token": reset_token.token,
# "new_password": "NewPass123!@#",
# "confirm_new_password": "NewPass123!@#"
# },
# follow_redirects=False
# )
# assert response.status_code == 303

# # Verify password was updated and token was marked as used
# session.refresh(test_user)
# session.refresh(reset_token)
# assert verify_password("NewPass123!@#", test_user.hashed_password)
# assert reset_token.used
def test_password_reset_flow(client: TestClient, session: Session, test_user: User, mock_resend_send):
# Test forgot password request
response = client.post(
"/auth/forgot_password",
data={"email": test_user.email},
follow_redirects=False
)
assert response.status_code == 303

# Verify the email was "sent" with correct parameters
mock_resend_send.assert_called_once()
call_args = mock_resend_send.call_args[0][0] # Get the SendParams argument

# Verify SendParams structure and required fields
assert isinstance(call_args, dict)
assert isinstance(call_args["from"], str)
assert isinstance(call_args["to"], list)
assert isinstance(call_args["subject"], str)
assert isinstance(call_args["html"], str)

# Verify content
assert call_args["to"] == [test_user.email]
assert call_args["from"] == "[email protected]"
assert "Password Reset Request" in call_args["subject"]
assert "reset_password" in call_args["html"]

# Verify reset token was created
reset_token = session.exec(select(PasswordResetToken)
.where(PasswordResetToken.user_id == test_user.id)).first()
assert reset_token is not None
assert not reset_token.used

# Test password reset
response = client.post(
"/auth/reset_password",
data={
"email": test_user.email,
"token": reset_token.token,
"new_password": "NewPass123!@#",
"confirm_new_password": "NewPass123!@#"
},
follow_redirects=False
)
assert response.status_code == 303

# Verify password was updated and token was marked as used
session.refresh(test_user)
session.refresh(reset_token)
assert verify_password("NewPass123!@#", test_user.hashed_password)
assert reset_token.used


def test_logout_endpoint(client: TestClient):
Expand Down Expand Up @@ -256,3 +295,69 @@ def test_password_reset_with_invalid_token(client: TestClient, test_user: User):
}
)
assert response.status_code == 400


def test_password_reset_url_generation(client: TestClient):
"""
Tests that the password reset URL is correctly formatted and contains
the required query parameters.
"""
test_email = "[email protected]"
test_token = "abc123"

url = generate_password_reset_url(test_email, test_token)

# Parse the URL
parsed = urlparse(url)
query_params = parse_qs(parsed.query)

# Get the actual path from the FastAPI app
reset_password_path: URLPath = app.url_path_for("reset_password")

# Verify URL path
assert parsed.path == str(reset_password_path)

# Verify query parameters
assert "email" in query_params
assert "token" in query_params
assert query_params["email"][0] == test_email
assert query_params["token"][0] == test_token


def test_password_reset_email_url(client: TestClient, session: Session, test_user: User, mock_resend_send):
"""
Tests that the password reset email contains a properly formatted reset URL.
"""
response = client.post(
"/auth/forgot_password",
data={"email": test_user.email},
follow_redirects=False
)
assert response.status_code == 303

# Get the reset token from the database
reset_token = session.exec(select(PasswordResetToken)
.where(PasswordResetToken.user_id == test_user.id)).first()
assert reset_token is not None

# Get the actual path from the FastAPI app
reset_password_path: URLPath = app.url_path_for("reset_password")

# Verify the email HTML contains the correct URL
mock_resend_send.assert_called_once()
call_args = mock_resend_send.call_args[0][0]
html_content = call_args["html"]

# Extract URL from HTML
import re
url_match = re.search(r'href=[\'"]([^\'"]*)[\'"]', html_content)
assert url_match is not None
reset_url = url_match.group(1)

# Parse and verify the URL
parsed = urlparse(reset_url)
query_params = parse_qs(parsed.query)

assert parsed.path == str(reset_password_path)
assert query_params["email"][0] == test_user.email
assert query_params["token"][0] == reset_token.token
19 changes: 17 additions & 2 deletions utils/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,21 @@ def __init__(self, user: User, access_token: str, refresh_token: str):
self.refresh_token = refresh_token


def generate_password_reset_url(email: str, token: str) -> str:
"""
Generates the password reset URL with proper query parameters.

Args:
email: User's email address
token: Password reset token

Returns:
Complete password reset URL
"""
base_url = os.getenv('BASE_URL')
return f"{base_url}/auth/reset_password?email={email}&token={token}"


def send_reset_email(email: str, session: Session):
# Check for an existing unexpired token
user = session.exec(select(User).where(User.email == email)).first()
Expand All @@ -281,12 +296,12 @@ def send_reset_email(email: str, session: Session):
session.add(reset_token)

try:
# TODO: Use a templating engine
reset_url = generate_password_reset_url(email, token)
params: resend.Emails.SendParams = {
"from": "[email protected]",
"to": [email],
"subject": "Password Reset Request",
"html": f"<p>Click <a href='{os.getenv('BASE_URL')}/reset_password?email={email}&token={token}'>here</a> to reset your password.</p>",
"html": f"<p>Click <a href='{reset_url}'>here</a> to reset your password.</p>",
}

sent_email: resend.Email = resend.Emails.send(params)
Expand Down