diff --git a/routers/role.py b/routers/role.py index 81770b1..95c5b1e 100644 --- a/routers/role.py +++ b/routers/role.py @@ -6,6 +6,7 @@ from fastapi.responses import RedirectResponse from sqlmodel import Session, select, col from sqlalchemy.orm import selectinload +from sqlalchemy.exc import IntegrityError from utils.db import get_session from utils.dependencies import get_authenticated_user from utils.models import Role, Permission, ValidPermissions, utc_time, User, DataIntegrityError @@ -54,7 +55,13 @@ def create_role( db_role.permissions.extend(db_permissions) # Commit transaction - session.commit() + try: + session.commit() + except IntegrityError: + session.rollback() + # This likely means the unique constraint (org_id, name) was violated, + # potentially due to a race condition if the pre-check passed. + raise RoleAlreadyExistsError() return RedirectResponse( url=organization_router.url_path_for("read_organization", org_id=organization_id), @@ -123,7 +130,14 @@ def update_role( db_role.name = name db_role.updated_at = utc_time() - session.commit() + try: + session.commit() + except IntegrityError: + session.rollback() + # This likely means the unique constraint (org_id, name) was violated, + # potentially due to a race condition if the pre-check passed. + raise RoleAlreadyExistsError() + session.refresh(db_role) return RedirectResponse( url=organization_router.url_path_for("read_organization", org_id=organization_id), diff --git a/tests/routers/test_organization.py b/tests/routers/test_organization.py index 8984945..a70b303 100644 --- a/tests/routers/test_organization.py +++ b/tests/routers/test_organization.py @@ -116,17 +116,37 @@ def test_update_organization_success( auth_client: TestClient, session: Session, test_organization: Organization, test_user: User ): """Test successful organization update""" - # Set up test user as owner with edit permission + # Ensure test_user has the EDIT_ORGANIZATION permission via the Owner role (already created by fixture) if test_organization.id is None: raise SetupError("Test organization ID is None") + + owner_role = session.exec( + select(Role).where( + Role.organization_id == test_organization.id, + Role.name == "Owner" + ) + ).first() + + if owner_role is None: + raise SetupError("Owner role not found for test organization.") - owner_role = Role(name="Owner", organization_id=test_organization.id) - owner_role.permissions = [ - Permission(name=ValidPermissions.EDIT_ORGANIZATION) - ] - owner_role.users.append(test_user) - session.add(owner_role) + # Ensure the permission is present (it should be by default) + edit_permission = session.exec( + select(Permission).where(Permission.name == ValidPermissions.EDIT_ORGANIZATION) + ).first() + if edit_permission is None: + raise SetupError("EDIT_ORGANIZATION permission not found.") + + if edit_permission not in owner_role.permissions: + owner_role.permissions.append(edit_permission) # Add just in case + + # Ensure the user is assigned to the role (it should be by default for the creating user) + if test_user not in owner_role.users: + owner_role.users.append(test_user) + session.commit() + session.refresh(owner_role) + session.refresh(test_user) new_name = "Updated Organization Name" response = auth_client.post( @@ -168,19 +188,38 @@ def test_update_organization_unauthorized(auth_client, session, test_organizatio assert "permission" in response.text.lower() def test_update_organization_duplicate_name(auth_client, session, test_organization, test_user): - """Test organization update with duplicate name""" - # Create another organization with the target name existing_org = Organization(name="Existing Org") session.add(existing_org) - # Set up permissions - owner_role = Role(name="Owner", organization_id=test_organization.id) - owner_role.permissions = [ - Permission(name=ValidPermissions.EDIT_ORGANIZATION) - ] - owner_role.users.append(test_user) - session.add(owner_role) + # Ensure test_user has EDIT_ORGANIZATION permission via the Owner role + if test_organization.id is None: + raise SetupError("Test organization ID is None") + + owner_role = session.exec( + select(Role).where( + Role.organization_id == test_organization.id, + Role.name == "Owner" + ) + ).first() + + if owner_role is None: + raise SetupError("Owner role not found for test organization.") + + edit_permission = session.exec( + select(Permission).where(Permission.name == ValidPermissions.EDIT_ORGANIZATION) + ).first() + if edit_permission is None: + raise SetupError("EDIT_ORGANIZATION permission not found.") + + if edit_permission not in owner_role.permissions: + owner_role.permissions.append(edit_permission) + + if test_user not in owner_role.users: + owner_role.users.append(test_user) + session.commit() + session.refresh(owner_role) + session.refresh(test_user) response = auth_client.post( app.url_path_for("update_organization", org_id=test_organization.id), @@ -196,14 +235,35 @@ def test_update_organization_duplicate_name(auth_client, session, test_organizat def test_update_organization_empty_name(auth_client, session, test_organization, test_user): """Test organization update with empty name""" - # Set up permissions - owner_role = Role(name="Owner", organization_id=test_organization.id) - owner_role.permissions = [ - Permission(name=ValidPermissions.EDIT_ORGANIZATION) - ] - owner_role.users.append(test_user) - session.add(owner_role) + # Ensure test_user has EDIT_ORGANIZATION permission via the Owner role + if test_organization.id is None: + raise SetupError("Test organization ID is None") + + owner_role = session.exec( + select(Role).where( + Role.organization_id == test_organization.id, + Role.name == "Owner" + ) + ).first() + + if owner_role is None: + raise SetupError("Owner role not found for test organization.") + + edit_permission = session.exec( + select(Permission).where(Permission.name == ValidPermissions.EDIT_ORGANIZATION) + ).first() + if edit_permission is None: + raise SetupError("EDIT_ORGANIZATION permission not found.") + + if edit_permission not in owner_role.permissions: + owner_role.permissions.append(edit_permission) + + if test_user not in owner_role.users: + owner_role.users.append(test_user) + session.commit() + session.refresh(owner_role) + session.refresh(test_user) response = auth_client.post( app.url_path_for("update_organization", org_id=test_organization.id), @@ -235,15 +295,35 @@ def test_delete_organization_success(auth_client, session, test_organization, te """Test successful organization deletion""" # Store the organization ID for later verification org_id = test_organization.id + if org_id is None: # Add check for None + raise SetupError("Test organization ID is None") - # Set up test user as owner with delete permission - owner_role = Role(name="Owner", organization_id=org_id) - owner_role.permissions = [ - Permission(name=ValidPermissions.DELETE_ORGANIZATION) - ] - owner_role.users.append(test_user) - session.add(owner_role) - session.commit() + # Ensure test_user has DELETE_ORGANIZATION permission via the Owner role + owner_role = session.exec( + select(Role).where( + Role.organization_id == org_id, + Role.name == "Owner" + ) + ).first() + + if owner_role is None: + raise SetupError("Owner role not found for test organization.") + + delete_permission = session.exec( + select(Permission).where(Permission.name == ValidPermissions.DELETE_ORGANIZATION) + ).first() + if delete_permission is None: + raise SetupError("DELETE_ORGANIZATION permission not found.") + + if delete_permission not in owner_role.permissions: + owner_role.permissions.append(delete_permission) + + if test_user not in owner_role.users: + owner_role.users.append(test_user) + + session.commit() # Commit permission/user assignment changes + session.refresh(owner_role) + session.refresh(test_user) response = auth_client.post( app.url_path_for("delete_organization", org_id=org_id), @@ -304,20 +384,44 @@ def test_delete_organization_cascade(auth_client, session, test_organization, te """Test that deleting organization cascades to roles""" # Store the organization ID for later verification org_id = test_organization.id + if org_id is None: # Add check for None + raise SetupError("Test organization ID is None") - # Set up test user as owner with delete permission - owner_role = Role(name="Owner", organization_id=org_id) - owner_role.permissions = [ - Permission(name=ValidPermissions.DELETE_ORGANIZATION) - ] - owner_role.users.append(test_user) - - # Add another role to verify cascade - member_role = Role(name="Member", organization_id=org_id) - - session.add(owner_role) - session.add(member_role) - session.commit() + # Ensure test_user has DELETE_ORGANIZATION permission via the Owner role + owner_role = session.exec( + select(Role).where( + Role.organization_id == org_id, + Role.name == "Owner" + ) + ).first() + if owner_role is None: + raise SetupError("Owner role not found for test organization.") + + delete_permission = session.exec( + select(Permission).where(Permission.name == ValidPermissions.DELETE_ORGANIZATION) + ).first() + if delete_permission is None: + raise SetupError("DELETE_ORGANIZATION permission not found.") + + if delete_permission not in owner_role.permissions: + owner_role.permissions.append(delete_permission) + + if test_user not in owner_role.users: + owner_role.users.append(test_user) + + # Verify the Member role exists (created by fixture) + member_role = session.exec( + select(Role).where( + Role.organization_id == org_id, + Role.name == "Member" + ) + ).first() + if member_role is None: + raise SetupError("Member role not found for test organization. Fixture might have changed.") + + session.commit() # Commit permission/user assignment changes + session.refresh(owner_role) + session.refresh(test_user) response = auth_client.post( app.url_path_for("delete_organization", org_id=org_id), diff --git a/tests/utils/test_models.py b/tests/utils/test_models.py index 13ba815..39a069c 100644 --- a/tests/utils/test_models.py +++ b/tests/utils/test_models.py @@ -1,6 +1,8 @@ from datetime import timedelta, datetime, UTC from typing import Optional from sqlmodel import select, Session +from sqlalchemy.exc import IntegrityError +import pytest from utils.models import ( Permission, Role, @@ -29,6 +31,10 @@ def test_permissions_persist_after_role_deletion(session: Session): session.add(organization) session.commit() session.refresh(organization) + + # Ensure organization ID is not None (for type checking) + if organization.id is None: + pytest.fail("Organization ID is None, test setup failed.") # Create a role and link two specific permissions role = Role(name="Test Role", organization_id=organization.id) @@ -70,6 +76,10 @@ def test_user_organizations_property(session: Session, test_user: User, test_org Test that User.organizations property correctly returns all organizations the user belongs to via their roles. """ + # Ensure organization ID is not None (for type checking) + if test_organization.id is None: + pytest.fail("Organization ID is None, test setup failed.") + # Create a role in the test organization role = Role(name="Test Role", organization_id=test_organization.id) session.add(role) @@ -91,6 +101,10 @@ def test_organization_users_property(session: Session, test_user: User, test_org Test that Organization.users property correctly returns all users in the organization via their roles. """ + # Ensure organization ID is not None (for type checking) + if test_organization.id is None: + pytest.fail("Organization ID is None, test setup failed.") + # Create a role in the test organization role = Role(name="Test Role", organization_id=test_organization.id) session.add(role) @@ -116,6 +130,10 @@ def test_cascade_delete_organization(session: Session, test_user: User, test_org - Deletes role-user links - Does not delete users """ + # Ensure organization ID is not None (for type checking) + if test_organization.id is None: + pytest.fail("Organization ID is None, test setup failed.") + # Create a role in the test organization role = Role(name="Test Role", organization_id=test_organization.id) session.add(role) @@ -193,6 +211,10 @@ def test_user_has_permission(session: Session, test_user: User, test_organizatio Test that User.has_permission method correctly checks if a user has a specific permission for a given organization. """ + # Ensure organization ID is not None (for type checking) + if test_organization.id is None: + pytest.fail("Organization ID is None, test setup failed.") + # Create a role with specific permissions in the test organization role = Role(name="Test Role", organization_id=test_organization.id) session.add(role) @@ -244,3 +266,50 @@ def test_cascade_delete_account_deletes_user(session: Session, test_account: Acc # Verify the user was cascade deleted assert session.exec(select(User).where(User.account_id == test_account.id)).first() is None + + +def test_role_name_unique_per_organization(session: Session, test_organization: Organization, second_test_organization: Organization): + """ + Test that role names must be unique within the same organization, + but can be duplicated across different organizations. + """ + role_name = "UniqueRoleTest" + + # Ensure organization IDs are not None (for type checking) + if test_organization.id is None or second_test_organization.id is None: + pytest.fail("Organization ID is None, test setup failed.") + + # Create a role in the first organization + role1 = Role(name=role_name, organization_id=test_organization.id) + session.add(role1) + session.commit() + session.refresh(role1) + assert role1.id is not None + + # Attempt to create another role with the same name in the same organization + role2_duplicate = Role(name=role_name, organization_id=test_organization.id) + session.add(role2_duplicate) + with pytest.raises(IntegrityError): + session.commit() + + # Rollback the session after the expected error + session.rollback() + + # Create a role with the same name in the second organization (should succeed) + role3_different_org = Role(name=role_name, organization_id=second_test_organization.id) + session.add(role3_different_org) + session.commit() + session.refresh(role3_different_org) + assert role3_different_org.id is not None + + # Verify the final state + roles_org1 = session.exec(select(Role).where(Role.organization_id == test_organization.id)).all() + roles_org2 = session.exec(select(Role).where(Role.organization_id == second_test_organization.id)).all() + + # Org 1 should have exactly one role with the test name after the failed attempt + count_org1_roles_with_name = sum(1 for role in roles_org1 if role.name == role_name) + assert count_org1_roles_with_name == 1 + + # Org 2 should only have the one role we added + assert len(roles_org2) == 1 + assert roles_org2[0].name == role_name diff --git a/utils/models.py b/utils/models.py index 769f562..5f1b0f0 100644 --- a/utils/models.py +++ b/utils/models.py @@ -4,7 +4,7 @@ from typing import Optional, List, Union from pydantic import EmailStr from sqlmodel import SQLModel, Field, Relationship -from sqlalchemy import Column, Enum as SQLAlchemyEnum, LargeBinary +from sqlalchemy import Column, Enum as SQLAlchemyEnum, LargeBinary, UniqueConstraint from sqlalchemy.orm import Mapped from utils.enums import ValidPermissions from exceptions.http_exceptions import DataIntegrityError @@ -220,6 +220,10 @@ class Role(SQLModel, table=True): back_populates="roles", link_model=RolePermissionLink ) + + __table_args__ = ( + UniqueConstraint("organization_id", "name", name="uq_role_organization_name"), + ) class Permission(SQLModel, table=True): """