Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Generated by Django 4.2.21 on 2025-07-09 12:02

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('dab_authentication', '0017_alter_authenticator_slug'),
]

operations = [
migrations.AddField(
model_name='authenticatoruser',
name='email',
field=models.EmailField(blank=True, default=None, help_text='The e-mail associated with this authenticator user.', max_length=254, null=True),
),
migrations.AlterField(
model_name='authenticatormap',
name='organization',
field=models.CharField(blank=True, default=None, help_text='An organization name this rule works on. Will expand {% for_attr_value(user_orgs) %} syntax', max_length=512, null=True),
),
migrations.AlterField(
model_name='authenticatormap',
name='role',
field=models.CharField(blank=True, default=None, help_text='The role this map will grant the authenticating user to the targeted object. Will expand {% for_attr_value(user_orgs) %} syntax', max_length=512, null=True),
),
migrations.AlterField(
model_name='authenticatormap',
name='team',
field=models.CharField(blank=True, default=None, help_text='A team name this rule works on. Will expand {% for_attr_value(user_orgs) %} syntax.', max_length=512, null=True),
),
]
6 changes: 3 additions & 3 deletions ansible_base/authentication/models/authenticator_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,22 +55,22 @@ class Meta:
null=True,
default=None,
blank=True,
help_text=_("The role this map will grant the authenticating user to the targeted object."),
help_text=_("The role this map will grant the authenticating user to the targeted object. Will expand {% for_attr_value(user_orgs) %} syntax"),
)

team = models.CharField(
max_length=512,
null=True,
default=None,
blank=True,
help_text=_('A team name this rule works on.'),
help_text=_('A team name this rule works on. Will expand {% for_attr_value(user_orgs) %} syntax.'),
)
organization = models.CharField(
max_length=512,
null=True,
default=None,
blank=True,
help_text=(_('An organization name this rule works on.')),
help_text=(_('An organization name this rule works on. Will expand {% for_attr_value(user_orgs) %} syntax')),
)
triggers = models.JSONField(
null=False,
Expand Down
1 change: 1 addition & 0 deletions ansible_base/authentication/models/authenticator_user.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class AuthenticatorUser(AbstractUserSocialAuth, AbstractCommonModel):
on_delete=models.CASCADE,
help_text=_("The local DB user related to this authenticator user."),
)
email = models.EmailField(default=None, null=True, blank=True, help_text=_("The e-mail associated with this authenticator user."))
# TODO: set self.authenticated based on the provider that is passed to this method.
# the provider should be the name of the Authenticator model instance
claims = models.JSONField(
Expand Down
16 changes: 13 additions & 3 deletions ansible_base/authentication/serializers/authenticator_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from rest_framework.serializers import ValidationError

from ansible_base.authentication.models import AuthenticatorMap
from ansible_base.authentication.utils.authenticator_map import _EXPANSION_FIELDS, check_expansion_syntax, has_expansion
from ansible_base.authentication.utils.trigger_definition import TRIGGER_DEFINITION
from ansible_base.lib.serializers.common import NamedCommonModelSerializer
from ansible_base.lib.utils.auth import get_organization_model, get_team_model
Expand Down Expand Up @@ -40,6 +41,12 @@ def validate(self, data) -> dict:
if role:
errors.update(self.validate_role_data(map_type, role, org, team))

for field in _EXPANSION_FIELDS:
if error_message := check_expansion_syntax(data.get(field, None)):
# Its really not possible to have two errors on the same time.
# Other errors indicate that things are missing so they would never get into here
errors[field] = error_message

if errors:
raise ValidationError(errors)
return data
Expand All @@ -58,6 +65,10 @@ def validate_role_data(self, map_type, role, org, team):

from ansible_base.rbac.models import RoleDefinition

# If this role is dynamically expanded we can't check it now, only at run time.
if has_expansion(role):
return errors

try:
rbac_role = RoleDefinition.objects.get(name=role)
is_system_role = rbac_role.content_type is None
Expand All @@ -66,9 +77,8 @@ def validate_role_data(self, map_type, role, org, team):
if is_system_role and map_type == 'role':
return errors

if is_system_role:
is_org_role, is_team_role = False, False
else:
is_org_role, is_team_role = False, False
if not is_system_role:
model_class = rbac_role.content_type.model_class()
is_org_role = issubclass(model_class, get_organization_model())
is_team_role = issubclass(model_class, get_team_model())
Expand Down
29 changes: 29 additions & 0 deletions ansible_base/authentication/utils/authenticator_map.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import re
from typing import Any, Optional

from django.utils.translation import gettext_lazy as _

_EXPANSION_FIELDS = ['organization', 'role', 'team']


def has_expansion(value: Optional[str]) -> bool:
"""
Checks the given value to see if it has the expansion syntax
"""
if not value:
return False
if re.search(r'{%.*%}', value):
return True
else:
return False


def check_expansion_syntax(value: Optional[str]) -> Optional[Any]:
"""
Check a given field to see if it contains the proper syntax for {% for_attr_value(user_orgs) %}

Raises a ValidationError if its incorrect
"""

if has_expansion(value) and not re.search(r'{%\s*for_attr_value\(.+\)\s*%}', value):
return _("Expansion only supports the format {% for_attr_value(attribute) %}")
Original file line number Diff line number Diff line change
Expand Up @@ -126,3 +126,40 @@ def test_validate_role_organization_role(self, serializer, org_member_rd):
)
except ValidationError as e:
pytest.fail(f"Validation should pass, but: {str(e)}")


