Skip to content

Commit a873b62

Browse files
committed
Do some refactoring
1 parent fc4477d commit a873b62

File tree

3 files changed

+534
-52
lines changed

3 files changed

+534
-52
lines changed

ansible_base/rbac/claims.py

Lines changed: 274 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -1,114 +1,336 @@
11
import hashlib
22
import json
3-
from typing import Union
3+
from typing import Type, Union
44

55
from django.apps import apps
66
from django.conf import settings
7-
from django.db.models import F, Model, OuterRef
7+
from django.db.models import F, Model, OuterRef, QuerySet
88

99
from ansible_base.lib.utils.auth import get_organization_model, get_team_model
1010

1111
from .models.content_type import DABContentType
1212
from .models.role import RoleDefinition
1313

1414

15-
def get_user_object_roles(user: Model) -> list[tuple[str, str, int]]:
16-
"""Returns a list of tuples giving (role name, ansible_id, content_type_id)
15+
def _get_resource_model() -> Type[Model]:
16+
"""Get the Resource model class from the resource registry.
17+
18+
Returns:
19+
The Resource model class used for ansible_id lookups
20+
"""
21+
return apps.get_model('dab_resource_registry', 'Resource')
22+
23+
24+
def _build_resource_subquery() -> QuerySet:
25+
"""Build a subquery to retrieve ansible_id values for resources.
26+
27+
This subquery is used to annotate role assignments with their corresponding
28+
ansible_id values from the resource registry.
29+
30+
Returns:
31+
QuerySet that filters resources by object_id and content_type from outer query
32+
and returns ansible_id values
33+
"""
34+
resource_cls = _get_resource_model()
35+
return resource_cls.objects.filter(object_id=OuterRef('object_id'), content_type=OuterRef('content_type')).values('ansible_id')
36+
37+
38+
def _build_role_assignments_queryset(user: Model, resource_subquery: QuerySet) -> QuerySet:
39+
"""Build queryset of role assignments with ansible_id and role name annotations.
40+
41+
Args:
42+
user: The user whose role assignments to query
43+
resource_subquery: Subquery to get ansible_id values
1744
18-
This data is the role name joined with data about the resource"""
19-
resource_cls = apps.get_model('dab_resource_registry', 'Resource')
20-
resource_qs = resource_cls.objects.filter(object_id=OuterRef('object_id'), content_type=OuterRef('content_type')).values('ansible_id')
21-
assignment_qs = (
45+
Returns:
46+
QuerySet of role assignments filtered to:
47+
- Object-scoped assignments only (content_type is not null)
48+
- JWT-managed roles only (as defined in settings)
49+
Annotated with:
50+
- aid: ansible_id from the resource registry
51+
- rd_name: role definition name
52+
"""
53+
return (
2254
user.role_assignments.filter(content_type__isnull=False)
23-
.annotate(aid=resource_qs, rd_name=F('role_definition__name'))
55+
.annotate(aid=resource_subquery, rd_name=F('role_definition__name'))
2456
.filter(rd_name__in=settings.ANSIBLE_BASE_JWT_MANAGED_ROLES)
2557
)
26-
return [(ra.rd_name, str(ra.aid), ra.content_type_id) for ra in assignment_qs]
2758

2859

