Skip to content

Commit 059b42c

Browse files
committed
Clear permissions for unused models before creating types
1 parent 964c531 commit 059b42c

File tree

3 files changed

+241
-47
lines changed

3 files changed

+241
-47
lines changed

ansible_base/rbac/migrations/0005_remote_permissions_data.py

Lines changed: 4 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -4,59 +4,18 @@
44

55
from django.db import migrations
66

7+
from . import _utils
8+
79

810
logger = logging.getLogger(__name__)
911

1012

1113
def create_types_if_needed(apps, schema_editor):
12-
"""Before we can migrate to the new DABContentType, entries in that table must be created.
13-
14-
This method runs what is ordinarily the post_migrate logic, but in the migration case here.
15-
Only needed in the upgrade case, otherwise better to run at true post-migrate.
16-
"""
17-
permission_cls = apps.get_model('dab_rbac', 'DABPermission')
18-
rd_cls = apps.get_model('dab_rbac', 'RoleDefinition')
19-
if permission_cls.objects.exists() or rd_cls.objects.exists():
20-
logger.info('Running DABContentType creation script as part of 0005 migration')
21-
from ansible_base.rbac.management.create_types import create_DAB_contenttypes
22-
23-
create_DAB_contenttypes(apps=apps)
14+
return _utils.create_types_if_needed(apps, schema_editor, logger)
2415

2516

2617
def migrate_content_type(apps, schema_editor):
27-
ct_cls = apps.get_model('dab_rbac', 'DABContentType')
28-
ct_cls.objects.clear_cache()
29-
for model_name in ('dabpermission', 'objectrole', 'roledefinition', 'roleuserassignment', 'roleteamassignment'):
30-
cls = apps.get_model('dab_rbac', model_name)
31-
update_ct = 0
32-
for obj in cls.objects.all():
33-
old_ct = obj.content_type
34-
if old_ct:
35-
try:
36-
# NOTE: could give duplicate normally, but that is impossible in migration path
37-
obj.new_content_type = ct_cls.objects.get_by_natural_key(old_ct.app_label, old_ct.model)
38-
except Exception as e:
39-
raise RuntimeError(
40-
f"Failed to get new content type for a {model_name} pk={obj.pk}, obj={obj.__dict__}"
41-
) from e
42-
obj.save()
43-
update_ct += 1
44-
if update_ct:
45-
logger.info(f'Updated content_type reference to new model for {model_name} for {update_ct} entries')
46-
for model_name in ('roleevaluation', 'roleevaluationuuid'):
47-
cls = apps.get_model('dab_rbac', model_name)
48-
cls.objects.all().delete()
49-
50-
# DABPermission model had api_slug added in last migration
51-
# if records existed before this point, it needs to be filled in
52-
mod_ct = 0
53-
permission_cls = apps.get_model('dab_rbac', 'DABPermission')
54-
for permission in permission_cls.objects.all():
55-
permission.api_slug = f'{permission.new_content_type.service}.{permission.codename}'
56-
permission.save()
57-
mod_ct += 1
58-
if mod_ct:
59-
logger.info(f'Set new field DABPermission.api_slug for {mod_ct} existing permissions')
18+
return _utils.migrate_content_type(apps, schema_editor, logger)
6019

6120

6221
class Migration(migrations.Migration):

ansible_base/rbac/migrations/_utils.py

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
# Generated by Claude Sonnet 4 (claude-sonnet-4@20250514)
12
from django.db import models
23

