diff --git a/ansible_base/authentication/authenticator_plugins/oidc.py b/ansible_base/authentication/authenticator_plugins/oidc.py index 890720141..80a46bae5 100644 --- a/ansible_base/authentication/authenticator_plugins/oidc.py +++ b/ansible_base/authentication/authenticator_plugins/oidc.py @@ -1,6 +1,7 @@ import logging import jwt +from django.conf import settings from django.core.exceptions import ValidationError from django.core.validators import MinValueValidator from django.utils.translation import gettext_lazy as _ @@ -123,7 +124,11 @@ class OpenIdConnectConfiguration(BaseAuthenticatorConfiguration): ) JWT_ALGORITHMS = ListField( - help_text=_("The algorithm(s) for decoding JWT responses from the IDP."), + help_text=_( + "The algorithm(s) for decoding JWT responses from the IDP. " + "Leave blank to extract from the .well-known configuration (if that fails we will attempt the default algorithms). " + "Set to ['none'] to not use encrypted tokens (the provider must send unencrypted tokens for this to work)" + ), default=None, allow_null=True, validators=[JWTAlgorithmListFieldValidator()], @@ -263,6 +268,41 @@ def public_key(self): ) return None + def setting(self, name, default=None): + if name == "JWT_ALGORITHMS": + existing_setting = self.strategy.setting(name, backend=self) + return self._get_jwt_algorithms(existing_setting) + return self.strategy.setting(name, backend=self) + + def _get_jwt_algorithms(self, existing_setting) -> list[str]: + """ + Get the JWT algorithms to pass to the decode + """ + algorithms = [] + if existing_setting: + # If the admin specified the algorithms then use them + algorithms = existing_setting + else: + # Try to get the algorithms from the .well-known/openid-configuration + try: + logger.debug("Attempting to get the JWT algorithms from the .well-known/openid-configuration") + config = self.oidc_config() + idp_algorithms = config.get("id_token_encryption_alg_values_supported") + if idp_algorithms: + logger.debug(f"JWT algorithms supported by the IDP: {idp_algorithms}") + algorithms = idp_algorithms + else: + raise Exception("No algorithms found in OIDC config") + except Exception as e: + # In controller 2.4 we did not have the ability to set the JWT algorithms so we will use the default algorithms + # We can't load the setting here because it was already loaded in the strategy + lib_defaults = OpenIdConnectAuth.JWT_ALGORITHMS + logger.error(f"Unable to get JWT algorithms from the .well-known/openid-configuration, defaulting to {lib_defaults}: {e}") + algorithms = lib_defaults + # There is a properly on self that we can also set that is used in some places + self.JWT_ALGORITHMS = algorithms + return algorithms + def user_data(self, access_token, *args, **kwargs): """ This function overrides the one in social auth class OpenIdConnectAuth, since @@ -275,20 +315,38 @@ def user_data(self, access_token, *args, **kwargs): user_data = self.request(self.userinfo_url(), headers={"Authorization": f"Bearer {access_token}"}) if user_data.headers["Content-Type"] == "application/jwt": # If the content type is application/jwt than we can assume that the token is encrypted. Otherwise it should be application/json + pubkey = self.public_key() - if not pubkey: - logger.error(_("OIDC client sent encrypted user info response, but no public key found.")) + if not pubkey and self.JWT_ALGORITHMS != ['none']: + logger.warning("OIDC client sent encrypted user info response, but no public key found.") return None + + + + + # TODO: Remove this before merging!!! + # Adding a bunch of white space to also invoke the linter error here + options = {'verify_iat': False} + + + + + + + if self.setting('JWT_ALGORITHMS') == ['none']: + logger.info("JWT decryption algorithm is set to ['none'], will proceed but this is insecure") + options['verify_signature'] = False try: data = jwt.decode( access_token, key=pubkey, - algorithms=self.setting("JWT_ALGORITHMS"), + algorithms=self.setting('JWT_ALGORITHMS'), audience=self.setting("KEY"), + options=options, ) return data except PyJWTError as e: - logger.error(_(f"Unable to decode user info response JWT: {e}")) + logger.error(f"Unable to decode user info response JWT: {e}") return None return user_data.json() diff --git a/test_app/tests/authentication/authenticator_plugins/test_oidc.py b/test_app/tests/authentication/authenticator_plugins/test_oidc.py index dbea5c080..a50d9fb2d 100644 --- a/test_app/tests/authentication/authenticator_plugins/test_oidc.py +++ b/test_app/tests/authentication/authenticator_plugins/test_oidc.py @@ -1,3 +1,4 @@ +import contextlib import json from unittest import mock @@ -119,21 +120,101 @@ def test_oidc_endpoint_url_validation( assert response.json()['configuration']['OIDC_ENDPOINT'] == endpoint_url +@mock.patch("ansible_base.authentication.authenticator_plugins.oidc.get_setting") @mock.patch("social_core.backends.oauth.BaseOAuth2.extra_data") -def test_extra_data(mockedsuper): +def test_extra_data(mock_super, mock_get_setting): + """Test that extra_data sets permissions directly on social.extra_data and calls parent.""" + # Setup + mock_get_setting.return_value = "is_system_auditor" + mock_super.return_value = {"some_data": "value"} + + ap = AuthenticatorPlugin() + + class SocialUser: + def __init__(self): + self.extra_data = {} + + response = { + "is_superuser": True, + "is_system_auditor": False, + "Group": ["mygroup"], + "other_field": "ignored" + } + social = SocialUser() + + # Execute + result = ap.extra_data(None, None, response=response, social=social) + + # Verify parent method was called (with positional args) + mock_super.assert_called_once_with(None, None, response, social=social) + + # Verify permissions were set directly on social.extra_data + assert social.extra_data["is_superuser"] is True + assert social.extra_data["is_system_auditor"] is False + + # Verify other fields are not set on social.extra_data + assert "Group" not in social.extra_data + assert "other_field" not in social.extra_data + + # Verify result is what parent returned + assert result == {"some_data": "value"} + + +@mock.patch("ansible_base.authentication.authenticator_plugins.oidc.get_setting") +@mock.patch("social_core.backends.oauth.BaseOAuth2.extra_data") +def test_extra_data_missing_permissions(mock_super, mock_get_setting): + """Test extra_data when permission fields are missing from response.""" + # Setup + mock_get_setting.return_value = "is_system_auditor" + mock_super.return_value = {"some_data": "value"} + ap = AuthenticatorPlugin() class SocialUser: def __init__(self): self.extra_data = {} - rDict = {} - rDict["is_superuser"] = "True" - rDict["Group"] = ["mygroup"] + response = {"other_field": "value"} # No permission fields social = SocialUser() - ap.extra_data(None, None, response=rDict, social=social) - assert mockedsuper.called - assert "is_superuser" in social.extra_data + + # Execute + ap.extra_data(None, None, response=response, social=social) + + # Verify no permissions were set + assert "is_superuser" not in social.extra_data + assert "is_system_auditor" not in social.extra_data + + +@mock.patch("ansible_base.authentication.authenticator_plugins.oidc.get_setting") +@mock.patch("social_core.backends.oauth.BaseOAuth2.extra_data") +def test_extra_data_custom_auditor_flag(mock_super, mock_get_setting): + """Test extra_data with custom auditor flag from settings.""" + # Setup + custom_flag = "custom_auditor_field" + mock_get_setting.return_value = custom_flag + mock_super.return_value = {"some_data": "value"} + + ap = AuthenticatorPlugin() + + class SocialUser: + def __init__(self): + self.extra_data = {} + + response = { + "is_superuser": False, + custom_flag: True, + } + social = SocialUser() + + # Execute + ap.extra_data(None, None, response=response, social=social) + + # Verify get_setting was called + mock_get_setting.assert_called_once_with('ANSIBLE_BASE_SOCIAL_AUDITOR_FLAG') + + # Verify permissions were set with custom flag + assert social.extra_data["is_superuser"] is False + assert social.extra_data[custom_flag] is True @mock.patch("social_core.backends.base.BaseAuth.setting") @@ -165,11 +246,300 @@ def json(self): mr.encrypted(True) mockedrequest.return_value = mr mockeddecode.return_value = mr.json() - data = ap.user_data("token") - - mockeddecode.assert_called_once_with('token', key='-----BEGIN PUBLIC KEY-----\nVALUE\n-----END PUBLIC KEY-----', algorithms='VALUE', audience='VALUE') - assert "key" in data - - # Decode failure - mockeddecode.side_effect = PyJWTError() - assert ap.user_data("token") is None + # Mock _get_jwt_algorithms to return a list as expected + with mock.patch.object(ap, '_get_jwt_algorithms') as mock_get_algs: + mock_get_algs.return_value = ['VALUE'] + data = ap.user_data("token") + + mockeddecode.assert_called_once_with( + 'token', + key='-----BEGIN PUBLIC KEY-----\nVALUE\n-----END PUBLIC KEY-----', + algorithms=['VALUE'], + audience='VALUE', + options={} + ) + assert "key" in data + + # Decode failure + mockeddecode.side_effect = PyJWTError() + assert ap.user_data("token") is None + + +class TestGetJwtAlgorithms: + """Test the _get_jwt_algorithms method.""" + + @pytest.mark.parametrize( + "scenario,admin_setting,oidc_config_data,oidc_exception,django_setting,library_default," + "expected_result,expected_debug_calls,expected_error_call", + [ + # Admin setting takes priority + ( + "admin_setting_provided", + ["RS256", "HS256"], # admin_setting + None, # oidc_config_data (not used) + None, # oidc_exception (not used) + None, # django_setting (not used) + None, # library_default (not used) + ["RS256", "HS256"], # expected_result + [], # expected_debug_calls (no debug calls when admin setting exists) + None # expected_error_call + ), + # OIDC config success + ( + "oidc_config_success", + None, # admin_setting + {"id_token_encryption_alg_values_supported": ["RS256", "ES256"]}, # oidc_config_data + None, # oidc_exception + None, # django_setting (not used) + None, # library_default (not used) + ["RS256", "ES256"], # expected_result + [ + "Attempting to get the JWT algorithms from the .well-known/openid-configuration", + "JWT algorithms supported by the IDP: ['RS256', 'ES256']" + ], # expected_debug_calls + None # expected_error_call + ), + # OIDC config has no algorithms - falls back to Django settings + ( + "oidc_no_algorithms_django_fallback", + None, # admin_setting + {"id_token_encryption_alg_values_supported": None}, # oidc_config_data + None, # oidc_exception + ["HS256", "RS256"], # django_setting + None, # library_default (not used) + ["HS256", "RS256"], # expected_result + ["Attempting to get the JWT algorithms from the .well-known/openid-configuration"], # expected_debug_calls + "No algorithms found in OIDC config" # expected_error_call + ), + # OIDC config has no algorithms - falls back to library defaults + ( + "oidc_no_algorithms_library_fallback", + None, # admin_setting + {"id_token_encryption_alg_values_supported": None}, # oidc_config_data + None, # oidc_exception + None, # django_setting (not available) + ["ES256"], # library_default + ["ES256"], # expected_result + ["Attempting to get the JWT algorithms from the .well-known/openid-configuration"], # expected_debug_calls + "No algorithms found in OIDC config" # expected_error_call + ), + # OIDC config raises network exception + ( + "oidc_config_exception", + None, # admin_setting + None, # oidc_config_data (not used due to exception) + Exception("Network error"), # oidc_exception + ["RS256", "HS256"], # django_setting + None, # library_default (not used) + ["RS256", "HS256"], # expected_result + ["Attempting to get the JWT algorithms from the .well-known/openid-configuration"], # expected_debug_calls + "Network error" # expected_error_call + ), + ] + ) + @mock.patch("ansible_base.authentication.authenticator_plugins.oidc.logger") + @mock.patch("social_core.backends.base.BaseAuth.setting") + def test_get_jwt_algorithms_scenarios( + self, mock_setting, mock_logger, expected_log, scenario, admin_setting, oidc_config_data, + oidc_exception, django_setting, library_default, expected_result, + expected_debug_calls, expected_error_call + ): + """Test _get_jwt_algorithms method across different scenarios.""" + # Setup + mock_setting.return_value = admin_setting + ap = AuthenticatorPlugin() + + # Setup context managers for mocking + context_managers = [] + # Mock OIDC config if needed + if admin_setting is None: # Only mock OIDC config if no admin setting + oidc_config_mock = mock.patch.object(ap, 'oidc_config') + context_managers.append(oidc_config_mock) + # Mock Django settings if specified + if django_setting is not None: + django_settings_mock = mock.patch("django.conf.settings.JWT_ALGORITHMS", django_setting, create=True) + context_managers.append(django_settings_mock) + # Mock library defaults if specified + if library_default is not None: + library_mock = mock.patch("social_core.backends.open_id_connect.OpenIdConnectAuth.JWT_ALGORITHMS", library_default) + context_managers.append(library_mock) + # Execute with all context managers and expected logging + with contextlib.ExitStack() as stack: + mocks = [stack.enter_context(cm) for cm in context_managers] + + # Configure OIDC config mock if it exists + if admin_setting is None and len(mocks) > 0: + oidc_mock = mocks[0] + if oidc_exception: + oidc_mock.side_effect = oidc_exception + else: + oidc_mock.return_value = oidc_config_data + + # Execute with expected logging + if expected_error_call: + # Use expected_log for single error call + with expected_log( + 'ansible_base.authentication.authenticator_plugins.oidc.logger', + 'error', + expected_error_call + ): + result = ap._get_jwt_algorithms() + else: + # Execute without expected_log for error + result = ap._get_jwt_algorithms() + + # Verify result + assert result == expected_result + + # Verify admin setting calls + if admin_setting is not None: + # The method calls setting() twice - once for the if check, once for the return + assert mock_setting.call_count == 2 + mock_setting.assert_has_calls([ + mock.call("JWT_ALGORITHMS"), + mock.call("JWT_ALGORITHMS") + ]) + else: + mock_setting.assert_called_once_with("JWT_ALGORITHMS") + + # Verify debug calls using mock_logger (for multiple calls) + if expected_debug_calls: + for debug_call in expected_debug_calls: + mock_logger.debug.assert_any_call(debug_call) + + +class TestUserDataWithNoneAlgorithm: + """Test the user_data method with 'none' algorithm support.""" + + @pytest.mark.parametrize( + "scenario,algorithms,public_key,decode_result,decode_exception,expected_result," + "expected_jwt_options,expected_info_log,expected_error_log", + [ + # 'none' algorithm scenario + ( + "none_algorithm", + ['none'], # algorithms + "test_key", # public_key + {"sub": "user123", "email": "test@example.com"}, # decode_result + None, # decode_exception + {"sub": "user123", "email": "test@example.com"}, # expected_result + {'verify_signature': False}, # expected_jwt_options + "JWT decryption algorithm is set to ['none'], will proceed but this is insecure", # expected_info_log + None # expected_error_log + ), + # Regular algorithm scenario + ( + "regular_algorithm", + ['RS256'], # algorithms + "test_key", # public_key + {"sub": "user123", "email": "test@example.com"}, # decode_result + None, # decode_exception + {"sub": "user123", "email": "test@example.com"}, # expected_result + {}, # expected_jwt_options + None, # expected_info_log + None # expected_error_log + ), + # JWT decode error scenario + ( + "jwt_decode_error", + ['RS256'], # algorithms + "test_key", # public_key + None, # decode_result (not used due to exception) + PyJWTError("Invalid signature"), # decode_exception + None, # expected_result + {}, # expected_jwt_options + None, # expected_info_log + "Invalid signature" # expected_error_log + ), + ] + ) + @mock.patch("social_core.backends.base.BaseAuth.setting") + @mock.patch("jwt.decode") + @mock.patch("social_core.backends.base.BaseAuth.request") + def test_user_data_jwt_scenarios( + self, mock_request, mock_decode, mock_setting, expected_log, scenario, + algorithms, public_key, decode_result, decode_exception, expected_result, + expected_jwt_options, expected_info_log, expected_error_log + ): + """Test user_data method across different JWT scenarios.""" + # Setup + mock_setting.side_effect = lambda key, *args: { + "PUBLIC_KEY": public_key, + "KEY": "test_audience" + }.get(key, "default") + + ap = AuthenticatorPlugin() + + # Mock the response with JWT content type + class MockResponse: + headers = {"Content-Type": "application/jwt"} + + mock_request.return_value = MockResponse() + if decode_exception: + mock_decode.side_effect = decode_exception + else: + mock_decode.return_value = decode_result + + # Mock _get_jwt_algorithms + with mock.patch.object(ap, '_get_jwt_algorithms') as mock_get_algs: + mock_get_algs.return_value = algorithms + + # Execute with expected logging + if expected_info_log: + with expected_log( + 'ansible_base.authentication.authenticator_plugins.oidc.logger', + 'info', + expected_info_log + ): + result = ap.user_data("test_token") + elif expected_error_log: + with expected_log( + 'ansible_base.authentication.authenticator_plugins.oidc.logger', + 'error', + expected_error_log + ): + result = ap.user_data("test_token") + else: + result = ap.user_data("test_token") + + # Verify result + assert result == expected_result + + # Verify jwt.decode was called with correct parameters + if public_key: # Only check decode call if public key exists + mock_decode.assert_called_once_with( + 'test_token', + key='-----BEGIN PUBLIC KEY-----\ntest_key\n-----END PUBLIC KEY-----', + algorithms=algorithms, + audience='test_audience', + options=expected_jwt_options + ) + + @mock.patch("social_core.backends.base.BaseAuth.setting") + @mock.patch("social_core.backends.base.BaseAuth.request") + def test_user_data_no_public_key_with_logging(self, mock_request, mock_setting, expected_log): + """Test user_data when no public key is configured and verify error logging.""" + # Setup + mock_setting.side_effect = lambda key, *args: { + "PUBLIC_KEY": None, # No public key + "KEY": "test_audience" + }.get(key, "default") + + ap = AuthenticatorPlugin() + + # Mock the response with JWT content type + class MockResponse: + headers = {"Content-Type": "application/jwt"} + + mock_request.return_value = MockResponse() + + # Execute with expected logging + with expected_log( + 'ansible_base.authentication.authenticator_plugins.oidc.logger', + 'error', + "OIDC client sent encrypted user info response, but no public key found." + ): + result = ap.user_data("test_token") + + # Verify + assert result is None