29-
def get_user_claims(user: Model) -> dict[str, Union[list[str], dict[str, Union[str, list[dict[str, str]]]]]]:
30-
claims = {'objects': {}, 'object_roles': {}, 'global_roles': []}
60+
def _format_role_assignment_results(assignment_queryset: QuerySet) -> list[tuple[str, str, int]]:
61+
"""Convert role assignment queryset to list of tuples.
3162
32-
org_cls = get_organization_model()
33-
team_cls = get_team_model()
63+
Args:
64+
assignment_queryset: QuerySet with aid and rd_name annotations
3465
35-
cached_objects_index = {} # Entries like { <content_model>: {<ansible_id>: <array index integer> } } used to resolve ansible_ids to array indexes
36-
cached_content_types = {} # Entries like { <content id integer>: <content_model> } used to resolve a content id to a model type
37-
for content_type in DABContentType.objects.all().values('id', 'model'):
38-
content_type_id = content_type['id']
39-
model = content_type['model']
40-
cached_content_types[content_type_id] = model
41-
cached_objects_index[model] = {}
66+
Returns:
67+
List of tuples containing (role_name, ansible_id, content_type_id)
68+
where:
69+
- role_name: Name of the role definition (e.g., "Organization Admin")
70+
- ansible_id: String representation of the resource's ansible_id
71+
- content_type_id: Integer ID of the content type
72+
"""
73+
return [(ra.rd_name, str(ra.aid), ra.content_type_id) for ra in assignment_queryset]
74+
75+
76+
def get_user_object_roles(user: Model) -> list[tuple[str, str, int]]:
77+
"""Get all object-scoped role assignments for a user with resource metadata.
78+
79+
This function retrieves role assignments that are scoped to specific objects
80+
(not global roles) and joins them with resource registry data to include
81+
ansible_id values. Only JWT-managed roles are included.
82+
83+
Args:
84+
user: Django user model instance
4285
43-
required_data = {} # Entries like { <content_model>: { <ansible_id>|<id>: <required_data> } } Note: required_data could have references to ansible_ids
86+
Returns:
87+
List of tuples containing (role_name, ansible_id, content_type_id):
88+
- role_name: Name of the role definition (e.g., "Organization Admin")
89+
- ansible_id: String representation of the resource's ansible_id
90+
- content_type_id: Integer ID of the content type for the assigned object
91+
92+
Example:
93+
[
94+
("Organization Admin", "uuid-123", 42),
95+
("Team Member", "uuid-456", 43)
96+
]
97+
"""
98+
resource_subquery = _build_resource_subquery()
99+
assignment_queryset = _build_role_assignments_queryset(user, resource_subquery)
100+
return _format_role_assignment_results(assignment_queryset)
101+
102+
103+
def _build_organization_data(org_cls: Type[Model], claims: dict, required_data: dict[str, dict]) -> str:
104+
"""Build organization data for claims processing.
105+
106+
Args:
107+
org_cls: Organization model class
108+
claims: Claims dictionary to populate
109+
required_data: Required data cache to populate
44110
45-
# Populate the required_data for orgs
111+
Returns:
112+
String representing the organization content type model name
113+
"""
46114
org_content_type_model = DABContentType.objects.get_for_model(org_cls).model
47115
required_data[org_content_type_model] = {}
116+
117+
# Populate required_data for organizations
48118
for org in org_cls.objects.all().values('id', 'name', 'resource__ansible_id'):
49119
org_id = org['id']
50120
name = org['name']
51121
ansible_id = str(org['resource__ansible_id'])
52-
required_data[org_content_type_model][org_id] = {'ansible_id': ansible_id, 'name': name}
53-
required_data[org_content_type_model][ansible_id] = required_data[org_content_type_model][org_id]
122+
org_data = {'ansible_id': ansible_id, 'name': name}
123+
124+
# Store by both id and ansible_id for flexible lookup
125+
required_data[org_content_type_model][org_id] = org_data
126+
required_data[org_content_type_model][ansible_id] = org_data
127+
54128
claims['objects'][org_content_type_model] = []
129+
return org_content_type_model
130+
55131

56-
# Populate the required_Data for teams
132+
def _build_team_data(team_cls: Type[Model], claims: dict, required_data: dict[str, dict]) -> str:
133+
"""Build team data for claims processing.
134+
135+
Args:
136+
team_cls: Team model class
137+
claims: Claims dictionary to populate
138+
required_data: Required data cache to populate
139+
140+
Returns:
141+
String representing the team content type model name
142+
"""
57143
team_content_type_model = DABContentType.objects.get_for_model(team_cls).model
58144
required_data[team_content_type_model] = {}
145+
146+
# Populate required_data for teams
59147
for team in team_cls.objects.all().values('id', 'name', 'resource__ansible_id', 'organization__resource__ansible_id'):
60148
team_id = team['id']
61149
team_name = team['name']
62150
ansible_id = str(team['resource__ansible_id'])
63151
related_org_ansible_id = str(team['organization__resource__ansible_id'])
64-
required_data[team_content_type_model][team_id] = {'ansible_id': ansible_id, 'name': team_name, 'org': related_org_ansible_id}
65-
required_data[team_content_type_model][ansible_id] = required_data[team_content_type_model][team_id]
152+
team_data = {'ansible_id': ansible_id, 'name': team_name, 'org': related_org_ansible_id}
153+
154+
# Store by both id and ansible_id for flexible lookup
155+
required_data[team_content_type_model][team_id] = team_data
156+
required_data[team_content_type_model][ansible_id] = team_data
157+
66158
claims['objects'][team_content_type_model] = []
159+
return team_content_type_model
160+
161+
162+
def _process_user_object_roles(
163+
user: Model,
164+
org_content_type_model: str,
165+
team_content_type_model: str,
166+
cached_objects_index: dict[str, dict],
167+
cached_content_types: dict[int, str],
168+
required_data: dict[str, dict],
169+
) -> tuple[dict[str, list], dict[str, dict[str, Union[str, list[int]]]]]:
170+
"""Process user's object-scoped role assignments and return objects and roles data.
171+
172+
Args:
173+
user: User model instance
174+
org_content_type_model: String name of organization content type model
175+
team_content_type_model: String name of team content type model
176+
cached_objects_index: Cache mapping content_model -> ansible_id -> array_index (will be modified)
177+
cached_content_types: Cache mapping content_type_id -> model_name
178+
required_data: Cache containing object data by content_model and ansible_id
179+
180+
Returns:
181+
Tuple containing:
182+
- objects_dict: Dictionary with content_model -> list of objects
183+
- object_roles: Dictionary mapping role names to role data
184+
185+
Example:
186+
(
187+
{'organization': [{'ansible_id': 'uuid1', 'name': 'Org1'}], 'team': []},
188+
{'Organization Admin': {'content_type': 'organization', 'objects': [0]}}
189+
)
190+
"""
191+
# Initialize objects dict with empty arrays
192+
objects_dict = {org_content_type_model: [], team_content_type_model: []}
67193