34
# This method has moved, and this is put here temporarily to make branch management easier
@@ -45,3 +46,123 @@ def give_permissions(apps, rd, users=(), teams=(), object_id=None, content_type_
4546
for team_id in teams
4647
]
4748
RoleTeamAssignment.objects.bulk_create(team_assignments, ignore_conflicts=True)
49+
50+
51+
def cleanup_orphaned_permissions(apps, logger=None):
52+
"""
53+
Delete orphaned DABPermission objects for models no longer in the permission registry.
54+
55+
This is used during migrations to clean up permissions that were auto-created for models
56+
that are no longer tracked by RBAC, but only if they are not referenced by any RoleDefinition.
57+
58+
Args:
59+
apps: Django apps registry (from migration context)
60+
logger: Optional logger for reporting deleted permissions
61+
62+
Returns:
63+
int: Number of permissions deleted
64+
"""
65+
# Get model classes from apps registry
66+
permission_cls = apps.get_model('dab_rbac', 'DABPermission')
67+
role_definition_cls = apps.get_model('dab_rbac', 'RoleDefinition')
68+
69+
# Get permission registry to check which models are registered
70+
from ansible_base.rbac import permission_registry
71+
registered_model_keys = set()
72+
for model in permission_registry._registry:
73+
registered_model_keys.add((model._meta.app_label, model._meta.model_name))
74+
75+
# Find orphaned permissions
76+
orphaned_permissions = []
77+
for permission in permission_cls.objects.all():
78+
if permission.content_type:
79+
model_key = (permission.content_type.app_label, permission.content_type.model)
80+
if model_key not in registered_model_keys:
81+
# Check if this permission is referenced by any RoleDefinition
82+
is_referenced = role_definition_cls.objects.filter(permissions=permission).exists()
83+
if not is_referenced:
84+
orphaned_permissions.append(permission)
85+
86+
# Delete orphaned permissions
87+
deleted_count = 0
88+
if orphaned_permissions:
89+
deleted_count = len(orphaned_permissions)
90+
permission_cls.objects.filter(id__in=[p.id for p in orphaned_permissions]).delete()
91+
if logger:
92+
logger.info(f'Deleted {deleted_count} orphaned DABPermission objects for unregistered models')
93+
94+
return deleted_count
95+
96+
97+
def migrate_content_type(apps, schema_editor, logger=None):
98+
"""
99+
Migrate content type references from Django ContentType to DABContentType.
100+
101+
This function handles the migration of content type references across all RBAC models
102+
from the old Django ContentType to the new DABContentType system.
103+
104+
Args:
105+
apps: Django apps registry (from migration context)
106+
schema_editor: Django schema editor (unused but required for migration signature)
107+
logger: Optional logger for reporting migration progress
108+
"""
109+
ct_cls = apps.get_model('dab_rbac', 'DABContentType')
110+
ct_cls.objects.clear_cache()
111+
112+
# Pre-check: Delete orphaned DABPermission objects before migration
113+
cleanup_orphaned_permissions(apps, logger)
114+
115+
for model_name in ('dabpermission', 'objectrole', 'roledefinition', 'roleuserassignment', 'roleteamassignment'):
116+
cls = apps.get_model('dab_rbac', model_name)
117+
update_ct = 0
118+
for obj in cls.objects.all():
119+
old_ct = obj.content_type
120+
if old_ct:
121+
try:
122+
# NOTE: could give duplicate normally, but that is impossible in migration path
123+
obj.new_content_type = ct_cls.objects.get_by_natural_key(old_ct.app_label, old_ct.model)
124+
except Exception as e:
125+
raise RuntimeError(
126+
f"Failed to get new content type for a {model_name} pk={obj.pk}, obj={obj.__dict__}"
127+
) from e
128+
obj.save()
129+
update_ct += 1
130+
if update_ct and logger:
131+
logger.info(f'Updated content_type reference to new model for {model_name} for {update_ct} entries')
132+
for model_name in ('roleevaluation', 'roleevaluationuuid'):
133+
cls = apps.get_model('dab_rbac', model_name)
134+
cls.objects.all().delete()
135+
136+
# DABPermission model had api_slug added in last migration
137+
# if records existed before this point, it needs to be filled in
138+
mod_ct = 0
139+
permission_cls = apps.get_model('dab_rbac', 'DABPermission')
140+
for permission in permission_cls.objects.all():
141+
permission.api_slug = f'{permission.new_content_type.service}.{permission.codename}'
142+
permission.save()
143+
mod_ct += 1
144+
if mod_ct and logger:
145+
logger.info(f'Set new field DABPermission.api_slug for {mod_ct} existing permissions')
146+
147+
148+
def create_types_if_needed(apps, schema_editor, logger=None):
149+
"""
150+
Create DABContentType entries if needed before migration.
151+
152+
Before we can migrate to the new DABContentType, entries in that table must be created.
153+
This method runs what is ordinarily the post_migrate logic, but in the migration case here.
154+
Only needed in the upgrade case, otherwise better to run at true post-migrate.
155+
156+
Args:
157+
apps: Django apps registry (from migration context)
158+
schema_editor: Django schema editor (unused but required for migration signature)
159+
logger: Optional logger for reporting creation progress
160+
"""
161+
permission_cls = apps.get_model('dab_rbac', 'DABPermission')
162+
rd_cls = apps.get_model('dab_rbac', 'RoleDefinition')
163+
if permission_cls.objects.exists() or rd_cls.objects.exists():
164+
if logger:
165+
logger.info('Running DABContentType creation script as part of 0005 migration')
166+
from ansible_base.rbac.management.create_types import create_DAB_contenttypes
167+
168+
create_DAB_contenttypes(apps=apps)

