Skip to content

Commit 391664a

Browse files
committed
Start on moving claims to DAB
1 parent ed32cac commit 391664a

File tree

2 files changed

+188
-0
lines changed

2 files changed

+188
-0
lines changed

ansible_base/rbac/claims.py

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
from typing import Union
2+
3+
from django.apps import apps
4+
from django.conf import settings
5+
from django.db.models import F, Model, OuterRef
6+
7+
from ansible_base.lib.utils.auth import get_organization_model, get_team_model
8+
9+
from .models.content_type import DABContentType
10+
from .models.role import RoleDefinition
11+
12+
13+
def get_user_object_roles(user: Model) -> list[tuple[str, str, int]]:
14+
"""Returns a list of tuples giving (role name, ansible_id, content_type_id)
15+
16+
This data is the role name joined with data about the resource"""
17+
resource_cls = apps.get_model('dab_resource_registry', 'Resource')
18+
resource_qs = resource_cls.objects.filter(object_id=OuterRef('object_id'), content_type=OuterRef('content_type')).values('ansible_id')
19+
assignment_qs = (
20+
user.role_assignments.filter(content_type__isnull=False)
21+
.annotate(aid=resource_qs, rd_name=F('role_definition__name'))
22+
.filter(rd_name__in=settings.ANSIBLE_BASE_JWT_MANAGED_ROLES)
23+
)
24+
return [(ra.rd_name, str(ra.aid), ra.content_type_id) for ra in assignment_qs]
25+
26+
27+
def get_user_claims(user: Model) -> dict[str, Union[list[str], dict[str, Union[str, list[dict[str, str]]]]]]:
28+
claims = {'objects': {}, 'object_roles': {}, 'global_roles': []}
29+
30+
org_cls = get_organization_model()
31+
team_cls = get_team_model()
32+
33+
cached_objects_index = {} # Entries like { <content_model>: {<ansible_id>: <array index integer> } } used to resolve ansible_ids to array indexes
34+
cached_content_types = {} # Entries like { <content id integer>: <content_model> } used to resolve a content id to a model type
35+
for content_type in DABContentType.objects.all().values('id', 'model'):
36+
content_type_id = content_type['id']
37+
model = content_type['model']
38+
cached_content_types[content_type_id] = model
39+
cached_objects_index[model] = {}
40+
41+
required_data = {} # Entries like { <content_model>: { <ansible_id>|<id>: <required_data> } } Note: required_data could have references to ansible_ids
42+
43+
# Populate the required_data for orgs
44+
org_content_type_model = DABContentType.objects.get_for_model(org_cls).model
45+
required_data[org_content_type_model] = {}
46+
for org in org_cls.objects.all().values('id', 'name', 'resource__ansible_id'):
47+
org_id = org['id']
48+
name = org['name']
49+
ansible_id = str(org['resource__ansible_id'])
50+
required_data[org_content_type_model][org_id] = {'ansible_id': ansible_id, 'name': name}
51+
required_data[org_content_type_model][ansible_id] = required_data[org_content_type_model][org_id]
52+
claims['objects'][org_content_type_model] = []
53+
54+
# Populate the required_Data for teams
55+
team_content_type_model = DABContentType.objects.get_for_model(team_cls).model
56+
required_data[team_content_type_model] = {}
57+
for team in team_cls.objects.all().values('id', 'name', 'resource__ansible_id', 'organization__resource__ansible_id'):
58+
team_id = team['id']
59+
team_name = team['name']
60+
ansible_id = str(team['resource__ansible_id'])
61+
related_org_ansible_id = str(team['organization__resource__ansible_id'])
62+
required_data[team_content_type_model][team_id] = {'ansible_id': ansible_id, 'name': team_name, 'org': related_org_ansible_id}
63+
required_data[team_content_type_model][ansible_id] = required_data[team_content_type_model][team_id]
64+
claims['objects'][team_content_type_model] = []
65+
66+
# We will now scan Org and Team roles and get users memberships to them.
67+
user_object_roles = get_user_object_roles(user)
68+
for role_name, ansible_id, content_type_id in user_object_roles:
69+
# Get the model for this content_type
70+
content_model_type = cached_content_types[content_type_id]
71+
72+
# If the ansible_id is not in the cached_objects_index
73+
if ansible_id not in cached_objects_index[content_model_type]:
74+
# Cache the index the current len will be the next index when we append)
75+
cached_objects_index[content_model_type][ansible_id] = len(claims['objects'][content_model_type])
76+
# Add the object to the payloads objects
77+
claims['objects'][content_model_type].append(required_data[content_model_type][ansible_id])
78+
79+
# Get the index value we want from the cache
80+
object_index = cached_objects_index[content_model_type][ansible_id]
81+
82+
# If the role is not in the payload, insert it
83+
if role_name not in claims['object_roles']:
84+
claims['object_roles'][role_name] = {'content_type': content_model_type, 'objects': []}
85+
86+
# The object is the object cache
87+
claims['object_roles'][role_name]['objects'].append(object_index)
88+
89+
# Now we are going to trim up any team references to organizations with the index instead of the ansible ID
90+
# i.e. we currently have entries like: payload['objects']['team'][0]['org'] = <ansible_id>
91+
# and we are going to convert that to: payload['objects']['team'][0]['org'] = 0
92+
93+
for team in claims['objects'][team_content_type_model]:
94+
org_ansible_id = team['org']
95+
if org_ansible_id in cached_objects_index[org_content_type_model]:
96+
team['org'] = cached_objects_index[org_content_type_model][org_ansible_id]
97+
else:
98+
# The user is in a team related to an org but we didn't pull that org in yet
99+
# Cache the index of the org, which is the current len
100+
cached_objects_index[org_content_type_model][org_ansible_id] = len(claims['objects'][org_content_type_model])
101+
org_data = required_data[org_content_type_model][org_ansible_id]
102+
team['org'] = len(claims['objects'][org_content_type_model])
103+
claims['objects'][org_content_type_model].append(org_data)
104+
105+
# See if the user has any global roles
106+
for rd in RoleDefinition.objects.filter(content_type=None, user_assignments__user=user.pk, name__in=settings.ANSIBLE_BASE_JWT_MANAGED_ROLES):
107+
claims['global_roles'].append(rd.name)
108+
109+
return claims