68-
# We will now scan Org and Team roles and get users memberships to them.
69194
user_object_roles = get_user_object_roles(user)
195+
object_roles = {}
196+
70197
for role_name, ansible_id, content_type_id in user_object_roles:
71198
# Get the model for this content_type
72199
content_model_type = cached_content_types[content_type_id]
73200

74201
# If the ansible_id is not in the cached_objects_index
75202
if ansible_id not in cached_objects_index[content_model_type]:
76-
# Cache the index the current len will be the next index when we append)
77-
cached_objects_index[content_model_type][ansible_id] = len(claims['objects'][content_model_type])
78-
# Add the object to the payloads objects
79-
claims['objects'][content_model_type].append(required_data[content_model_type][ansible_id])
203+
# Cache the index (current len will be the next index when we append)
204+
cached_objects_index[content_model_type][ansible_id] = len(objects_dict[content_model_type])
205+
# Add the object to the objects dict
206+
objects_dict[content_model_type].append(required_data[content_model_type][ansible_id])
80207

81-
# Get the index value we want from the cache
208+
# Get the index value from the cache
82209
object_index = cached_objects_index[content_model_type][ansible_id]
83210

84-
# If the role is not in the payload, insert it
85-
if role_name not in claims['object_roles']:
86-
claims['object_roles'][role_name] = {'content_type': content_model_type, 'objects': []}
211+
# If the role is not in object_roles, initialize it
212+
if role_name not in object_roles:
213+
object_roles[role_name] = {'content_type': content_model_type, 'objects': []}
214+
215+
# Add the object index to the role
216+
object_roles[role_name]['objects'].append(object_index)
217+
218+
return objects_dict, object_roles
219+
87220

