|
| 1 | +# Generated by Claude Sonnet 4 (claude-sonnet-4@20250514) |
| 2 | +import logging |
| 3 | + |
1 | 4 | from django.db import models
|
2 | 5 |
|
| 6 | +logger = logging.getLogger(__name__) |
| 7 | + |
3 | 8 | # This method has moved, and this is put here temporarily to make branch management easier
|
4 | 9 | from ansible_base.rbac.management import create_dab_permissions as create_custom_permissions # noqa
|
5 | 10 |
|
@@ -45,3 +50,127 @@ def give_permissions(apps, rd, users=(), teams=(), object_id=None, content_type_
|
45 | 50 | for team_id in teams
|
46 | 51 | ]
|
47 | 52 | RoleTeamAssignment.objects.bulk_create(team_assignments, ignore_conflicts=True)
|
| 53 | + |
| 54 | + |
| 55 | +def cleanup_orphaned_permissions(apps): |
| 56 | + """ |
| 57 | + Delete orphaned DABPermission objects for models no longer in the permission registry. |
| 58 | +
|
| 59 | + This is used during migrations to clean up permissions for any previously-registered model |
| 60 | + that are no longer tracked by RBAC, but only if they are not referenced by any RoleDefinition. |
| 61 | +
|
| 62 | + Args: |
| 63 | + apps: Django apps registry (from migration context) |
| 64 | +
|
| 65 | + Returns: |
| 66 | + int: Number of permissions deleted |
| 67 | + """ |
| 68 | + # Get model classes from apps registry |
| 69 | + permission_cls = apps.get_model('dab_rbac', 'DABPermission') |
| 70 | + role_definition_cls = apps.get_model('dab_rbac', 'RoleDefinition') |
| 71 | + |
| 72 | + # Get permission registry to check which models are registered |
| 73 | + from ansible_base.rbac import permission_registry |
| 74 | + registered_model_keys = set() |
| 75 | + for model in permission_registry._registry: |
| 76 | + registered_model_keys.add((model._meta.app_label, model._meta.model_name)) |
| 77 | + |
| 78 | + # Find orphaned permissions |
| 79 | + orphaned_permissions = [] |
| 80 | + for permission in permission_cls.objects.all(): |
| 81 | + if permission.content_type: |
| 82 | + model_key = (permission.content_type.app_label, permission.content_type.model) |
| 83 | + if model_key not in registered_model_keys: |
| 84 | + # Check if this permission is referenced by any RoleDefinition |
| 85 | + referencing_roles = role_definition_cls.objects.filter(permissions=permission) |
| 86 | + if referencing_roles.exists(): |
| 87 | + # Log warning for unregistered model still referenced by role definitions |
| 88 | + role_names = list(referencing_roles.values_list('name', flat=True)) |
| 89 | + logger.warning( |
| 90 | + f'Permission {permission.codename} for unregistered model ' |
| 91 | + f'{permission.content_type.app_label}.{permission.content_type.model} ' |
| 92 | + f'is still referenced by role definitions: {role_names}' |
| 93 | + ) |
| 94 | + else: |
| 95 | + logger.info(f'Deleting orphaned permission {permission.codename} for unregistered model {model_key}') |
| 96 | + orphaned_permissions.append(permission) |
| 97 | + |
| 98 | + # Delete orphaned permissions |
| 99 | + deleted_count = 0 |
| 100 | + if orphaned_permissions: |
| 101 | + deleted_count = len(orphaned_permissions) |
| 102 | + permission_cls.objects.filter(id__in=[p.id for p in orphaned_permissions]).delete() |
| 103 | + logger.info(f'Deleted {deleted_count} orphaned DABPermission objects for unregistered models') |
| 104 | + |
| 105 | + return deleted_count |
| 106 | + |
| 107 | + |
| 108 | +def migrate_content_type(apps, schema_editor): |
| 109 | + """ |
| 110 | + Migrate content type references from Django ContentType to DABContentType. |
| 111 | +
|
| 112 | + This function handles the migration of content type references across all RBAC models |
| 113 | + from the old Django ContentType to the new DABContentType system. |
| 114 | +
|
| 115 | + Args: |
| 116 | + apps: Django apps registry (from migration context) |
| 117 | + schema_editor: Django schema editor (unused but required for migration signature) |
| 118 | + """ |
| 119 | + # Pre-check: Delete orphaned DABPermission objects before migration |
| 120 | + cleanup_orphaned_permissions(apps) |
| 121 | + |
| 122 | + ct_cls = apps.get_model('dab_rbac', 'DABContentType') |
| 123 | + ct_cls.objects.clear_cache() |
| 124 | + |
| 125 | + for model_name in ('dabpermission', 'objectrole', 'roledefinition', 'roleuserassignment', 'roleteamassignment'): |
| 126 | + cls = apps.get_model('dab_rbac', model_name) |
| 127 | + update_ct = 0 |
| 128 | + for obj in cls.objects.all(): |
| 129 | + old_ct = obj.content_type |
| 130 | + if old_ct: |
| 131 | + try: |
| 132 | + # NOTE: could give duplicate normally, but that is impossible in migration path |
| 133 | + obj.new_content_type = ct_cls.objects.get_by_natural_key(old_ct.app_label, old_ct.model) |
| 134 | + except Exception as e: |
| 135 | + raise RuntimeError( |
| 136 | + f"Failed to get new content type for a {model_name} pk={obj.pk}, obj={obj.__dict__}" |
| 137 | + ) from e |
| 138 | + obj.save() |
| 139 | + update_ct += 1 |
| 140 | + if update_ct: |
| 141 | + logger.info(f'Updated content_type reference to new model for {model_name} for {update_ct} entries') |
| 142 | + for model_name in ('roleevaluation', 'roleevaluationuuid'): |
| 143 | + cls = apps.get_model('dab_rbac', model_name) |
| 144 | + cls.objects.all().delete() |
| 145 | + |
| 146 | + # DABPermission model had api_slug added in last migration |
| 147 | + # if records existed before this point, it needs to be filled in |
| 148 | + mod_ct = 0 |
| 149 | + permission_cls = apps.get_model('dab_rbac', 'DABPermission') |
| 150 | + for permission in permission_cls.objects.all(): |
| 151 | + permission.api_slug = f'{permission.new_content_type.service}.{permission.codename}' |
| 152 | + permission.save() |
| 153 | + mod_ct += 1 |
| 154 | + if mod_ct: |
| 155 | + logger.info(f'Set new field DABPermission.api_slug for {mod_ct} existing permissions') |
| 156 | + |
| 157 | + |
| 158 | +def create_types_if_needed(apps, schema_editor): |
| 159 | + """ |
| 160 | + Create DABContentType entries if needed before migration. |
| 161 | +
|
| 162 | + Before we can migrate to the new DABContentType, entries in that table must be created. |
| 163 | + This method runs what is ordinarily the post_migrate logic, but in the migration case here. |
| 164 | + Only needed in the upgrade case, otherwise better to run at true post-migrate. |
| 165 | +
|
| 166 | + Args: |
| 167 | + apps: Django apps registry (from migration context) |
| 168 | + schema_editor: Django schema editor (unused but required for migration signature) |
| 169 | + """ |
| 170 | + permission_cls = apps.get_model('dab_rbac', 'DABPermission') |
| 171 | + rd_cls = apps.get_model('dab_rbac', 'RoleDefinition') |
| 172 | + if permission_cls.objects.exists() or rd_cls.objects.exists(): |
| 173 | + logger.info('Running DABContentType creation script as part of 0005 migration') |
| 174 | + from ansible_base.rbac.management.create_types import create_DAB_contenttypes |
| 175 | + |
| 176 | + create_DAB_contenttypes(apps=apps) |
0 commit comments