Skip to content

Commit c390f51

Browse files
First pass at implementing email update confirmation workflow
1 parent b1aa87a commit c390f51

File tree

6 files changed

+208
-10
lines changed

6 files changed

+208
-10
lines changed

main.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ async def lifespan(app: FastAPI):
3333
# Optional shutdown logic
3434

3535

36-
app = FastAPI(lifespan=lifespan)
36+
app: FastAPI = FastAPI(lifespan=lifespan)
3737

3838
# Mount static files (e.g., CSS, JS)
3939
app.mount("/static", StaticFiles(directory="static"), name="static")

routers/authentication.py

Lines changed: 85 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from sqlmodel import Session, select
99
from utils.models import User, UserPassword
1010
from utils.auth import (
11+
AuthenticationError,
1112
get_session,
1213
get_user_from_reset_token,
1314
create_password_validator,
@@ -18,7 +19,10 @@
1819
create_access_token,
1920
create_refresh_token,
2021
validate_token,
21-
send_reset_email
22+
send_reset_email,
23+
send_email_update_confirmation,
24+
get_user_from_email_update_token,
25+
get_authenticated_user
2226
)
2327

2428
logger = getLogger("uvicorn.error")
@@ -102,6 +106,17 @@ async def as_form(
102106
new_password=new_password, confirm_new_password=confirm_new_password)
103107

104108

109+
class UpdateEmail(BaseModel):
110+
new_email: EmailStr
111+
112+
@classmethod
113+
async def as_form(
114+
cls,
115+
new_email: EmailStr = Form(...)
116+
):
117+
return cls(new_email=new_email)
118+
119+
105120
# --- DB Request and Response Models ---
106121

107122

@@ -289,3 +304,72 @@ def logout():
289304
response.delete_cookie("access_token")
290305
response.delete_cookie("refresh_token")
291306
return response
307+
308+
309+
@router.post("/update_email")
310+
async def request_email_update(
311+
update: UpdateEmail = Depends(UpdateEmail.as_form),
312+
user: User = Depends(get_authenticated_user),
313+
session: Session = Depends(get_session)
314+
):
315+
# Check if the new email is already registered
316+
existing_user = session.exec(
317+
select(User).where(User.email == update.new_email)
318+
).first()
319+
320+
if existing_user:
321+
raise HTTPException(
322+
status_code=400,
323+
detail="This email is already registered"
324+
)
325+
326+
if not user or not user.id:
327+
raise HTTPException(
328+
status_code=400,
329+
detail="User not found"
330+
)
331+
332+
# Send confirmation email
333+
send_email_update_confirmation(
334+
current_email=user.email,
335+
new_email=update.new_email,
336+
user_id=user.id,
337+
session=session
338+
)
339+
340+
return RedirectResponse(
341+
url="/settings?email_update_requested=true",
342+
status_code=303
343+
)
344+
345+
346+
@router.get("/confirm_email_update")
347+
async def confirm_email_update(
348+
user_id: int,
349+
token: str,
350+
new_email: str,
351+
session: Session = Depends(get_session)
352+
):
353+
user, update_token = get_user_from_email_update_token(
354+
user_id, token, session
355+
)
356+
357+
if not user or not update_token:
358+
raise AuthenticationError()
359+
360+
# Get the new email from the most recent unconfirmed token
361+
if update_token.is_expired():
362+
raise HTTPException(
363+
status_code=400,
364+
detail="Token has expired"
365+
)
366+
367+
# Update email and mark token as used
368+
user.email = new_email
369+
update_token.used = True
370+
session.commit()
371+
372+
return RedirectResponse(
373+
url="/settings?email_updated=true",
374+
status_code=303
375+
)
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
{% extends "emails/base_email.html" %}
2+
3+
{% block email_title %}Email Update Request{% endblock %}
4+
5+
{% block email_content %}
6+
<h1>Confirm Your New Email Address</h1>
7+
<p>You have requested to change your email address from {{ current_email }} to {{ new_email }}.</p>
8+
<p>Click the link below to confirm this change:</p>
9+
<p><a href="{{ confirmation_url }}">Confirm Email Update</a></p>
10+
<p>If you did not request this change, please ignore this email.</p>
11+
<p>This link will expire in 1 hour.</p>
12+
{% endblock %}

tests/test_auth.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
)
1818

1919

20-
def test_convert_python_regex_to_html():
20+
def test_convert_python_regex_to_html() -> None:
2121
PYTHON_SPECIAL_CHARS = r"(?=.*[\[\]\\@$!%*?&{}<>.,'#\-_=+\(\):;|~/\^])"
2222
HTML_EQUIVALENT = r"(?=.*[\[\]\\@$!%*?&\{\}\<\>\.\,\\'#\-_=\+\(\):;\|~\/\^])"
2323

@@ -26,14 +26,14 @@ def test_convert_python_regex_to_html():
2626
assert PYTHON_SPECIAL_CHARS == HTML_EQUIVALENT
2727

2828

29-
def test_password_hashing():
29+
def test_password_hashing() -> None:
3030
password = "Test123!@#"
3131
hashed = get_password_hash(password)
3232
assert verify_password(password, hashed)
3333
assert not verify_password("wrong_password", hashed)
3434

3535

36-
def test_token_creation_and_validation():
36+
def test_token_creation_and_validation() -> None:
3737
data = {"sub": "[email protected]"}
3838

3939
# Test access token
@@ -51,21 +51,21 @@ def test_token_creation_and_validation():
5151
assert decoded["type"] == "refresh"
5252

5353

