diff --git a/ansible_base/rbac/models/role.py b/ansible_base/rbac/models/role.py index ee90a1446..6d131b09b 100644 --- a/ansible_base/rbac/models/role.py +++ b/ansible_base/rbac/models/role.py @@ -302,11 +302,6 @@ def give_or_remove_permission(self, actor, content_object, giving=True, sync_act update_after_assignment(update_teams, to_update) - if not sync_action and self.name in permission_registry._trackers: - tracker = permission_registry._trackers[self.name] - with tracker.sync_active(): - tracker.sync_relationship(actor, content_object, giving=giving) - return assignment @classmethod diff --git a/ansible_base/rbac/permission_registry.py b/ansible_base/rbac/permission_registry.py index 134eb2377..0af451cf3 100644 --- a/ansible_base/rbac/permission_registry.py +++ b/ansible_base/rbac/permission_registry.py @@ -32,8 +32,6 @@ def __init__(self): self._parent_fields = dict() self._managed_roles = dict() # code-defined role definitions, managed=True self.apps_ready = False - self._tracked_relationships = set() - self._trackers = dict() def register(self, *args: Type[Model], parent_field_name: Optional[str] = 'organization'): if self.apps_ready: @@ -50,9 +48,6 @@ def register(self, *args: Type[Model], parent_field_name: Optional[str] = 'organ else: logger.debug(f'Model {cls._meta.model_name} registered to permission registry more than once') - def track_relationship(self, cls, relationship, role_name): - self._tracked_relationships.add((cls, relationship, role_name)) - def get_parent_model(self, model) -> Optional[type]: model = self._name_to_model[model._meta.model_name] parent_field_name = self.get_parent_fd_name(model) @@ -158,14 +153,6 @@ def call_when_apps_ready(self, apps, app_config) -> None: triggers.connect_rbac_signals(cls) connect_rbac_methods(cls) - for cls, relationship, role_name in self._tracked_relationships: - if role_name in self._trackers: - tracker = self._trackers[role_name] - else: - tracker = triggers.TrackedRelationship(cls, role_name) - self._trackers[role_name] = tracker - tracker.initialize(relationship) - self.register_managed_role_constructors() @property diff --git a/ansible_base/rbac/triggers.py b/ansible_base/rbac/triggers.py index 12e806bc8..5aa03bbee 100644 --- a/ansible_base/rbac/triggers.py +++ b/ansible_base/rbac/triggers.py @@ -1,6 +1,5 @@ import logging -from contextlib import contextmanager -from typing import Optional, Union +from typing import Union from uuid import UUID from django.db.models import Model, Q @@ -303,90 +302,6 @@ def post_migration_rbac_setup(sender, *args, **kwargs): compute_object_role_permissions() -class TrackedRelationship: - def __init__(self, cls, role_name): - self.cls = cls - self.role_name = role_name - self.user_relationship = None - self.team_relationship = None - self._active_sync_flag = False - - def initialize(self, relationship): - manager = getattr(self.cls, relationship) - related_model_name = manager.field.related_model._meta.model_name - if related_model_name == permission_registry.team_model._meta.model_name: - self.team_relationship = relationship - m2m_changed.connect(self.sync_team_to_role, sender=manager.through) - elif related_model_name == permission_registry.user_model._meta.model_name: - self.user_relationship = relationship - m2m_changed.connect(self.sync_user_to_role, sender=manager.through) - else: - raise RuntimeError(f'Can only register user or team relationships, obtained {related_model_name}') - - @contextmanager - def sync_active(self): - try: - self._active_sync_flag = True - yield - finally: - self._active_sync_flag = False - - def sync_relationship(self, actor, content_object, giving=True): - # Exit if role does not apply for the intended model type, for example - # if user is given "team-member" role to organization, do not add user to the team members - if content_object._meta.model_name != self.cls._meta.model_name: - return - - if actor._meta.model_name == permission_registry.team_model._meta.model_name: - if self.team_relationship is None: - return - manager = getattr(content_object, self.team_relationship) - elif actor._meta.model_name == permission_registry.user_model._meta.model_name: - if self.user_relationship is None: - return - manager = getattr(content_object, self.user_relationship) - - if giving: - manager.add(actor) - else: - manager.remove(actor) - - def _sync_actor_to_role(self, actor_model: type, instance: Model, action: str, pk_set: Optional[set[int]]): - if self._active_sync_flag: - return - if action.startswith('pre_'): - return - rd = RoleDefinition.objects.get(name=self.role_name) - - if action in ('post_add', 'post_remove'): - actor_set = pk_set - elif action == 'post_clear': - ct = permission_registry.content_type_model.objects.get_for_model(instance) - role = ObjectRole.objects.get(object_id=instance.pk, content_type=ct, role_definition=rd) - if actor_model._meta.model_name == 'team': - actor_set = set(role.teams.values_list('id', flat=True)) - else: - actor_set = set(role.users.values_list('id', flat=True)) - - giving = bool(action == 'post_add') - for actor in actor_model.objects.filter(pk__in=actor_set): - rd.give_or_remove_permission(actor, instance, giving=giving, sync_action=True) - - def sync_team_to_role(self, instance: Model, action: str, model: type, pk_set: Optional[set[int]], reverse: bool, **kwargs): - if not reverse: - self._sync_actor_to_role(permission_registry.team_model, instance, action, pk_set) - else: - for pk in pk_set: - self._sync_actor_to_role(permission_registry.team_model, model(pk=pk), action, {instance.pk}) - - def sync_user_to_role(self, instance: Model, action: str, model: type, pk_set: Optional[set[int]], reverse: bool, **kwargs): - if not reverse: - self._sync_actor_to_role(permission_registry.user_model, instance, action, pk_set) - else: - for pk in pk_set: - self._sync_actor_to_role(permission_registry.user_model, model(pk=pk), action, {instance.pk}) - - def connect_rbac_signals(cls): if cls._meta.model_name == permission_registry.team_model._meta.model_name: pre_delete.connect(team_pre_delete, sender=cls, dispatch_uid='stash-team-roles-before-delete') diff --git a/docs/apps/rbac/for_app_developers.md b/docs/apps/rbac/for_app_developers.md index 8ae79cabc..242dd20cf 100644 --- a/docs/apps/rbac/for_app_developers.md +++ b/docs/apps/rbac/for_app_developers.md @@ -344,27 +344,6 @@ If you create (in code) a role definition that sets `managed` to True, then thes rules will be disregarded for that particular role definition. Managed role definitions can not be created through the API, but can be created in code like migration scripts. -### Tracked Relationships - -Let's say that you are introducing RBAC, and you have already set up your API -with some relationship, like members of a team, and parents of a team -(to get nested teams). -This sub-feature will use signals to do bidirectional syncing of memberships of -that relationship with memberships of their corresponding role. - -``` -permission_registry.track_relationship(Team, 'users', 'Team Member') -permission_registry.track_relationship(Team, 'team_parents', 'Team Member') -``` - -This only works with our 2 "actor" types of users and teams. -Adding these lines will synchronize users and teams of team-object-roles with the "team-member" -role definition (by name) to the `Team.users` -and `Team.tracked_parents` ManyToMany relationships, respectively. -So if you have a team object, `team.users.add(user)` will also give that -user _member permission_ to that team, where those permissions are defined by the -role definition with the name "team-member". - ### Role assignment callback diff --git a/test_app/migrations/0001_initial.py b/test_app/migrations/0001_initial.py index 53adf83b5..9ac5469c8 100644 --- a/test_app/migrations/0001_initial.py +++ b/test_app/migrations/0001_initial.py @@ -55,8 +55,6 @@ class Migration(migrations.Migration): ('description', models.TextField(blank=True, default='', help_text='The organization description.')), ('created_by', models.ForeignKey(default=None, editable=False, help_text='The user who created this resource', null=True, on_delete=django.db.models.deletion.DO_NOTHING, related_name='%(app_label)s_%(class)s_created+', to=settings.AUTH_USER_MODEL)), ('modified_by', models.ForeignKey(default=None, editable=False, help_text='The user who last modified this resource', null=True, on_delete=django.db.models.deletion.DO_NOTHING, related_name='%(app_label)s_%(class)s_modified+', to=settings.AUTH_USER_MODEL)), - ('admins', models.ManyToManyField(blank=True, help_text='The list of admins for this organization', related_name='admin_of_organizations', to=settings.AUTH_USER_MODEL)), - ('users', models.ManyToManyField(blank=True, help_text='The list of users on this organization', related_name='member_of_organizations', to=settings.AUTH_USER_MODEL)) ], options={'ordering': ['id'], 'permissions': [('member_organization', 'User is member of this organization')]}, ), @@ -85,7 +83,6 @@ class Migration(migrations.Migration): ('created_by', models.ForeignKey(default=None, editable=False, help_text='The user who created this resource', null=True, on_delete=django.db.models.deletion.DO_NOTHING, related_name='%(app_label)s_%(class)s_created+', to=settings.AUTH_USER_MODEL)), ('modified_by', models.ForeignKey(default=None, editable=False, help_text='The user who last modified this resource', null=True, on_delete=django.db.models.deletion.DO_NOTHING, related_name='%(app_label)s_%(class)s_modified+', to=settings.AUTH_USER_MODEL)), ('organization', models.ForeignKey(help_text='The organization of this team.', on_delete=django.db.models.deletion.CASCADE, related_name='teams', to=settings.ANSIBLE_BASE_ORGANIZATION_MODEL)), - ('team_parents', models.ManyToManyField(blank=True, related_name='team_children', to=settings.ANSIBLE_BASE_TEAM_MODEL)), ], options={ 'ordering': ('organization__name', 'name'), diff --git a/test_app/migrations/0006_team_admins_team_users.py b/test_app/migrations/0006_team_admins_team_users.py index 3d7a09de5..7708c80ee 100644 --- a/test_app/migrations/0006_team_admins_team_users.py +++ b/test_app/migrations/0006_team_admins_team_users.py @@ -1,7 +1,6 @@ # Generated by Django 4.2.8 on 2024-03-18 12:43 -from django.conf import settings -from django.db import migrations, models +from django.db import migrations class Migration(migrations.Migration): @@ -11,14 +10,5 @@ class Migration(migrations.Migration): ] operations = [ - migrations.AddField( - model_name='team', - name='admins', - field=models.ManyToManyField(blank=True, help_text='The list of admins for this team', related_name='teams_administered', to=settings.AUTH_USER_MODEL), - ), - migrations.AddField( - model_name='team', - name='users', - field=models.ManyToManyField(blank=True, help_text='The list of users on this team', related_name='teams', to=settings.AUTH_USER_MODEL), - ), + # Removed team admins and user relationships ] diff --git a/test_app/models.py b/test_app/models.py index 0c920154e..04f01fa3c 100644 --- a/test_app/models.py +++ b/test_app/models.py @@ -1,6 +1,5 @@ import uuid -from django.conf import settings from django.db import models from django.db.models import JSONField from rest_framework.exceptions import PermissionDenied as DRFPermissionDenied @@ -30,20 +29,6 @@ class Meta: resource = AnsibleResourceField(primary_key_field="id") - users = models.ManyToManyField( - settings.AUTH_USER_MODEL, - related_name='member_of_organizations', - blank=True, - help_text="The list of users on this organization", - ) - - admins = models.ManyToManyField( - settings.AUTH_USER_MODEL, - related_name='admin_of_organizations', - blank=True, - help_text="The list of admins for this organization", - ) - extra_field = models.CharField(max_length=100, null=True) @@ -73,7 +58,6 @@ class ManagedUser(User): class Team(AbstractTeam): resource = AnsibleResourceField(primary_key_field="id") - team_parents = models.ManyToManyField('Team', related_name='team_children', blank=True) encryptioner = models.ForeignKey('test_app.EncryptionModel', on_delete=models.SET_NULL, null=True) @@ -89,20 +73,6 @@ class Meta: ordering = ('organization__name', 'name') permissions = [('member_team', 'Has all roles assigned to this team')] - users = models.ManyToManyField( - User, - related_name='teams', - blank=True, - help_text="The list of users on this team", - ) - - admins = models.ManyToManyField( - User, - related_name='teams_administered', - blank=True, - help_text="The list of admins for this team", - ) - class ResourceMigrationTestModel(models.Model): name = models.CharField(max_length=255) @@ -383,17 +353,6 @@ class Meta: permission_registry.register(ExtraExtraUUIDModel, parent_field_name='extra_uuid') -# NOTE(cutwater): Using hard coded role names instead of ones defined in ReconcileUser class, -# to avoid circular dependency between models and claims modules. This is a temporary workarond, -# since we plan to drop support of tracked relationships in future. -permission_registry.track_relationship(Team, 'users', 'Team Member') -permission_registry.track_relationship(Team, 'admins', 'Team Admin') -permission_registry.track_relationship(Team, 'team_parents', 'Team Member') - -permission_registry.track_relationship(Organization, 'users', 'Organization Member') -permission_registry.track_relationship(Organization, 'admins', 'Organization Admin') - - class MultipleFieldsModel(NamedCommonModel): class Meta: app_label = "test_app" diff --git a/test_app/tests/conftest.py b/test_app/tests/conftest.py index cc508b6a8..0edb37f89 100644 --- a/test_app/tests/conftest.py +++ b/test_app/tests/conftest.py @@ -674,7 +674,7 @@ def org_member_rd(): @pytest.fixture def member_rd(): - "Member role for a team, place in root conftest because it is needed for the team users tracked relationship" + "Member role for a team" RoleDefinition.objects.managed.clear() yield RoleDefinition.objects.managed.team_member RoleDefinition.objects.managed.clear() @@ -682,7 +682,7 @@ def member_rd(): @pytest.fixture def admin_rd(): - "Member role for a team, place in root conftest because it is needed for the team users tracked relationship" + "Admin role for a team" RoleDefinition.objects.managed.clear() yield RoleDefinition.objects.managed.team_admin RoleDefinition.objects.managed.clear() diff --git a/test_app/tests/lib/abstract_models/test_common.py b/test_app/tests/lib/abstract_models/test_common.py index 1d209b2fc..8d6996c72 100644 --- a/test_app/tests/lib/abstract_models/test_common.py +++ b/test_app/tests/lib/abstract_models/test_common.py @@ -240,27 +240,6 @@ def test_related_view_log_message(debug_mode, not_called, expected_log): model.related_fields(request) -@pytest.mark.parametrize( - "ignore_relation", - [ - True, - False, - ], -) -@pytest.mark.django_db -def test_related_view_ignore_m2m_relations(ignore_relation, admin_user): - rf = RequestFactory() - request = rf.get('/') - with patch('ansible_base.lib.abstract_models.common.get_relative_url', return_value='https://www.example.com/user'): - if ignore_relation: - admin_user.ignore_relations = ['member_of_organizations'] - else: - admin_user.ignore_relations = [] - - related = admin_user.related_fields(request) - assert ('member_of_organizations' not in related) is ignore_relation - - def test_jsonfield_can_be_encrypted(admin_user, local_authenticator): extra_data = {} diff --git a/test_app/tests/rbac/api/test_user_permissions.py b/test_app/tests/rbac/api/test_user_permissions.py index 8b0cda1b2..72cdb50d9 100644 --- a/test_app/tests/rbac/api/test_user_permissions.py +++ b/test_app/tests/rbac/api/test_user_permissions.py @@ -2,7 +2,6 @@ from django.test import override_settings from ansible_base.lib.utils.response import get_relative_url -from ansible_base.rbac.policies import visible_users from test_app.models import Organization, Team, User @@ -216,143 +215,3 @@ def test_team_admins_can_add_children(self, user, user_api_client, organization, response = user_api_client.post(url, data=data) assert response.status_code == 201, response.data assert rando.has_obj_perm(inventory, 'change') - - -@pytest.mark.django_db -class TestRelatedUserListView: - def _initial_check(self, url, user_api_client, count=0): - response = user_api_client.get(url) - assert response.status_code == 200 - assert response.data['count'] == count - - def _assign_users(self, role_definition, object): - members = [ - User.objects.create(username='rando1'), - User.objects.create(username='rando2'), - User.objects.create(username='rando3'), - ] - role_definition.give_permission(members[0], object) - role_definition.give_permission(members[1], object) - - def test_org_admin_list_org_members(self, user, user_api_client, organization, org_member_rd, org_admin_rd): - org_admin_rd.give_permission(user, organization) - - url = get_relative_url('organization-users-list', kwargs={'pk': organization.pk}) - self._initial_check(url, user_api_client) - self._assign_users(org_member_rd, organization) - - response = user_api_client.get(url) - assert response.status_code == 200 - assert response.data['count'] == 2 - - def test_org_member_list_org_members(self, user, user_api_client, organization, org_member_rd): - org_member_rd.give_permission(user, organization) - - url = get_relative_url('organization-users-list', kwargs={'pk': organization.pk}) - self._initial_check(url, user_api_client, 1) - self._assign_users(org_member_rd, organization) - - response = user_api_client.get(url) - assert response.status_code == 200 - assert response.data['count'] == 3 - - def test_superadmin_list_org_members(self, user, user_api_client, organization, org_member_rd): - user.is_superuser = True - user.save() - - url = get_relative_url('organization-users-list', kwargs={'pk': organization.pk}) - self._initial_check(url, user_api_client) - self._assign_users(org_member_rd, organization) - - response = user_api_client.get(url) - assert response.status_code == 200 - assert response.data['count'] == 2 - - -@pytest.mark.django_db -class TestRelationshipBasedAssignment: - """Tests permissions via tracked_relationship feature, duplicated functionality with TestRoleBasedAssignment - - Philosophically, this should perform the same actions and make the same assertions as TestRoleBasedAssignment - because under the hood, signals are used to make these memberships exactly the same - as the corresponding role assignments - """ - - def test_parent_object_view_permission(self, user, user_api_client, organization, org_member_rd): - url = get_relative_url('organization-list') - response = user_api_client.get(url) - assert response.data['count'] == 0 - - url = get_relative_url('organization-users-list', kwargs={'pk': organization.pk}) - response = user_api_client.get(url) - assert response.status_code == 404, response.data - - org_member_rd.give_permission(user, organization) - response = user_api_client.get(url) - assert response.status_code == 200, response.data - assert user.username in set(item['username'] for item in response.data['results']) - - def test_org_admins_can_add_members(self, user, user_api_client, organization, org_member_rd, org_admin_rd): - rando = User.objects.create(username='rando') - url = get_relative_url('organization-users-associate', kwargs={'pk': organization.pk}) - - org_member_rd.give_permission(user, organization) - - data = {'instances': [rando.id]} - - response = user_api_client.post(url, data=data) - assert response.status_code == 403, response.data - assert not rando.has_obj_perm(organization, 'member') # sanity, verify atomicity - - org_admin_rd.give_permission(user, organization) - - response = user_api_client.post(url, data=data) - assert response.status_code == 204, response.data - assert rando.has_obj_perm(organization, 'member') - - @override_settings(ORG_ADMINS_CAN_SEE_ALL_USERS=False) - def test_associate_needs_view_permission(self, user, user_api_client, organization, org_member_rd, org_admin_rd): - "Need view permission to user to associate to an organization" - org_admin_rd.give_permission(user, organization) - rando = User.objects.create(username='rando') - assert not visible_users(user).filter(pk=rando.id).exists() # sanity - url = get_relative_url('organization-admins-associate', kwargs={'pk': organization.pk}) - - data = {'instances': [rando.id]} - - # user can not see other user rando, should get related object does not exist error - response = user_api_client.post(url, data=data) - assert response.status_code == 400, response.data - assert not rando.has_obj_perm(organization, 'change') - - org_member_rd.give_permission(rando, organization) - assert visible_users(user).filter(pk=rando.id).exists() - - # user can see other user rando, and can make rando an organization admin - response = user_api_client.post(url, data=data) - assert response.status_code == 204, response.data - assert rando.has_obj_perm(organization, 'change') # action took full effect - - def test_sublist_visibility(self, user, user_api_client, team, member_rd, admin_rd): - url = get_relative_url('team-users-list', kwargs={'pk': team.pk}) - - # with no permissions, user should not be able to make a GET to members list - response = user_api_client.get(url) - assert response.status_code == 404 - - rando = User.objects.create(username='rando') - member_rd.give_permission(rando, team) - admin_rd.give_permission(user, team) - assert not visible_users(user).filter(pk=rando.id).exists() # sanity - - # even though user can not see rando, they still show in members list - # this would be important if user wanted to remove rando permissions - response = user_api_client.get(url) - assert response.status_code == 200 - assert 'rando' in set(item['username'] for item in response.data['results']) - - @override_settings(ORG_ADMINS_CAN_SEE_ALL_USERS=False) - def test_visible_users_flags(self, admin_user, user, team, member_rd): - assert set(visible_users(user).values_list('id', flat=True)) == {admin_user.id, user.id} - assert set(visible_users(user, always_show_superusers=False).values_list('id', flat=True)) == {user.id} - assert set(visible_users(user, always_show_self=False).values_list('id', flat=True)) == {admin_user.id} diff --git a/test_app/tests/rbac/features/test_role_tracking.py b/test_app/tests/rbac/features/test_role_tracking.py deleted file mode 100644 index 65128cef8..000000000 --- a/test_app/tests/rbac/features/test_role_tracking.py +++ /dev/null @@ -1,73 +0,0 @@ -import pytest - -from ansible_base.rbac import permission_registry - - -@pytest.mark.django_db -def test_add_user_to_team_relationship(team, rando, inventory, inv_rd, member_rd): - inv_rd.give_permission(team, inventory) - assert not rando.has_obj_perm(team, 'member_team') - assert not rando.has_obj_perm(inventory, 'change_inventory') - - team.users.add(rando) - assert rando.has_obj_perm(team, 'member_team') - assert rando.has_obj_perm(inventory, 'change_inventory') - - team.users.clear() - assert not rando.has_obj_perm(team, 'member_team') - assert not rando.has_obj_perm(inventory, 'change_inventory') - - -@pytest.mark.django_db -def test_add_user_to_tracked_role(team, rando, member_rd): - assert not rando.has_obj_perm(team, 'member_team') - - member_rd.give_permission(rando, team) - assert rando in team.users.all() - - member_rd.remove_permission(rando, team) - assert rando not in team.users.all() - - -@pytest.mark.django_db -def test_add_team_to_tracked_relationship(rando, organization, member_rd): - child_team = permission_registry.team_model.objects.create(name='child-team', organization=organization) - parent_team = permission_registry.team_model.objects.create(name='parent-team', organization=organization) - member_rd.give_permission(rando, parent_team) - assert not rando.has_obj_perm(child_team, 'member') - - child_team.team_parents.add(parent_team) - assert rando.has_obj_perm(child_team, 'member') - - child_team.team_parents.clear() - assert not rando.has_obj_perm(child_team, 'member') - - -@pytest.mark.django_db -def test_add_team_to_tracked_role(rando, organization, member_rd): - child_team = permission_registry.team_model.objects.create(name='child-team', organization=organization) - parent_team = permission_registry.team_model.objects.create(name='parent-team', organization=organization) - member_rd.give_permission(rando, parent_team) - assert not rando.has_obj_perm(child_team, 'member') - - member_rd.give_permission(parent_team, child_team) - assert parent_team in child_team.team_parents.all() - - member_rd.remove_permission(parent_team, child_team) - assert parent_team not in child_team.team_parents.all() - - -@pytest.mark.django_db -@pytest.mark.parametrize("reverse", [True, False]) -def test_add_organization_member_to_relationship(rando, organization, org_member_rd, reverse): - assert not rando.has_obj_perm(organization, 'member') - - if reverse: - rando.member_of_organizations.add(organization) - else: - organization.users.add(rando) - - assert org_member_rd.object_roles.count() == 1 - object_role = org_member_rd.object_roles.first() - assert rando in object_role.users.all() - assert rando.has_obj_perm(organization, 'member')