diff --git a/ansible_base/authentication/migrations/0018_authenticatoruser_email.py b/ansible_base/authentication/migrations/0018_authenticatoruser_email.py new file mode 100644 index 000000000..ed1760012 --- /dev/null +++ b/ansible_base/authentication/migrations/0018_authenticatoruser_email.py @@ -0,0 +1,33 @@ +# Generated by Django 4.2.21 on 2025-07-09 12:02 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('dab_authentication', '0017_alter_authenticator_slug'), + ] + + operations = [ + migrations.AddField( + model_name='authenticatoruser', + name='email', + field=models.EmailField(blank=True, default=None, help_text='The e-mail associated with this authenticator user.', max_length=254, null=True), + ), + migrations.AlterField( + model_name='authenticatormap', + name='organization', + field=models.CharField(blank=True, default=None, help_text='An organization name this rule works on. Will expand {% for_attr_value(user_orgs) %} syntax', max_length=512, null=True), + ), + migrations.AlterField( + model_name='authenticatormap', + name='role', + field=models.CharField(blank=True, default=None, help_text='The role this map will grant the authenticating user to the targeted object. Will expand {% for_attr_value(user_orgs) %} syntax', max_length=512, null=True), + ), + migrations.AlterField( + model_name='authenticatormap', + name='team', + field=models.CharField(blank=True, default=None, help_text='A team name this rule works on. Will expand {% for_attr_value(user_orgs) %} syntax.', max_length=512, null=True), + ), + ] diff --git a/ansible_base/authentication/models/authenticator_map.py b/ansible_base/authentication/models/authenticator_map.py index 59cc1d04a..e9c2f6da3 100644 --- a/ansible_base/authentication/models/authenticator_map.py +++ b/ansible_base/authentication/models/authenticator_map.py @@ -55,7 +55,7 @@ class Meta: null=True, default=None, blank=True, - help_text=_("The role this map will grant the authenticating user to the targeted object."), + help_text=_("The role this map will grant the authenticating user to the targeted object. Will expand {% for_attr_value(user_orgs) %} syntax"), ) team = models.CharField( @@ -63,14 +63,14 @@ class Meta: null=True, default=None, blank=True, - help_text=_('A team name this rule works on.'), + help_text=_('A team name this rule works on. Will expand {% for_attr_value(user_orgs) %} syntax.'), ) organization = models.CharField( max_length=512, null=True, default=None, blank=True, - help_text=(_('An organization name this rule works on.')), + help_text=(_('An organization name this rule works on. Will expand {% for_attr_value(user_orgs) %} syntax')), ) triggers = models.JSONField( null=False, diff --git a/ansible_base/authentication/models/authenticator_user.py b/ansible_base/authentication/models/authenticator_user.py index 45e657122..17d53319b 100644 --- a/ansible_base/authentication/models/authenticator_user.py +++ b/ansible_base/authentication/models/authenticator_user.py @@ -42,6 +42,7 @@ class AuthenticatorUser(AbstractUserSocialAuth, AbstractCommonModel): on_delete=models.CASCADE, help_text=_("The local DB user related to this authenticator user."), ) + email = models.EmailField(default=None, null=True, blank=True, help_text=_("The e-mail associated with this authenticator user.")) # TODO: set self.authenticated based on the provider that is passed to this method. # the provider should be the name of the Authenticator model instance claims = models.JSONField( diff --git a/ansible_base/authentication/serializers/authenticator_map.py b/ansible_base/authentication/serializers/authenticator_map.py index 05e2835fc..81964c4fe 100644 --- a/ansible_base/authentication/serializers/authenticator_map.py +++ b/ansible_base/authentication/serializers/authenticator_map.py @@ -6,6 +6,7 @@ from rest_framework.serializers import ValidationError from ansible_base.authentication.models import AuthenticatorMap +from ansible_base.authentication.utils.authenticator_map import _EXPANSION_FIELDS, check_expansion_syntax, has_expansion from ansible_base.authentication.utils.trigger_definition import TRIGGER_DEFINITION from ansible_base.lib.serializers.common import NamedCommonModelSerializer from ansible_base.lib.utils.auth import get_organization_model, get_team_model @@ -40,6 +41,12 @@ def validate(self, data) -> dict: if role: errors.update(self.validate_role_data(map_type, role, org, team)) + for field in _EXPANSION_FIELDS: + if error_message := check_expansion_syntax(data.get(field, None)): + # Its really not possible to have two errors on the same time. + # Other errors indicate that things are missing so they would never get into here + errors[field] = error_message + if errors: raise ValidationError(errors) return data @@ -58,6 +65,10 @@ def validate_role_data(self, map_type, role, org, team): from ansible_base.rbac.models import RoleDefinition + # If this role is dynamically expanded we can't check it now, only at run time. + if has_expansion(role): + return errors + try: rbac_role = RoleDefinition.objects.get(name=role) is_system_role = rbac_role.content_type is None @@ -66,9 +77,8 @@ def validate_role_data(self, map_type, role, org, team): if is_system_role and map_type == 'role': return errors - if is_system_role: - is_org_role, is_team_role = False, False - else: + is_org_role, is_team_role = False, False + if not is_system_role: model_class = rbac_role.content_type.model_class() is_org_role = issubclass(model_class, get_organization_model()) is_team_role = issubclass(model_class, get_team_model()) diff --git a/ansible_base/authentication/utils/authenticator_map.py b/ansible_base/authentication/utils/authenticator_map.py new file mode 100644 index 000000000..556f0fb72 --- /dev/null +++ b/ansible_base/authentication/utils/authenticator_map.py @@ -0,0 +1,29 @@ +import re +from typing import Any, Optional + +from django.utils.translation import gettext_lazy as _ + +_EXPANSION_FIELDS = ['organization', 'role', 'team'] + + +def has_expansion(value: Optional[str]) -> bool: + """ + Checks the given value to see if it has the expansion syntax + """ + if not value: + return False + if re.search(r'{%.*%}', value): + return True + else: + return False + + +def check_expansion_syntax(value: Optional[str]) -> Optional[Any]: + """ + Check a given field to see if it contains the proper syntax for {% for_attr_value(user_orgs) %} + + Raises a ValidationError if its incorrect + """ + + if has_expansion(value) and not re.search(r'{%\s*for_attr_value\(.+\)\s*%}', value): + return _("Expansion only supports the format {% for_attr_value(attribute) %}") diff --git a/test_app/tests/authentication/serializers/test_authenticator_map.py b/test_app/tests/authentication/serializers/test_authenticator_map.py index defdd9b56..56fd638b1 100644 --- a/test_app/tests/authentication/serializers/test_authenticator_map.py +++ b/test_app/tests/authentication/serializers/test_authenticator_map.py @@ -126,3 +126,40 @@ def test_validate_role_organization_role(self, serializer, org_member_rd): ) except ValidationError as e: pytest.fail(f"Validation should pass, but: {str(e)}") + + +@pytest.mark.django_db +class TestAuthenticatorMapEscapeSequence: + @pytest.fixture(autouse=True) + def init_serializer(self, serializer): + serializer.validate_trigger_data = MagicMock(return_value={}) + serializer._is_rbac_installed = MagicMock(return_value=True) + + @pytest.mark.parametrize( + "role,organization,team", + [ + (TEAM_MEMBER_ROLE_NAME, "asdf", "1234"), + ("Team {% for_attr_value(a) %}", "asdf", "1234"), + (TEAM_MEMBER_ROLE_NAME, "Organization {% for_attr_value(member_of) %}", "1234"), + (TEAM_MEMBER_ROLE_NAME, "asdf", "{% for_attr_value(member_of) %} Team"), + ], + ) + def test_validate_expansion_fields(self, serializer, member_rd, role, organization, team): + try: + serializer.validate(dict(name="authentication_map_4", map_type="role", role=role, organization=organization, team=team)) + except ValidationError as e: + pytest.fail(f"Validation should pass, but: {str(e)}") + + @pytest.mark.parametrize( + "role,organization,team,failure_type", + [ + ("Team {% ) %}", "asdf", "1234", "role"), + (TEAM_MEMBER_ROLE_NAME, "Organization {% for_attr_value() %}", "1234", "organization"), + (TEAM_MEMBER_ROLE_NAME, "asdf", "{% (member_of) %} Team", "team"), + ], + ) + def test_validate_expansion_fields_negative(self, serializer, member_rd, role, organization, team, failure_type): + with pytest.raises(ValidationError) as e: + serializer.validate(dict(name="authentication_map_4", map_type="role", role=role, organization=organization, team=team)) + assert failure_type in str(e.value) + assert 'Expansion only supports' in str(e.value) diff --git a/test_app/tests/authentication/utils/test_authenticator_map.py b/test_app/tests/authentication/utils/test_authenticator_map.py new file mode 100644 index 000000000..8222f5d24 --- /dev/null +++ b/test_app/tests/authentication/utils/test_authenticator_map.py @@ -0,0 +1,43 @@ +import pytest + +from ansible_base.authentication.utils.authenticator_map import check_expansion_syntax, has_expansion + +# check_role_type is tested only though the serializer + + +@pytest.mark.parametrize( + "value,expected_result", + [ + ("a", False), + ("{% junkj}", False), + ("{%%}", True), + ("Pre-string {%%}", True), + ("Pre-string {%%} Post-STring", True), + ("{%%} Post-String", True), + ], +) +def test_has_expansion(value, expected_result): + assert has_expansion(value) == expected_result + + +@pytest.mark.parametrize( + "value,should_work", + [ + ("a", True), + ("Pre-string {%%}", False), + ("Pre-string {% junk %} Post-STring", False), + ("{%%} Post-String", False), + ("Pre-string {% for_attr_value(testing) %}", True), + ("Pre-string {% for_attr_value(testing2) %} Post-STring", True), + ("{% for_attr_value(a) %} Post-String", True), + ("{% for_attr_value() %} Post-String", False), + ("{% for_attr_value(f) %}", True), + ("{% i for_attr_value(12) %}", False), + ], +) +def test_check_expansion_syntax(value, should_work): + response = check_expansion_syntax(value) + if should_work: + assert response is None + else: + assert response is not None