Skip to content

Commit d27dae9

Browse files
committed
Test and reorg for global remote roles
1 parent e7df91c commit d27dae9

File tree

2 files changed

+59
-21
lines changed

2 files changed

+59
-21
lines changed

ansible_base/rbac/validators.py

Lines changed: 43 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import inspect
21
import re
32
from collections import defaultdict
43
from typing import Optional, Type, Union
@@ -28,28 +27,17 @@ def prnt_codenames(codename_set: set[str]) -> str:
2827
return ', '.join(codename_set)
2928

3029

31-
def codenames_for_remote_cls(cls: Union[Type[RemoteObject], RemoteObject]) -> list[str]:
32-
"""For remote objects, we have to use the database to get its known permissions"""
33-
if inspect.isclass(cls):
34-
ct = cls.get_ct_from_type()
35-
else:
36-
ct = cls.content_type
30+
def codenames_for_content_type(ct: Model):
3731
return [permission.codename for permission in ct.dab_permissions.all()]
3832

3933

40-
def codenames_for_cls(cls: Union[Model, Type[Model], Type[RemoteObject], RemoteObject]) -> list[str]:
41-
"Helper method that gives the Django permission codenames for a given class"
42-
if (inspect.isclass(cls) and issubclass(cls, RemoteObject)) or isinstance(cls, RemoteObject):
43-
return codenames_for_remote_cls(cls)
44-
return [t[0] for t in cls._meta.permissions] + [f'{act}_{cls._meta.model_name}' for act in cls._meta.default_permissions]
45-
46-
4734
def permissions_allowed_for_system_role() -> dict[Type[Model], list[str]]:
4835
"Permission codenames useable in system-wide roles, which have content_type set to None"
4936
permissions_by_model = defaultdict(list)
50-
for cls in sorted(permission_registry.all_registered_models, key=lambda cls: cls._meta.model_name):
51-
is_team = bool(cls._meta.model_name == 'team')
52-
for codename in codenames_for_cls(cls):
37+
for ct in permission_registry.content_type_model.objects.all():
38+
is_team = bool(ct.model == 'team')
39+
cls = ct.model_class()
40+
for codename in codenames_for_content_type(ct):
5341
if is_team and (not codename.startswith('view')):
5442
continue # special exclusion of team object permissions from system-wide roles
5543
permissions_by_model[cls].append(codename)
@@ -169,9 +157,8 @@ def check_has_change_with_delete(codename_set: set[str], permissions_by_model: d
169157

170158

171159
def validate_permissions_for_model(permissions, content_type: Optional[Model], managed: bool = False) -> None:
172-
"""Validation for creating a RoleDefinition
160+
"""Validation for creating a RoleDefinition, called by serializer for public API
173161
174-
This is called by the RoleDefinitionSerializer so clients will get these errors.
175162
It is also called by manager helper methods like RoleDefinition.objects.create_from_permissions
176163
which is done as an aid to tests and other apps integrating this library.
177164
"""
@@ -221,7 +208,9 @@ def validate_codename_for_model(codename: str, model: Union[Model, Type[Model],
221208
assuming obj is an inventory.
222209
It also tries to protect the user by throwing an error if the permission does not work.
223210
"""
224-
valid_codenames = codenames_for_cls(model)
211+
# Calls to get_for_model should be efficient, if problem caller should call warm_cache
212+
model_ct = permission_registry.content_type_model.objects.get_for_model(model)
213+
valid_codenames = codenames_for_content_type(model_ct)
225214
if (not codename.startswith('add')) and codename in valid_codenames:
226215
return codename
227216
if re.match(r'^[a-z]+$', codename):
@@ -239,7 +228,8 @@ def validate_codename_for_model(codename: str, model: Union[Model, Type[Model],
239228
return name
240229

241230
for rel, child_cls in permission_registry.get_child_models(model):
242-
if name in codenames_for_cls(child_cls):
231+
child_ct = permission_registry.content_type_model.objects.get_for_model(model)
232+
if name in codenames_for_content_type(child_ct):
243233
return name
244234
raise RuntimeError(f'The permission {name} is not valid for model {model._meta.model_name}')
245235

@@ -300,3 +290,35 @@ def check_locally_managed(rd: Model) -> None:
300290
return
301291
if rd.name in settings.ANSIBLE_BASE_JWT_MANAGED_ROLES:
302292
raise ValidationError('Not managed locally, use the resource server instead')
293+
294+
295+
class LocalValidators:
296+
"""This keeps functioning validators that use model data from permission_registry as opposed to DB
297+
298+
These are only valid if you do not track permissions for remote models,
299+
but they can still be used in those cases.
300+
"""
301+
302+
@staticmethod
303+
def codenames_for_cls(cls: Union[Model, Type[Model]]) -> list[str]:
304+
"Helper method that gives the Django permission codenames for a given class"
305+
return [t[0] for t in cls._meta.permissions] + [f'{act}_{cls._meta.model_name}' for act in cls._meta.default_permissions]
306+
307+
@staticmethod
308+
def permissions_allowed_for_role(cls) -> dict[Type[Model], list[str]]:
309+
"Permission codenames valid for a RoleDefinition of given class, organized by permission class"
310+
if cls is None:
311+
return permissions_allowed_for_system_role()
312+
313+
if not permission_registry.is_registered(cls):
314+
raise ValidationError(f'Django-ansible-base RBAC does not track permissions for model {cls._meta.model_name}')
315+
316+
# Include direct model permissions (except for add permission)
317+
permissions_by_model = defaultdict(list)
318+
permissions_by_model[cls] = [codename for codename in LocalValidators.codenames_for_cls(cls) if not is_add_perm(codename)]
319+
320+
# Include model permissions for all child models, including the add permission
321+
for rel, child_cls in permission_registry.get_child_models(cls):
322+
permissions_by_model[child_cls] += LocalValidators.codenames_for_cls(child_cls)
323+
324+
return permissions_by_model

test_app/tests/rbac/remote/test_public_api_compat.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
from ansible_base.lib.utils.response import get_relative_url
66
from ansible_base.rbac.remote import RemoteObject
77

8+
# Role Definitions
9+
810

911
@pytest.mark.django_db
1012
def test_role_definition_list_remote_and_local(admin_api_client, inv_rd, foo_rd):
@@ -32,6 +34,20 @@ def test_create_remote_role_definition_for_remote(admin_api_client, foo_type, fo
3234
assert response.data['permissions'] == ['foo.foo_foo']
3335

3436

37+
@pytest.mark.django_db
38+
def test_create_remote_role_definition_global(admin_api_client, foo_type, foo_permission):
39+
"Test creation of a system-wide role definition for a remote model"
40+
url = get_relative_url("roledefinition-list")
41+
data = dict(name='foo-foo-foo-global', description='bar', permissions=[foo_permission.api_slug], content_type=None)
42+
response = admin_api_client.post(url, data=data, format="json")
43+
assert response.status_code == 201, response.data
44+
assert response.data['name'] == 'foo-foo-foo-global'
45+
assert response.data['permissions'] == ['foo.foo_foo']
46+
47+
48+
# Role User Assignments
49+
50+
3551
@pytest.mark.django_db
3652
def test_user_role_assignment_remote_and_local(admin_api_client, rando, foo_type, foo_rd):
3753
"Test that after assigning permission to remote objects the assignment list works."

0 commit comments

Comments
 (0)