diff --git a/.env.example b/.env.example index 54a4502..3bcd885 100644 --- a/.env.example +++ b/.env.example @@ -1,6 +1,9 @@ # Secret key for JWT SECRET_KEY= +# Base URL +BASE_URL=http://localhost:8000 + # Database DB_USER= DB_PASSWORD= diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 5b20293..af360f1 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -53,6 +53,7 @@ jobs: echo "DB_PORT=5432" >> $GITHUB_ENV echo "DB_NAME=test_db" >> $GITHUB_ENV echo "SECRET_KEY=$(openssl rand -base64 32)" >> $GITHUB_ENV + echo "BASE_URL=http://localhost:8000" >> $GITHUB_ENV - name: Verify environment variables run: | diff --git a/tests/test_authentication.py b/tests/test_authentication.py index a87cb60..eed349e 100644 --- a/tests/test_authentication.py +++ b/tests/test_authentication.py @@ -1,7 +1,11 @@ import pytest from fastapi.testclient import TestClient +from starlette.datastructures import URLPath from sqlmodel import Session, select from datetime import timedelta +from unittest.mock import patch +import resend +from urllib.parse import urlparse, parse_qs from main import app from utils.models import User, PasswordResetToken @@ -11,7 +15,8 @@ create_refresh_token, verify_password, get_password_hash, - validate_token + validate_token, + generate_password_reset_url ) @@ -51,6 +56,24 @@ def test_user_fixture(session: Session): return user +# Mock email response fixture +@pytest.fixture +def mock_email_response(): + """ + Returns a mock Email response object + """ + return resend.Email(id="6229f547-f3f6-4eb8-b0dc-82c1b09121b6") + + +@pytest.fixture +def mock_resend_send(mock_email_response): + """ + Patches resend.Emails.send to return a mock response + """ + with patch('resend.Emails.send', return_value=mock_email_response) as mock: + yield mock + + # --- Authentication Helper Function Tests --- @@ -166,40 +189,56 @@ def test_refresh_token_endpoint(client: TestClient, test_user: User): assert decoded["sub"] == test_user.email -# # TODO: Mock email sending -# def test_password_reset_flow(client: TestClient, session: Session, test_user: User): -# # Test forgot password request -# response = client.post( -# "/auth/forgot_password", -# data={"email": test_user.email}, -# follow_redirects=False -# ) -# assert response.status_code == 303 - -# # Verify reset token was created -# reset_token = session.exec(select(PasswordResetToken) -# .where(PasswordResetToken.user_id == test_user.id)).first() -# assert reset_token is not None -# assert not reset_token.used - -# # Test password reset -# response = client.post( -# "/auth/reset_password", -# data={ -# "email": test_user.email, -# "token": reset_token.token, -# "new_password": "NewPass123!@#", -# "confirm_new_password": "NewPass123!@#" -# }, -# follow_redirects=False -# ) -# assert response.status_code == 303 - -# # Verify password was updated and token was marked as used -# session.refresh(test_user) -# session.refresh(reset_token) -# assert verify_password("NewPass123!@#", test_user.hashed_password) -# assert reset_token.used +def test_password_reset_flow(client: TestClient, session: Session, test_user: User, mock_resend_send): + # Test forgot password request + response = client.post( + "/auth/forgot_password", + data={"email": test_user.email}, + follow_redirects=False + ) + assert response.status_code == 303 + + # Verify the email was "sent" with correct parameters + mock_resend_send.assert_called_once() + call_args = mock_resend_send.call_args[0][0] # Get the SendParams argument + + # Verify SendParams structure and required fields + assert isinstance(call_args, dict) + assert isinstance(call_args["from"], str) + assert isinstance(call_args["to"], list) + assert isinstance(call_args["subject"], str) + assert isinstance(call_args["html"], str) + + # Verify content + assert call_args["to"] == [test_user.email] + assert call_args["from"] == "noreply@promptlytechnologies.com" + assert "Password Reset Request" in call_args["subject"] + assert "reset_password" in call_args["html"] + + # Verify reset token was created + reset_token = session.exec(select(PasswordResetToken) + .where(PasswordResetToken.user_id == test_user.id)).first() + assert reset_token is not None + assert not reset_token.used + + # Test password reset + response = client.post( + "/auth/reset_password", + data={ + "email": test_user.email, + "token": reset_token.token, + "new_password": "NewPass123!@#", + "confirm_new_password": "NewPass123!@#" + }, + follow_redirects=False + ) + assert response.status_code == 303 + + # Verify password was updated and token was marked as used + session.refresh(test_user) + session.refresh(reset_token) + assert verify_password("NewPass123!@#", test_user.hashed_password) + assert reset_token.used def test_logout_endpoint(client: TestClient): @@ -256,3 +295,69 @@ def test_password_reset_with_invalid_token(client: TestClient, test_user: User): } ) assert response.status_code == 400 + + +def test_password_reset_url_generation(client: TestClient): + """ + Tests that the password reset URL is correctly formatted and contains + the required query parameters. + """ + test_email = "test@example.com" + test_token = "abc123" + + url = generate_password_reset_url(test_email, test_token) + + # Parse the URL + parsed = urlparse(url) + query_params = parse_qs(parsed.query) + + # Get the actual path from the FastAPI app + reset_password_path: URLPath = app.url_path_for("reset_password") + + # Verify URL path + assert parsed.path == str(reset_password_path) + + # Verify query parameters + assert "email" in query_params + assert "token" in query_params + assert query_params["email"][0] == test_email + assert query_params["token"][0] == test_token + + +def test_password_reset_email_url(client: TestClient, session: Session, test_user: User, mock_resend_send): + """ + Tests that the password reset email contains a properly formatted reset URL. + """ + response = client.post( + "/auth/forgot_password", + data={"email": test_user.email}, + follow_redirects=False + ) + assert response.status_code == 303 + + # Get the reset token from the database + reset_token = session.exec(select(PasswordResetToken) + .where(PasswordResetToken.user_id == test_user.id)).first() + assert reset_token is not None + + # Get the actual path from the FastAPI app + reset_password_path: URLPath = app.url_path_for("reset_password") + + # Verify the email HTML contains the correct URL + mock_resend_send.assert_called_once() + call_args = mock_resend_send.call_args[0][0] + html_content = call_args["html"] + + # Extract URL from HTML + import re + url_match = re.search(r'href=[\'"]([^\'"]*)[\'"]', html_content) + assert url_match is not None + reset_url = url_match.group(1) + + # Parse and verify the URL + parsed = urlparse(reset_url) + query_params = parse_qs(parsed.query) + + assert parsed.path == str(reset_password_path) + assert query_params["email"][0] == test_user.email + assert query_params["token"][0] == reset_token.token diff --git a/utils/auth.py b/utils/auth.py index f73c989..3bf7dac 100644 --- a/utils/auth.py +++ b/utils/auth.py @@ -258,6 +258,21 @@ def __init__(self, user: User, access_token: str, refresh_token: str): self.refresh_token = refresh_token +def generate_password_reset_url(email: str, token: str) -> str: + """ + Generates the password reset URL with proper query parameters. + + Args: + email: User's email address + token: Password reset token + + Returns: + Complete password reset URL + """ + base_url = os.getenv('BASE_URL') + return f"{base_url}/auth/reset_password?email={email}&token={token}" + + def send_reset_email(email: str, session: Session): # Check for an existing unexpired token user = session.exec(select(User).where(User.email == email)).first() @@ -281,12 +296,12 @@ def send_reset_email(email: str, session: Session): session.add(reset_token) try: - # TODO: Use a templating engine + reset_url = generate_password_reset_url(email, token) params: resend.Emails.SendParams = { "from": "noreply@promptlytechnologies.com", "to": [email], "subject": "Password Reset Request", - "html": f"

Click here to reset your password.

", + "html": f"

Click here to reset your password.

", } sent_email: resend.Email = resend.Emails.send(params)