Skip to content

Commit 59cba5f

Browse files
committed
Clear permissions for unused models before creating types
Reduce tests some consolidate migration logic Improve logging and code cleanup
1 parent 964c531 commit 59cba5f

File tree

3 files changed

+184
-59
lines changed

3 files changed

+184
-59
lines changed
Lines changed: 3 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -1,62 +1,8 @@
11
# Generated by Django 4.2.23 on 2025-06-30 12:48
22

3-
import logging
4-
53
from django.db import migrations
64

7-
8-
logger = logging.getLogger(__name__)
9-
10-
11-
def create_types_if_needed(apps, schema_editor):
12-
"""Before we can migrate to the new DABContentType, entries in that table must be created.
13-
14-
This method runs what is ordinarily the post_migrate logic, but in the migration case here.
15-
Only needed in the upgrade case, otherwise better to run at true post-migrate.
16-
"""
17-
permission_cls = apps.get_model('dab_rbac', 'DABPermission')
18-
rd_cls = apps.get_model('dab_rbac', 'RoleDefinition')
19-
if permission_cls.objects.exists() or rd_cls.objects.exists():
20-
logger.info('Running DABContentType creation script as part of 0005 migration')
21-
from ansible_base.rbac.management.create_types import create_DAB_contenttypes
22-
23-
create_DAB_contenttypes(apps=apps)
24-
25-
26-
def migrate_content_type(apps, schema_editor):
27-
ct_cls = apps.get_model('dab_rbac', 'DABContentType')
28-
ct_cls.objects.clear_cache()
29-
for model_name in ('dabpermission', 'objectrole', 'roledefinition', 'roleuserassignment', 'roleteamassignment'):
30-
cls = apps.get_model('dab_rbac', model_name)
31-
update_ct = 0
32-
for obj in cls.objects.all():
33-
old_ct = obj.content_type
34-
if old_ct:
35-
try:
36-
# NOTE: could give duplicate normally, but that is impossible in migration path
37-
obj.new_content_type = ct_cls.objects.get_by_natural_key(old_ct.app_label, old_ct.model)
38-
except Exception as e:
39-
raise RuntimeError(
40-
f"Failed to get new content type for a {model_name} pk={obj.pk}, obj={obj.__dict__}"
41-
) from e
42-
obj.save()
43-
update_ct += 1
44-
if update_ct:
45-
logger.info(f'Updated content_type reference to new model for {model_name} for {update_ct} entries')
46-
for model_name in ('roleevaluation', 'roleevaluationuuid'):
47-
cls = apps.get_model('dab_rbac', model_name)
48-
cls.objects.all().delete()
49-
50-
# DABPermission model had api_slug added in last migration
51-
# if records existed before this point, it needs to be filled in
52-
mod_ct = 0
53-
permission_cls = apps.get_model('dab_rbac', 'DABPermission')
54-
for permission in permission_cls.objects.all():
55-
permission.api_slug = f'{permission.new_content_type.service}.{permission.codename}'
56-
permission.save()
57-
mod_ct += 1
58-
if mod_ct:
59-
logger.info(f'Set new field DABPermission.api_slug for {mod_ct} existing permissions')
5+
from . import _utils
606

617

628
class Migration(migrations.Migration):
@@ -66,6 +12,6 @@ class Migration(migrations.Migration):
6612
]
6713

6814
operations = [
69-
migrations.RunPython(create_types_if_needed, migrations.RunPython.noop),
70-
migrations.RunPython(migrate_content_type, migrations.RunPython.noop),
15+
migrations.RunPython(_utils.create_types_if_needed, migrations.RunPython.noop),
16+
migrations.RunPython(_utils.migrate_content_type, migrations.RunPython.noop),
7117
]

ansible_base/rbac/migrations/_utils.py

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
1+
# Generated by Claude Sonnet 4 (claude-sonnet-4@20250514)
2+
import logging
3+
14
from django.db import models
25

6+
logger = logging.getLogger(__name__)
7+
38
# This method has moved, and this is put here temporarily to make branch management easier
49
from ansible_base.rbac.management import create_dab_permissions as create_custom_permissions # noqa
510

