Skip to content

Commit cb78b3f

Browse files
Passing tests for email invite flow
1 parent 918fb51 commit cb78b3f

File tree

3 files changed

+364
-28
lines changed

3 files changed

+364
-28
lines changed

routers/invitation.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@
2121
InvitationEmailMismatchError,
2222
)
2323
from exceptions.exceptions import EmailSendFailedError
24+
# Import the account router to generate URLs for login/register
25+
from routers.account import router as account_router
26+
from routers.organization import router as org_router # Already imported, check usage
2427

2528
# Setup logger
2629
logger = getLogger("uvicorn.error")
@@ -160,7 +163,7 @@ async def accept_invitation(
160163
process_invitation(invitation, current_user, session)
161164
session.commit()
162165
# Redirect to the organization page
163-
redirect_url = router.url_path_for("read_organization", org_id=invitation.organization_id)
166+
redirect_url = org_router.url_path_for("read_organization", org_id=invitation.organization_id)
164167
return RedirectResponse(url=str(redirect_url), status_code=status.HTTP_303_SEE_OTHER)
165168
except Exception as e:
166169
logger.error(
@@ -176,15 +179,15 @@ async def accept_invitation(
176179
logger.info(
177180
f"Invitation {invitation.id} requires login for {invitation.invitee_email}. Redirecting."
178181
)
179-
login_url = router.url_path_for("read_login") # Assuming 'read_login' is the name of the GET /login route
182+
login_url = account_router.url_path_for("read_login")
180183
redirect_url_with_token = f"{login_url}?invitation_token={invitation.token}"
181184
return RedirectResponse(url=redirect_url_with_token, status_code=status.HTTP_303_SEE_OTHER)
182185
else:
183186
# Account does not exist - redirect to registration
184187
logger.info(
185188
f"Invitation {invitation.id} requires registration for {invitation.invitee_email}. Redirecting."
186189
)
187-
register_url = router.url_path_for("read_register") # Assuming 'read_register' is the name of the GET /register route
190+
register_url = account_router.url_path_for("read_register")
188191
redirect_url_with_params = (
189192
f"{register_url}?email={invitation.invitee_email}&invitation_token={invitation.token}"
190193
)
Lines changed: 332 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,332 @@
1+
import pytest
2+
from fastapi.testclient import TestClient
3+
from sqlmodel import Session, select
4+
from sqlalchemy.orm import joinedload
5+
from urllib.parse import urlparse, parse_qs
6+
7+
from main import app
8+
from utils.models import User, Account, Invitation
9+
10+
# --- Test Scenarios ---
11+
12+
# 1. Success: New User Registration Flow
13+
def test_accept_invitation_new_user_get_redirects_to_register(
14+
unauth_client: TestClient,
15+
test_invitation: Invitation
16+
):
17+
"""GET /accept with valid token for non-existent account redirects to register."""
18+
response = unauth_client.get(
19+
app.url_path_for("accept_invitation"),
20+
params={"token": test_invitation.token},
21+
follow_redirects=False
22+
)
23+
assert response.status_code == 303
24+
redirect_location = response.headers["location"]
25+
parsed_url = urlparse(redirect_location)
26+
query_params = parse_qs(parsed_url.query)
27+
28+
assert parsed_url.path == app.url_path_for("read_register")
29+
assert query_params.get("invitation_token") == [test_invitation.token]
30+
assert query_params.get("email") == [test_invitation.invitee_email]
31+
32+
def test_accept_invitation_new_user_post_registers_and_accepts(
33+
unauth_client: TestClient,
34+
session: Session,
35+
test_invitation: Invitation,
36+
test_organization: Invitation # Fetch org for redirect check
37+
):
38+
"""POST /register with valid token creates user, accepts invite, redirects to org."""
39+
register_data = {
40+
"name": "New Invitee",
41+
"email": test_invitation.invitee_email, # Must match invitation
42+
"password": "NewInvitee123!@#",
43+
"confirm_password": "NewInvitee123!@#",
44+
"invitation_token": test_invitation.token
45+
}
46+
response = unauth_client.post(
47+
app.url_path_for("register"),
48+
data=register_data,
49+
follow_redirects=False
50+
)
51+
52+
assert response.status_code == 303
53+
# Check redirect URL matches the organization page
54+
expected_redirect_url = app.url_path_for("read_organization", org_id=test_organization.id)
55+
assert response.headers["location"] == expected_redirect_url
56+
57+
# Check cookies are set
58+
assert "access_token" in response.cookies
59+
assert "refresh_token" in response.cookies
60+
61+
# Verify database state
62+
# 1. Account created
63+
account = session.exec(select(Account).where(Account.email == test_invitation.invitee_email)).first()
64+
assert account is not None
65+
66+
# 2. User created and linked
67+
user: User = session.exec(
68+
select(User).where(User.account_id == account.id).options(joinedload(User.roles))
69+
).unique().one()
70+
assert user is not None
71+
assert user.name == "New Invitee"
72+
73+
# 3. User added to the correct Role
74+
assert any(role.id == test_invitation.role_id for role in user.roles)
75+
76+
# 4. Invitation marked as used
77+
session.refresh(test_invitation)
78+
assert test_invitation.used is True
79+
assert test_invitation.accepted_by_user_id == user.id
80+
assert test_invitation.accepted_at is not None
81+
82+
# 2. Success: Existing User Login Flow
83+
def test_accept_invitation_existing_user_logged_out_get_redirects_to_login(
84+
unauth_client: TestClient,
85+
test_invitation: Invitation,
86+
existing_invitee_account: Account # Ensure account exists
87+
):
88+
"""GET /accept with valid token for existing account redirects to login."""
89+
response = unauth_client.get(
90+
app.url_path_for("accept_invitation"),
91+
params={"token": test_invitation.token},
92+
follow_redirects=False
93+
)
94+
assert response.status_code == 303
95+
redirect_location = response.headers["location"]
96+
parsed_url = urlparse(redirect_location)
97+
query_params = parse_qs(parsed_url.query)
98+
99+
assert parsed_url.path == app.url_path_for("read_login")
100+
assert query_params.get("invitation_token") == [test_invitation.token]
101+
102+
def test_accept_invitation_existing_user_post_logs_in_and_accepts(
103+
unauth_client: TestClient,
104+
session: Session,
105+
test_invitation: Invitation,
106+
existing_invitee_account: Account,
107+
existing_invitee_user: User, # To verify role assignment
108+
test_organization: Invitation # Fetch org for redirect check
109+
):
110+
"""POST /login with valid token logs in user, accepts invite, redirects to org."""
111+
login_data = {
112+
"email": existing_invitee_account.email,
113+
"password": "Invitee123!@#", # Password from fixture
114+
"invitation_token": test_invitation.token
115+
}
116+
response = unauth_client.post(
117+
app.url_path_for("login"),
118+
data=login_data,
119+
follow_redirects=False
120+
)
121+
122+
assert response.status_code == 303
123+
# Check redirect URL matches the organization page
124+
expected_redirect_url = app.url_path_for("read_organization", org_id=test_organization.id)
125+
assert response.headers["location"] == expected_redirect_url
126+
127+
# Check cookies are set
128+
assert "access_token" in response.cookies
129+
assert "refresh_token" in response.cookies
130+
131+
# Verify database state
132+
# 1. User added to the correct Role (load roles eagerly)
133+
session.refresh(existing_invitee_user, attribute_names=['roles'])
134+
assert any(role.id == test_invitation.role_id for role in existing_invitee_user.roles)
135+
136+
# 2. Invitation marked as used
137+
session.refresh(test_invitation)
138+
assert test_invitation.used is True
139+
assert test_invitation.accepted_by_user_id == existing_invitee_user.id
140+
assert test_invitation.accepted_at is not None
141+
142+
# 3. Success: Logged-in Correct User Flow
143+
def test_accept_invitation_logged_in_correct_user_get_accepts_and_redirects(
144+
auth_client_invitee: TestClient,
145+
session: Session,
146+
test_invitation: Invitation,
147+
existing_invitee_user: User,
148+
test_organization: Invitation # Fetch org for redirect check
149+
):
150+
"""GET /accept with valid token when logged in as correct user accepts directly."""
151+
response = auth_client_invitee.get(
152+
app.url_path_for("accept_invitation"),
153+
params={"token": test_invitation.token},
154+
follow_redirects=False
155+
)
156+
157+
assert response.status_code == 303
158+
# Check redirect URL matches the organization page
159+
expected_redirect_url = app.url_path_for("read_organization", org_id=test_organization.id)
160+
assert response.headers["location"] == expected_redirect_url
161+
162+
# Verify database state
163+
# 1. User added to the correct Role (load roles eagerly)
164+
session.refresh(existing_invitee_user, attribute_names=['roles'])
165+
assert any(role.id == test_invitation.role_id for role in existing_invitee_user.roles)
166+
167+
# 2. Invitation marked as used
168+
session.refresh(test_invitation)
169+
assert test_invitation.used is True
170+
assert test_invitation.accepted_by_user_id == existing_invitee_user.id
171+
assert test_invitation.accepted_at is not None
172+
173+
# 4. Failure: Invalid/Expired/Used Token
174+
@pytest.mark.parametrize("token_type", [
175+
"invalid",
176+
"expired",
177+
"used",
178+
])
179+
def test_accept_invitation_get_invalid_token_fails(
180+
unauth_client: TestClient,
181+
token_type: str,
182+
request # Required by getfixturevalue
183+
):
184+
"""GET /accept with invalid, expired, or used token fails with 404."""
185+
token_value = "invalid-token-string"
186+
if token_type == "expired":
187+
expired_invite: Invitation = request.getfixturevalue("expired_invitation")
188+
token_value = expired_invite.token
189+
elif token_type == "used":
190+
used_invite: Invitation = request.getfixturevalue("used_invitation")
191+
token_value = used_invite.token
192+
193+
response = unauth_client.get(
194+
app.url_path_for("accept_invitation"),
195+
params={"token": token_value},
196+
follow_redirects=False
197+
)
198+
assert response.status_code == 404 # InvalidInvitationTokenError maps to 404
199+
200+
@pytest.mark.parametrize("token_type", [
201+
"invalid",
202+
"expired",
203+
"used",
204+
])
205+
def test_accept_invitation_register_post_invalid_token_fails(
206+
unauth_client: TestClient,
207+
token_type: str,
208+
request # Required by getfixturevalue
209+
):
210+
"""POST /register with invalid token fails with 404."""
211+
token_value = "invalid-token-string"
212+
invitee_email = "[email protected]"
213+
if token_type == "expired":
214+
expired_invite: Invitation = request.getfixturevalue("expired_invitation")
215+
token_value = expired_invite.token
216+
invitee_email = expired_invite.invitee_email
217+
elif token_type == "used":
218+
used_invite: Invitation = request.getfixturevalue("used_invitation")
219+
token_value = used_invite.token
220+
invitee_email = used_invite.invitee_email
221+
222+
register_data = {
223+
"name": "Invalid Token User",
224+
"email": invitee_email,
225+
"password": "Password123!@#",
226+
"confirm_password": "Password123!@#",
227+
"invitation_token": token_value
228+
}
229+
response = unauth_client.post(
230+
app.url_path_for("register"),
231+
data=register_data,
232+
follow_redirects=False
233+
)
234+
assert response.status_code == 404 # InvalidInvitationTokenError
235+
236+
@pytest.mark.parametrize("token_type", [
237+
"invalid",
238+
"expired",
239+
"used",
240+
])
241+
def test_accept_invitation_login_post_invalid_token_fails(
242+
unauth_client: TestClient,
243+
token_type: str,
244+
existing_invitee_account, # Need an account to attempt login
245+
request # Required by getfixturevalue
246+
):
247+
"""POST /login with invalid token fails with 404."""
248+
token_value = "invalid-token-string"
249+
if token_type == "expired":
250+
expired_invite: Invitation = request.getfixturevalue("expired_invitation")
251+
token_value = expired_invite.token
252+
elif token_type == "used":
253+
used_invite: Invitation = request.getfixturevalue("used_invitation")
254+
token_value = used_invite.token
255+
256+
login_data = {
257+
"email": existing_invitee_account.email,
258+
"password": "Invitee123!@#",
259+
"invitation_token": token_value
260+
}
261+
response = unauth_client.post(
262+
app.url_path_for("login"),
263+
data=login_data,
264+
follow_redirects=False
265+
)
266+
assert response.status_code == 404 # InvalidInvitationTokenError
267+
268+
# 5. Failure: Email Mismatch (Registration)
269+
def test_accept_invitation_register_email_mismatch_fails(
270+
unauth_client: TestClient,
271+
test_invitation: Invitation
272+
):
273+
"""POST /register with valid token but different email fails with 403."""
274+
register_data = {
275+
"name": "Mismatch User",
276+
"email": "[email protected]", # Different from invitation
277+
"password": "Password123!@#",
278+
"confirm_password": "Password123!@#",
279+
"invitation_token": test_invitation.token
280+
}
281+
response = unauth_client.post(
282+
app.url_path_for("register"),
283+
data=register_data,
284+
follow_redirects=False
285+
)
286+
assert response.status_code == 403 # InvitationEmailMismatchError
287+
288+
# 6. Failure: Email Mismatch (Login)
289+
def test_accept_invitation_login_email_mismatch_fails(
290+
unauth_client: TestClient,
291+
test_invitation: Invitation,
292+
test_account: Account # Account with email different from invitee
293+
):
294+
"""POST /login with valid token but credentials for different user fails with 403."""
295+
login_data = {
296+
"email": test_account.email, # Different email
297+
"password": "Test123!@#", # Password for test_account
298+
"invitation_token": test_invitation.token # Token for [email protected]
299+
}
300+
response = unauth_client.post(
301+
app.url_path_for("login"),
302+
data=login_data,
303+
follow_redirects=False
304+
)
305+
assert response.status_code == 403 # InvitationEmailMismatchError
306+
307+
# 7. Failure: Logged-in Wrong User
308+
def test_accept_invitation_logged_in_wrong_user_get_redirects_to_login(
309+
auth_client_non_member: TestClient, # Client logged in as user != invitee
310+
test_invitation: Invitation,
311+
existing_invitee_account: Account, # Ensure the invitee's account exists
312+
session: Session
313+
):
314+
"""GET /accept with valid token when logged in as wrong user redirects to login."""
315+
response = auth_client_non_member.get(
316+
app.url_path_for("accept_invitation"),
317+
params={"token": test_invitation.token},
318+
follow_redirects=False
319+
)
320+
assert response.status_code == 303
321+
redirect_location = response.headers["location"]
322+
parsed_url = urlparse(redirect_location)
323+
query_params = parse_qs(parsed_url.query)
324+
325+
# Should redirect back to login, preserving the token
326+
assert parsed_url.path == app.url_path_for("read_login")
327+
assert query_params.get("invitation_token") == [test_invitation.token]
328+
329+
# Verify database state hasn't changed
330+
session.refresh(test_invitation)
331+
assert test_invitation.used is False
332+
assert test_invitation.accepted_by_user_id is None

0 commit comments

Comments
 (0)