54-
def test_expired_token():
54+
def test_expired_token() -> None:
5555
data = {"sub": "[email protected]"}
5656
expired_delta = timedelta(minutes=-10)
5757
expired_token = create_access_token(data, expired_delta)
5858
decoded = validate_token(expired_token, "access")
5959
assert decoded is None
6060

6161

62-
def test_invalid_token_type():
62+
def test_invalid_token_type() -> None:
6363
data = {"sub": "[email protected]"}
6464
access_token = create_access_token(data)
6565
decoded = validate_token(access_token, "refresh")
6666
assert decoded is None
6767

68-
def test_password_reset_url_generation():
68+
def test_password_reset_url_generation() -> None:
6969
"""
7070
Tests that the password reset URL is correctly formatted and contains
7171
the required query parameters.
@@ -91,7 +91,7 @@ def test_password_reset_url_generation():
9191
assert query_params["email"][0] == test_email
9292
assert query_params["token"][0] == test_token
9393

94-
def test_password_pattern():
94+
def test_password_pattern() -> None:
9595
"""
9696
Tests that the password pattern is correctly defined. to require at least
9797
one uppercase letter, one lowercase letter, one digit, and one special

utils/auth.py

Lines changed: 84 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
from fastapi.templating import Jinja2Templates
1717
from fastapi import Depends, Cookie, HTTPException, status
1818
from utils.db import get_session
19-
from utils.models import User, Role, PasswordResetToken
19+
from utils.models import User, Role, PasswordResetToken, EmailUpdateToken
2020

2121
load_dotenv()
2222
resend.api_key = os.environ["RESEND_API_KEY"]
@@ -434,3 +434,86 @@ def get_user_with_relations(
434434
).one()
435435

436436
return eager_user
437+
438+
439+
def generate_email_update_url(user_id: int, token: str, new_email: str) -> str:
440+
"""
441+
Generates the email update confirmation URL with proper query parameters.
442+
"""
443+
base_url = os.getenv('BASE_URL')
444+
return f"{base_url}/auth/confirm_email_update?user_id={user_id}&token={token}&new_email={new_email}"
445+
446+
447+
def send_email_update_confirmation(
448+
current_email: str,
449+
new_email: str,
450+
user_id: int,
451+
session: Session
452+
) -> None:
453+
# Check for an existing unexpired token
454+
existing_token = session.exec(
455+
select(EmailUpdateToken)
456+
.where(
457+
EmailUpdateToken.user_id == user_id,
458+
EmailUpdateToken.expires_at > datetime.now(UTC),
459+
EmailUpdateToken.used == False
460+
)
461+
).first()
462+
463+
if existing_token:
464+
logger.debug("An unexpired email update token already exists for this user.")
465+
return
466+
467+
# Generate a new token
468+
token = EmailUpdateToken(user_id=user_id)
469+
session.add(token)
470+
471+
try:
472+
confirmation_url = generate_email_update_url(
473+
user_id, token.token, new_email)
474+
475+
# Render the email template
476+
template = templates.get_template("emails/update_email.html")
477+
html_content = template.render({
478+
"confirmation_url": confirmation_url,
479+
"current_email": current_email,
480+
"new_email": new_email
481+
})
482+
483+
params: resend.Emails.SendParams = {
484+
"from": "[email protected]",
485+
"to": [current_email],
486+
"subject": "Confirm Email Update",
487+
"html": html_content,
488+
}
489+
490+
sent_email: resend.Email = resend.Emails.send(params)
491+
logger.debug(f"Email update confirmation sent: {sent_email.get('id')}")
492+
493+
session.commit()
494+
except Exception as e:
495+
logger.error(f"Failed to send email update confirmation: {e}")
496+
session.rollback()
497+
498+
499+
def get_user_from_email_update_token(
500+
user_id: int,
501+
token: str,
502+
session: Session
503+
) -> tuple[Optional[User], Optional[EmailUpdateToken]]:
504+
result = session.exec(
505+
select(User, EmailUpdateToken)
506+
.where(
507+
User.id == user_id,
508+
EmailUpdateToken.token == token,
509+
EmailUpdateToken.expires_at > datetime.now(UTC),
510+
EmailUpdateToken.used == False,
511+
EmailUpdateToken.user_id == User.id
512+
)
513+
).first()
514+
515+
if not result:
516+
return None, None
517+
518+
user, update_token = result
519+
return user, update_token

utils/models.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,25 @@ def is_expired(self) -> bool:
161161
return datetime.now(UTC) > self.expires_at.replace(tzinfo=UTC)
162162

163163

164+
class EmailUpdateToken(SQLModel, table=True):
165+
id: Optional[int] = Field(default=None, primary_key=True)
166+
user_id: Optional[int] = Field(foreign_key="user.id")
167+
token: str = Field(default_factory=lambda: str(
168+
uuid4()), index=True, unique=True)
169+
expires_at: datetime = Field(
170+
default_factory=lambda: datetime.now(UTC) + timedelta(hours=1))
171+
used: bool = Field(default=False)
172+
173+
user: Mapped[Optional["User"]] = Relationship(
174+
back_populates="email_update_tokens")
175+
176+
def is_expired(self) -> bool:
177+
"""
178+
Check if the token has expired
179+
"""
180+
return datetime.now(UTC) > self.expires_at.replace(tzinfo=UTC)
181+
182+
164183
class UserPassword(SQLModel, table=True):
165184
id: Optional[int] = Field(default=None, primary_key=True)
166185
user_id: Optional[int] = Field(foreign_key="user.id", unique=True)

0 commit comments

Comments
 (0)