@@ -45,3 +50,127 @@ def give_permissions(apps, rd, users=(), teams=(), object_id=None, content_type_
4550
for team_id in teams
4651
]
4752
RoleTeamAssignment.objects.bulk_create(team_assignments, ignore_conflicts=True)
53+
54+
55+
def cleanup_orphaned_permissions(apps):
56+
"""
57+
Delete orphaned DABPermission objects for models no longer in the permission registry.
58+
59+
This is used during migrations to clean up permissions for any previously-registered model
60+
that are no longer tracked by RBAC, but only if they are not referenced by any RoleDefinition.
61+
62+
Args:
63+
apps: Django apps registry (from migration context)
64+
65+
Returns:
66+
int: Number of permissions deleted
67+
"""
68+
# Get model classes from apps registry
69+
permission_cls = apps.get_model('dab_rbac', 'DABPermission')
70+
role_definition_cls = apps.get_model('dab_rbac', 'RoleDefinition')
71+
72+
# Get permission registry to check which models are registered
73+
from ansible_base.rbac import permission_registry
74+
registered_model_keys = set()
75+
for model in permission_registry._registry:
76+
registered_model_keys.add((model._meta.app_label, model._meta.model_name))
77+
78+
# Find orphaned permissions
79+
orphaned_permissions = []
80+
for permission in permission_cls.objects.all():
81+
if permission.content_type:
82+
model_key = (permission.content_type.app_label, permission.content_type.model)
83+
if model_key not in registered_model_keys:
84+
# Check if this permission is referenced by any RoleDefinition
85+
referencing_roles = role_definition_cls.objects.filter(permissions=permission)
86+
if referencing_roles.exists():
87+
# Log warning for unregistered model still referenced by role definitions
88+
role_names = list(referencing_roles.values_list('name', flat=True))
89+
logger.warning(
90+
f'Permission {permission.codename} for unregistered model '
91+
f'{permission.content_type.app_label}.{permission.content_type.model} '
92+
f'is still referenced by role definitions: {role_names}'
93+
)
94+
else:
95+
logger.info(f'Deleting orphaned permission {permission.codename} for unregistered model {model_key}')
96+
orphaned_permissions.append(permission)
97+
98+
# Delete orphaned permissions
99+
deleted_count = 0
100+
if orphaned_permissions:
101+
deleted_count = len(orphaned_permissions)
102+
permission_cls.objects.filter(id__in=[p.id for p in orphaned_permissions]).delete()
103+
logger.info(f'Deleted {deleted_count} orphaned DABPermission objects for unregistered models')
104+
105+
return deleted_count
106+
107+
108+
def migrate_content_type(apps, schema_editor):
109+
"""
110+
Migrate content type references from Django ContentType to DABContentType.
111+
112+
This function handles the migration of content type references across all RBAC models
113+
from the old Django ContentType to the new DABContentType system.
114+
115+
Args:
116+
apps: Django apps registry (from migration context)
117+
schema_editor: Django schema editor (unused but required for migration signature)
118+
"""
119+
# Pre-check: Delete orphaned DABPermission objects before migration
120+
cleanup_orphaned_permissions(apps)
121+
122+
ct_cls = apps.get_model('dab_rbac', 'DABContentType')
123+
ct_cls.objects.clear_cache()
124+
125+
for model_name in ('dabpermission', 'objectrole', 'roledefinition', 'roleuserassignment', 'roleteamassignment'):
126+
cls = apps.get_model('dab_rbac', model_name)
127+
update_ct = 0
128+
for obj in cls.objects.all():
129+
old_ct = obj.content_type
130+
if old_ct:
131+
try:
132+
# NOTE: could give duplicate normally, but that is impossible in migration path
133+
obj.new_content_type = ct_cls.objects.get_by_natural_key(old_ct.app_label, old_ct.model)
134+
except Exception as e:
135+
raise RuntimeError(
136+
f"Failed to get new content type for a {model_name} pk={obj.pk}, obj={obj.__dict__}"
137+
) from e
138+
obj.save()
139+
update_ct += 1
140+
if update_ct:
141+
logger.info(f'Updated content_type reference to new model for {model_name} for {update_ct} entries')
142+
for model_name in ('roleevaluation', 'roleevaluationuuid'):
143+
cls = apps.get_model('dab_rbac', model_name)
144+
cls.objects.all().delete()
145+
146+
# DABPermission model had api_slug added in last migration
147+
# if records existed before this point, it needs to be filled in
148+
mod_ct = 0
149+
permission_cls = apps.get_model('dab_rbac', 'DABPermission')
150+
for permission in permission_cls.objects.all():
151+
permission.api_slug = f'{permission.new_content_type.service}.{permission.codename}'
152+
permission.save()
153+
mod_ct += 1
154+
if mod_ct:
155+
logger.info(f'Set new field DABPermission.api_slug for {mod_ct} existing permissions')
156+
157+
158+
def create_types_if_needed(apps, schema_editor):
159+
"""
160+
Create DABContentType entries if needed before migration.
161+
162+
Before we can migrate to the new DABContentType, entries in that table must be created.
163+
This method runs what is ordinarily the post_migrate logic, but in the migration case here.
164+
Only needed in the upgrade case, otherwise better to run at true post-migrate.
165+
166+
Args:
167+
apps: Django apps registry (from migration context)
168+
schema_editor: Django schema editor (unused but required for migration signature)
169+
"""
170+
permission_cls = apps.get_model('dab_rbac', 'DABPermission')
171+
rd_cls = apps.get_model('dab_rbac', 'RoleDefinition')
172+
if permission_cls.objects.exists() or rd_cls.objects.exists():
173+
logger.info('Running DABContentType creation script as part of 0005 migration')
174+
from ansible_base.rbac.management.create_types import create_DAB_contenttypes
175+
176+
create_DAB_contenttypes(apps=apps)

