Skip to content

Commit 81fe6f2

Browse files
AAP-38581 Allow ldap union in user search of ldap authenticator (#706)
## Description <!-- Mandatory: Provide a clear, concise description of the changes and their purpose --> - What is being changed? The LDAPSearch object now support a single LDAPSearch or a union of LDAPSearches. This brings the LDAP authenticator inline with the awx LDAP adapter. - Why is this change needed? This was missing functionality after migrating authenticators from AWX to DAB Authentication. - How does this change address the issue? The code will now look at the input from USER_SEARCH and GROUP_SEARCH and validate them differently. Before it would look for an array of 3 strings (and perform validation on the strings). It will now look for either this or for an array of these things. We extended the LDAPSearch object because the UI already knows how to deal with that kind of field. NOTE: UnionSearch is only available for the USER_SEARCH field. A error message will be raised if a user attempts to use it in the GROUP_SEARCH field. This is consistent with AWX. ## Type of Change <!-- Mandatory: Check one or more boxes that apply --> - [X] Bug fix (non-breaking change which fixes an issue) - [X] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to not work as expected) - [ ] Documentation update - [ ] Test update - [ ] Refactoring (no functional changes) - [ ] Development environment change - [ ] Configuration change ## Self-Review Checklist <!-- These items help ensure quality - they complement our automated CI checks --> - [X] I have performed a self-review of my code - [X] I have added relevant comments to complex code sections - [X] I have updated documentation where needed - [X] I have considered the security impact of these changes - [X] I have considered performance implications - [X] I have thought about error handling and edge cases - [X] I have tested the changes in my local environment ## Testing Instructions <!-- Optional for test-only changes. Mandatory for all other changes --> <!-- Must be detailed enough for reviewers to reproduce --> ### Prerequisites <!-- List any specific setup required --> ### Steps to Test 1. Create an LDAP authenticator and set the USER_SEARCH field to an array of searches. 2. Log in a user from either branch of the search. ### Expected Results <!-- Describe what should happen after following the steps --> ## Additional Context <!-- Optional but helpful information --> ### Required Actions <!-- Check if changes require work in other areas --> <!-- Remove section if no external actions needed --> - [X] Requires documentation updates <!-- API docs, feature docs, deployment guides --> - [ ] Requires downstream repository changes <!-- Specify repos: django-ansible-base, eda-server, etc. --> - [ ] Requires infrastructure/deployment changes <!-- CI/CD, installer updates, new services --> - [ ] Requires coordination with other teams <!-- UI team, platform services, infrastructure --> - [ ] Blocked by PR/MR: #XXX <!-- Reference blocking PRs/MRs with brief context --> ### Screenshots/Logs <!-- Add if relevant to demonstrate the changes -->
1 parent 50deeda commit 81fe6f2

File tree

2 files changed

+217
-46
lines changed
  • ansible_base/authentication/authenticator_plugins
  • test_app/tests/authentication/authenticator_plugins

2 files changed

+217
-46
lines changed

ansible_base/authentication/authenticator_plugins/ldap.py

Lines changed: 103 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from django_auth_ldap import config
1010
from django_auth_ldap.backend import LDAPBackend
1111
from django_auth_ldap.backend import LDAPSettings as BaseLDAPSettings
12-
from django_auth_ldap.config import LDAPGroupType
12+
from django_auth_ldap.config import LDAPGroupType, LDAPSearchUnion
1313
from rest_framework.serializers import ValidationError
1414

1515
from ansible_base.authentication.authenticator_plugins.base import AbstractAuthenticatorPlugin, Authenticator, BaseAuthenticatorConfiguration
@@ -20,6 +20,9 @@
2020

2121
logger = logging.getLogger('ansible_base.authentication.authenticator_plugins.ldap')
2222

23+
_MUST_BE_AN_ARRAY_MESSAGE = 'Must be an array of 3 items: search DN, search scope and a filter'
24+
_MUST_BE_AN_ARRAY_MESSAGE_TRANSLATED = _(_MUST_BE_AN_ARRAY_MESSAGE)
25+
2326

2427
user_search_string = '%(user)s'
2528

@@ -69,8 +72,71 @@ def validator(value):
6972

7073

7174
class LDAPSearchField(ListField):
75+
@staticmethod
76+
def is_single_search(value):
77+
try:
78+
# Note: this method was chosen as it was how AWX determined a single vs multi-field
79+
return len(value) == 3 and type(value[0]) is str
80+
except TypeError:
81+
return False
82+
83+
def validate_single_search(self, search_term):
84+
errors = {}
85+
86+
# If this thing doesn't look like we expect, return an error
87+
if not LDAPSearchField.is_single_search(search_term):
88+
return _MUST_BE_AN_ARRAY_MESSAGE_TRANSLATED
89+
90+
# Validate the first item is an ldap DN
91+
try:
92+
validate_ldap_dn(search_term[0], with_user=False, required=True)
93+
except ValidationError as e:
94+
errors[0] = e.args[0]
95+
96+
# Validate the second item is a search term
97+
if type(search_term[1]) is not str or not search_term[1].startswith('SCOPE_') or not getattr(ldap, search_term[1], None):
98+
errors[1] = _('Must be a string representing an LDAP scope object')
99+
100+
# Validate the third item is a filter
101+
try:
102+
validate_ldap_filter(search_term[2], with_user=self.search_must_have_user)
103+
except ValidationError as e:
104+
errors[2] = e.args[0]
105+
106+
# If we got errors return now because the next part will fail for sure
107+
if errors:
108+
return errors
109+
110+
# We made it all the way here, make sure we can instantiate an LDAPSearch object
111+
try:
112+
# Search fields should be LDAPSearch objects, so we need to convert them from [] to these objects
113+
config.LDAPSearch(search_term[0], getattr(ldap, search_term[1]), search_term[2])
114+
except Exception as e:
115+
errors = _('Failed to instantiate LDAPSearch object: %(e)s') % {"e": e}
116+
117+
return errors
118+
119+
def check_search_input(self, allow_union, value):
120+
#
121+
# Check if we have a single search instance and massage the value if needed
122+
#
123+
124+
# If we got 3 items and all 3 items are not lists than we have what looks like a single search
125+
single_search = LDAPSearchField.is_single_search(value)
126+
127+
# If we are not a union enabled field and got a union, raise
128+
if not self.allow_union and not single_search:
129+
raise ValidationError(_MUST_BE_AN_ARRAY_MESSAGE_TRANSLATED)
130+
131+
if single_search:
132+
# If we got a single search wrap it in array for common processing
133+
value = [value]
134+
135+
return single_search, value
136+
72137
def __init__(self, **kwargs):
73138
self.search_must_have_user = kwargs.pop('search_must_have_user', False)
139+
self.allow_union = kwargs.pop('allow_union', False)
74140
super().__init__(**kwargs)
75141

76142
def validator(value):
@@ -79,31 +145,27 @@ def validator(value):
79145

80146
errors = {}
81147

82-
if len(value) != 3:
83-
raise ValidationError(_('Must be an array of 3 items: search DN, search scope and a filter'))
148+
single_search, value = self.check_search_input(self.allow_union, value)
84149

85-
try:
86-
validate_ldap_dn(value[0], with_user=False, required=True)
87-
except ValidationError as e:
88-
errors[0] = e.args[0]
150+
# Loop over each of the items in the "array" and check it
151+
for index in range(len(value)):
152+
# Check to see if this ite has any errors in it
153+
local_errors = self.validate_single_search(value[index])
89154

90-
if type(value[1]) is not str or not value[1].startswith('SCOPE_') or not getattr(ldap, value[1], None):
91-
errors[1] = _('Must be a string representing an LDAP scope object')
155+
# If not, continue
156+
if local_errors == {}:
157+
continue
92158

93-
try:
94-
validate_ldap_filter(value[2], with_user=self.search_must_have_user)
95-
except ValidationError as e:
96-
errors[2] = e.args[0]
159+
errors[index] = local_errors
97160

161+
# If we had any errors in any of the validation, raise them up.
98162
if errors:
99-
raise ValidationError(errors)
100-
101-
# We made it all the way here, make sure we can instantiate an LDAPSearch object
102-
try:
103-
# Search fields should be LDAPSearch objects, so we need to convert them from [] to these objects
104-
config.LDAPSearch(value[0], getattr(ldap, value[1]), value[2])
105-
except Exception as e:
106-
raise ValidationError(_('Failed to instantiate LDAPSearch object: %(e)s') % {"e": e})
163+
if single_search:
164+
# If we are a single search we are just going to raise the element 0 items for clarity
165+
raise ValidationError(errors[0])
166+
else:
167+
# If we are a multi search return all errors
168+
raise ValidationError(errors)
107169

108170
self.validators.append(validator)
109171

@@ -214,6 +276,7 @@ class LDAPConfiguration(BaseAuthenticatorConfiguration):
214276
allow_null=True,
215277
required=False,
216278
search_must_have_user=False,
279+
allow_union=False,
217280
ui_field_label=_('LDAP Group Search'),
218281
)
219282
START_TLS = BooleanField(
@@ -259,6 +322,7 @@ class LDAPConfiguration(BaseAuthenticatorConfiguration):
259322
allow_null=True,
260323
required=False,
261324
search_must_have_user=True,
325+
allow_union=True,
262326
ui_field_label=_('LDAP User Search'),
263327
)
264328