@pytest.mark.django_db
class TestAuthenticatorMapEscapeSequence:
@pytest.fixture(autouse=True)
def init_serializer(self, serializer):
serializer.validate_trigger_data = MagicMock(return_value={})
serializer._is_rbac_installed = MagicMock(return_value=True)

@pytest.mark.parametrize(
"role,organization,team",
[
(TEAM_MEMBER_ROLE_NAME, "asdf", "1234"),
("Team {% for_attr_value(a) %}", "asdf", "1234"),
(TEAM_MEMBER_ROLE_NAME, "Organization {% for_attr_value(member_of) %}", "1234"),
(TEAM_MEMBER_ROLE_NAME, "asdf", "{% for_attr_value(member_of) %} Team"),
],
)
def test_validate_expansion_fields(self, serializer, member_rd, role, organization, team):
try:
serializer.validate(dict(name="authentication_map_4", map_type="role", role=role, organization=organization, team=team))
except ValidationError as e:
pytest.fail(f"Validation should pass, but: {str(e)}")

@pytest.mark.parametrize(
"role,organization,team,failure_type",
[
("Team {% ) %}", "asdf", "1234", "role"),
(TEAM_MEMBER_ROLE_NAME, "Organization {% for_attr_value() %}", "1234", "organization"),
(TEAM_MEMBER_ROLE_NAME, "asdf", "{% (member_of) %} Team", "team"),
],
)
def test_validate_expansion_fields_negative(self, serializer, member_rd, role, organization, team, failure_type):
with pytest.raises(ValidationError) as e:
serializer.validate(dict(name="authentication_map_4", map_type="role", role=role, organization=organization, team=team))
assert failure_type in str(e.value)
assert 'Expansion only supports' in str(e.value)
43 changes: 43 additions & 0 deletions test_app/tests/authentication/utils/test_authenticator_map.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import pytest

from ansible_base.authentication.utils.authenticator_map import check_expansion_syntax, has_expansion

# check_role_type is tested only though the serializer


@pytest.mark.parametrize(
"value,expected_result",
[
("a", False),
("{% junkj}", False),
("{%%}", True),
("Pre-string {%%}", True),
("Pre-string {%%} Post-STring", True),
("{%%} Post-String", True),
],
)
def test_has_expansion(value, expected_result):
assert has_expansion(value) == expected_result


@pytest.mark.parametrize(
"value,should_work",
[
("a", True),
("Pre-string {%%}", False),
("Pre-string {% junk %} Post-STring", False),
("{%%} Post-String", False),
("Pre-string {% for_attr_value(testing) %}", True),
("Pre-string {% for_attr_value(testing2) %} Post-STring", True),
("{% for_attr_value(a) %} Post-String", True),
("{% for_attr_value() %} Post-String", False),
("{% for_attr_value(f) %}", True),
("{% i for_attr_value(12) %}", False),
],
)
def test_check_expansion_syntax(value, should_work):
response = check_expansion_syntax(value)
if should_work:
assert response is None
else:
assert response is not None