Skip to content

Commit 48005cd

Browse files
authored
AAP-54064 Delete DABPermission entries if model is not registered and not used (#851)
This is specifically needed for a prior behavior of `Team` model auto-registration. This was deleted a few months ago, but old versions may still create permissions for the team model. Re-stating, that leaves a scenario where an app might _enable RBAC_ but _register no models_ and then fail to upgrade. This is written specifically to address that situation. Previously, we just threw an error out of an abundance of caution (which was correct, because we didn't have a specific action clarified). For this specific case, the remediation action is clear - delete the permission entries. We still have to be really specific to avoid deleting unintended things. The criteria should be well-formed in this patch.
1 parent 964c531 commit 48005cd

File tree

3 files changed

+184
-59
lines changed

3 files changed

+184
-59
lines changed
Lines changed: 3 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -1,62 +1,8 @@
11
# Generated by Django 4.2.23 on 2025-06-30 12:48
22

3-
import logging
4-
53
from django.db import migrations
64

7-
8-
logger = logging.getLogger(__name__)
9-
10-
11-
def create_types_if_needed(apps, schema_editor):
12-
"""Before we can migrate to the new DABContentType, entries in that table must be created.
13-
14-
This method runs what is ordinarily the post_migrate logic, but in the migration case here.
15-
Only needed in the upgrade case, otherwise better to run at true post-migrate.
16-
"""
17-
permission_cls = apps.get_model('dab_rbac', 'DABPermission')
18-
rd_cls = apps.get_model('dab_rbac', 'RoleDefinition')
19-
if permission_cls.objects.exists() or rd_cls.objects.exists():
20-
logger.info('Running DABContentType creation script as part of 0005 migration')
21-
from ansible_base.rbac.management.create_types import create_DAB_contenttypes
22-
23-
create_DAB_contenttypes(apps=apps)
24-
25-
26-
def migrate_content_type(apps, schema_editor):
27-
ct_cls = apps.get_model('dab_rbac', 'DABContentType')
28-
ct_cls.objects.clear_cache()
29-
for model_name in ('dabpermission', 'objectrole', 'roledefinition', 'roleuserassignment', 'roleteamassignment'):
30-
cls = apps.get_model('dab_rbac', model_name)
31-
update_ct = 0
32-
for obj in cls.objects.all():
33-
old_ct = obj.content_type
34-
if old_ct:
35-
try:
36-
# NOTE: could give duplicate normally, but that is impossible in migration path
37-
obj.new_content_type = ct_cls.objects.get_by_natural_key(old_ct.app_label, old_ct.model)
38-
except Exception as e:
39-
raise RuntimeError(
40-
f"Failed to get new content type for a {model_name} pk={obj.pk}, obj={obj.__dict__}"
41-
) from e
42-
obj.save()
43-
update_ct += 1
44-
if update_ct:
45-
logger.info(f'Updated content_type reference to new model for {model_name} for {update_ct} entries')
46-
for model_name in ('roleevaluation', 'roleevaluationuuid'):
47-
cls = apps.get_model('dab_rbac', model_name)
48-
cls.objects.all().delete()
49-
50-
# DABPermission model had api_slug added in last migration
51-
# if records existed before this point, it needs to be filled in
52-
mod_ct = 0
53-
permission_cls = apps.get_model('dab_rbac', 'DABPermission')
54-
for permission in permission_cls.objects.all():
55-
permission.api_slug = f'{permission.new_content_type.service}.{permission.codename}'
56-
permission.save()
57-
mod_ct += 1
58-
if mod_ct:
59-
logger.info(f'Set new field DABPermission.api_slug for {mod_ct} existing permissions')
5+
from . import _utils
606

617

628
class Migration(migrations.Migration):
@@ -66,6 +12,6 @@ class Migration(migrations.Migration):
6612
]
6713

6814
operations = [
69-
migrations.RunPython(create_types_if_needed, migrations.RunPython.noop),
70-
migrations.RunPython(migrate_content_type, migrations.RunPython.noop),
15+
migrations.RunPython(_utils.create_types_if_needed, migrations.RunPython.noop),
16+
migrations.RunPython(_utils.migrate_content_type, migrations.RunPython.noop),
7117
]

ansible_base/rbac/migrations/_utils.py

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
1+
# Generated by Claude Sonnet 4 (claude-sonnet-4@20250514)
2+
import logging
3+
14
from django.db import models
25

6+
logger = logging.getLogger(__name__)
7+
38
# This method has moved, and this is put here temporarily to make branch management easier
49
from ansible_base.rbac.management import create_dab_permissions as create_custom_permissions # noqa
510

@@ -45,3 +50,127 @@ def give_permissions(apps, rd, users=(), teams=(), object_id=None, content_type_
4550
for team_id in teams
4651
]
4752
RoleTeamAssignment.objects.bulk_create(team_assignments, ignore_conflicts=True)
53+
54+
55+
def cleanup_orphaned_permissions(apps):
56+
"""
57+
Delete orphaned DABPermission objects for models no longer in the permission registry.
58+
59+
This is used during migrations to clean up permissions for any previously-registered model
60+
that are no longer tracked by RBAC, but only if they are not referenced by any RoleDefinition.
61+
62+
Args:
63+
apps: Django apps registry (from migration context)
64+
65+
Returns:
66+
int: Number of permissions deleted
67+
"""
68+
# Get model classes from apps registry
69+
permission_cls = apps.get_model('dab_rbac', 'DABPermission')
70+
role_definition_cls = apps.get_model('dab_rbac', 'RoleDefinition')
71+
72+
# Get permission registry to check which models are registered
73+
from ansible_base.rbac import permission_registry
74+
registered_model_keys = set()
75+
for model in permission_registry._registry:
76+
registered_model_keys.add((model._meta.app_label, model._meta.model_name))
77+
78+
# Find orphaned permissions
79+
orphaned_permissions = []
80+
for permission in permission_cls.objects.all():
81+
if permission.content_type:
82+
model_key = (permission.content_type.app_label, permission.content_type.model)
83+
if model_key not in registered_model_keys:
84+
# Check if this permission is referenced by any RoleDefinition
85+
referencing_roles = role_definition_cls.objects.filter(permissions=permission)
86+
if referencing_roles.exists():
87+
# Log warning for unregistered model still referenced by role definitions
88+
role_names = list(referencing_roles.values_list('name', flat=True))
89+
logger.warning(
90+
f'Permission {permission.codename} for unregistered model '
91+
f'{permission.content_type.app_label}.{permission.content_type.model} '
92+
f'is still referenced by role definitions: {role_names}'
93+
)
94+
else:
95+
logger.info(f'Deleting orphaned permission {permission.codename} for unregistered model {model_key}')
96+
orphaned_permissions.append(permission)
97+
98+
# Delete orphaned permissions
99+
deleted_count = 0
100+
if orphaned_permissions:
101+
deleted_count = len(orphaned_permissions)
102+
permission_cls.objects.filter(id__in=[p.id for p in orphaned_permissions]).delete()
103+
logger.info(f'Deleted {deleted_count} orphaned DABPermission objects for unregistered models')
104+
105+
return deleted_count
106+
107+
108+
def migrate_content_type(apps, schema_editor):
109+
"""
110+
Migrate content type references from Django ContentType to DABContentType.
111+
112+
This function handles the migration of content type references across all RBAC models
113+
from the old Django ContentType to the new DABContentType system.
114+
115+
Args:
116+
apps: Django apps registry (from migration context)
117+
schema_editor: Django schema editor (unused but required for migration signature)
118+
"""
119+
# Pre-check: Delete orphaned DABPermission objects before migration
120+
cleanup_orphaned_permissions(apps)
121+
122+
ct_cls = apps.get_model('dab_rbac', 'DABContentType')
123+
ct_cls.objects.clear_cache()
124+
125+
for model_name in ('dabpermission', 'objectrole', 'roledefinition', 'roleuserassignment', 'roleteamassignment'):
126+
cls = apps.get_model('dab_rbac', model_name)
127+
update_ct = 0
128+
for obj in cls.objects.all():
129+
old_ct = obj.content_type
130+
if old_ct:
131+
try:
132+
# NOTE: could give duplicate normally, but that is impossible in migration path
133+
obj.new_content_type = ct_cls.objects.get_by_natural_key(old_ct.app_label, old_ct.model)
134+
except Exception as e:
135+
raise RuntimeError(
136+
f"Failed to get new content type for a {model_name} pk={obj.pk}, obj={obj.__dict__}"
137+
) from e
138+
obj.save()
139+
update_ct += 1
140+
if update_ct:
141+
logger.info(f'Updated content_type reference to new model for {model_name} for {update_ct} entries')
142+
for model_name in ('roleevaluation', 'roleevaluationuuid'):
143+
cls = apps.get_model('dab_rbac', model_name)
144+
cls.objects.all().delete()
145+
146+
# DABPermission model had api_slug added in last migration
147+
# if records existed before this point, it needs to be filled in
148+
mod_ct = 0
149+
permission_cls = apps.get_model('dab_rbac', 'DABPermission')
150+
for permission in permission_cls.objects.all():
151+
permission.api_slug = f'{permission.new_content_type.service}.{permission.codename}'
152+
permission.save()
153+
mod_ct += 1
154+
if mod_ct:
155+
logger.info(f'Set new field DABPermission.api_slug for {mod_ct} existing permissions')
156+
157+
158+
def create_types_if_needed(apps, schema_editor):
159+
"""
160+
Create DABContentType entries if needed before migration.
161+
162+
Before we can migrate to the new DABContentType, entries in that table must be created.
163+
This method runs what is ordinarily the post_migrate logic, but in the migration case here.
164+
Only needed in the upgrade case, otherwise better to run at true post-migrate.
165+
166+
Args:
167+
apps: Django apps registry (from migration context)
168+
schema_editor: Django schema editor (unused but required for migration signature)
169+
"""
170+
permission_cls = apps.get_model('dab_rbac', 'DABPermission')
171+
rd_cls = apps.get_model('dab_rbac', 'RoleDefinition')
172+
if permission_cls.objects.exists() or rd_cls.objects.exists():
173+
logger.info('Running DABContentType creation script as part of 0005 migration')
174+
from ansible_base.rbac.management.create_types import create_DAB_contenttypes
175+
176+
create_DAB_contenttypes(apps=apps)

test_app/tests/rbac/test_migrations.py

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
import pytest
22
from django.apps import apps
33

4-
from ansible_base.rbac.migrations._utils import give_permissions
5-
from ansible_base.rbac.models import DABContentType, DABPermission, RoleTeamAssignment, RoleUserAssignment
4+
from ansible_base.rbac.migrations._utils import cleanup_orphaned_permissions, give_permissions
5+
from ansible_base.rbac.models import DABContentType, DABPermission, RoleDefinition, RoleTeamAssignment, RoleUserAssignment
66
from ansible_base.rbac.permission_registry import permission_registry
77
from test_app.models import Team, User
88

@@ -33,3 +33,53 @@ def test_give_permissions_by_id(organization, inventory, inv_rd):
3333
def test_permission_migration():
3434
"These are expected to be created via a post_migrate signal just like auth.Permission"
3535
assert len(DABPermission.objects.order_by('content_type').values_list('content_type').distinct()) == len(permission_registry.all_registered_models)
36+
37+
38+
@pytest.mark.django_db
39+
def test_cleanup_orphaned_permissions_deletes_unregistered_unreferenced():
40+
"""Test that cleanup_orphaned_permissions deletes permissions for unregistered models that are not referenced by any RoleDefinition.
41+
42+
Generated by Claude Sonnet 4 (claude-sonnet-4@20250514)
43+
"""
44+
# Create a DABContentType for a model not in the permission registry
45+
dab_fake_ct = DABContentType.objects.create(app_label='fake_app', model='fakemodel', service='local')
46+
47+
# Create a permission for this fake model (should be deleted)
48+
orphaned_permission = DABPermission.objects.create(name='Can view fake model', content_type=dab_fake_ct, codename='view_fakemodel')
49+
50+
initial_count = DABPermission.objects.count()
51+
52+
# Run cleanup
53+
deleted_count = cleanup_orphaned_permissions(apps)
54+
55+
# Verify the orphaned permission was deleted
56+
assert deleted_count == 1
57+
assert DABPermission.objects.count() == initial_count - 1
58+
assert not DABPermission.objects.filter(id=orphaned_permission.id).exists()
59+
60+
61+
@pytest.mark.django_db
62+
def test_cleanup_orphaned_permissions_preserves_referenced():
63+
"""Test that cleanup_orphaned_permissions preserves permissions for unregistered models that are referenced by RoleDefinitions.
64+
65+
Generated by Claude Sonnet 4 (claude-sonnet-4@20250514)
66+
"""
67+
# Create a DABContentType for a model not in the permission registry
68+
dab_fake_ct = DABContentType.objects.create(app_label='fake_app', model='fakemodel', service='local')
69+
70+
# Create a permission for this fake model
71+
referenced_permission = DABPermission.objects.create(name='Can view fake model', content_type=dab_fake_ct, codename='view_fakemodel')
72+
73+
# Create a RoleDefinition that references this permission
74+
role_def = RoleDefinition.objects.create(name='Fake Role', content_type=dab_fake_ct)
75+
role_def.permissions.add(referenced_permission)
76+
77+
initial_count = DABPermission.objects.count()
78+
79+
# Run cleanup
80+
deleted_count = cleanup_orphaned_permissions(apps)
81+
82+
# Verify the referenced permission was NOT deleted
83+
assert deleted_count == 0
84+
assert DABPermission.objects.count() == initial_count
85+
assert DABPermission.objects.filter(id=referenced_permission.id).exists()

0 commit comments

Comments
 (0)