|
3 | 3 | import logging
|
4 | 4 | import re
|
5 | 5 | from enum import Enum, auto
|
6 |
| -from typing import Any, List, Optional, Union |
| 6 | +from typing import Any, Iterable, List, Optional, Union |
7 | 7 | from uuid import uuid4
|
8 | 8 |
|
9 | 9 | from django.conf import settings
|
|
22 | 22 | from ansible_base.lib.utils.auth import get_organization_model, get_team_model
|
23 | 23 | from ansible_base.lib.utils.string import is_empty
|
24 | 24 | from ansible_base.rbac.models import DABContentType
|
| 25 | +from ansible_base.rbac.remote import get_local_resource_prefix |
25 | 26 |
|
26 | 27 | from .trigger_definition import TRIGGER_DEFINITION
|
27 | 28 |
|
@@ -894,22 +895,102 @@ def items(self):
|
894 | 895 | """
|
895 | 896 | return self.cache.items()
|
896 | 897 |
|
897 |
| - def cache_existing(self, role_assignments): |
898 |
| - """Caches given role_assignments associated with one user in form of dict (see method `items()`)""" |
| 898 | + def cache_existing(self, role_assignments: Iterable[models.Model]) -> None: |
| 899 | + """ |
| 900 | + Caches given role_assignments associated with one user in the internal cache dictionary. |
| 901 | +
|
| 902 | + This method processes role assignments and stores them in a nested dictionary structure |
| 903 | + for efficient lookup during permission reconciliation. |
| 904 | +
|
| 905 | + Args: |
| 906 | + role_assignments: An iterable of role assignment model instances (typically from |
| 907 | + user.role_assignments.all() QuerySet) that contain role_definition, |
| 908 | + content_type, content_object, and object_id attributes. |
| 909 | +
|
| 910 | + Cache Structure: |
| 911 | + The internal cache will be populated in the following format: |
| 912 | + { |
| 913 | + "System Auditor": { # role_name (str) |
| 914 | + None: { # content_type_id (None for system roles) |
| 915 | + None: { # object_id (None for system roles) |
| 916 | + 'object': None, # content_object (None for system roles) |
| 917 | + 'status': 'existing' # STATUS_EXISTING constant |
| 918 | + } |
| 919 | + } |
| 920 | + }, |
| 921 | + "Organization Admin": { # role_name (str) |
| 922 | + 15: { # content_type_id (int, e.g., Organization content type) |
| 923 | + 42: { # object_id (int, specific organization ID) |
| 924 | + 'object': <Organization>, # content_object (Organization instance) |
| 925 | + 'status': 'existing' # STATUS_EXISTING constant |
| 926 | + }, |
| 927 | + 43: { # object_id (int, another organization ID) |
| 928 | + 'object': <Organization>, # content_object (Organization instance) |
| 929 | + 'status': 'existing' # STATUS_EXISTING constant |
| 930 | + } |
| 931 | + } |
| 932 | + }, |
| 933 | + "Team Member": { # role_name (str) |
| 934 | + 16: { # content_type_id (int, e.g., Team content type) |
| 935 | + 7: { # object_id (int, specific team ID) |
| 936 | + 'object': <Team>, # content_object (Team instance) |
| 937 | + 'status': 'existing' # STATUS_EXISTING constant |
| 938 | + } |
| 939 | + } |
| 940 | + } |
| 941 | + } |
| 942 | +
|
| 943 | + Notes: |
| 944 | + - Caches both global/system roles and local object role assignments |
| 945 | + - Global/system roles have content_type_id=None and object_id=None |
| 946 | + - Local object roles are cached only if content_type.service is local or "shared" |
| 947 | + - Organization/Team roles have specific content_type_id and object_id values |
| 948 | + - All cached assignments are marked with STATUS_EXISTING status |
| 949 | + - Role definitions are also cached separately in self.role_definitions |
| 950 | + """ |
899 | 951 | for role_assignment in role_assignments:
|
900 | 952 | # Cache role definition
|
901 | 953 | if (role_definition := self._rd_by_id(role_assignment)) is None:
|
902 | 954 | role_definition = role_assignment.role_definition
|
903 | 955 | self.role_definitions[role_definition.name] = role_definition
|
904 | 956 |
|
905 |
| - # Cache Role User Assignment |
| 957 | + # Skip role assignments that should not be cached |
| 958 | + if not ( |
| 959 | + role_assignment.content_type is None # Global/system roles (e.g., System Auditor) |
| 960 | + or role_assignment.content_type.service in [get_local_resource_prefix(), "shared"] |
| 961 | + ): # Local object roles |
| 962 | + continue |
| 963 | + |
| 964 | + # Cache Role User Assignment - only initialize cache key for assignments we're actually caching |
906 | 965 | self._init_cache_key(role_definition.name, content_type_id=role_assignment.content_type_id)
|
907 | 966 |
|
908 |
| - # object_id is TEXT db type |
909 |
| - object_id = int(role_assignment.object_id) if role_assignment.object_id is not None else None |
910 |
| - obj = role_assignment.content_object if object_id else None |
| 967 | + # Cache the role assignment |
| 968 | + self._cache_role_assignment(role_definition, role_assignment) |
| 969 | + |
| 970 | + def _cache_role_assignment(self, role_definition: models.Model, role_assignment: models.Model) -> None: |
| 971 | + """ |
| 972 | + Cache a single role assignment. |
| 973 | +
|
| 974 | + Args: |
| 975 | + role_definition: The role definition associated with this assignment |
| 976 | + role_assignment: The role assignment to cache |
| 977 | + """ |
| 978 | + if role_assignment.content_type is None: |
| 979 | + # Global role - both object_id and content_object are None |
| 980 | + object_id = None |
| 981 | + obj = None |
| 982 | + else: |
| 983 | + # Object role - try to convert object_id to int |
| 984 | + try: |
| 985 | + object_id = int(role_assignment.object_id) if role_assignment.object_id is not None else None |
| 986 | + except (ValueError, TypeError): |
| 987 | + # Intended to catch any int casting errors, since we're assuming object_ids are text values cast-able to integers |
| 988 | + logger.exception(f'Unable to cache object_id {role_assignment.object_id}: Could not cast to type int') |
| 989 | + return # Skip this role assignment if we can't convert the object_id |
| 990 | + |
| 991 | + obj = role_assignment.content_object if object_id is not None else None |
911 | 992 |
|
912 |
| - self.cache[role_definition.name][role_assignment.content_type_id][object_id] = {'object': obj, 'status': self.STATUS_EXISTING} |
| 993 | + self.cache[role_definition.name][role_assignment.content_type_id][object_id] = {'object': obj, 'status': self.STATUS_EXISTING} |
913 | 994 |
|
914 | 995 | def rd_by_name(self, role_name: str) -> Optional[CommonModel]:
|
915 | 996 | """Returns RoleDefinition by its name. Caches it if requested for first time"""
|
|
0 commit comments