From da71ceafd8e8362ab00cde861661500f615f8249 Mon Sep 17 00:00:00 2001 From: AlanCoding Date: Fri, 5 Sep 2025 09:24:49 -0400 Subject: [PATCH 1/5] Remove role-tracking feature no longer used --- ansible_base/rbac/models/role.py | 5 - ansible_base/rbac/permission_registry.py | 13 -- ansible_base/rbac/triggers.py | 85 ----------- docs/apps/rbac/for_app_developers.md | 21 --- test_app/migrations/0001_initial.py | 3 - .../migrations/0006_team_admins_team_users.py | 14 +- test_app/models.py | 40 ----- test_app/tests/conftest.py | 4 +- .../tests/rbac/api/test_user_permissions.py | 140 ------------------ .../tests/rbac/features/test_role_tracking.py | 73 --------- .../test_assignment_visibility_security.py | 125 ++++++++++++++++ 11 files changed, 129 insertions(+), 394 deletions(-) delete mode 100644 test_app/tests/rbac/features/test_role_tracking.py create mode 100644 test_app/tests/rbac/test_assignment_visibility_security.py diff --git a/ansible_base/rbac/models/role.py b/ansible_base/rbac/models/role.py index ee90a1446..6d131b09b 100644 --- a/ansible_base/rbac/models/role.py +++ b/ansible_base/rbac/models/role.py @@ -302,11 +302,6 @@ def give_or_remove_permission(self, actor, content_object, giving=True, sync_act update_after_assignment(update_teams, to_update) - if not sync_action and self.name in permission_registry._trackers: - tracker = permission_registry._trackers[self.name] - with tracker.sync_active(): - tracker.sync_relationship(actor, content_object, giving=giving) - return assignment @classmethod diff --git a/ansible_base/rbac/permission_registry.py b/ansible_base/rbac/permission_registry.py index 134eb2377..0af451cf3 100644 --- a/ansible_base/rbac/permission_registry.py +++ b/ansible_base/rbac/permission_registry.py @@ -32,8 +32,6 @@ def __init__(self): self._parent_fields = dict() self._managed_roles = dict() # code-defined role definitions, managed=True self.apps_ready = False - self._tracked_relationships = set() - self._trackers = dict() def register(self, *args: Type[Model], parent_field_name: Optional[str] = 'organization'): if self.apps_ready: @@ -50,9 +48,6 @@ def register(self, *args: Type[Model], parent_field_name: Optional[str] = 'organ else: logger.debug(f'Model {cls._meta.model_name} registered to permission registry more than once') - def track_relationship(self, cls, relationship, role_name): - self._tracked_relationships.add((cls, relationship, role_name)) - def get_parent_model(self, model) -> Optional[type]: model = self._name_to_model[model._meta.model_name] parent_field_name = self.get_parent_fd_name(model) @@ -158,14 +153,6 @@ def call_when_apps_ready(self, apps, app_config) -> None: triggers.connect_rbac_signals(cls) connect_rbac_methods(cls) - for cls, relationship, role_name in self._tracked_relationships: - if role_name in self._trackers: - tracker = self._trackers[role_name] - else: - tracker = triggers.TrackedRelationship(cls, role_name) - self._trackers[role_name] = tracker - tracker.initialize(relationship) - self.register_managed_role_constructors() @property diff --git a/ansible_base/rbac/triggers.py b/ansible_base/rbac/triggers.py index 12e806bc8..91e57b6ec 100644 --- a/ansible_base/rbac/triggers.py +++ b/ansible_base/rbac/triggers.py @@ -1,5 +1,4 @@ import logging -from contextlib import contextmanager from typing import Optional, Union from uuid import UUID @@ -303,90 +302,6 @@ def post_migration_rbac_setup(sender, *args, **kwargs): compute_object_role_permissions() -class TrackedRelationship: - def __init__(self, cls, role_name): - self.cls = cls - self.role_name = role_name - self.user_relationship = None - self.team_relationship = None - self._active_sync_flag = False - - def initialize(self, relationship): - manager = getattr(self.cls, relationship) - related_model_name = manager.field.related_model._meta.model_name - if related_model_name == permission_registry.team_model._meta.model_name: - self.team_relationship = relationship - m2m_changed.connect(self.sync_team_to_role, sender=manager.through) - elif related_model_name == permission_registry.user_model._meta.model_name: - self.user_relationship = relationship - m2m_changed.connect(self.sync_user_to_role, sender=manager.through) - else: - raise RuntimeError(f'Can only register user or team relationships, obtained {related_model_name}') - - @contextmanager - def sync_active(self): - try: - self._active_sync_flag = True - yield - finally: - self._active_sync_flag = False - - def sync_relationship(self, actor, content_object, giving=True): - # Exit if role does not apply for the intended model type, for example - # if user is given "team-member" role to organization, do not add user to the team members - if content_object._meta.model_name != self.cls._meta.model_name: - return - - if actor._meta.model_name == permission_registry.team_model._meta.model_name: - if self.team_relationship is None: - return - manager = getattr(content_object, self.team_relationship) - elif actor._meta.model_name == permission_registry.user_model._meta.model_name: - if self.user_relationship is None: - return - manager = getattr(content_object, self.user_relationship) - - if giving: - manager.add(actor) - else: - manager.remove(actor) - - def _sync_actor_to_role(self, actor_model: type, instance: Model, action: str, pk_set: Optional[set[int]]): - if self._active_sync_flag: - return - if action.startswith('pre_'): - return - rd = RoleDefinition.objects.get(name=self.role_name) - - if action in ('post_add', 'post_remove'): - actor_set = pk_set - elif action == 'post_clear': - ct = permission_registry.content_type_model.objects.get_for_model(instance) - role = ObjectRole.objects.get(object_id=instance.pk, content_type=ct, role_definition=rd) - if actor_model._meta.model_name == 'team': - actor_set = set(role.teams.values_list('id', flat=True)) - else: - actor_set = set(role.users.values_list('id', flat=True)) - - giving = bool(action == 'post_add') - for actor in actor_model.objects.filter(pk__in=actor_set): - rd.give_or_remove_permission(actor, instance, giving=giving, sync_action=True) - - def sync_team_to_role(self, instance: Model, action: str, model: type, pk_set: Optional[set[int]], reverse: bool, **kwargs): - if not reverse: - self._sync_actor_to_role(permission_registry.team_model, instance, action, pk_set) - else: - for pk in pk_set: - self._sync_actor_to_role(permission_registry.team_model, model(pk=pk), action, {instance.pk}) - - def sync_user_to_role(self, instance: Model, action: str, model: type, pk_set: Optional[set[int]], reverse: bool, **kwargs): - if not reverse: - self._sync_actor_to_role(permission_registry.user_model, instance, action, pk_set) - else: - for pk in pk_set: - self._sync_actor_to_role(permission_registry.user_model, model(pk=pk), action, {instance.pk}) - - def connect_rbac_signals(cls): if cls._meta.model_name == permission_registry.team_model._meta.model_name: pre_delete.connect(team_pre_delete, sender=cls, dispatch_uid='stash-team-roles-before-delete') diff --git a/docs/apps/rbac/for_app_developers.md b/docs/apps/rbac/for_app_developers.md index 8ae79cabc..242dd20cf 100644 --- a/docs/apps/rbac/for_app_developers.md +++ b/docs/apps/rbac/for_app_developers.md @@ -344,27 +344,6 @@ If you create (in code) a role definition that sets `managed` to True, then thes rules will be disregarded for that particular role definition. Managed role definitions can not be created through the API, but can be created in code like migration scripts. -### Tracked Relationships - -Let's say that you are introducing RBAC, and you have already set up your API -with some relationship, like members of a team, and parents of a team -(to get nested teams). -This sub-feature will use signals to do bidirectional syncing of memberships of -that relationship with memberships of their corresponding role. - -``` -permission_registry.track_relationship(Team, 'users', 'Team Member') -permission_registry.track_relationship(Team, 'team_parents', 'Team Member') -``` - -This only works with our 2 "actor" types of users and teams. -Adding these lines will synchronize users and teams of team-object-roles with the "team-member" -role definition (by name) to the `Team.users` -and `Team.tracked_parents` ManyToMany relationships, respectively. -So if you have a team object, `team.users.add(user)` will also give that -user _member permission_ to that team, where those permissions are defined by the -role definition with the name "team-member". - ### Role assignment callback diff --git a/test_app/migrations/0001_initial.py b/test_app/migrations/0001_initial.py index 53adf83b5..9ac5469c8 100644 --- a/test_app/migrations/0001_initial.py +++ b/test_app/migrations/0001_initial.py @@ -55,8 +55,6 @@ class Migration(migrations.Migration): ('description', models.TextField(blank=True, default='', help_text='The organization description.')), ('created_by', models.ForeignKey(default=None, editable=False, help_text='The user who created this resource', null=True, on_delete=django.db.models.deletion.DO_NOTHING, related_name='%(app_label)s_%(class)s_created+', to=settings.AUTH_USER_MODEL)), ('modified_by', models.ForeignKey(default=None, editable=False, help_text='The user who last modified this resource', null=True, on_delete=django.db.models.deletion.DO_NOTHING, related_name='%(app_label)s_%(class)s_modified+', to=settings.AUTH_USER_MODEL)), - ('admins', models.ManyToManyField(blank=True, help_text='The list of admins for this organization', related_name='admin_of_organizations', to=settings.AUTH_USER_MODEL)), - ('users', models.ManyToManyField(blank=True, help_text='The list of users on this organization', related_name='member_of_organizations', to=settings.AUTH_USER_MODEL)) ], options={'ordering': ['id'], 'permissions': [('member_organization', 'User is member of this organization')]}, ), @@ -85,7 +83,6 @@ class Migration(migrations.Migration): ('created_by', models.ForeignKey(default=None, editable=False, help_text='The user who created this resource', null=True, on_delete=django.db.models.deletion.DO_NOTHING, related_name='%(app_label)s_%(class)s_created+', to=settings.AUTH_USER_MODEL)), ('modified_by', models.ForeignKey(default=None, editable=False, help_text='The user who last modified this resource', null=True, on_delete=django.db.models.deletion.DO_NOTHING, related_name='%(app_label)s_%(class)s_modified+', to=settings.AUTH_USER_MODEL)), ('organization', models.ForeignKey(help_text='The organization of this team.', on_delete=django.db.models.deletion.CASCADE, related_name='teams', to=settings.ANSIBLE_BASE_ORGANIZATION_MODEL)), - ('team_parents', models.ManyToManyField(blank=True, related_name='team_children', to=settings.ANSIBLE_BASE_TEAM_MODEL)), ], options={ 'ordering': ('organization__name', 'name'), diff --git a/test_app/migrations/0006_team_admins_team_users.py b/test_app/migrations/0006_team_admins_team_users.py index 3d7a09de5..58abdef9f 100644 --- a/test_app/migrations/0006_team_admins_team_users.py +++ b/test_app/migrations/0006_team_admins_team_users.py @@ -1,7 +1,6 @@ # Generated by Django 4.2.8 on 2024-03-18 12:43 -from django.conf import settings -from django.db import migrations, models +from django.db import migrations class Migration(migrations.Migration): @@ -11,14 +10,5 @@ class Migration(migrations.Migration): ] operations = [ - migrations.AddField( - model_name='team', - name='admins', - field=models.ManyToManyField(blank=True, help_text='The list of admins for this team', related_name='teams_administered', to=settings.AUTH_USER_MODEL), - ), - migrations.AddField( - model_name='team', - name='users', - field=models.ManyToManyField(blank=True, help_text='The list of users on this team', related_name='teams', to=settings.AUTH_USER_MODEL), - ), + # Removed team admins and user relationsips ] diff --git a/test_app/models.py b/test_app/models.py index 0c920154e..097f4585b 100644 --- a/test_app/models.py +++ b/test_app/models.py @@ -30,20 +30,6 @@ class Meta: resource = AnsibleResourceField(primary_key_field="id") - users = models.ManyToManyField( - settings.AUTH_USER_MODEL, - related_name='member_of_organizations', - blank=True, - help_text="The list of users on this organization", - ) - - admins = models.ManyToManyField( - settings.AUTH_USER_MODEL, - related_name='admin_of_organizations', - blank=True, - help_text="The list of admins for this organization", - ) - extra_field = models.CharField(max_length=100, null=True) @@ -73,7 +59,6 @@ class ManagedUser(User): class Team(AbstractTeam): resource = AnsibleResourceField(primary_key_field="id") - team_parents = models.ManyToManyField('Team', related_name='team_children', blank=True) encryptioner = models.ForeignKey('test_app.EncryptionModel', on_delete=models.SET_NULL, null=True) @@ -89,20 +74,6 @@ class Meta: ordering = ('organization__name', 'name') permissions = [('member_team', 'Has all roles assigned to this team')] - users = models.ManyToManyField( - User, - related_name='teams', - blank=True, - help_text="The list of users on this team", - ) - - admins = models.ManyToManyField( - User, - related_name='teams_administered', - blank=True, - help_text="The list of admins for this team", - ) - class ResourceMigrationTestModel(models.Model): name = models.CharField(max_length=255) @@ -383,17 +354,6 @@ class Meta: permission_registry.register(ExtraExtraUUIDModel, parent_field_name='extra_uuid') -# NOTE(cutwater): Using hard coded role names instead of ones defined in ReconcileUser class, -# to avoid circular dependency between models and claims modules. This is a temporary workarond, -# since we plan to drop support of tracked relationships in future. -permission_registry.track_relationship(Team, 'users', 'Team Member') -permission_registry.track_relationship(Team, 'admins', 'Team Admin') -permission_registry.track_relationship(Team, 'team_parents', 'Team Member') - -permission_registry.track_relationship(Organization, 'users', 'Organization Member') -permission_registry.track_relationship(Organization, 'admins', 'Organization Admin') - - class MultipleFieldsModel(NamedCommonModel): class Meta: app_label = "test_app" diff --git a/test_app/tests/conftest.py b/test_app/tests/conftest.py index cc508b6a8..0edb37f89 100644 --- a/test_app/tests/conftest.py +++ b/test_app/tests/conftest.py @@ -674,7 +674,7 @@ def org_member_rd(): @pytest.fixture def member_rd(): - "Member role for a team, place in root conftest because it is needed for the team users tracked relationship" + "Member role for a team" RoleDefinition.objects.managed.clear() yield RoleDefinition.objects.managed.team_member RoleDefinition.objects.managed.clear() @@ -682,7 +682,7 @@ def member_rd(): @pytest.fixture def admin_rd(): - "Member role for a team, place in root conftest because it is needed for the team users tracked relationship" + "Admin role for a team" RoleDefinition.objects.managed.clear() yield RoleDefinition.objects.managed.team_admin RoleDefinition.objects.managed.clear() diff --git a/test_app/tests/rbac/api/test_user_permissions.py b/test_app/tests/rbac/api/test_user_permissions.py index 8b0cda1b2..144eb6780 100644 --- a/test_app/tests/rbac/api/test_user_permissions.py +++ b/test_app/tests/rbac/api/test_user_permissions.py @@ -216,143 +216,3 @@ def test_team_admins_can_add_children(self, user, user_api_client, organization, response = user_api_client.post(url, data=data) assert response.status_code == 201, response.data assert rando.has_obj_perm(inventory, 'change') - - -@pytest.mark.django_db -class TestRelatedUserListView: - def _initial_check(self, url, user_api_client, count=0): - response = user_api_client.get(url) - assert response.status_code == 200 - assert response.data['count'] == count - - def _assign_users(self, role_definition, object): - members = [ - User.objects.create(username='rando1'), - User.objects.create(username='rando2'), - User.objects.create(username='rando3'), - ] - role_definition.give_permission(members[0], object) - role_definition.give_permission(members[1], object) - - def test_org_admin_list_org_members(self, user, user_api_client, organization, org_member_rd, org_admin_rd): - org_admin_rd.give_permission(user, organization) - - url = get_relative_url('organization-users-list', kwargs={'pk': organization.pk}) - self._initial_check(url, user_api_client) - self._assign_users(org_member_rd, organization) - - response = user_api_client.get(url) - assert response.status_code == 200 - assert response.data['count'] == 2 - - def test_org_member_list_org_members(self, user, user_api_client, organization, org_member_rd): - org_member_rd.give_permission(user, organization) - - url = get_relative_url('organization-users-list', kwargs={'pk': organization.pk}) - self._initial_check(url, user_api_client, 1) - self._assign_users(org_member_rd, organization) - - response = user_api_client.get(url) - assert response.status_code == 200 - assert response.data['count'] == 3 - - def test_superadmin_list_org_members(self, user, user_api_client, organization, org_member_rd): - user.is_superuser = True - user.save() - - url = get_relative_url('organization-users-list', kwargs={'pk': organization.pk}) - self._initial_check(url, user_api_client) - self._assign_users(org_member_rd, organization) - - response = user_api_client.get(url) - assert response.status_code == 200 - assert response.data['count'] == 2 - - -@pytest.mark.django_db -class TestRelationshipBasedAssignment: - """Tests permissions via tracked_relationship feature, duplicated functionality with TestRoleBasedAssignment - - Philosophically, this should perform the same actions and make the same assertions as TestRoleBasedAssignment - because under the hood, signals are used to make these memberships exactly the same - as the corresponding role assignments - """ - - def test_parent_object_view_permission(self, user, user_api_client, organization, org_member_rd): - url = get_relative_url('organization-list') - response = user_api_client.get(url) - assert response.data['count'] == 0 - - url = get_relative_url('organization-users-list', kwargs={'pk': organization.pk}) - response = user_api_client.get(url) - assert response.status_code == 404, response.data - - org_member_rd.give_permission(user, organization) - response = user_api_client.get(url) - assert response.status_code == 200, response.data - assert user.username in set(item['username'] for item in response.data['results']) - - def test_org_admins_can_add_members(self, user, user_api_client, organization, org_member_rd, org_admin_rd): - rando = User.objects.create(username='rando') - url = get_relative_url('organization-users-associate', kwargs={'pk': organization.pk}) - - org_member_rd.give_permission(user, organization) - - data = {'instances': [rando.id]} - - response = user_api_client.post(url, data=data) - assert response.status_code == 403, response.data - assert not rando.has_obj_perm(organization, 'member') # sanity, verify atomicity - - org_admin_rd.give_permission(user, organization) - - response = user_api_client.post(url, data=data) - assert response.status_code == 204, response.data - assert rando.has_obj_perm(organization, 'member') - - @override_settings(ORG_ADMINS_CAN_SEE_ALL_USERS=False) - def test_associate_needs_view_permission(self, user, user_api_client, organization, org_member_rd, org_admin_rd): - "Need view permission to user to associate to an organization" - org_admin_rd.give_permission(user, organization) - rando = User.objects.create(username='rando') - assert not visible_users(user).filter(pk=rando.id).exists() # sanity - url = get_relative_url('organization-admins-associate', kwargs={'pk': organization.pk}) - - data = {'instances': [rando.id]} - - # user can not see other user rando, should get related object does not exist error - response = user_api_client.post(url, data=data) - assert response.status_code == 400, response.data - assert not rando.has_obj_perm(organization, 'change') - - org_member_rd.give_permission(rando, organization) - assert visible_users(user).filter(pk=rando.id).exists() - - # user can see other user rando, and can make rando an organization admin - response = user_api_client.post(url, data=data) - assert response.status_code == 204, response.data - assert rando.has_obj_perm(organization, 'change') # action took full effect - - def test_sublist_visibility(self, user, user_api_client, team, member_rd, admin_rd): - url = get_relative_url('team-users-list', kwargs={'pk': team.pk}) - - # with no permissions, user should not be able to make a GET to members list - response = user_api_client.get(url) - assert response.status_code == 404 - - rando = User.objects.create(username='rando') - member_rd.give_permission(rando, team) - admin_rd.give_permission(user, team) - assert not visible_users(user).filter(pk=rando.id).exists() # sanity - - # even though user can not see rando, they still show in members list - # this would be important if user wanted to remove rando permissions - response = user_api_client.get(url) - assert response.status_code == 200 - assert 'rando' in set(item['username'] for item in response.data['results']) - - @override_settings(ORG_ADMINS_CAN_SEE_ALL_USERS=False) - def test_visible_users_flags(self, admin_user, user, team, member_rd): - assert set(visible_users(user).values_list('id', flat=True)) == {admin_user.id, user.id} - assert set(visible_users(user, always_show_superusers=False).values_list('id', flat=True)) == {user.id} - assert set(visible_users(user, always_show_self=False).values_list('id', flat=True)) == {admin_user.id} diff --git a/test_app/tests/rbac/features/test_role_tracking.py b/test_app/tests/rbac/features/test_role_tracking.py deleted file mode 100644 index 65128cef8..000000000 --- a/test_app/tests/rbac/features/test_role_tracking.py +++ /dev/null @@ -1,73 +0,0 @@ -import pytest - -from ansible_base.rbac import permission_registry - - -@pytest.mark.django_db -def test_add_user_to_team_relationship(team, rando, inventory, inv_rd, member_rd): - inv_rd.give_permission(team, inventory) - assert not rando.has_obj_perm(team, 'member_team') - assert not rando.has_obj_perm(inventory, 'change_inventory') - - team.users.add(rando) - assert rando.has_obj_perm(team, 'member_team') - assert rando.has_obj_perm(inventory, 'change_inventory') - - team.users.clear() - assert not rando.has_obj_perm(team, 'member_team') - assert not rando.has_obj_perm(inventory, 'change_inventory') - - -@pytest.mark.django_db -def test_add_user_to_tracked_role(team, rando, member_rd): - assert not rando.has_obj_perm(team, 'member_team') - - member_rd.give_permission(rando, team) - assert rando in team.users.all() - - member_rd.remove_permission(rando, team) - assert rando not in team.users.all() - - -@pytest.mark.django_db -def test_add_team_to_tracked_relationship(rando, organization, member_rd): - child_team = permission_registry.team_model.objects.create(name='child-team', organization=organization) - parent_team = permission_registry.team_model.objects.create(name='parent-team', organization=organization) - member_rd.give_permission(rando, parent_team) - assert not rando.has_obj_perm(child_team, 'member') - - child_team.team_parents.add(parent_team) - assert rando.has_obj_perm(child_team, 'member') - - child_team.team_parents.clear() - assert not rando.has_obj_perm(child_team, 'member') - - -@pytest.mark.django_db -def test_add_team_to_tracked_role(rando, organization, member_rd): - child_team = permission_registry.team_model.objects.create(name='child-team', organization=organization) - parent_team = permission_registry.team_model.objects.create(name='parent-team', organization=organization) - member_rd.give_permission(rando, parent_team) - assert not rando.has_obj_perm(child_team, 'member') - - member_rd.give_permission(parent_team, child_team) - assert parent_team in child_team.team_parents.all() - - member_rd.remove_permission(parent_team, child_team) - assert parent_team not in child_team.team_parents.all() - - -@pytest.mark.django_db -@pytest.mark.parametrize("reverse", [True, False]) -def test_add_organization_member_to_relationship(rando, organization, org_member_rd, reverse): - assert not rando.has_obj_perm(organization, 'member') - - if reverse: - rando.member_of_organizations.add(organization) - else: - organization.users.add(rando) - - assert org_member_rd.object_roles.count() == 1 - object_role = org_member_rd.object_roles.first() - assert rando in object_role.users.all() - assert rando.has_obj_perm(organization, 'member') diff --git a/test_app/tests/rbac/test_assignment_visibility_security.py b/test_app/tests/rbac/test_assignment_visibility_security.py new file mode 100644 index 000000000..366155a41 --- /dev/null +++ b/test_app/tests/rbac/test_assignment_visibility_security.py @@ -0,0 +1,125 @@ +# Generated by Claude Sonnet 4 +""" +Test to verify that RBAC does not allow permission assignment to users that +the requesting user cannot view through the visible_users policy. +""" +import pytest +from django.contrib.auth import get_user_model +from rest_framework.test import APIClient + +from ansible_base.rbac.models import RoleUserAssignment +from ansible_base.rbac.policies import visible_users + +User = get_user_model() + + +@pytest.mark.django_db +class TestAssignmentVisibilitySecurity: + """Test that users cannot assign permissions to users they cannot see""" + + def test_cannot_assign_permission_to_invisible_user(self, inventory, inv_rd, user, organization): # This is the authenticated user from the fixture + """ + Test that a user with admin permission to an inventory cannot assign + permissions to a user they cannot see via the visible_users method. + + This is a security test to ensure RBAC doesn't allow privilege escalation + by granting permissions to users the requester shouldn't be able to see. + """ + # Create a second user that the authenticated user should NOT be able to see + invisible_user = User.objects.create(username='invisible_user', email='invisible@example.com') + + # Verify that the invisible user is not in the visible users for the authenticated user + visible_qs = visible_users(user) + assert invisible_user not in visible_qs, "Test setup failed: invisible_user should not be visible to authenticated user" + + # Give the authenticated user admin permission to the inventory + RoleUserAssignment.objects.create(user=user, role_definition=inv_rd, content_object=inventory) + + # Create authenticated API client for the user + client = APIClient() + client.force_authenticate(user=user) + + # Attempt to assign the inventory role to the invisible user + assignment_data = {'user': invisible_user.pk, 'role_definition': inv_rd.pk, 'object_id': inventory.pk} + + response = client.post('/api/v1/role_user_assignments/', assignment_data, format='json') + + # The assignment should be rejected due to security policy + # Either with 400 (validation error) or 404 (user not found/accessible) + assert response.status_code in [400, 404], ( + f"Expected 400 or 404 when trying to assign permission to invisible user, " f"got {response.status_code}: {response.data}" + ) + + # Verify no assignment was actually created + from ansible_base.rbac.models import DABContentType + + assignment_exists = RoleUserAssignment.objects.filter( + user=invisible_user, role_definition=inv_rd, content_type=DABContentType.objects.get_for_model(inventory.__class__), object_id=inventory.pk + ).exists() + + assert not assignment_exists, "Assignment should not have been created for invisible user" + + def test_both_users_can_see_each_other_when_superuser(self, inventory, inv_rd, user, organization): # authenticated user + """ + Simplified control test: verify that a superuser can assign to any user. + """ + # Make the authenticated user a superuser + user.is_superuser = True + user.save() + + # Create a second user (superuser should be able to see and assign to any user) + visible_user = User.objects.create(username='visible_user', email='visible@example.com') + + # Give the authenticated user admin permission to the inventory + RoleUserAssignment.objects.create(user=user, role_definition=inv_rd, content_object=inventory) + + # Create authenticated API client for the user + client = APIClient() + client.force_authenticate(user=user) + + # Attempt to assign the inventory role to the visible user + assignment_data = {'user': visible_user.pk, 'role_definition': inv_rd.pk, 'object_id': inventory.pk} + + response = client.post('/api/v1/role_user_assignments/', assignment_data, format='json') + + # This should succeed for superuser + assert response.status_code == 201, f"Expected 201 when superuser assigns permission to user, " f"got {response.status_code}: {response.data}" + + # Verify the assignment was created + from ansible_base.rbac.models import DABContentType + + assignment_exists = RoleUserAssignment.objects.filter( + user=visible_user, role_definition=inv_rd, content_type=DABContentType.objects.get_for_model(inventory.__class__), object_id=inventory.pk + ).exists() + + assert assignment_exists, "Assignment should have been created for visible user" + + +@pytest.mark.django_db +def test_org_admin_access_to_role_user_access_endpoint(user_api_client, user, org_admin_rd): + """ + Test that an organization admin can access the role_user_access endpoint for their organization. + + This tests for a bug where org admins get a 403 error when accessing: + GET /api/gateway/v1/role_user_access/shared.organization/2/ + """ + from ansible_base.lib.utils.response import get_relative_url + from test_app.models import Organization + + # Create an organization + organization = Organization.objects.create(name='test-org') + + # Give the user org admin permissions for this organization + org_admin_rd.give_permission(user, organization) + + # Construct the URL for the role_user_access endpoint + url = get_relative_url('role-user-access', kwargs={'pk': organization.pk, 'model_name': 'shared.organization'}) + + # Make the request as the org admin user + response = user_api_client.get(url) + + # This should NOT return 403 - org admins should be able to access this endpoint + assert response.status_code != 403, f"Organization admin got 403 error accessing role_user_access endpoint: {response.data}" + + # It should return 200 OK + assert response.status_code == 200, f"Expected 200 but got {response.status_code}: {response.data}" From ceea564adece3f0a30166e3414d9e857f39e8a39 Mon Sep 17 00:00:00 2001 From: AlanCoding Date: Fri, 5 Sep 2025 09:25:59 -0400 Subject: [PATCH 2/5] Delete accidental file --- .../test_assignment_visibility_security.py | 125 ------------------ 1 file changed, 125 deletions(-) delete mode 100644 test_app/tests/rbac/test_assignment_visibility_security.py diff --git a/test_app/tests/rbac/test_assignment_visibility_security.py b/test_app/tests/rbac/test_assignment_visibility_security.py deleted file mode 100644 index 366155a41..000000000 --- a/test_app/tests/rbac/test_assignment_visibility_security.py +++ /dev/null @@ -1,125 +0,0 @@ -# Generated by Claude Sonnet 4 -""" -Test to verify that RBAC does not allow permission assignment to users that -the requesting user cannot view through the visible_users policy. -""" -import pytest -from django.contrib.auth import get_user_model -from rest_framework.test import APIClient - -from ansible_base.rbac.models import RoleUserAssignment -from ansible_base.rbac.policies import visible_users - -User = get_user_model() - - -@pytest.mark.django_db -class TestAssignmentVisibilitySecurity: - """Test that users cannot assign permissions to users they cannot see""" - - def test_cannot_assign_permission_to_invisible_user(self, inventory, inv_rd, user, organization): # This is the authenticated user from the fixture - """ - Test that a user with admin permission to an inventory cannot assign - permissions to a user they cannot see via the visible_users method. - - This is a security test to ensure RBAC doesn't allow privilege escalation - by granting permissions to users the requester shouldn't be able to see. - """ - # Create a second user that the authenticated user should NOT be able to see - invisible_user = User.objects.create(username='invisible_user', email='invisible@example.com') - - # Verify that the invisible user is not in the visible users for the authenticated user - visible_qs = visible_users(user) - assert invisible_user not in visible_qs, "Test setup failed: invisible_user should not be visible to authenticated user" - - # Give the authenticated user admin permission to the inventory - RoleUserAssignment.objects.create(user=user, role_definition=inv_rd, content_object=inventory) - - # Create authenticated API client for the user - client = APIClient() - client.force_authenticate(user=user) - - # Attempt to assign the inventory role to the invisible user - assignment_data = {'user': invisible_user.pk, 'role_definition': inv_rd.pk, 'object_id': inventory.pk} - - response = client.post('/api/v1/role_user_assignments/', assignment_data, format='json') - - # The assignment should be rejected due to security policy - # Either with 400 (validation error) or 404 (user not found/accessible) - assert response.status_code in [400, 404], ( - f"Expected 400 or 404 when trying to assign permission to invisible user, " f"got {response.status_code}: {response.data}" - ) - - # Verify no assignment was actually created - from ansible_base.rbac.models import DABContentType - - assignment_exists = RoleUserAssignment.objects.filter( - user=invisible_user, role_definition=inv_rd, content_type=DABContentType.objects.get_for_model(inventory.__class__), object_id=inventory.pk - ).exists() - - assert not assignment_exists, "Assignment should not have been created for invisible user" - - def test_both_users_can_see_each_other_when_superuser(self, inventory, inv_rd, user, organization): # authenticated user - """ - Simplified control test: verify that a superuser can assign to any user. - """ - # Make the authenticated user a superuser - user.is_superuser = True - user.save() - - # Create a second user (superuser should be able to see and assign to any user) - visible_user = User.objects.create(username='visible_user', email='visible@example.com') - - # Give the authenticated user admin permission to the inventory - RoleUserAssignment.objects.create(user=user, role_definition=inv_rd, content_object=inventory) - - # Create authenticated API client for the user - client = APIClient() - client.force_authenticate(user=user) - - # Attempt to assign the inventory role to the visible user - assignment_data = {'user': visible_user.pk, 'role_definition': inv_rd.pk, 'object_id': inventory.pk} - - response = client.post('/api/v1/role_user_assignments/', assignment_data, format='json') - - # This should succeed for superuser - assert response.status_code == 201, f"Expected 201 when superuser assigns permission to user, " f"got {response.status_code}: {response.data}" - - # Verify the assignment was created - from ansible_base.rbac.models import DABContentType - - assignment_exists = RoleUserAssignment.objects.filter( - user=visible_user, role_definition=inv_rd, content_type=DABContentType.objects.get_for_model(inventory.__class__), object_id=inventory.pk - ).exists() - - assert assignment_exists, "Assignment should have been created for visible user" - - -@pytest.mark.django_db -def test_org_admin_access_to_role_user_access_endpoint(user_api_client, user, org_admin_rd): - """ - Test that an organization admin can access the role_user_access endpoint for their organization. - - This tests for a bug where org admins get a 403 error when accessing: - GET /api/gateway/v1/role_user_access/shared.organization/2/ - """ - from ansible_base.lib.utils.response import get_relative_url - from test_app.models import Organization - - # Create an organization - organization = Organization.objects.create(name='test-org') - - # Give the user org admin permissions for this organization - org_admin_rd.give_permission(user, organization) - - # Construct the URL for the role_user_access endpoint - url = get_relative_url('role-user-access', kwargs={'pk': organization.pk, 'model_name': 'shared.organization'}) - - # Make the request as the org admin user - response = user_api_client.get(url) - - # This should NOT return 403 - org admins should be able to access this endpoint - assert response.status_code != 403, f"Organization admin got 403 error accessing role_user_access endpoint: {response.data}" - - # It should return 200 OK - assert response.status_code == 200, f"Expected 200 but got {response.status_code}: {response.data}" From 2ca1ad31a4cc7a827354729750b0da11880758c1 Mon Sep 17 00:00:00 2001 From: AlanCoding Date: Fri, 5 Sep 2025 09:30:58 -0400 Subject: [PATCH 3/5] flake8 fixes --- ansible_base/rbac/triggers.py | 2 +- test_app/models.py | 1 - test_app/tests/rbac/api/test_user_permissions.py | 1 - 3 files changed, 1 insertion(+), 3 deletions(-) diff --git a/ansible_base/rbac/triggers.py b/ansible_base/rbac/triggers.py index 91e57b6ec..5aa03bbee 100644 --- a/ansible_base/rbac/triggers.py +++ b/ansible_base/rbac/triggers.py @@ -1,5 +1,5 @@ import logging -from typing import Optional, Union +from typing import Union from uuid import UUID from django.db.models import Model, Q diff --git a/test_app/models.py b/test_app/models.py index 097f4585b..04f01fa3c 100644 --- a/test_app/models.py +++ b/test_app/models.py @@ -1,6 +1,5 @@ import uuid -from django.conf import settings from django.db import models from django.db.models import JSONField from rest_framework.exceptions import PermissionDenied as DRFPermissionDenied diff --git a/test_app/tests/rbac/api/test_user_permissions.py b/test_app/tests/rbac/api/test_user_permissions.py index 144eb6780..72cdb50d9 100644 --- a/test_app/tests/rbac/api/test_user_permissions.py +++ b/test_app/tests/rbac/api/test_user_permissions.py @@ -2,7 +2,6 @@ from django.test import override_settings from ansible_base.lib.utils.response import get_relative_url -from ansible_base.rbac.policies import visible_users from test_app.models import Organization, Team, User From 73526a4958f7bfcd61c102fd543f2837ee188ebd Mon Sep 17 00:00:00 2001 From: AlanCoding Date: Fri, 5 Sep 2025 09:50:13 -0400 Subject: [PATCH 4/5] Delete test relying on role tracking --- .../tests/lib/abstract_models/test_common.py | 21 ------------------- 1 file changed, 21 deletions(-) diff --git a/test_app/tests/lib/abstract_models/test_common.py b/test_app/tests/lib/abstract_models/test_common.py index 1d209b2fc..8d6996c72 100644 --- a/test_app/tests/lib/abstract_models/test_common.py +++ b/test_app/tests/lib/abstract_models/test_common.py @@ -240,27 +240,6 @@ def test_related_view_log_message(debug_mode, not_called, expected_log): model.related_fields(request) -@pytest.mark.parametrize( - "ignore_relation", - [ - True, - False, - ], -) -@pytest.mark.django_db -def test_related_view_ignore_m2m_relations(ignore_relation, admin_user): - rf = RequestFactory() - request = rf.get('/') - with patch('ansible_base.lib.abstract_models.common.get_relative_url', return_value='https://www.example.com/user'): - if ignore_relation: - admin_user.ignore_relations = ['member_of_organizations'] - else: - admin_user.ignore_relations = [] - - related = admin_user.related_fields(request) - assert ('member_of_organizations' not in related) is ignore_relation - - def test_jsonfield_can_be_encrypted(admin_user, local_authenticator): extra_data = {} From ad7001f9d795581ede377693c3157052113b2120 Mon Sep 17 00:00:00 2001 From: AlanCoding Date: Thu, 18 Sep 2025 11:00:11 -0400 Subject: [PATCH 5/5] Spelling --- test_app/migrations/0006_team_admins_team_users.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test_app/migrations/0006_team_admins_team_users.py b/test_app/migrations/0006_team_admins_team_users.py index 58abdef9f..7708c80ee 100644 --- a/test_app/migrations/0006_team_admins_team_users.py +++ b/test_app/migrations/0006_team_admins_team_users.py @@ -10,5 +10,5 @@ class Migration(migrations.Migration): ] operations = [ - # Removed team admins and user relationsips + # Removed team admins and user relationships ]