Skip to content

Database enforces that role names are unique for the organization #119

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions routers/role.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from fastapi.responses import RedirectResponse
from sqlmodel import Session, select, col
from sqlalchemy.orm import selectinload
from sqlalchemy.exc import IntegrityError
from utils.db import get_session
from utils.dependencies import get_authenticated_user
from utils.models import Role, Permission, ValidPermissions, utc_time, User, DataIntegrityError
Expand Down Expand Up @@ -54,7 +55,13 @@ def create_role(
db_role.permissions.extend(db_permissions)

# Commit transaction
session.commit()
try:
session.commit()
except IntegrityError:
session.rollback()
# This likely means the unique constraint (org_id, name) was violated,
# potentially due to a race condition if the pre-check passed.
raise RoleAlreadyExistsError()

return RedirectResponse(
url=organization_router.url_path_for("read_organization", org_id=organization_id),
Expand Down Expand Up @@ -123,7 +130,14 @@ def update_role(
db_role.name = name
db_role.updated_at = utc_time()

session.commit()
try:
session.commit()
except IntegrityError:
session.rollback()
# This likely means the unique constraint (org_id, name) was violated,
# potentially due to a race condition if the pre-check passed.
raise RoleAlreadyExistsError()

session.refresh(db_role)
return RedirectResponse(
url=organization_router.url_path_for("read_organization", org_id=organization_id),
Expand Down
192 changes: 148 additions & 44 deletions tests/routers/test_organization.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,17 +116,37 @@ def test_update_organization_success(
auth_client: TestClient, session: Session, test_organization: Organization, test_user: User
):
"""Test successful organization update"""
# Set up test user as owner with edit permission
# Ensure test_user has the EDIT_ORGANIZATION permission via the Owner role (already created by fixture)
if test_organization.id is None:
raise SetupError("Test organization ID is None")

owner_role = session.exec(
select(Role).where(
Role.organization_id == test_organization.id,
Role.name == "Owner"
)
).first()

if owner_role is None:
raise SetupError("Owner role not found for test organization.")

owner_role = Role(name="Owner", organization_id=test_organization.id)
owner_role.permissions = [
Permission(name=ValidPermissions.EDIT_ORGANIZATION)
]
owner_role.users.append(test_user)
session.add(owner_role)
# Ensure the permission is present (it should be by default)
edit_permission = session.exec(
select(Permission).where(Permission.name == ValidPermissions.EDIT_ORGANIZATION)
).first()
if edit_permission is None:
raise SetupError("EDIT_ORGANIZATION permission not found.")

if edit_permission not in owner_role.permissions:
owner_role.permissions.append(edit_permission) # Add just in case

# Ensure the user is assigned to the role (it should be by default for the creating user)
if test_user not in owner_role.users:
owner_role.users.append(test_user)

session.commit()
session.refresh(owner_role)
session.refresh(test_user)

new_name = "Updated Organization Name"
response = auth_client.post(
Expand Down Expand Up @@ -168,19 +188,38 @@ def test_update_organization_unauthorized(auth_client, session, test_organizatio
assert "permission" in response.text.lower()

def test_update_organization_duplicate_name(auth_client, session, test_organization, test_user):
"""Test organization update with duplicate name"""
# Create another organization with the target name
existing_org = Organization(name="Existing Org")
session.add(existing_org)

# Set up permissions
owner_role = Role(name="Owner", organization_id=test_organization.id)
owner_role.permissions = [
Permission(name=ValidPermissions.EDIT_ORGANIZATION)
]
owner_role.users.append(test_user)
session.add(owner_role)
# Ensure test_user has EDIT_ORGANIZATION permission via the Owner role
if test_organization.id is None:
raise SetupError("Test organization ID is None")

owner_role = session.exec(
select(Role).where(
Role.organization_id == test_organization.id,
Role.name == "Owner"
)
).first()

if owner_role is None:
raise SetupError("Owner role not found for test organization.")

edit_permission = session.exec(
select(Permission).where(Permission.name == ValidPermissions.EDIT_ORGANIZATION)
).first()
if edit_permission is None:
raise SetupError("EDIT_ORGANIZATION permission not found.")

if edit_permission not in owner_role.permissions:
owner_role.permissions.append(edit_permission)

if test_user not in owner_role.users:
owner_role.users.append(test_user)

session.commit()
session.refresh(owner_role)
session.refresh(test_user)

response = auth_client.post(
app.url_path_for("update_organization", org_id=test_organization.id),
Expand All @@ -196,14 +235,35 @@ def test_update_organization_duplicate_name(auth_client, session, test_organizat

def test_update_organization_empty_name(auth_client, session, test_organization, test_user):
"""Test organization update with empty name"""
# Set up permissions
owner_role = Role(name="Owner", organization_id=test_organization.id)
owner_role.permissions = [
Permission(name=ValidPermissions.EDIT_ORGANIZATION)
]
owner_role.users.append(test_user)
session.add(owner_role)
# Ensure test_user has EDIT_ORGANIZATION permission via the Owner role
if test_organization.id is None:
raise SetupError("Test organization ID is None")

owner_role = session.exec(
select(Role).where(
Role.organization_id == test_organization.id,
Role.name == "Owner"
)
).first()

if owner_role is None:
raise SetupError("Owner role not found for test organization.")

edit_permission = session.exec(
select(Permission).where(Permission.name == ValidPermissions.EDIT_ORGANIZATION)
).first()
if edit_permission is None:
raise SetupError("EDIT_ORGANIZATION permission not found.")

if edit_permission not in owner_role.permissions:
owner_role.permissions.append(edit_permission)

if test_user not in owner_role.users:
owner_role.users.append(test_user)

session.commit()
session.refresh(owner_role)
session.refresh(test_user)

response = auth_client.post(
app.url_path_for("update_organization", org_id=test_organization.id),
Expand Down Expand Up @@ -235,15 +295,35 @@ def test_delete_organization_success(auth_client, session, test_organization, te
"""Test successful organization deletion"""
# Store the organization ID for later verification
org_id = test_organization.id
if org_id is None: # Add check for None
raise SetupError("Test organization ID is None")

# Set up test user as owner with delete permission
owner_role = Role(name="Owner", organization_id=org_id)
owner_role.permissions = [
Permission(name=ValidPermissions.DELETE_ORGANIZATION)
]
owner_role.users.append(test_user)
session.add(owner_role)
session.commit()
# Ensure test_user has DELETE_ORGANIZATION permission via the Owner role
owner_role = session.exec(
select(Role).where(
Role.organization_id == org_id,
Role.name == "Owner"
)
).first()

if owner_role is None:
raise SetupError("Owner role not found for test organization.")

delete_permission = session.exec(
select(Permission).where(Permission.name == ValidPermissions.DELETE_ORGANIZATION)
).first()
if delete_permission is None:
raise SetupError("DELETE_ORGANIZATION permission not found.")

if delete_permission not in owner_role.permissions:
owner_role.permissions.append(delete_permission)

if test_user not in owner_role.users:
owner_role.users.append(test_user)

session.commit() # Commit permission/user assignment changes
session.refresh(owner_role)
session.refresh(test_user)

response = auth_client.post(
app.url_path_for("delete_organization", org_id=org_id),
Expand Down Expand Up @@ -304,20 +384,44 @@ def test_delete_organization_cascade(auth_client, session, test_organization, te
"""Test that deleting organization cascades to roles"""
# Store the organization ID for later verification
org_id = test_organization.id
if org_id is None: # Add check for None
raise SetupError("Test organization ID is None")

# Set up test user as owner with delete permission
owner_role = Role(name="Owner", organization_id=org_id)
owner_role.permissions = [
Permission(name=ValidPermissions.DELETE_ORGANIZATION)
]
owner_role.users.append(test_user)

# Add another role to verify cascade
member_role = Role(name="Member", organization_id=org_id)

session.add(owner_role)
session.add(member_role)
session.commit()
# Ensure test_user has DELETE_ORGANIZATION permission via the Owner role
owner_role = session.exec(
select(Role).where(
Role.organization_id == org_id,
Role.name == "Owner"
)
).first()
if owner_role is None:
raise SetupError("Owner role not found for test organization.")

delete_permission = session.exec(
select(Permission).where(Permission.name == ValidPermissions.DELETE_ORGANIZATION)
).first()
if delete_permission is None:
raise SetupError("DELETE_ORGANIZATION permission not found.")

if delete_permission not in owner_role.permissions:
owner_role.permissions.append(delete_permission)

if test_user not in owner_role.users:
owner_role.users.append(test_user)

# Verify the Member role exists (created by fixture)
member_role = session.exec(
select(Role).where(
Role.organization_id == org_id,
Role.name == "Member"
)
).first()
if member_role is None:
raise SetupError("Member role not found for test organization. Fixture might have changed.")

session.commit() # Commit permission/user assignment changes
session.refresh(owner_role)
session.refresh(test_user)

response = auth_client.post(
app.url_path_for("delete_organization", org_id=org_id),
Expand Down
69 changes: 69 additions & 0 deletions tests/utils/test_models.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from datetime import timedelta, datetime, UTC
from typing import Optional
from sqlmodel import select, Session
from sqlalchemy.exc import IntegrityError
import pytest
from utils.models import (
Permission,
Role,
Expand Down Expand Up @@ -29,6 +31,10 @@ def test_permissions_persist_after_role_deletion(session: Session):
session.add(organization)
session.commit()
session.refresh(organization)

# Ensure organization ID is not None (for type checking)
if organization.id is None:
pytest.fail("Organization ID is None, test setup failed.")

# Create a role and link two specific permissions
role = Role(name="Test Role", organization_id=organization.id)
Expand Down Expand Up @@ -70,6 +76,10 @@ def test_user_organizations_property(session: Session, test_user: User, test_org
Test that User.organizations property correctly returns all organizations
the user belongs to via their roles.
"""
# Ensure organization ID is not None (for type checking)
if test_organization.id is None:
pytest.fail("Organization ID is None, test setup failed.")

# Create a role in the test organization
role = Role(name="Test Role", organization_id=test_organization.id)
session.add(role)
Expand All @@ -91,6 +101,10 @@ def test_organization_users_property(session: Session, test_user: User, test_org
Test that Organization.users property correctly returns all users
in the organization via their roles.
"""
# Ensure organization ID is not None (for type checking)
if test_organization.id is None:
pytest.fail("Organization ID is None, test setup failed.")

# Create a role in the test organization
role = Role(name="Test Role", organization_id=test_organization.id)
session.add(role)
Expand All @@ -116,6 +130,10 @@ def test_cascade_delete_organization(session: Session, test_user: User, test_org
- Deletes role-user links
- Does not delete users
"""
# Ensure organization ID is not None (for type checking)
if test_organization.id is None:
pytest.fail("Organization ID is None, test setup failed.")

# Create a role in the test organization
role = Role(name="Test Role", organization_id=test_organization.id)
session.add(role)
Expand Down Expand Up @@ -193,6 +211,10 @@ def test_user_has_permission(session: Session, test_user: User, test_organizatio
Test that User.has_permission method correctly checks if a user has a specific
permission for a given organization.
"""
# Ensure organization ID is not None (for type checking)
if test_organization.id is None:
pytest.fail("Organization ID is None, test setup failed.")

# Create a role with specific permissions in the test organization
role = Role(name="Test Role", organization_id=test_organization.id)
session.add(role)
Expand Down Expand Up @@ -244,3 +266,50 @@ def test_cascade_delete_account_deletes_user(session: Session, test_account: Acc

# Verify the user was cascade deleted
assert session.exec(select(User).where(User.account_id == test_account.id)).first() is None


def test_role_name_unique_per_organization(session: Session, test_organization: Organization, second_test_organization: Organization):
"""
Test that role names must be unique within the same organization,
but can be duplicated across different organizations.
"""
role_name = "UniqueRoleTest"

# Ensure organization IDs are not None (for type checking)
if test_organization.id is None or second_test_organization.id is None:
pytest.fail("Organization ID is None, test setup failed.")

# Create a role in the first organization
role1 = Role(name=role_name, organization_id=test_organization.id)
session.add(role1)
session.commit()
session.refresh(role1)
assert role1.id is not None

# Attempt to create another role with the same name in the same organization
role2_duplicate = Role(name=role_name, organization_id=test_organization.id)
session.add(role2_duplicate)
with pytest.raises(IntegrityError):
session.commit()

# Rollback the session after the expected error
session.rollback()

# Create a role with the same name in the second organization (should succeed)
role3_different_org = Role(name=role_name, organization_id=second_test_organization.id)
session.add(role3_different_org)
session.commit()
session.refresh(role3_different_org)
assert role3_different_org.id is not None

# Verify the final state
roles_org1 = session.exec(select(Role).where(Role.organization_id == test_organization.id)).all()
roles_org2 = session.exec(select(Role).where(Role.organization_id == second_test_organization.id)).all()

# Org 1 should have exactly one role with the test name after the failed attempt
count_org1_roles_with_name = sum(1 for role in roles_org1 if role.name == role_name)
assert count_org1_roles_with_name == 1

# Org 2 should only have the one role we added
assert len(roles_org2) == 1
assert roles_org2[0].name == role_name
Loading