diff --git a/routers/role.py b/routers/role.py index 1a89f2d..e94a99a 100644 --- a/routers/role.py +++ b/routers/role.py @@ -85,16 +85,6 @@ class RoleUpdate(BaseModel): organization_id: int permissions: List[ValidPermissions] - @field_validator("id") - @classmethod - def validate_role_exists(cls, id: int, info): - session = info.context.get("session") - if session: - role = session.get(Role, id) - if not role or not role.id: - raise RoleNotFoundError() - return id - @classmethod async def as_form( cls, diff --git a/tests/test_organization.py b/tests/test_organization.py index 978c019..bff6df2 100644 --- a/tests/test_organization.py +++ b/tests/test_organization.py @@ -1,6 +1,4 @@ -# test_organization.py - -from utils.models import Organization, Role +from utils.models import Organization, Role, Permission, ValidPermissions from sqlmodel import select def test_create_organization_success(auth_client, session, test_user): @@ -66,3 +64,207 @@ def test_create_organization_unauthenticated(unauth_client): ) assert response.status_code == 303 # Unauthorized + +def test_update_organization_success(auth_client, session, test_organization, test_user): + """Test successful organization update""" + # Set up test user as owner with edit permission + owner_role = Role(name="Owner", organization_id=test_organization.id) + owner_role.permissions = [ + Permission(name=ValidPermissions.EDIT_ORGANIZATION) + ] + owner_role.users.append(test_user) + session.add(owner_role) + session.commit() + + new_name = "Updated Organization Name" + response = auth_client.post( + f"/organizations/update/{test_organization.id}", + data={"id": test_organization.id, "name": new_name}, + follow_redirects=False + ) + + assert response.status_code == 303 # Redirect status code + assert "/profile" in response.headers["location"] + + # Verify database update + updated_org = session.get(Organization, test_organization.id) + assert updated_org.name == new_name + +def test_update_organization_unauthorized(auth_client, session, test_organization, test_user): + """Test organization update without proper permissions""" + # Add user to organization but without edit permission + basic_role = Role(name="Basic", organization_id=test_organization.id) + basic_role.users.append(test_user) + session.add(basic_role) + session.commit() + + response = auth_client.post( + f"/organizations/update/{test_organization.id}", + data={ + "id": test_organization.id, + "name": "Unauthorized Update" + }, + follow_redirects=False + ) + + assert response.status_code == 403 + assert "permission" in response.text.lower() + +def test_update_organization_duplicate_name(auth_client, session, test_organization, test_user): + """Test organization update with duplicate name""" + # Create another organization with the target name + existing_org = Organization(name="Existing Org") + session.add(existing_org) + + # Set up permissions + owner_role = Role(name="Owner", organization_id=test_organization.id) + owner_role.permissions = [ + Permission(name=ValidPermissions.EDIT_ORGANIZATION) + ] + owner_role.users.append(test_user) + session.add(owner_role) + session.commit() + + response = auth_client.post( + f"/organizations/update/{test_organization.id}", + data={ + "id": test_organization.id, + "name": "Existing Org" + }, + follow_redirects=False + ) + + assert response.status_code == 400 + assert "organization name already taken" in response.text.lower() + +def test_update_organization_empty_name(auth_client, session, test_organization, test_user): + """Test organization update with empty name""" + # Set up permissions + owner_role = Role(name="Owner", organization_id=test_organization.id) + owner_role.permissions = [ + Permission(name=ValidPermissions.EDIT_ORGANIZATION) + ] + owner_role.users.append(test_user) + session.add(owner_role) + session.commit() + + response = auth_client.post( + f"/organizations/update/{test_organization.id}", + data={ + "id": test_organization.id, + "name": " " + }, + follow_redirects=False + ) + + assert response.status_code == 400 + assert "organization name cannot be empty" in response.text.lower() + +def test_update_organization_unauthenticated(unauth_client, test_organization): + """Test organization update without authentication""" + response = unauth_client.post( + f"/organizations/update/{test_organization.id}", + data={ + "id": test_organization.id, + "name": "Unauthorized Update" + }, + follow_redirects=False + ) + + assert response.status_code == 303 # Redirect to login + +def test_delete_organization_success(auth_client, session, test_organization, test_user): + """Test successful organization deletion""" + # Set up test user as owner with delete permission + owner_role = Role(name="Owner", organization_id=test_organization.id) + owner_role.permissions = [ + Permission(name=ValidPermissions.DELETE_ORGANIZATION) + ] + owner_role.users.append(test_user) + session.add(owner_role) + session.commit() + + response = auth_client.post( + f"/organizations/delete/{test_organization.id}", + follow_redirects=False + ) + + assert response.status_code == 303 # Redirect status code + assert "/profile" in response.headers["location"] + + # Verify organization was deleted + deleted_org = session.get(Organization, test_organization.id) + assert deleted_org is None + +def test_delete_organization_unauthorized(auth_client, session, test_organization, test_user): + """Test organization deletion without proper permissions""" + # Add user to organization but without delete permission + basic_role = Role(name="Owner", organization_id=test_organization.id) + basic_role.users.append(test_user) + session.add(basic_role) + session.commit() + + response = auth_client.post( + f"/organizations/delete/{test_organization.id}", + follow_redirects=False + ) + + assert response.status_code == 403 + assert "permission" in response.text.lower() + + # Verify organization still exists + org = session.get(Organization, test_organization.id) + assert org is not None + +def test_delete_organization_not_member(auth_client, session, test_organization, test_user): + """Test organization deletion by non-member""" + response = auth_client.post( + f"/organizations/delete/{test_organization.id}", + follow_redirects=False + ) + + assert response.status_code == 403 + assert "permission" in response.text.lower() + + # Verify organization still exists + org = session.get(Organization, test_organization.id) + assert org is not None + +def test_delete_organization_unauthenticated(unauth_client, test_organization): + """Test organization deletion without authentication""" + response = unauth_client.post( + f"/organizations/delete/{test_organization.id}", + follow_redirects=False + ) + + assert response.status_code == 303 # Redirect to login + +def test_delete_organization_cascade(auth_client, session, test_organization, test_user): + """Test that deleting organization cascades to roles""" + # Set up test user as owner with delete permission + owner_role = Role(name="Owner", organization_id=test_organization.id) + owner_role.permissions = [ + Permission(name=ValidPermissions.DELETE_ORGANIZATION) + ] + owner_role.users.append(test_user) + + # Add another role to verify cascade + member_role = Role(name="Member", organization_id=test_organization.id) + + session.add(owner_role) + session.add(member_role) + session.commit() + + response = auth_client.post( + f"/organizations/delete/{test_organization.id}", + follow_redirects=False + ) + + assert response.status_code == 303 + + # Verify roles were also deleted + roles = session.exec( + select(Role) + .where(Role.organization_id == test_organization.id) + ).all() + assert len(roles) == 0 diff --git a/tests/test_role.py b/tests/test_role.py index e44421b..3d6e488 100644 --- a/tests/test_role.py +++ b/tests/test_role.py @@ -1,6 +1,7 @@ # test_role.py import pytest +from .conftest import SetupError from utils.models import Role, Permission, ValidPermissions, User from sqlmodel import Session, select @@ -109,3 +110,347 @@ def test_create_role_unauthenticated(unauth_client, test_organization): ) assert response.status_code == 303 + + +@pytest.fixture +def editor_user(session: Session, test_user: User, test_organization): + """ + Creates a user who has EDIT_ROLE permission assigned via a role in the + specified organization. + """ + editor_role = Role( + name="Editor Role", + organization_id=test_organization.id + ) + edit_permission = session.exec( + select(Permission).where(Permission.name == ValidPermissions.EDIT_ROLE) + ).first() + if not edit_permission: + raise ValueError("EDIT_ROLE permission not found in the Permission table. Check seeds/setup.") + + editor_role.permissions.append(edit_permission) + session.add(editor_role) + + # Assign the newly created 'Editor Role' to our test user + test_user.roles.append(editor_role) + session.commit() + return test_user + + +def test_update_role_success(auth_client, editor_user, test_organization, session: Session): + """ + Test successfully updating a role's name and permissions. + Ensures a user with EDIT_ROLE permission can update the role. + """ + # Create a role we will update + existing_role = Role( + name="Old Role Name", + organization_id=test_organization.id + ) + session.add(existing_role) + session.commit() + session.refresh(existing_role) + + # Add an existing permission to the role so we can test it being removed + perm_create = session.exec( + select(Permission).where(Permission.name == ValidPermissions.CREATE_ROLE) + ).first() + if not perm_create: + raise SetupError("Test setup failed; CREATE_ROLE permission not found.") + + existing_role.permissions.append(perm_create) + session.commit() + + # Verify setup + assert existing_role.id is not None + original_id = existing_role.id + + # Update the role using the /roles/update endpoint + response = auth_client.post( + "/roles/update", + data={ + "id": existing_role.id, + "name": "New Role Name", + "organization_id": test_organization.id, + "permissions": [ValidPermissions.EDIT_ROLE.value] # remove CREATE_ROLE, add EDIT_ROLE + }, + follow_redirects=False + ) + + assert response.status_code == 303 + + # Check that the role was updated in the database + updated_role = session.exec( + select(Role).where(Role.id == original_id) + ).first() + assert updated_role is not None + assert updated_role.name == "New Role Name" + perm_names = [p.name for p in updated_role.permissions] + assert ValidPermissions.CREATE_ROLE not in perm_names + assert ValidPermissions.EDIT_ROLE in perm_names + + +def test_update_role_unauthorized(auth_client, test_user, test_organization, session: Session): + """ + Test that a user without EDIT_ROLE permission cannot update a role. + A 403 (InsufficientPermissionsError) is expected. + """ + # Create a role in the same organization that we try to update + some_role = Role( + name="Role Without Permission", + organization_id=test_organization.id + ) + session.add(some_role) + session.commit() + session.refresh(some_role) + + response = auth_client.post( + "/roles/update", + data={ + "id": some_role.id, + "name": "Attempted Update", + "organization_id": test_organization.id, + "permissions": [ValidPermissions.EDIT_ROLE.value] + }, + follow_redirects=True + ) + # Because the user has no EDIT_ROLE permission, the endpoint should raise 403 + assert response.status_code == 403 + + +def test_update_role_nonexistent(auth_client, editor_user, test_organization): + """ + Test attempting to update a role that does not exist. + A 404 (RoleNotFoundError) is expected. + """ + response = auth_client.post( + "/roles/update", + data={ + "id": 9999999, # A role ID that doesn't exist + "name": "Nonexistent Role", + "organization_id": test_organization.id, + "permissions": [ValidPermissions.EDIT_ROLE.value] + }, + follow_redirects=True + ) + assert response.status_code == 404 + + +def test_update_role_duplicate_name(auth_client, editor_user, test_organization, session: Session): + """ + Test that updating a role to a name that already exists in the same organization + fails with 400 (RoleAlreadyExistsError). + """ + # Create two roles in the same organization + role1 = Role(name="Original Role", organization_id=test_organization.id) + role2 = Role(name="Conflict Role", organization_id=test_organization.id) + session.add(role1) + session.add(role2) + session.commit() + + # Try to update 'role1' to have the same name as 'role2' + response = auth_client.post( + "/roles/update", + data={ + "id": role1.id, + "name": "Conflict Role", + "organization_id": test_organization.id, + "permissions": [ValidPermissions.EDIT_ROLE.value] + }, + follow_redirects=True + ) + + assert response.status_code == 400 + + +def test_update_role_invalid_permission(auth_client, editor_user, test_organization, session: Session): + """ + Test attempting to update a role with an invalid permission + that is not in the ValidPermissions enum. Expects a 400 status. + """ + role_to_update = Role( + name="Role With Bad Permission", + organization_id=test_organization.id + ) + session.add(role_to_update) + session.commit() + session.refresh(role_to_update) + + # Provide an invalid permission string + response = auth_client.post( + "/roles/update", + data={ + "id": role_to_update.id, + "name": "Invalid Permission Test", + "organization_id": test_organization.id, + "permissions": ["NOT_A_VALID_PERMISSION"] + }, + follow_redirects=True + ) + + assert response.status_code == 422 + + +def test_update_role_unauthenticated(unauth_client, test_organization, session: Session): + """ + Test that an unauthenticated user (no valid tokens) will not have access + to update a role. By default, the router requires login, so it should + redirect. + """ + # Create a role + some_role = Role( + name="Role For Unauth Test", + organization_id=test_organization.id + ) + session.add(some_role) + session.commit() + session.refresh(some_role) + + response = unauth_client.post( + "/roles/update", + data={ + "id": some_role.id, + "name": "Should Not Succeed", + "organization_id": test_organization.id, + "permissions": [ValidPermissions.EDIT_ROLE.value] + }, + follow_redirects=False + ) + assert response.status_code == 303 + + +@pytest.fixture +def delete_role_user(session: Session, test_user: User, test_organization): + """Create a user with DELETE_ROLE permission""" + delete_role = Role( + name="Delete Role Permission", + organization_id=test_organization.id + ) + + delete_permission: Permission | None = session.exec( + select(Permission).where(Permission.name == ValidPermissions.DELETE_ROLE) + ).first() + + if delete_permission is None: + raise ValueError("Error during test setup: DELETE_ROLE permission not found") + + delete_role.permissions.append(delete_permission) + session.add(delete_role) + + test_user.roles.append(delete_role) + session.commit() + + return test_user + + +def test_delete_role_success(auth_client, delete_role_user, test_organization, session: Session): + """Test successful role deletion""" + # Create a role to delete + role_to_delete = Role( + name="Role To Delete", + organization_id=test_organization.id + ) + session.add(role_to_delete) + session.commit() + session.refresh(role_to_delete) + + response = auth_client.post( + "/roles/delete", + data={ + "id": role_to_delete.id, + "organization_id": test_organization.id + }, + follow_redirects=False + ) + + assert response.status_code == 303 + + # Verify role was deleted from database + deleted_role = session.exec( + select(Role).where(Role.id == role_to_delete.id) + ).first() + assert deleted_role is None + + +def test_delete_role_unauthorized(auth_client, test_user, test_organization, session: Session): + """Test role deletion without proper permissions""" + # Create a role to attempt to delete + role = Role( + name="Unauthorized Delete", + organization_id=test_organization.id + ) + session.add(role) + session.commit() + + response = auth_client.post( + "/roles/delete", + data={ + "id": role.id, + "organization_id": test_organization.id + }, + follow_redirects=False + ) + + assert response.status_code == 403 + + +def test_delete_nonexistent_role(auth_client, delete_role_user, test_organization): + """Test attempting to delete a role that doesn't exist""" + response = auth_client.post( + "/roles/delete", + data={ + "id": 99999, # Non-existent role ID + "organization_id": test_organization.id + }, + follow_redirects=False + ) + + assert response.status_code == 404 + + +def test_delete_role_with_users(auth_client, delete_role_user, test_organization, session: Session): + """Test attempting to delete a role that has users assigned""" + # Create a role and assign it to a user + role_with_users = Role( + name="Role With Users", + organization_id=test_organization.id + ) + session.add(role_with_users) + session.commit() + + # Assign the role to our test user + delete_role_user.roles.append(role_with_users) + session.commit() + + response = auth_client.post( + "/roles/delete", + data={ + "id": role_with_users.id, + "organization_id": test_organization.id + }, + follow_redirects=False + ) + + assert response.status_code == 400 + + +def test_delete_role_unauthenticated(unauth_client, test_organization, session: Session): + """Test role deletion without authentication""" + # Create a role to attempt to delete + role = Role( + name="Unauthenticated Delete", + organization_id=test_organization.id + ) + session.add(role) + session.commit() + + response = unauth_client.post( + "/roles/delete", + data={ + "id": role.id, + "organization_id": test_organization.id + }, + follow_redirects=False + ) + + assert response.status_code == 303 # Redirects to login page