@@ -372,19 +436,28 @@ def authenticate(self, request, username=None, password=None, **kwargs) -> (obje
372436
self.settings.CONNECTION_OPTIONS[ldap.OPT_X_TLS_NEWCTX] = 0
373437

374438
# Ensure USER_SEARCH and GROUP_SEARCH are converted into a search object
375-
for field, search_must_have_user in [('GROUP_SEARCH', False), ('USER_SEARCH', True)]:
439+
for field in ['GROUP_SEARCH', 'USER_SEARCH']:
376440
data = getattr(self.settings, field, None)
377441
# Ignore None or empty (e.g., [])
378442
if not data:
379443
setattr(self.settings, field, None)
380-
elif not isinstance(data, config.LDAPSearch):
381-
try:
382-
# Search fields should be LDAPSearch objects, so we need to convert them from [] to these objects
383-
search_object = config.LDAPSearch(data[0], getattr(ldap, data[1]), data[2])
384-
setattr(self.settings, field, search_object)
385-
except Exception as e:
386-
logger.error(f'Failed to instantiate {field} LDAPSearch object: {e}')
387-
return None
444+
elif not isinstance(data, config.LDAPSearch) and not isinstance(data, config.LDAPSearchUnion):
445+
# If we only got 3 strings we will be just a single search, stuff it into an array of a single search
446+
if len(data) == 3 and type(data[0]) is str and type(data[1]) is str and type(data[2]) is str:
447+
data = [data]
448+
searches = []
449+
for index in range(len(data)):
450+
try:
451+
# Search fields should be LDAPSearch objects, so we need to convert them from [] to these objects
452+
search_object = config.LDAPSearch(data[index][0], getattr(ldap, data[index][1]), data[index][2])
453+
searches.append(search_object)
454+
except Exception as e:
455+
logger.error(f'Failed to instantiate {field}[{index}] as LDAPSearch object: {e}')
456+
return None
457+
if len(searches) == 1:
458+
setattr(self.settings, field, searches[0])
459+
else:
460+
setattr(self.settings, field, LDAPSearchUnion(*searches))
388461

389462
try:
390463
user_from_ldap = super().authenticate(request, username, password)

0 commit comments

Comments
 (0)