Skip to content

Commit 384db27

Browse files
Enforce uniqueness of organization-roles at the database level
1 parent a46446f commit 384db27

File tree

4 files changed

+238
-47
lines changed

4 files changed

+238
-47
lines changed

routers/role.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from fastapi.responses import RedirectResponse
77
from sqlmodel import Session, select, col
88
from sqlalchemy.orm import selectinload
9+
from sqlalchemy.exc import IntegrityError
910
from utils.db import get_session
1011
from utils.dependencies import get_authenticated_user
1112
from utils.models import Role, Permission, ValidPermissions, utc_time, User, DataIntegrityError
@@ -54,7 +55,13 @@ def create_role(
5455
db_role.permissions.extend(db_permissions)
5556

5657
# Commit transaction
57-
session.commit()
58+
try:
59+
session.commit()
60+
except IntegrityError:
61+
session.rollback()
62+
# This likely means the unique constraint (org_id, name) was violated,
63+
# potentially due to a race condition if the pre-check passed.
64+
raise RoleAlreadyExistsError()
5865

5966
return RedirectResponse(
6067
url=organization_router.url_path_for("read_organization", org_id=organization_id),
@@ -123,7 +130,14 @@ def update_role(
123130
db_role.name = name
124131
db_role.updated_at = utc_time()
125132

126-
session.commit()
133+
try:
134+
session.commit()
135+
except IntegrityError:
136+
session.rollback()
137+
# This likely means the unique constraint (org_id, name) was violated,
138+
# potentially due to a race condition if the pre-check passed.
139+
raise RoleAlreadyExistsError()
140+
127141
session.refresh(db_role)
128142
return RedirectResponse(
129143
url=organization_router.url_path_for("read_organization", org_id=organization_id),

tests/routers/test_organization.py

Lines changed: 148 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -116,17 +116,37 @@ def test_update_organization_success(
116116
auth_client: TestClient, session: Session, test_organization: Organization, test_user: User
117117
):
118118
"""Test successful organization update"""
119-
# Set up test user as owner with edit permission
119+
# Ensure test_user has the EDIT_ORGANIZATION permission via the Owner role (already created by fixture)
120120
if test_organization.id is None:
121121
raise SetupError("Test organization ID is None")
122+
123+
owner_role = session.exec(
124+
select(Role).where(
125+
Role.organization_id == test_organization.id,
126+
Role.name == "Owner"
127+
)
128+
).first()
129+
130+
if owner_role is None:
131+
raise SetupError("Owner role not found for test organization.")
122132

123-
owner_role = Role(name="Owner", organization_id=test_organization.id)
124-
owner_role.permissions = [
125-
Permission(name=ValidPermissions.EDIT_ORGANIZATION)
126-
]
127-
owner_role.users.append(test_user)
128-
session.add(owner_role)
133+
# Ensure the permission is present (it should be by default)
134+
edit_permission = session.exec(
135+
select(Permission).where(Permission.name == ValidPermissions.EDIT_ORGANIZATION)
136+
).first()
137+
if edit_permission is None:
138+
raise SetupError("EDIT_ORGANIZATION permission not found.")
139+
140+
if edit_permission not in owner_role.permissions:
141+
owner_role.permissions.append(edit_permission) # Add just in case
142+
143+
# Ensure the user is assigned to the role (it should be by default for the creating user)
144+
if test_user not in owner_role.users:
145+
owner_role.users.append(test_user)
146+
129147
session.commit()
148+
session.refresh(owner_role)
149+
session.refresh(test_user)
130150

131151
new_name = "Updated Organization Name"
132152
response = auth_client.post(
@@ -168,19 +188,38 @@ def test_update_organization_unauthorized(auth_client, session, test_organizatio
168188
assert "permission" in response.text.lower()
169189

170190
def test_update_organization_duplicate_name(auth_client, session, test_organization, test_user):
171-
"""Test organization update with duplicate name"""
172-
# Create another organization with the target name
173191
existing_org = Organization(name="Existing Org")
174192
session.add(existing_org)
175193

176-
# Set up permissions
177-
owner_role = Role(name="Owner", organization_id=test_organization.id)
178-
owner_role.permissions = [
179-
Permission(name=ValidPermissions.EDIT_ORGANIZATION)
180-
]
181-
owner_role.users.append(test_user)
182-
session.add(owner_role)
194+
# Ensure test_user has EDIT_ORGANIZATION permission via the Owner role
195+
if test_organization.id is None:
196+
raise SetupError("Test organization ID is None")
197+
198+
owner_role = session.exec(
199+
select(Role).where(
200+
Role.organization_id == test_organization.id,
201+
Role.name == "Owner"
202+
)
203+
).first()
204+
205+
if owner_role is None:
206+
raise SetupError("Owner role not found for test organization.")
207+
208+
edit_permission = session.exec(
209+
select(Permission).where(Permission.name == ValidPermissions.EDIT_ORGANIZATION)
210+
).first()
211+
if edit_permission is None:
212+
raise SetupError("EDIT_ORGANIZATION permission not found.")
213+
214+
if edit_permission not in owner_role.permissions:
215+
owner_role.permissions.append(edit_permission)
216+
217+
if test_user not in owner_role.users:
218+
owner_role.users.append(test_user)
219+
183220
session.commit()
221+
session.refresh(owner_role)
222+
session.refresh(test_user)
184223

185224
response = auth_client.post(
186225
app.url_path_for("update_organization", org_id=test_organization.id),
@@ -196,14 +235,35 @@ def test_update_organization_duplicate_name(auth_client, session, test_organizat
196235

197236
def test_update_organization_empty_name(auth_client, session, test_organization, test_user):
198237
"""Test organization update with empty name"""
199-
# Set up permissions
200-
owner_role = Role(name="Owner", organization_id=test_organization.id)
201-
owner_role.permissions = [
202-
Permission(name=ValidPermissions.EDIT_ORGANIZATION)
203-
]
204-
owner_role.users.append(test_user)
205-
session.add(owner_role)
238+
# Ensure test_user has EDIT_ORGANIZATION permission via the Owner role
239+
if test_organization.id is None:
240+
raise SetupError("Test organization ID is None")
241+
242+
owner_role = session.exec(
243+
select(Role).where(
244+
Role.organization_id == test_organization.id,
245+
Role.name == "Owner"
246+
)
247+
).first()
248+
249+
if owner_role is None:
250+
raise SetupError("Owner role not found for test organization.")
251+
252+
edit_permission = session.exec(
253+
select(Permission).where(Permission.name == ValidPermissions.EDIT_ORGANIZATION)
254+
).first()
255+
if edit_permission is None:
256+
raise SetupError("EDIT_ORGANIZATION permission not found.")
257+
258+
if edit_permission not in owner_role.permissions:
259+
owner_role.permissions.append(edit_permission)
260+
261+
if test_user not in owner_role.users:
262+
owner_role.users.append(test_user)
263+
206264
session.commit()
265+
session.refresh(owner_role)
266+
session.refresh(test_user)
207267

208268
response = auth_client.post(
209269
app.url_path_for("update_organization", org_id=test_organization.id),
@@ -235,15 +295,35 @@ def test_delete_organization_success(auth_client, session, test_organization, te
235295
"""Test successful organization deletion"""
236296
# Store the organization ID for later verification
237297
org_id = test_organization.id
298+
if org_id is None: # Add check for None
299+
raise SetupError("Test organization ID is None")
238300

239-
# Set up test user as owner with delete permission
240-
owner_role = Role(name="Owner", organization_id=org_id)
241-
owner_role.permissions = [
242-
Permission(name=ValidPermissions.DELETE_ORGANIZATION)
243-
]
244-
owner_role.users.append(test_user)
245-
session.add(owner_role)
246-
session.commit()
301+
# Ensure test_user has DELETE_ORGANIZATION permission via the Owner role
302+
owner_role = session.exec(
303+
select(Role).where(
304+
Role.organization_id == org_id,
305+
Role.name == "Owner"
306+
)
307+
).first()
308+
309+
if owner_role is None:
310+
raise SetupError("Owner role not found for test organization.")
311+
312+
delete_permission = session.exec(
313+
select(Permission).where(Permission.name == ValidPermissions.DELETE_ORGANIZATION)
314+
).first()
315+
if delete_permission is None:
316+
raise SetupError("DELETE_ORGANIZATION permission not found.")
317+
318+
if delete_permission not in owner_role.permissions:
319+
owner_role.permissions.append(delete_permission)
320+
321+
if test_user not in owner_role.users:
322+
owner_role.users.append(test_user)
323+
324+
session.commit() # Commit permission/user assignment changes
325+
session.refresh(owner_role)
326+
session.refresh(test_user)
247327

248328
response = auth_client.post(
249329
app.url_path_for("delete_organization", org_id=org_id),
@@ -304,20 +384,44 @@ def test_delete_organization_cascade(auth_client, session, test_organization, te
304384
"""Test that deleting organization cascades to roles"""
305385
# Store the organization ID for later verification
306386
org_id = test_organization.id
387+
if org_id is None: # Add check for None
388+
raise SetupError("Test organization ID is None")
307389

308-
# Set up test user as owner with delete permission
309-
owner_role = Role(name="Owner", organization_id=org_id)
310-
owner_role.permissions = [
311-
Permission(name=ValidPermissions.DELETE_ORGANIZATION)
312-
]
313-
owner_role.users.append(test_user)
314-
315-
# Add another role to verify cascade
316-
member_role = Role(name="Member", organization_id=org_id)
317-
318-
session.add(owner_role)
319-
session.add(member_role)
320-
session.commit()
390+
# Ensure test_user has DELETE_ORGANIZATION permission via the Owner role
391+
owner_role = session.exec(
392+
select(Role).where(
393+
Role.organization_id == org_id,
394+
Role.name == "Owner"
395+
)
396+
).first()
397+
if owner_role is None:
398+
raise SetupError("Owner role not found for test organization.")
399+
400+
delete_permission = session.exec(
401+
select(Permission).where(Permission.name == ValidPermissions.DELETE_ORGANIZATION)
402+
).first()
403+
if delete_permission is None:
404+
raise SetupError("DELETE_ORGANIZATION permission not found.")
405+
406+
if delete_permission not in owner_role.permissions:
407+
owner_role.permissions.append(delete_permission)
408+
409+
if test_user not in owner_role.users:
410+
owner_role.users.append(test_user)
411+
412+
# Verify the Member role exists (created by fixture)
413+
member_role = session.exec(
414+
select(Role).where(
415+
Role.organization_id == org_id,
416+
Role.name == "Member"
417+
)
418+
).first()
419+
if member_role is None:
420+
raise SetupError("Member role not found for test organization. Fixture might have changed.")
421+
422+
session.commit() # Commit permission/user assignment changes
423+
session.refresh(owner_role)
424+
session.refresh(test_user)
321425

322426
response = auth_client.post(
323427
app.url_path_for("delete_organization", org_id=org_id),

tests/utils/test_models.py

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
from datetime import timedelta, datetime, UTC
22
from typing import Optional
33
from sqlmodel import select, Session
4+
from sqlalchemy.exc import IntegrityError
5+
import pytest
46
from utils.models import (
57
Permission,
68
Role,
@@ -29,6 +31,10 @@ def test_permissions_persist_after_role_deletion(session: Session):
2931
session.add(organization)
3032
session.commit()
3133
session.refresh(organization)
34+
35+
# Ensure organization ID is not None (for type checking)
36+
if organization.id is None:
37+
pytest.fail("Organization ID is None, test setup failed.")
3238

3339
# Create a role and link two specific permissions
3440
role = Role(name="Test Role", organization_id=organization.id)
@@ -70,6 +76,10 @@ def test_user_organizations_property(session: Session, test_user: User, test_org
7076
Test that User.organizations property correctly returns all organizations
7177
the user belongs to via their roles.
7278
"""
79+
# Ensure organization ID is not None (for type checking)
80+
if test_organization.id is None:
81+
pytest.fail("Organization ID is None, test setup failed.")
82+
7383
# Create a role in the test organization
7484
role = Role(name="Test Role", organization_id=test_organization.id)
7585
session.add(role)
@@ -91,6 +101,10 @@ def test_organization_users_property(session: Session, test_user: User, test_org
91101
Test that Organization.users property correctly returns all users
92102
in the organization via their roles.
93103
"""
104+
# Ensure organization ID is not None (for type checking)
105+
if test_organization.id is None:
106+
pytest.fail("Organization ID is None, test setup failed.")
107+
94108
# Create a role in the test organization
95109
role = Role(name="Test Role", organization_id=test_organization.id)
96110
session.add(role)
@@ -116,6 +130,10 @@ def test_cascade_delete_organization(session: Session, test_user: User, test_org
116130
- Deletes role-user links
117131
- Does not delete users
118132
"""
133+
# Ensure organization ID is not None (for type checking)
134+
if test_organization.id is None:
135+
pytest.fail("Organization ID is None, test setup failed.")
136+
119137
# Create a role in the test organization
120138
role = Role(name="Test Role", organization_id=test_organization.id)
121139
session.add(role)
@@ -193,6 +211,10 @@ def test_user_has_permission(session: Session, test_user: User, test_organizatio
193211
Test that User.has_permission method correctly checks if a user has a specific
194212
permission for a given organization.
195213
"""
214+
# Ensure organization ID is not None (for type checking)
215+
if test_organization.id is None:
216+
pytest.fail("Organization ID is None, test setup failed.")
217+
196218
# Create a role with specific permissions in the test organization
197219
role = Role(name="Test Role", organization_id=test_organization.id)
198220
session.add(role)
@@ -244,3 +266,50 @@ def test_cascade_delete_account_deletes_user(session: Session, test_account: Acc
244266

245267
# Verify the user was cascade deleted
246268
assert session.exec(select(User).where(User.account_id == test_account.id)).first() is None
269+
270+
271+
def test_role_name_unique_per_organization(session: Session, test_organization: Organization, second_test_organization: Organization):
272+
"""
273+
Test that role names must be unique within the same organization,
274+
but can be duplicated across different organizations.
275+
"""
276+
role_name = "UniqueRoleTest"
277+
278+
# Ensure organization IDs are not None (for type checking)
279+
if test_organization.id is None or second_test_organization.id is None:
280+
pytest.fail("Organization ID is None, test setup failed.")
281+
282+
# Create a role in the first organization
283+
role1 = Role(name=role_name, organization_id=test_organization.id)
284+
session.add(role1)
285+
session.commit()
286+
session.refresh(role1)
287+
assert role1.id is not None
288+
289+
# Attempt to create another role with the same name in the same organization
290+
role2_duplicate = Role(name=role_name, organization_id=test_organization.id)
291+
session.add(role2_duplicate)
292+
with pytest.raises(IntegrityError):
293+
session.commit()
294+
295+
# Rollback the session after the expected error
296+
session.rollback()
297+
298+
# Create a role with the same name in the second organization (should succeed)
299+
role3_different_org = Role(name=role_name, organization_id=second_test_organization.id)
300+
session.add(role3_different_org)
301+
session.commit()
302+
session.refresh(role3_different_org)
303+
assert role3_different_org.id is not None
304+
305+
# Verify the final state
306+
roles_org1 = session.exec(select(Role).where(Role.organization_id == test_organization.id)).all()
307+
roles_org2 = session.exec(select(Role).where(Role.organization_id == second_test_organization.id)).all()
308+
309+
# Org 1 should have exactly one role with the test name after the failed attempt
310+
count_org1_roles_with_name = sum(1 for role in roles_org1 if role.name == role_name)
311+
assert count_org1_roles_with_name == 1
312+
313+
# Org 2 should only have the one role we added
314+
assert len(roles_org2) == 1
315+
assert roles_org2[0].name == role_name

0 commit comments

Comments
 (0)