Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion ansible_base/authentication/authenticator_plugins/ldap.py
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,13 @@ def authenticate(self, request, username=None, password=None, **kwargs) -> (obje
# In unit testing there were cases where the function we are in was being called before get_or_build_user.
# Its unclear if that was just a byproduct of mocking or a real scenario.
# Since this call is idempotent we are just going to call it again to ensure the AuthenticatorUser is created for update_user_claims
get_or_create_authenticator_user(username.lower(), self.database_instance, user_details={}, extra_data=user_from_ldap.ldap_user.attrs.data)
get_or_create_authenticator_user(
username.lower(),
user_from_ldap.email,
self.database_instance,
user_details={},
extra_data=user_from_ldap.ldap_user.attrs.data
)
return update_user_claims(user_from_ldap, self.database_instance, users_groups)
except Exception:
logger.exception(f"Encountered an error authenticating to LDAP {self.database_instance.name}")
Expand Down Expand Up @@ -520,6 +526,7 @@ def get_or_build_user(self, username, ldap_user):
"""
user, _authenticator_user, created = get_or_create_authenticator_user(
username.lower(),
ldap_user.attrs.data.get(ldap_user.settings.USER_ATTR_MAP['email'], ""),
self.database_instance,
user_details={
"username": username,
Expand Down
3 changes: 2 additions & 1 deletion ansible_base/authentication/authenticator_plugins/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def authenticate(self, request, username=None, password=None, **kwargs):
return None

# Determine the user name for this authenticator, we have to call this so that we can "attach" to a pre-created user
new_username = determine_username_from_uid(username, self.database_instance)
new_username = determine_username_from_uid(username, "", self.database_instance)
# However we can't really accept a different username because we are the local authenticator imageine if:
# User "a" is from another authenticator and has an AuthenticatorUser
# User "a" tried to login from local authenticator
Expand All @@ -59,6 +59,7 @@ def authenticate(self, request, username=None, password=None, **kwargs):
if user:
get_or_create_authenticator_user(
username,
user.email,
authenticator=self.database_instance,
user_details={},
extra_data={
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ def authenticate(self, request, username=None, password=None, **kwargs):

user, _authenticator_user, _is_created = get_or_create_authenticator_user(
username,
"",
authenticator=self.database_instance,
user_details={},
extra_data={"username": username},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ def authenticate(self, request, username=None, password=None, **kwargs):
if reply.valid:
user, _authenticator_user, _created = get_or_create_authenticator_user(
username,
"",
self.database_instance,
user_details={},
extra_data={'username': username},
Expand Down
94 changes: 68 additions & 26 deletions ansible_base/authentication/utils/authentication.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
from typing import Optional, Tuple
from typing import Optional, Tuple, Union

from django.conf import settings
from django.contrib.auth import get_user_model
Expand All @@ -16,6 +16,9 @@
logger = logging.getLogger('ansible_base.authentication.utils.authentication')


merge_strategy = "Email and Username fallback"


class FakeBackend:
def setting(self, *args, **kwargs):
return ["username", "email"]
Expand Down Expand Up @@ -125,18 +128,24 @@ def determine_username_from_uid_social(**kwargs) -> dict:
)

alt_uid = authenticator.get_alternative_uid(**kwargs)
email = kwargs.get('details', {}).get('email', None)

if migrated_username := migrate_from_existing_authenticator(
uid=kwargs.get("uid"), alt_uid=alt_uid, authenticator=authenticator.database_instance, preferred_username=selected_username
):
username = migrated_username
else:
username = determine_username_from_uid(selected_username, authenticator.database_instance)
username = determine_username_from_uid(selected_username, email, authenticator.database_instance)

return {"username": username}


def determine_username_from_uid(uid: str = None, authenticator: Authenticator = None, alt_uid: Optional[str] = None) -> str:
def determine_username_from_uid(
uid: str = None,
email: Union[str, list[str], None] = None,
authenticator: Authenticator = None,
alt_uid: Optional[str] = None
) -> str:
"""
Determine what the username for the User object will be from the given uid and authenticator
This will take uid like "bella" and search for an AuthenticatorUser and return:
Expand Down Expand Up @@ -164,23 +173,53 @@ def determine_username_from_uid(uid: str = None, authenticator: Authenticator =
return new_username

# We didn't get an exact match. If any other provider is using this uid our id will be uid<hash>
if AuthenticatorUser.objects.filter(Q(uid=uid) | Q(user__username=uid)).count() != 0:
# Some other provider is providing this username so we need to create our own username
new_username = get_local_username({'username': uid})
logger.info(
f'Authenticator {authenticator.name} wants to authenticate {uid} but that'
f' username is already in use by another authenticator,'
f' the user from this authenticator will be {new_username}'
)
return new_username

# We didn't have an exact match but no other provider is servicing this uid so lets return that for usage
logger.info(f"Authenticator {authenticator.name} is able to authenticate user {uid} as {uid}")
return uid
logger.debug(f"Authentication merge strategy is {merge_strategy}")
if merge_strategy == "":
if AuthenticatorUser.objects.filter(Q(uid=uid) | Q(user__username=uid)).count() != 0:
# Some other provider is providing this username so we need to create our own username
new_username = get_local_username({'username': uid})
logger.info(
f'Authenticator {authenticator.name} wants to authenticate {uid} but that'
f' username is already in use by another authenticator,'
f' the user from this authenticator will be {new_username}'
)
return new_username
else:
# We didn't have an exact match but no other provider is servicing this uid so lets return that for usage
logger.info(f"Authenticator {authenticator.name} is able to authenticate user {uid} as {uid}")
return uid
elif merge_strategy == "Email and Username fallback":
if email is None or email == "" or email == []:
existing_user_match = Authenticator.objects.get(user__username=uid)
else:
if type(email) is list:
email = email[0]
if type(email) is not str:
raise AuthException(f"The passed email was not a string, it was a {type(email)}")
existing_user_match = Authenticator.objects.get(user__email=email)

if existing_user_match.count() == 1:
existing_user = existing_user_match[0].user
new_username = existing_user.username
logger.info(f"Authenticator {authenticator.name} matched {uid}/{email} from provider {existing_user_match[0].provider.name}, combining users")
return new_username
elif existing_user_match.count() > 1:
raise AuthException(f"Found more than 1 user matching {uid}/{email}, unable to determine which user to merge with!")
else:
# Some other provider is providing this username so we need to create our own username
new_username = get_local_username({'username': uid})
logger.info(
f'Authenticator {authenticator.name} wants to authenticate {uid}/{email} but that'
f' username is already in use by another authenticator,'
f' the user from this authenticator will be {new_username}'
)
return new_username
else:
raise AuthException(f"Got an invalid merge_strategy {merge_strategy}")


def get_or_create_authenticator_user(
uid: str, authenticator: Authenticator, user_details: dict = dict, extra_data: dict = dict
uid: str, email: str, authenticator: Authenticator, user_details: dict = dict, extra_data: dict = dict
) -> Tuple[Optional[AbstractUser], Optional[AuthenticatorUser], Optional[bool]]:
"""
Create the user object in the database along with it's associated AuthenticatorUser class.
Expand All @@ -203,7 +242,7 @@ def get_or_create_authenticator_user(
if migrated_username := migrate_from_existing_authenticator(uid=uid, alt_uid=None, authenticator=authenticator, preferred_username=uid):
username = migrated_username
else:
username = determine_username_from_uid(uid, authenticator)
username = determine_username_from_uid(uid, email, authenticator)

created = None
try:
Expand All @@ -213,14 +252,17 @@ def get_or_create_authenticator_user(
auth_user.save()
created = False
except AuthenticatorUser.DoesNotExist:
# Ensure that this username is not already tied to another authenticator
auth_user = AuthenticatorUser.objects.filter(user__username=username).first()
if auth_user is not None:
logger.error(
f'Authenticator {authenticator.name} attempted to create an AuthenticatorUser for {username}'
f' but that id is already tied to authenticator {auth_user.provider.name}'
)
return None, None, None
if merge_strategy is None:
# Ensure that this username is not already tied to another authenticator
auth_user = AuthenticatorUser.objects.filter(user__username=username).first()
if auth_user is not None:
logger.error(
f'Authenticator {authenticator.name} attempted to create an AuthenticatorUser for {username}'
f' but that id is already tied to authenticator {auth_user.provider.name}'
)
return None, None, None
else:
pass

# ensure the authenticator isn't trying to pass along a cheeky is_superuser in user_details
details = {k: user_details.get(k, "") for k in ["first_name", "last_name", "email"]}
Expand Down
Loading