88-
# The object is the object cache
89-
claims['object_roles'][role_name]['objects'].append(object_index)
221+
def _fix_team_organization_references(
222+
objects_dict: dict[str, list],
223+
team_content_type_model: str,
224+
org_content_type_model: str,
225+
cached_objects_index: dict[str, dict],
226+
required_data: dict[str, dict],
227+
) -> None:
228+
"""Convert team organization references from ansible_ids to array indexes.
90229
91-
# Now we are going to trim up any team references to organizations with the index instead of the ansible ID
92-
# i.e. we currently have entries like: payload['objects']['team'][0]['org'] = <ansible_id>
93-
# and we are going to convert that to: payload['objects']['team'][0]['org'] = 0
230+
Teams initially reference their organizations by ansible_id. This method
231+
converts those references to array indexes within the objects structure.
94232
95-
for team in claims['objects'][team_content_type_model]:
233+
Args:
234+
objects_dict: Dictionary with content_model -> list of objects (will be modified)
235+
team_content_type_model: String name of team content type model
236+
org_content_type_model: String name of organization content type model
237+
cached_objects_index: Cache mapping content_model -> ansible_id -> array_index (will be modified)
238+
required_data: Cache containing object data by content_model and ansible_id
239+
"""
240+
for team in objects_dict[team_content_type_model]:
96241
org_ansible_id = team['org']
242+
97243
if org_ansible_id in cached_objects_index[org_content_type_model]:
244+
# Organization is already in objects, use its index
98245
team['org'] = cached_objects_index[org_content_type_model][org_ansible_id]
99246
else:
100-
# The user is in a team related to an org but we didn't pull that org in yet
101-
# Cache the index of the org, which is the current len
102-
cached_objects_index[org_content_type_model][org_ansible_id] = len(claims['objects'][org_content_type_model])
247+
# Organization not yet in objects - add it
248+
org_index = len(objects_dict[org_content_type_model])
249+
cached_objects_index[org_content_type_model][org_ansible_id] = org_index
103250
org_data = required_data[org_content_type_model][org_ansible_id]
104-
team['org'] = len(claims['objects'][org_content_type_model])
105-
claims['objects'][org_content_type_model].append(org_data)
251+
team['org'] = org_index
252+
objects_dict[org_content_type_model].append(org_data)
253+
254+
255+
def _get_user_global_roles(user: Model) -> list[str]:
256+
"""Get user's global role assignments.
257+
258+
Args:
259+
user: User model instance
260+
261+
Returns:
262+
List of global role names assigned to the user
263+
264+
Example:
265+
['Platform Auditor', 'System Administrator']
266+
"""
267+
global_roles_query = RoleDefinition.objects.filter(content_type=None, user_assignments__user=user.pk, name__in=settings.ANSIBLE_BASE_JWT_MANAGED_ROLES)
268+
269+
return [role_definition.name for role_definition in global_roles_query]
270+
271+
272+
def get_user_claims(user: Model) -> dict[str, Union[list[str], dict[str, Union[str, list[dict[str, str]]]]]]:
273+
"""Generate comprehensive claims data for a user including roles and object access.
274+
275+
This function builds a complete picture of a user's permissions by gathering:
276+
- Global roles (system-wide permissions)
277+
- Object roles (permissions on specific resources)
278+
- Object metadata (organizations and teams the user has access to)
279+
280+
Args:
281+
user: Django user model instance
282+
283+
Returns:
284+
Dictionary containing:
285+
- objects: Nested dict with arrays of organization/team objects user has access to
286+
- object_roles: Dict mapping role names to content types and object indexes
287+
- global_roles: List of global role names assigned to the user
288+
289+
Example:
290+
{
291+
'objects': {
292+
'organization': [{'ansible_id': 'uuid1', 'name': 'Org1'}],
293+
'team': [{'ansible_id': 'uuid2', 'name': 'Team1', 'org': 0}]
294+
},
295+
'object_roles': {
296+
'Organization Admin': {'content_type': 'organization', 'objects': [0]}
297+
},
298+
'global_roles': ['Platform Auditor']
299+
}
300+
"""
301+
# Initialize caching dictionaries
302+
cached_objects_index = {} # { <content_model>: {<ansible_id>: <array index integer> } }
303+
cached_content_types = {} # { <content id integer>: <content_model> }
304+
required_data = {} # { <content_model>: { <ansible_id>|<id>: <required_data> } }
305+
306+
# Build content type caches
307+
for content_type in DABContentType.objects.all().values('id', 'model'):
308+
content_type_id = content_type['id']
309+
model = content_type['model']
310+
cached_content_types[content_type_id] = model
311+
cached_objects_index[model] = {}
312+
313+
# Get model classes
314+
org_cls = get_organization_model()
315+
team_cls = get_team_model()
316+
317+
# Build organization and team data caches
318+
org_content_type_model = _build_organization_data(org_cls, {'objects': {}}, required_data)
319+
team_content_type_model = _build_team_data(team_cls, {'objects': {}}, required_data)
320+
321+
# Process user's object role assignments
322+
objects_dict, object_roles = _process_user_object_roles(
323+
user, org_content_type_model, team_content_type_model, cached_objects_index, cached_content_types, required_data
324+
)
325+
326+
# Convert team organization references from ansible_ids to indexes
327+
_fix_team_organization_references(objects_dict, team_content_type_model, org_content_type_model, cached_objects_index, required_data)
106328

107-
# See if the user has any global roles
108-
for rd in RoleDefinition.objects.filter(content_type=None, user_assignments__user=user.pk, name__in=settings.ANSIBLE_BASE_JWT_MANAGED_ROLES):
109-
claims['global_roles'].append(rd.name)
329+
# Get global roles
330+
global_roles = _get_user_global_roles(user)
110331

111-
return claims
332+
# Build final claims structure
333+
return {'objects': objects_dict, 'object_roles': object_roles, 'global_roles': global_roles}
112334

113335

114336
def get_user_claims_hashable_form(claims: dict) -> dict[str, Union[list[str], dict[str, list[str]]]]:

0 commit comments

Comments
 (0)