diff --git a/docs/en_US/oauth2.rst b/docs/en_US/oauth2.rst index 916a3a63c61..c42b5f1b9d9 100644 --- a/docs/en_US/oauth2.rst +++ b/docs/en_US/oauth2.rst @@ -1,15 +1,40 @@ .. _oauth2: -***************************************** -`Enabling OAUTH2 Authentication`:index: -***************************************** +******************************************************* +`Enabling OAUTH2 and OIDC Authentication`:index: +******************************************************* -To enable OAUTH2 authentication for pgAdmin, you must configure the OAUTH2 -settings in the *config_local.py* or *config_system.py* file (see the -:ref:`config.py ` documentation) on the system where pgAdmin is -installed in Server mode. You can copy these settings from *config.py* file -and modify the values for the following parameters: +To enable OAUTH2 or OpenID Connect (OIDC) authentication for pgAdmin, you must +configure the OAUTH2 settings in the *config_local.py* or *config_system.py* +file (see the :ref:`config.py ` documentation) on the system where +pgAdmin is installed in Server mode. You can copy these settings from *config.py* +file and modify the values for the following parameters. + +OAuth2 vs OpenID Connect (OIDC) +================================ + +pgAdmin supports both OAuth2 and OIDC authentication protocols: + +**OAuth2** is an authorization framework that allows third-party applications to +obtain limited access to user accounts. When using OAuth2, pgAdmin must explicitly +call the provider's userinfo endpoint to retrieve user profile information. + +**OpenID Connect (OIDC)** is an identity layer built on top of OAuth2 that provides +standardized user authentication and profile information. When using OIDC, user +identity information is included directly in the ID token, which is more efficient +and secure. + +.. note:: + When **OAUTH2_SERVER_METADATA_URL** is configured, pgAdmin treats the provider + as an OIDC provider and will: + + - Use ID token claims for user identity (sub, email, preferred_username) + - Skip the userinfo endpoint call when ID token contains sufficient information + - Validate the ID token automatically using the provider's public keys + + This is the **recommended approach** for modern identity providers like + Microsoft Entra ID (Azure AD), Google, Keycloak, Auth0, and Okta. .. _AzureAD: https://learn.microsoft.com/en-us/security/zero-trust/develop/configure-tokens-group-claims-app-roles @@ -32,20 +57,17 @@ and modify the values for the following parameters: "OAUTH2_CLIENT_SECRET", "Oauth2 Client Secret" "OAUTH2_TOKEN_URL", "Oauth2 Access Token endpoint" "OAUTH2_AUTHORIZATION_URL", "Endpoint for user authorization" - "OAUTH2_SERVER_METADATA_URL", "Server metadata url for your OAuth2 provider" + "OAUTH2_SERVER_METADATA_URL", "**OIDC Discovery URL** (recommended for OIDC providers). When set, pgAdmin will use OIDC flow with automatic ID token validation and user claims from the ID token. Example: *https://login.microsoftonline.com/{tenant}/v2.0/.well-known/openid-configuration*. When using this parameter, OAUTH2_TOKEN_URL and OAUTH2_AUTHORIZATION_URL are optional as they will be discovered automatically." "OAUTH2_API_BASE_URL", "Oauth2 base URL endpoint to make requests simple, ex: *https://api.github.com/*" - "OAUTH2_USERINFO_ENDPOINT", "User Endpoint, ex: *user* (for github, or *user/emails* if the user's email address is private) and *userinfo* (for google)," - "OAUTH2_SCOPE", "Oauth scope, ex: 'openid email profile'. Note that an 'email' claim is required in the resulting profile." + "OAUTH2_USERINFO_ENDPOINT", "User Endpoint, ex: *user* (for github, or *user/emails* if the user's email address is private) and *userinfo* (for google). **For OIDC providers**, this is optional if the ID token contains sufficient claims (email, preferred_username, or sub)." + "OAUTH2_SCOPE", "Oauth scope, ex: 'openid email profile'. **For OIDC providers**, include 'openid' scope to receive an ID token." "OAUTH2_ICON", "The Font-awesome icon to be placed on the oauth2 button, ex: fa-github" "OAUTH2_BUTTON_COLOR", "Oauth2 button color" - "OAUTH2_USERNAME_CLAIM", "The claim which is used for the username. If the value is empty - the email is used as username, but if a value is provided, the claim has to exist. Ex: *oid* (for AzureAD), *email* (for Github)" + "OAUTH2_USERNAME_CLAIM", "The claim which is used for the username. If the value is empty, **for OIDC providers** pgAdmin will use: 1) email, 2) preferred_username, or 3) sub (in that order). **For OAuth2 providers** without OIDC, email is required. Ex: *oid* (for AzureAD), *email* (for Github), *preferred_username* (for Keycloak)" "OAUTH2_AUTO_CREATE_USER", "Set the value to *True* if you want to automatically create a pgAdmin user corresponding to a successfully authenticated Oauth2 user. Please note that password is not stored in the pgAdmin database." - "OAUTH2_ADDITIONAL_CLAIMS", "If a dictionary is provided, pgAdmin will check for a matching key and value on the userinfo endpoint - and in the Id Token. In case there is no match with the provided config, the user will receive an authorization error. - Useful for checking AzureAD_ *wids* or *groups*, GitLab_ *owner*, *maintainer* and *reporter* claims." + "OAUTH2_ADDITIONAL_CLAIMS", "If a dictionary is provided, pgAdmin will check for a matching key and value on the **ID token first** (for OIDC providers), then fall back to the userinfo endpoint response. In case there is no match with the provided config, the user will receive an authorization error. Useful for checking AzureAD_ *wids* or *groups*, GitLab_ *owner*, *maintainer* and *reporter* claims." "OAUTH2_SSL_CERT_VERIFICATION", "Set this variable to False to disable SSL certificate verification for OAuth2 provider. This may need to set False, in case of self-signed certificates." "OAUTH2_CHALLENGE_METHOD", "Enable PKCE workflow. PKCE method name, only *S256* is supported" @@ -83,3 +105,107 @@ Ref: https://oauth.net/2/pkce To enable PKCE workflow, set the configuration parameters OAUTH2_CHALLENGE_METHOD to *S256* and OAUTH2_RESPONSE_TYPE to *code*. Both parameters are mandatory to enable PKCE workflow. + +OIDC Configuration Examples +============================ + +Using OIDC with Discovery Metadata (Recommended) +------------------------------------------------- + +When using OIDC providers, configure the **OAUTH2_SERVER_METADATA_URL** parameter +to enable automatic discovery and ID token validation: + +.. code-block:: python + + OAUTH2_CONFIG = [{ + 'OAUTH2_NAME': 'my-oidc-provider', + 'OAUTH2_DISPLAY_NAME': 'My OIDC Provider', + 'OAUTH2_CLIENT_ID': 'your-client-id', + 'OAUTH2_CLIENT_SECRET': 'your-client-secret', + 'OAUTH2_SERVER_METADATA_URL': 'https://provider.example.com/.well-known/openid-configuration', + 'OAUTH2_SCOPE': 'openid email profile', + # OAUTH2_USERINFO_ENDPOINT is optional when using OIDC + # Token and authorization URLs are discovered automatically + }] + +With this configuration: + +- pgAdmin will use the OIDC discovery endpoint to automatically find token and authorization URLs +- User identity will be extracted from ID token claims (sub, email, preferred_username) +- The userinfo endpoint will only be called as a fallback if ID token lacks required claims +- ID token will be automatically validated using the provider's public keys + +Username Resolution for OIDC +----------------------------- + +When **OAUTH2_SERVER_METADATA_URL** is configured (OIDC mode), pgAdmin will +resolve the username in the following order: + +1. **OAUTH2_USERNAME_CLAIM** (if configured) - checks ID token first, then userinfo +2. **email** claim from ID token or userinfo endpoint +3. **preferred_username** claim from ID token (standard OIDC claim) +4. **sub** claim from ID token (always present in OIDC, used as last resort) + +Example with custom username claim: + +.. code-block:: python + + OAUTH2_CONFIG = [{ + # ... other config ... + 'OAUTH2_USERNAME_CLAIM': 'preferred_username', + # pgAdmin will use 'preferred_username' from ID token for the username + }] + +Example without custom claim (uses automatic fallback): + +.. code-block:: python + + OAUTH2_CONFIG = [{ + # ... other config ... + # No OAUTH2_USERNAME_CLAIM specified + # pgAdmin will try: email -> preferred_username -> sub + }] + +Additional Claims Authorization with OIDC +------------------------------------------ + +When using **OAUTH2_ADDITIONAL_CLAIMS** with OIDC providers, pgAdmin will: + +1. Check the ID token claims first (more secure, no additional network call) +2. Fall back to userinfo endpoint response if needed + +Example: + +.. code-block:: python + + OAUTH2_CONFIG = [{ + # ... other config ... + 'OAUTH2_ADDITIONAL_CLAIMS': { + 'groups': ['admin-group', 'pgadmin-users'], + 'roles': ['database-admin'] + }, + # pgAdmin will check these claims in ID token first, + # then userinfo endpoint if not found + }] + +Legacy OAuth2 Configuration (Without OIDC) +------------------------------------------- + +For providers that don't support OIDC discovery, configure all endpoints manually: + +.. code-block:: python + + OAUTH2_CONFIG = [{ + 'OAUTH2_NAME': 'github', + 'OAUTH2_DISPLAY_NAME': 'GitHub', + 'OAUTH2_CLIENT_ID': 'your-client-id', + 'OAUTH2_CLIENT_SECRET': 'your-client-secret', + 'OAUTH2_TOKEN_URL': 'https://github.com/login/oauth/access_token', + 'OAUTH2_AUTHORIZATION_URL': 'https://github.com/login/oauth/authorize', + 'OAUTH2_API_BASE_URL': 'https://api.github.com/', + 'OAUTH2_USERINFO_ENDPOINT': 'user', + 'OAUTH2_SCOPE': 'user:email', + # No OAUTH2_SERVER_METADATA_URL - pure OAuth2 mode + }] + +In this mode, user identity is retrieved only from the userinfo endpoint. diff --git a/web/pgadmin/authenticate/__init__.py b/web/pgadmin/authenticate/__init__.py index 84f5e9ccc21..7ed8b79029d 100644 --- a/web/pgadmin/authenticate/__init__.py +++ b/web/pgadmin/authenticate/__init__.py @@ -227,15 +227,27 @@ def as_dict(self): return res def update_auth_sources(self): - for auth_src in [KERBEROS, OAUTH2]: - if auth_src in self.auth_sources: - if 'internal_button' in request.form: + # Only mutate the ordered list of auth sources when a user explicitly + # selected an auth method on the login form. + # + # Without this guard, a plain internal login POST (email/password) can + # incorrectly drop INTERNAL/LDAP and try OAUTH2 first, which then fails + # because no oauth2 provider button was provided. + if request.method != 'POST': + return + + if 'internal_button' in request.form: + for auth_src in [KERBEROS, OAUTH2]: + if auth_src in self.auth_sources: self.auth_sources.remove(auth_src) - else: - if INTERNAL in self.auth_sources: - self.auth_sources.remove(INTERNAL) - if LDAP in self.auth_sources: - self.auth_sources.remove(LDAP) + return + + if 'oauth2_button' in request.form: + if INTERNAL in self.auth_sources: + self.auth_sources.remove(INTERNAL) + if LDAP in self.auth_sources: + self.auth_sources.remove(LDAP) + return def set_current_source(self, source): self.current_source = source diff --git a/web/pgadmin/authenticate/oauth2.py b/web/pgadmin/authenticate/oauth2.py index 076e4589ecf..828314cca03 100644 --- a/web/pgadmin/authenticate/oauth2.py +++ b/web/pgadmin/authenticate/oauth2.py @@ -12,7 +12,7 @@ import config from authlib.integrations.flask_client import OAuth -from flask import current_app, url_for, session, request,\ +from flask import current_app, url_for, session, request, \ redirect, Flask, flash from flask_babel import gettext from flask_security import login_user, current_user @@ -104,6 +104,10 @@ class OAuth2Authentication(BaseAuthentication): email_keys = ['mail', 'email'] def __init__(self): + # Selected provider name (set during authenticate()). + # Initializing avoids AttributeError in edge cases/tests. + self.oauth2_current_client = None + for oauth2_config in config.OAUTH2_CONFIG: OAuth2Authentication.oauth2_config[ @@ -135,9 +139,9 @@ def __init__(self): name=oauth2_config['OAUTH2_NAME'], client_id=oauth2_config['OAUTH2_CLIENT_ID'], client_secret=oauth2_config['OAUTH2_CLIENT_SECRET'], - access_token_url=oauth2_config['OAUTH2_TOKEN_URL'], - authorize_url=oauth2_config['OAUTH2_AUTHORIZATION_URL'], - api_base_url=oauth2_config['OAUTH2_API_BASE_URL'], + access_token_url=oauth2_config.get('OAUTH2_TOKEN_URL'), + authorize_url=oauth2_config.get('OAUTH2_AUTHORIZATION_URL'), + api_base_url=oauth2_config.get('OAUTH2_API_BASE_URL'), client_kwargs=client_kwargs, server_metadata_url=oauth2_config.get( 'OAUTH2_SERVER_METADATA_URL', None) @@ -147,11 +151,53 @@ def get_source_name(self): return OAUTH2 def get_friendly_name(self): - return self.oauth2_config[self.oauth2_current_client]['OAUTH2_NAME'] + provider = self.oauth2_config.get(self.oauth2_current_client) + if not provider: + return OAUTH2 + return provider.get('OAUTH2_NAME', OAUTH2) def validate(self, form): return True, None + def _is_oidc_provider(self): + """ + Determine if the current provider is configured as an OIDC provider. + Returns True if OAUTH2_SERVER_METADATA_URL is defined. + """ + provider = self.oauth2_config.get(self.oauth2_current_client) + if not provider: + return False + return 'OAUTH2_SERVER_METADATA_URL' in provider and \ + provider['OAUTH2_SERVER_METADATA_URL'] is not None + + def _get_id_token_claims(self): + """ + Extract and return ID token claims for OIDC providers. + + In pgAdmin's Authlib integration, the token response returned by + authorize_access_token() may include a decoded claims dict under the + 'userinfo' key (e.g. populated from the ID token). + + If those claims are not present, this returns an empty dict and the + caller should fall back to the configured userinfo endpoint. + + Returns: + dict: ID token claims, or empty dict if not available or + parsing fails + """ + if not self._is_oidc_provider(): + return {} + + token = session.get('oauth2_token') + if not isinstance(token, dict): + return {} + + claims = token.get('userinfo') + if isinstance(claims, dict): + return claims + + return {} + def get_profile_dict(self, profile): """ Returns the dictionary from profile @@ -165,53 +211,156 @@ def get_profile_dict(self, profile): else: return {} + def _resolve_username(self, id_token_claims, profile_dict): + """ + Resolve username from available claims with OIDC-aware fallback. + + Resolution order: + 1. If OAUTH2_USERNAME_CLAIM is configured, use that claim from + ID token first, then userinfo profile + 2. For OIDC providers, check in order: email, preferred_username, sub + 3. For non-OIDC providers, use email only + + Args: + id_token_claims (dict): Claims from ID token + profile_dict (dict): Claims from userinfo endpoint + + Returns: + tuple: (username, email) or (None, None) if resolution fails + """ + provider = self.oauth2_config.get(self.oauth2_current_client, {}) + username_claim = provider.get('OAUTH2_USERNAME_CLAIM') + + # Extract email from profile (backward compatibility) + email_key = [value for value in self.email_keys + if value in profile_dict.keys()] + email = profile_dict[email_key[0]] if email_key else None + + # If specific username claim is configured, look for it + if username_claim: + # Check ID token claims first + if username_claim in id_token_claims: + username = id_token_claims[username_claim] + current_app.logger.debug( + f'Found username claim "{username_claim}" ' + 'in ID token') + return username, email + # Fall back to userinfo profile + elif username_claim in profile_dict: + username = profile_dict[username_claim] + current_app.logger.debug( + f'Found username claim "{username_claim}" ' + 'in profile') + return username, email + else: + current_app.logger.error( + f'Required username claim "{username_claim}" ' + f'not found in ID token or profile') + return None, email + + # For OIDC providers, use standard claim hierarchy + if self._is_oidc_provider(): + # Priority 1: email (from ID token or profile) + if 'email' in id_token_claims: + username = id_token_claims['email'] + # Use as email if not found elsewhere + email = email or username + current_app.logger.debug( + 'Using email from ID token as username') + return username, email + elif email: + current_app.logger.debug( + 'Using email from profile as username') + return email, email + + # Priority 2: preferred_username + if 'preferred_username' in id_token_claims: + username = id_token_claims['preferred_username'] + current_app.logger.debug( + 'Using preferred_username from ID token') + return username, email + + # Priority 3: sub (always present in OIDC) + if 'sub' in id_token_claims: + username = id_token_claims['sub'] + current_app.logger.debug( + 'Using sub from ID token as last resort') + return username, email + + # Should not reach here for valid OIDC provider + current_app.logger.warning( + 'OIDC provider but no standard claims found in ID token') + + # For non-OIDC OAuth2 providers, email is required + if email: + current_app.logger.debug( + 'Using email as username for OAuth2 provider') + return email, email + + return None, None + def login(self, form): + if not self.oauth2_current_client: + error_msg = 'No OAuth2 provider available.' + current_app.logger.error(error_msg) + return False, gettext(error_msg) + profile = self.get_user_profile() profile_dict = self.get_profile_dict(profile) - current_app.logger.debug(f"profile: {profile}") - current_app.logger.debug(f"profile_dict: {profile_dict}") + profile_dict_keys = [] + if isinstance(profile_dict, dict): + profile_dict_keys = sorted(profile_dict.keys()) + current_app.logger.debug( + f'profile_dict keys: {profile_dict_keys}' + if profile_dict_keys else 'profile_dict empty' + ) + + # Get ID token claims for OIDC providers + id_token_claims = self._get_id_token_claims() + id_token_claims_keys = [] + if isinstance(id_token_claims, dict): + id_token_claims_keys = sorted(id_token_claims.keys()) + current_app.logger.debug( + f'id_token_claims keys: {id_token_claims_keys}' + if id_token_claims_keys else 'id_token_claims empty' + ) + + # For OIDC providers, we must have either ID token claims or profile + if ( + self._is_oidc_provider() and + not id_token_claims and + not profile_dict + ): + error_msg = "No profile data found from OIDC provider." + current_app.logger.error(error_msg) + return False, gettext(error_msg) - if not profile_dict: + # For non-OIDC providers, profile is required + if not self._is_oidc_provider() and not profile_dict: error_msg = "No profile data found." - current_app.logger.exception(error_msg) + current_app.logger.error(error_msg) return False, gettext(error_msg) - email_key = [ - value for value in self.email_keys - if value in profile_dict.keys() - ] - email = profile_dict[email_key[0]] if (len(email_key) > 0) else None + # Resolve username using OIDC-aware logic + username, email = self._resolve_username(id_token_claims, profile_dict) - username = email - username_claim = None - if 'OAUTH2_USERNAME_CLAIM' in self.oauth2_config[ - self.oauth2_current_client]: - username_claim = self.oauth2_config[ - self.oauth2_current_client - ]['OAUTH2_USERNAME_CLAIM'] - if username_claim is not None: - id_token = session['oauth2_token'].get('userinfo', {}) - if username_claim in profile: - username = profile[username_claim] - current_app.logger.debug('Found username claim in profile') - elif username_claim in id_token: - username = id_token[username_claim] - current_app.logger.debug('Found username claim in id_token') + if not username: + if self._is_oidc_provider(): + error_msg = ( + 'Could not extract username from OIDC claims. ' + 'Please ensure your OIDC provider returns standard ' + 'claims (email, preferred_username, or sub).' + ) else: - error_msg = "The claim '%s' is required to login into " \ - "pgAdmin. Please update your OAuth2 profile." % ( - username_claim) - current_app.logger.exception(error_msg) - return False, gettext(error_msg) - else: - if not email or email == '': - error_msg = "An email id or OAUTH2_USERNAME_CLAIM is" \ - " required to login into pgAdmin. Please update your" \ - " OAuth2 profile for email id or set" \ - " OAUTH2_USERNAME_CLAIM config parameter." - current_app.logger.exception(error_msg) - return False, gettext(error_msg) + error_msg = ( + 'An email id or OAUTH2_USERNAME_CLAIM is required to ' + 'login into pgAdmin. Please update your OAuth2 profile ' + 'for email id or set OAUTH2_USERNAME_CLAIM config ' + 'parameter.' + ) + current_app.logger.error(error_msg) + return False, gettext(error_msg) additional_claims = None if 'OAUTH2_ADDITIONAL_CLAIMS' in self.oauth2_config[ @@ -221,27 +370,63 @@ def login(self, form): self.oauth2_current_client ]['OAUTH2_ADDITIONAL_CLAIMS'] - # checking oauth provider userinfo response - valid_profile, reason = self.__is_any_claim_valid(profile, - additional_claims) - current_app.logger.debug(f"profile claims: {profile}") - current_app.logger.debug(f"reason: {reason}") - - # checking oauth provider idtoken claims - id_token_claims = session.get('oauth2_token', {}).get('userinfo',{}) - valid_idtoken, reason = self.__is_any_claim_valid(id_token_claims, - additional_claims) - current_app.logger.debug(f"idtoken claims: {id_token_claims}") - current_app.logger.debug(f"reason: {reason}") - - if not valid_profile and not valid_idtoken: - return_msg = "The user is not authorized to login" \ - " based on your identity profile." \ - " Please contact your administrator." - audit_msg = f"The authenticated user {username} is not" \ - " authorized to access pgAdmin based on OAUTH2 config. " \ - f"Reason: additional claim required {additional_claims}, " \ - f"profile claims {profile}, idtoken cliams {id_token_claims}." + # For OIDC providers, check ID token claims first, then userinfo + # For non-OIDC providers, check userinfo only + if self._is_oidc_provider(): + valid_idtoken, reason = self.__is_any_claim_valid( + id_token_claims, additional_claims) + current_app.logger.debug( + f'ID token claim keys: {id_token_claims_keys}' + ) + current_app.logger.debug( + f'ID token validation reason: {reason}' + ) + + # If ID token validation succeeds, we're done + if valid_idtoken: + valid_combined = True + else: + # Fall back to userinfo profile + valid_profile, reason = self.__is_any_claim_valid( + profile_dict, additional_claims) + current_app.logger.debug( + f'Profile claim keys: {profile_dict_keys}' + ) + current_app.logger.debug( + f'Profile validation reason: {reason}' + ) + valid_combined = valid_profile + else: + # Non-OIDC: only check userinfo profile + valid_combined, reason = self.__is_any_claim_valid( + profile_dict, additional_claims) + current_app.logger.debug( + f'Profile claim keys: {profile_dict_keys}' + ) + current_app.logger.debug( + f'Validation reason: {reason}' + ) + + if not valid_combined: + return_msg = ( + 'The user is not authorized to login based on your identity ' + 'profile. Please contact your administrator.' + ) + + additional_claim_names = [] + if isinstance(additional_claims, dict): + additional_claim_names = sorted(additional_claims.keys()) + + audit_msg = ( + f'The authenticated user {username} is not authorized to ' + 'access pgAdmin based on OAUTH2 config. ' + 'Reason: additional claims required. ' + f'additional_claim_names={additional_claim_names}, ' + f'profile_len={len(profile_dict)}, ' + f'profile_keys={profile_dict_keys}, ' + f'id_token_len={len(id_token_claims)}, ' + f'id_token_keys={id_token_claims_keys}.' + ) current_app.logger.warning(audit_msg) return False, return_msg @@ -267,6 +452,69 @@ def get_user_profile(self): session['oauth2_logout_url'] = self.oauth2_config[ self.oauth2_current_client]['OAUTH2_LOGOUT_URL'] + # For OIDC providers, parse the ID token JWT to extract claims. + # We can skip the userinfo endpoint call if the ID token has + # sufficient claims for authentication and authorization. + if self._is_oidc_provider(): + id_token_claims = self._get_id_token_claims() + # Check if we have basic required claims in ID token + has_sufficient_claims = any([ + 'email' in id_token_claims, + 'preferred_username' in id_token_claims, + 'sub' in id_token_claims + ]) + + # Default to requiring the userinfo endpoint unless we can prove + # the ID token claims are sufficient for our configured needs. + needs_userinfo = True + + if has_sufficient_claims: + provider = self.oauth2_config.get( + self.oauth2_current_client, {} + ) + username_claim = provider.get('OAUTH2_USERNAME_CLAIM') + additional_claims = provider.get('OAUTH2_ADDITIONAL_CLAIMS') + + # If custom username claim or additional authorization + # claims are configured, they may exist only in userinfo; + # don't skip userinfo unless ID token has them. + needs_userinfo = False + if username_claim and username_claim not in id_token_claims: + needs_userinfo = True + if isinstance(additional_claims, dict) and additional_claims: + missing_authz_keys = [ + k for k in additional_claims.keys() + if k not in id_token_claims + ] + if missing_authz_keys: + needs_userinfo = True + + if has_sufficient_claims and not needs_userinfo: + current_app.logger.debug( + 'OIDC provider: using parsed ID token JWT claims, ' + 'skipping userinfo endpoint') + # Return ID token claims as profile + return id_token_claims + else: + current_app.logger.debug( + 'OIDC provider: ID token JWT lacks standard claims, ' + 'falling back to userinfo endpoint') + + # For non-OIDC providers or when ID token is insufficient, + # call the userinfo endpoint + if 'OAUTH2_USERINFO_ENDPOINT' not in self.oauth2_config[ + self.oauth2_current_client]: + if self._is_oidc_provider(): + # OIDC provider should have provided claims in ID token + current_app.logger.warning( + 'OIDC provider has no userinfo endpoint configured ' + 'and ID token lacks standard claims') + else: + current_app.logger.error( + 'OAUTH2_USERINFO_ENDPOINT not configured for ' + 'non-OIDC provider') + return {} + resp = self.oauth2_clients[self.oauth2_current_client].get( self.oauth2_config[ self.oauth2_current_client]['OAUTH2_USERINFO_ENDPOINT'], @@ -276,7 +524,11 @@ def get_user_profile(self): return resp.json() def authenticate(self, form): - self.oauth2_current_client = request.form['oauth2_button'] + # Prefer the explicit oauth2 button value. + # Avoid raising BadRequestKeyError when oauth2 isn't selected. + self.oauth2_current_client = request.form.get('oauth2_button') + if not self.oauth2_current_client: + return False, gettext('No OAuth2 provider selected.') redirect_url = url_for(OAUTH2_AUTHORIZE, _external=True) if self.oauth2_current_client not in self.oauth2_clients: diff --git a/web/pgadmin/browser/tests/__init__.py b/web/pgadmin/browser/tests/__init__.py index f33b2742b78..00e23a7b16c 100644 --- a/web/pgadmin/browser/tests/__init__.py +++ b/web/pgadmin/browser/tests/__init__.py @@ -11,5 +11,14 @@ class BrowserGenerateTestCase(BaseTestGenerator): + # This is a smoke/placeholder test to ensure the browser tests package is + # discovered by the regression runner. It should not require a live server + # connection. + def setUp(self): + return + + def tearDown(self): + return + def runTest(self): return diff --git a/web/pgadmin/browser/tests/test_oauth2_with_mocking.py b/web/pgadmin/browser/tests/test_oauth2_with_mocking.py index 26337486318..0f157ab770b 100644 --- a/web/pgadmin/browser/tests/test_oauth2_with_mocking.py +++ b/web/pgadmin/browser/tests/test_oauth2_with_mocking.py @@ -12,8 +12,8 @@ from regression.python_test_utils import test_utils as utils from pgadmin.authenticate.registry import AuthSourceRegistry from unittest.mock import patch, MagicMock -from pgadmin.authenticate import AuthSourceManager from pgadmin.utils.constants import OAUTH2, INTERNAL +from flask import current_app, redirect class Oauth2LoginMockTestCase(BaseTestGenerator): @@ -24,38 +24,83 @@ class Oauth2LoginMockTestCase(BaseTestGenerator): scenarios = [ ('Oauth2 External Authentication', dict( - auth_source=['oauth2'], oauth2_provider='github', - flag=1 + kind='external_redirect', + profile={}, + id_token_claims=None, )), ('Oauth2 Authentication', dict( - auth_source=['oauth2'], oauth2_provider='github', - flag=2 + kind='login_success', + profile={'email': 'oauth2@gmail.com'}, + id_token_claims=None, )), ('Oauth2 Additional Claims Authentication', dict( - auth_source=['oauth2'], oauth2_provider='auth-with-additional-claim-check', - flag=3 + kind='login_success', + profile={'email': 'oauth2@gmail.com', 'wids': ['789']}, + id_token_claims=None, )), ('Oauth2 PKCE Support', dict( - auth_source=['oauth2'], oauth2_provider='keycloak-pkce', - flag=4 + kind='pkce', + profile={}, + id_token_claims=None, + )), + ('OIDC Uses ID Token Claims', dict( + oauth2_provider='oidc-basic', + kind='login_success', + profile={}, + id_token_claims={'email': 'oidc@example.com', 'sub': 'abc'}, + )), + ('OIDC Falls Back To Profile Email', dict( + oauth2_provider='oidc-basic', + kind='login_success', + profile={'email': 'fallback@example.com'}, + id_token_claims={'sub': 'abc'}, + )), + ('OIDC Username Claim Precedence', dict( + oauth2_provider='oidc-username-claim', + kind='login_success', + profile={'email': 'email@example.com'}, + id_token_claims={'preferred_username': 'preferred-user'}, + )), + ('OIDC Additional Claims Via ID Token', dict( + oauth2_provider='oidc-additional-claims', + kind='login_success', + profile={'email': 'claims@example.com'}, + id_token_claims={'groups': ['group-a']}, + )), + ('OIDC Additional Claims Rejected', dict( + oauth2_provider='oidc-additional-claims', + kind='login_failure', + profile={'email': 'claims@example.com'}, + id_token_claims={'groups': ['group-b']}, + )), + ('OIDC get_user_profile Skips Userinfo', dict( + oauth2_provider='oidc-basic', + kind='oidc_get_user_profile_skip', + profile={}, + id_token_claims=None, + )), + ('OIDC get_user_profile Calls Userinfo', dict( + oauth2_provider='oidc-basic', + kind='oidc_get_user_profile_call', + profile={}, + id_token_claims=None, )), ] @classmethod def setUpClass(cls): - """ - We need to logout the test client as we are testing - OAuth2 login scenarios. - """ + """Logout the test client as we are testing OAuth2 login scenarios.""" cls.tester.logout() def setUp(self): - app_config.AUTHENTICATION_SOURCES = self.auth_source + app_config.AUTHENTICATION_SOURCES = [OAUTH2] self.app.PGADMIN_EXTERNAL_AUTH_SOURCE = OAUTH2 + # Ensure OAuth2 users can be created during tests. + app_config.OAUTH2_AUTO_CREATE_USER = True app_config.OAUTH2_CONFIG = [ { 'OAUTH2_NAME': 'github', @@ -111,6 +156,49 @@ def setUp(self): 'OAUTH2_BUTTON_COLOR': '#3253a8', 'OAUTH2_CHALLENGE_METHOD': 'S256', 'OAUTH2_RESPONSE_TYPE': 'code', + }, + { + 'OAUTH2_NAME': 'oidc-basic', + 'OAUTH2_DISPLAY_NAME': 'OIDC Basic', + 'OAUTH2_CLIENT_ID': 'testclientid', + 'OAUTH2_CLIENT_SECRET': 'testclientsec', + 'OAUTH2_TOKEN_URL': 'https://oidc.example/token', + 'OAUTH2_AUTHORIZATION_URL': 'https://oidc.example/auth', + 'OAUTH2_API_BASE_URL': 'https://oidc.example/', + 'OAUTH2_USERINFO_ENDPOINT': 'userinfo', + 'OAUTH2_SCOPE': 'openid email profile', + 'OAUTH2_SERVER_METADATA_URL': + 'https://oidc.example/.well-known/openid-configuration', + }, + { + 'OAUTH2_NAME': 'oidc-username-claim', + 'OAUTH2_DISPLAY_NAME': 'OIDC Username Claim', + 'OAUTH2_CLIENT_ID': 'testclientid', + 'OAUTH2_CLIENT_SECRET': 'testclientsec', + 'OAUTH2_TOKEN_URL': 'https://oidc.example/token', + 'OAUTH2_AUTHORIZATION_URL': 'https://oidc.example/auth', + 'OAUTH2_API_BASE_URL': 'https://oidc.example/', + 'OAUTH2_USERINFO_ENDPOINT': 'userinfo', + 'OAUTH2_SCOPE': 'openid email profile', + 'OAUTH2_SERVER_METADATA_URL': + 'https://oidc.example/.well-known/openid-configuration', + 'OAUTH2_USERNAME_CLAIM': 'preferred_username', + }, + { + 'OAUTH2_NAME': 'oidc-additional-claims', + 'OAUTH2_DISPLAY_NAME': 'OIDC Additional Claims', + 'OAUTH2_CLIENT_ID': 'testclientid', + 'OAUTH2_CLIENT_SECRET': 'testclientsec', + 'OAUTH2_TOKEN_URL': 'https://oidc.example/token', + 'OAUTH2_AUTHORIZATION_URL': 'https://oidc.example/auth', + 'OAUTH2_API_BASE_URL': 'https://oidc.example/', + 'OAUTH2_USERINFO_ENDPOINT': 'userinfo', + 'OAUTH2_SCOPE': 'openid email profile', + 'OAUTH2_SERVER_METADATA_URL': + 'https://oidc.example/.well-known/openid-configuration', + 'OAUTH2_ADDITIONAL_CLAIMS': { + 'groups': ['group-a'] + } } ] @@ -121,96 +209,155 @@ def runTest(self): "Can not run Oauth2 Authentication in the Desktop mode." ) - if self.flag == 1: - self.test_external_authentication() - elif self.flag == 2: - self.test_oauth2_authentication() - elif self.flag == 3: - self.test_oauth2_authentication_with_additional_claims_success() - elif self.flag == 4: - self.test_oauth2_authentication_with_pkce() - - def test_external_authentication(self): - """ - Ensure that the user should be redirected - to the external url for the authentication. - """ - - AuthSourceManager.update_auth_sources = MagicMock() + self._reset_oauth2_state() - try: - self.tester.login( + if self.kind == 'external_redirect': + self._test_external_authentication(self.oauth2_provider) + elif self.kind == 'pkce': + self.test_oauth2_authentication_with_pkce() + elif self.kind == 'login_success': + self._test_oauth2_login_success( + self.oauth2_provider, self.profile, self.id_token_claims + ) + elif self.kind == 'login_failure': + self._test_oauth2_login_failure( + self.oauth2_provider, self.profile, self.id_token_claims + ) + elif self.kind == 'oidc_get_user_profile_skip': + self._test_oidc_get_user_profile_skip_userinfo( + self.oauth2_provider + ) + elif self.kind == 'oidc_get_user_profile_call': + self._test_oidc_get_user_profile_calls_userinfo( + self.oauth2_provider + ) + else: + self.fail(f'Unknown test kind: {self.kind}') + + def _reset_oauth2_state(self): + """Reset singleton caches so each subTest gets a clean OAuth2 state.""" + # Clear AuthSourceRegistry singleton instances. + AuthSourceRegistry._objects = dict() + + # Clear per-app cache of instantiated auth sources. + with self.app.app_context(): + cached = getattr(current_app, '_pgadmin_auth_sources', None) + if isinstance(cached, dict): + cached.clear() + else: + setattr(current_app, '_pgadmin_auth_sources', {}) + + # Clear OAuth2Authentication class-level caches. + from pgadmin.authenticate.oauth2 import OAuth2Authentication + OAuth2Authentication.oauth2_clients = {} + OAuth2Authentication.oauth2_config = {} + + def _assert_oauth2_session_logged_in(self): + with self.tester.session_transaction() as sess: + asm = sess.get('auth_source_manager') + self.assertIsNotNone(asm) + self.assertEqual(asm.get('current_source'), OAUTH2) + + def _assert_oauth2_session_not_logged_in(self): + with self.tester.session_transaction() as sess: + asm = sess.get('auth_source_manager') + self.assertTrue(asm is None or asm == {}) + + def _test_external_authentication(self, provider): + """Ensure the user is redirected to an external URL.""" + from pgadmin.authenticate.oauth2 import OAuth2Authentication + + def _fake_authenticate(self, _form): + self.oauth2_current_client = provider + return False, redirect('https://example.com/') + + with patch.object( + OAuth2Authentication, 'authenticate', new=_fake_authenticate + ): + try: + self.tester.login( + email=None, password=None, + _follow_redirects=True, + headers=None, + extra_form_data=dict(oauth2_button=provider) + ) + except Exception as e: + self.assertEqual( + 'Following external redirects is not supported.', + str(e) + ) + + def _test_oauth2_login_success( + self, provider, profile, id_token_claims=None + ): + from pgadmin.authenticate.oauth2 import OAuth2Authentication + + def _fake_authenticate(self, _form): + self.oauth2_current_client = provider + # Important: AuthSourceManager may be constructed with a dict + # form for oauth2_button flows, so avoid returning a username. + return True, None + + def _fake_get_user_profile(self): + if id_token_claims is not None: + from flask import session + session['oauth2_token'] = { + 'access_token': 'test-access-token', + 'id_token': 'mock.jwt.token', + 'token_type': 'Bearer', + 'userinfo': id_token_claims + } + return profile + + with patch.object( + OAuth2Authentication, 'authenticate', new=_fake_authenticate + ), patch.object( + OAuth2Authentication, 'get_user_profile', + new=_fake_get_user_profile + ): + res = self.tester.login( email=None, password=None, _follow_redirects=True, headers=None, - extra_form_data=dict(oauth2_button=self.oauth2_provider) + extra_form_data=dict(oauth2_button=provider) ) - except Exception as e: - self.assertEqual('Following external' - ' redirects is not supported.', str(e)) - - def test_oauth2_authentication(self): - """ - Ensure that when the client sends an correct authorization token, - they receive a 200 OK response and the user principal is extracted and - passed on to the routed method. - """ - - profile = self.mock_user_profile() - - # Mock Oauth2 Authenticate - AuthSourceRegistry._registry[OAUTH2].authenticate = MagicMock( - return_value=[True, '']) - - AuthSourceManager.update_auth_sources = MagicMock() - - # Create AuthSourceManager object - auth_obj = AuthSourceManager({}, [OAUTH2]) - auth_source = AuthSourceRegistry.get(OAUTH2) - auth_obj.set_source(auth_source) - auth_obj.set_current_source(auth_source.get_source_name()) - - # Check the login with Oauth2 - res = self.tester.login(email=None, password=None, - _follow_redirects=True, - headers=None, - extra_form_data=dict( - oauth2_button=self.oauth2_provider) - ) - - respdata = 'Gravatar image for %s' % profile['email'] - self.assertTrue(respdata in res.data.decode('utf8')) - - def test_oauth2_authentication_with_additional_claims_success(self): - """ - Ensure that when an oauth2 config has a dict OAUTH2_ADDITIONAL_CLAIMS, - any match of the OAUTH2_ADDITIONAL_CLAIMS dict will allow user login. - """ - - profile = self.mock_user_profile_with_additional_claims() - - # Mock Oauth2 Authenticate - AuthSourceRegistry._registry[OAUTH2].authenticate = MagicMock( - return_value=[True, '']) - - AuthSourceManager.update_auth_sources = MagicMock() - - # Create AuthSourceManager object - auth_obj = AuthSourceManager({}, [OAUTH2]) - auth_source = AuthSourceRegistry.get(OAUTH2) - auth_obj.set_source(auth_source) - auth_obj.set_current_source(auth_source.get_source_name()) - - # Check the login with Oauth2 - res = self.tester.login(email=None, password=None, - _follow_redirects=True, - headers=None, - extra_form_data=dict( - oauth2_button=self.oauth2_provider) - ) - - respdata = 'Gravatar image for %s' % profile['email'] - self.assertTrue(respdata in res.data.decode('utf8')) + self.assertEqual(res.status_code, 200) + self._assert_oauth2_session_logged_in() + + def _test_oauth2_login_failure( + self, provider, profile, id_token_claims=None + ): + from pgadmin.authenticate.oauth2 import OAuth2Authentication + + def _fake_authenticate(self, _form): + self.oauth2_current_client = provider + return True, None + + def _fake_get_user_profile(self): + if id_token_claims is not None: + from flask import session + session['oauth2_token'] = { + 'access_token': 'test-access-token', + 'id_token': 'mock.jwt.token', + 'token_type': 'Bearer', + 'userinfo': id_token_claims + } + return profile + + with patch.object( + OAuth2Authentication, 'authenticate', new=_fake_authenticate + ), patch.object( + OAuth2Authentication, 'get_user_profile', + new=_fake_get_user_profile + ): + res = self.tester.login( + email=None, password=None, + _follow_redirects=True, + headers=None, + extra_form_data=dict(oauth2_button=provider) + ) + self.assertEqual(res.status_code, 200) + self._assert_oauth2_session_not_logged_in() def test_oauth2_authentication_with_pkce(self): """ @@ -219,13 +366,21 @@ def test_oauth2_authentication_with_pkce(self): the default client_kwargs is correctly included. """ - with patch('pgadmin.authenticate.oauth2.OAuth.register') as \ - mock_register: + with patch( + 'pgadmin.authenticate.oauth2.OAuth.register' + ) as mock_register: from pgadmin.authenticate.oauth2 import OAuth2Authentication OAuth2Authentication() - args, kwargs = mock_register.call_args + pkce_call = None + for _args, _kwargs in mock_register.call_args_list: + if _kwargs.get('name') == 'keycloak-pkce': + pkce_call = (_args, _kwargs) + break + + self.assertIsNotNone(pkce_call) + _, kwargs = pkce_call client_kwargs = kwargs.get('client_kwargs', {}) # Check that PKCE and default client_kwargs are included @@ -236,21 +391,57 @@ def test_oauth2_authentication_with_pkce(self): self.assertEqual( client_kwargs.get('scope'), 'openid email profile') self.assertEqual( - client_kwargs.get('verify'), 'true') - - def mock_user_profile_with_additional_claims(self): - profile = {'email': 'oauth2@gmail.com', 'wids': ['789']} - - AuthSourceRegistry._registry[OAUTH2].get_user_profile = MagicMock( - return_value=profile) - return profile - - def mock_user_profile(self): - profile = {'email': 'oauth2@gmail.com'} + client_kwargs.get('verify'), True) + + def _test_oidc_get_user_profile_skip_userinfo(self, provider): + from pgadmin.authenticate.oauth2 import OAuth2Authentication + + with self.app.test_request_context('/'): + oauth = OAuth2Authentication() + oauth.oauth2_current_client = provider + + claims = {'email': 'oidc-skip@example.com', 'sub': 'abc'} + + client = MagicMock() + client.authorize_access_token = MagicMock(return_value={ + 'access_token': 't', + 'id_token': 'mock.jwt.token', + 'token_type': 'Bearer', + 'userinfo': claims + }) + client.get = MagicMock(side_effect=AssertionError( + 'userinfo endpoint should not be called')) + + OAuth2Authentication.oauth2_clients[provider] = client + profile = oauth.get_user_profile() + self.assertEqual(profile.get('email'), 'oidc-skip@example.com') + client.get.assert_not_called() + + def _test_oidc_get_user_profile_calls_userinfo(self, provider): + from pgadmin.authenticate.oauth2 import OAuth2Authentication + + with self.app.test_request_context('/'): + oauth = OAuth2Authentication() + oauth.oauth2_current_client = provider + + client = MagicMock() + client.authorize_access_token = MagicMock(return_value={ + 'access_token': 't', + 'token_type': 'Bearer', + 'userinfo': {} + }) + + resp = MagicMock() + resp.raise_for_status = MagicMock() + resp.json = MagicMock( + return_value={'email': 'userinfo@example.com'} + ) + client.get = MagicMock(return_value=resp) - AuthSourceRegistry._registry[OAUTH2].get_user_profile = MagicMock( - return_value=profile) - return profile + OAuth2Authentication.oauth2_clients[provider] = client + profile = oauth.get_user_profile() + self.assertEqual(profile.get('email'), 'userinfo@example.com') + client.get.assert_called_once() def tearDown(self): self.tester.logout()