test_app/tests/rbac/test_migrations.py

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
import pytest
22
from django.apps import apps
33

4-
from ansible_base.rbac.migrations._utils import give_permissions
5-
from ansible_base.rbac.models import DABContentType, DABPermission, RoleTeamAssignment, RoleUserAssignment
4+
from ansible_base.rbac.migrations._utils import cleanup_orphaned_permissions, give_permissions
5+
from ansible_base.rbac.models import DABContentType, DABPermission, RoleDefinition, RoleTeamAssignment, RoleUserAssignment
66
from ansible_base.rbac.permission_registry import permission_registry
77
from test_app.models import Team, User
88

@@ -33,3 +33,53 @@ def test_give_permissions_by_id(organization, inventory, inv_rd):
3333
def test_permission_migration():
3434
"These are expected to be created via a post_migrate signal just like auth.Permission"
3535
assert len(DABPermission.objects.order_by('content_type').values_list('content_type').distinct()) == len(permission_registry.all_registered_models)
36+
37+
38+
@pytest.mark.django_db
39+
def test_cleanup_orphaned_permissions_deletes_unregistered_unreferenced():
40+
"""Test that cleanup_orphaned_permissions deletes permissions for unregistered models that are not referenced by any RoleDefinition.
41+
42+
Generated by Claude Sonnet 4 (claude-sonnet-4@20250514)
43+
"""
44+
# Create a DABContentType for a model not in the permission registry
45+
dab_fake_ct = DABContentType.objects.create(app_label='fake_app', model='fakemodel', service='local')
46+
47+
# Create a permission for this fake model (should be deleted)
48+
orphaned_permission = DABPermission.objects.create(name='Can view fake model', content_type=dab_fake_ct, codename='view_fakemodel')
49+
50+
initial_count = DABPermission.objects.count()
51+
52+
# Run cleanup
53+
deleted_count = cleanup_orphaned_permissions(apps)
54+
55+
# Verify the orphaned permission was deleted
56+
assert deleted_count == 1
57+
assert DABPermission.objects.count() == initial_count - 1
58+
assert not DABPermission.objects.filter(id=orphaned_permission.id).exists()
59+
60+
61+
@pytest.mark.django_db
62+
def test_cleanup_orphaned_permissions_preserves_referenced():
63+
"""Test that cleanup_orphaned_permissions preserves permissions for unregistered models that are referenced by RoleDefinitions.
64+
65+
Generated by Claude Sonnet 4 (claude-sonnet-4@20250514)
66+
"""
67+
# Create a DABContentType for a model not in the permission registry
68+
dab_fake_ct = DABContentType.objects.create(app_label='fake_app', model='fakemodel', service='local')
69+
70+
# Create a permission for this fake model
71+
referenced_permission = DABPermission.objects.create(name='Can view fake model', content_type=dab_fake_ct, codename='view_fakemodel')
72+
73+
# Create a RoleDefinition that references this permission
74+
role_def = RoleDefinition.objects.create(name='Fake Role', content_type=dab_fake_ct)
75+
role_def.permissions.add(referenced_permission)
76+
77+
initial_count = DABPermission.objects.count()
78+
79+
# Run cleanup
80+
deleted_count = cleanup_orphaned_permissions(apps)
81+
82+
# Verify the referenced permission was NOT deleted
83+
assert deleted_count == 0
84+
assert DABPermission.objects.count() == initial_count
85+
assert DABPermission.objects.filter(id=referenced_permission.id).exists()

0 commit comments

Comments
 (0)