test_app/tests/rbac/test_claims.py

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
import pytest
2+
from django.contrib.auth import get_user_model
3+
4+
from ansible_base.rbac import permission_registry
5+
from ansible_base.rbac.claims import get_user_claims
6+
from ansible_base.rbac.models import RoleDefinition
7+
from test_app.models import Inventory, Organization, Team
8+
9+
10+
@pytest.mark.django_db
11+
class TestUserClaims:
12+
def test_user_claims_comprehensive_permissions(self):
13+
"""Test get_user_claims with a wide array of permissions including org admin, team membership, and global auditor role"""
14+
# Create a test user
15+
User = get_user_model()
16+
user = User.objects.create(username='test_claims_user', email='[email protected]')
17+
18+
# Create test organization and team
19+
org = Organization.objects.create(name='Test Org')
20+
team = Team.objects.create(name='Test Team', organization=org)
21+
inventory = Inventory.objects.create(name='Test Inventory', organization=org)
22+
23+
# Get or create role definitions we need
24+
25+
# 1. Organization Admin role
26+
org_admin_rd = RoleDefinition.objects.create_from_permissions(
27+
permissions=['view_organization', 'change_organization', 'add_inventory', 'change_inventory', 'delete_inventory', 'view_inventory'],
28+
name='Organization Admin',
29+
content_type=permission_registry.content_type_model.objects.get_for_model(Organization),
30+
managed=True,
31+
)
32+
33+
# 2. Team Member role
34+
team_member_rd = RoleDefinition.objects.create_from_permissions(
35+
permissions=[permission_registry.team_permission, f'view_{permission_registry.team_model._meta.model_name}'],
36+
name='Team Member',
37+
content_type=permission_registry.content_type_model.objects.get_for_model(Team),
38+
managed=True,
39+
)
40+
41+
# 3. Platform Auditor role (global)
42+
platform_auditor_rd = RoleDefinition.objects.create_from_permissions(
43+
permissions=['view_organization', 'view_inventory', 'view_team'],
44+
name='Platform Auditor',
45+
content_type=None, # Global role
46+
managed=True,
47+
)
48+
49+
# 4. Direct inventory permission
50+
inventory_rd = RoleDefinition.objects.create_from_permissions(
51+
permissions=['change_inventory', 'view_inventory'],
52+
name='Inventory Editor',
53+
content_type=permission_registry.content_type_model.objects.get_for_model(Inventory),
54+
managed=True,
55+
)
56+
57+
# Assign permissions to user
58+
# Make user an admin of the organization
59+
org_admin_rd.give_permission(user, org)
60+
61+
# Make user a member of the team
62+
team_member_rd.give_permission(user, team)
63+
64+
# Give user direct inventory permissions
65+
inventory_rd.give_permission(user, inventory)
66+
67+
# Give user global Platform Auditor role
68+
platform_auditor_rd.give_global_permission(user)
69+
70+
# Call the function under test
71+
claims = get_user_claims(user)
72+
73+
# For now, assert that we get a dict structure
74+
# The user will fill in the expected structure later
75+
assert isinstance(claims, dict)
76+
77+
# Placeholder assertion - user will replace this with actual expected structure
78+
expected_claims = {}
79+
assert claims == expected_claims

0 commit comments

Comments
 (0)