test_app/tests/rbac/test_migrations.py

Lines changed: 116 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
import pytest
22
from django.apps import apps
33

4-
from ansible_base.rbac.migrations._utils import give_permissions
5-
from ansible_base.rbac.models import DABContentType, DABPermission, RoleTeamAssignment, RoleUserAssignment
4+
from ansible_base.rbac.migrations._utils import cleanup_orphaned_permissions, give_permissions
5+
from ansible_base.rbac.models import DABContentType, DABPermission, RoleDefinition, RoleTeamAssignment, RoleUserAssignment
66
from ansible_base.rbac.permission_registry import permission_registry
77
from test_app.models import Team, User
88

@@ -33,3 +33,117 @@ def test_give_permissions_by_id(organization, inventory, inv_rd):
3333
def test_permission_migration():
3434
"These are expected to be created via a post_migrate signal just like auth.Permission"
3535
assert len(DABPermission.objects.order_by('content_type').values_list('content_type').distinct()) == len(permission_registry.all_registered_models)
36+
37+
38+
@pytest.mark.django_db
39+
def test_cleanup_orphaned_permissions_deletes_unregistered_unreferenced():
40+
"""Test that cleanup_orphaned_permissions deletes permissions for unregistered models that are not referenced by any RoleDefinition.
41+
42+
Generated by Claude Sonnet 4 (claude-sonnet-4@20250514)
43+
"""
44+
# Create a DABContentType for a model not in the permission registry
45+
dab_fake_ct = DABContentType.objects.create(app_label='fake_app', model='fakemodel', service='local')
46+
47+
# Create a permission for this fake model
48+
orphaned_permission = DABPermission.objects.create(name='Can view fake model', content_type=dab_fake_ct, codename='view_fakemodel')
49+
50+
# For registered model, let's use an existing DABContentType from the test setup
51+
# Get any existing DABContentType that represents a registered model
52+
existing_registered_ct = DABContentType.objects.filter(app_label='test_app').first()
53+
54+
if not existing_registered_ct:
55+
# Create one if none exist
56+
existing_registered_ct = DABContentType.objects.create(app_label='test_app', model='organization', service='shared')
57+
58+
valid_permission = DABPermission.objects.create(name='Can view registered model', content_type=existing_registered_ct, codename='view_registered')
59+
60+
initial_count = DABPermission.objects.count()
61+
62+
# Run cleanup
63+
deleted_count = cleanup_orphaned_permissions(apps)
64+
65+
# Verify the orphaned permission was deleted
66+
assert deleted_count == 1
67+
assert DABPermission.objects.count() == initial_count - 1
68+
assert not DABPermission.objects.filter(id=orphaned_permission.id).exists()
69+
assert DABPermission.objects.filter(id=valid_permission.id).exists()
70+
71+
72+
@pytest.mark.django_db
73+
def test_cleanup_orphaned_permissions_preserves_referenced():
74+
"""Test that cleanup_orphaned_permissions preserves permissions for unregistered models that are referenced by RoleDefinitions.
75+
76+
Generated by Claude Sonnet 4 (claude-sonnet-4@20250514)
77+
"""
78+
# Create a DABContentType for a model not in the permission registry
79+
dab_fake_ct = DABContentType.objects.create(app_label='fake_app', model='fakemodel', service='local')
80+
81+
# Create a permission for this fake model
82+
referenced_permission = DABPermission.objects.create(name='Can view fake model', content_type=dab_fake_ct, codename='view_fakemodel')
83+
84+
# Create a RoleDefinition that references this permission
85+
role_def = RoleDefinition.objects.create(name='Fake Role', content_type=dab_fake_ct)
86+
role_def.permissions.add(referenced_permission)
87+
88+
initial_count = DABPermission.objects.count()
89+
90+
# Run cleanup
91+
deleted_count = cleanup_orphaned_permissions(apps)
92+
93+
# Verify the referenced permission was NOT deleted
94+
assert deleted_count == 0
95+
assert DABPermission.objects.count() == initial_count
96+
assert DABPermission.objects.filter(id=referenced_permission.id).exists()
97+
98+
99+
@pytest.mark.django_db
100+
def test_cleanup_orphaned_permissions_preserves_registered():
101+
"""Test that cleanup_orphaned_permissions preserves all permissions for registered models.
102+
103+
Generated by Claude Sonnet 4 (claude-sonnet-4@20250514)
104+
"""
105+
initial_count = DABPermission.objects.count()
106+
107+
# Run cleanup - should not delete any existing permissions since they're for registered models
108+
deleted_count = cleanup_orphaned_permissions(apps)
109+
110+
# Verify no permissions were deleted
111+
assert deleted_count == 0
112+
assert DABPermission.objects.count() == initial_count
113+
114+
115+
@pytest.mark.django_db
116+
def test_cleanup_orphaned_permissions_mixed_scenario():
117+
"""Test cleanup with a mix of registered, unregistered referenced, and unregistered unreferenced permissions.
118+
119+
Generated by Claude Sonnet 4 (claude-sonnet-4@20250514)
120+
"""
121+
# Create DABContentTypes for models not in the permission registry
122+
dab_fake_ct1 = DABContentType.objects.create(app_label='fake_app', model='orphanedmodel', service='local')
123+
dab_fake_ct2 = DABContentType.objects.create(app_label='fake_app', model='referencedmodel', service='local')
124+
125+
# Create orphaned permission (should be deleted)
126+
orphaned_permission = DABPermission.objects.create(name='Can view orphaned model', content_type=dab_fake_ct1, codename='view_orphanedmodel')
127+
128+
# Create referenced permission (should be preserved)
129+
referenced_permission = DABPermission.objects.create(name='Can view referenced model', content_type=dab_fake_ct2, codename='view_referencedmodel')
130+
131+
# Create a RoleDefinition that references the second permission
132+
role_def = RoleDefinition.objects.create(name='Referenced Role', content_type=dab_fake_ct2)
133+
role_def.permissions.add(referenced_permission)
134+
135+
# Get count of valid permissions (for registered models)
136+
valid_perms_count = DABPermission.objects.exclude(id__in=[orphaned_permission.id, referenced_permission.id]).count()
137+
138+
initial_count = DABPermission.objects.count()
139+
140+
# Run cleanup
141+
deleted_count = cleanup_orphaned_permissions(apps)
142+
143+
# Verify only the orphaned permission was deleted
144+
assert deleted_count == 1
145+
assert DABPermission.objects.count() == initial_count - 1
146+
assert not DABPermission.objects.filter(id=orphaned_permission.id).exists()
147+
assert DABPermission.objects.filter(id=referenced_permission.id).exists()
148+
# All other permissions should still exist
149+
assert DABPermission.objects.exclude(id=referenced_permission.id).count() == valid_perms_count

0 commit comments

Comments
 (0)