diff --git a/docs/security.rst b/docs/security.rst index 5c4a40e63..32b597caa 100644 --- a/docs/security.rst +++ b/docs/security.rst @@ -16,6 +16,7 @@ Supported Authentication Types It's the web server responsibility to authenticate the user, useful for intranet sites, when the server (Apache, Nginx) is configured to use kerberos, no need for the user to login with username and password on F.A.B. :OAUTH: Authentication using OAUTH (v1 or v2). You need to install authlib. +:SAML: Authentication using SAML 2.0 (e.g., Microsoft Entra ID, Okta, OneLogin). You need to install python3-saml. .. note:: **Deprecated Authentication Types (Removed in Flask-AppBuilder 5.0+)** @@ -31,7 +32,7 @@ The session is preserved and encrypted using Flask-Login. Authentication Methods ---------------------- -You can choose one from 4 authentication methods. Configure the method to be used +You can choose one from 5 authentication methods. Configure the method to be used on the **config.py** (when using the create-app, or following the proposed app structure). First the configuration imports the constants for the authentication methods:: @@ -39,7 +40,8 @@ configuration imports the constants for the authentication methods:: AUTH_DB, AUTH_LDAP, AUTH_OAUTH, - AUTH_REMOTE_USER + AUTH_REMOTE_USER, + AUTH_SAML, ) Next you will use the **AUTH_TYPE** key to choose the type:: @@ -437,6 +439,138 @@ Therefore, you can send tweets, post on the users Facebook, retrieve the user's Take a look at the `example `_ to get an idea of a simple use for this. +Authentication: SAML +-------------------- + +This method will authenticate users via SAML 2.0 identity providers such as +Microsoft Entra ID (formerly Azure AD), Okta, OneLogin, etc. + +.. note:: To use SAML you need to install `python3-saml `_: + ``pip install flask-appbuilder[saml]`` + +Configure your SAML providers and SP settings in **config.py**:: + + AUTH_TYPE = AUTH_SAML + + # registration configs + AUTH_USER_REGISTRATION = True + AUTH_USER_REGISTRATION_ROLE = "Public" + + # Sync roles at login from SAML assertion + AUTH_ROLES_SYNC_AT_LOGIN = True + + # Map SAML group names to FAB roles + AUTH_ROLES_MAPPING = { + "admins": ["Admin"], + "users": ["Public"], + } + + # SAML Identity Providers + SAML_PROVIDERS = [ + { + "name": "entra_id", + "icon": "fa-microsoft", + "idp": { + "entityId": "https://sts.windows.net//", + "singleSignOnService": { + "url": "https://login.microsoftonline.com//saml2", + "binding": "urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect", + }, + "singleLogoutService": { + "url": "https://login.microsoftonline.com//saml2", + "binding": "urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect", + }, + "x509cert": "", + }, + "attribute_mapping": { + "http://schemas.xmlsoap.org/ws/2005/05/identity/claims/emailaddress": "email", + "http://schemas.xmlsoap.org/ws/2005/05/identity/claims/givenname": "first_name", + "http://schemas.xmlsoap.org/ws/2005/05/identity/claims/surname": "last_name", + "http://schemas.xmlsoap.org/ws/2005/05/identity/claims/name": "username", + "http://schemas.microsoft.com/ws/2008/06/identity/claims/groups": "role_keys", + }, + }, + ] + + # Global SAML Service Provider configuration + SAML_CONFIG = { + "strict": True, + "debug": False, + "sp": { + "entityId": "https://myapp.example.com/saml/metadata/", + "assertionConsumerService": { + "url": "https://myapp.example.com/saml/acs/", + "binding": "urn:oasis:names:tc:SAML:2.0:bindings:HTTP-POST", + }, + "singleLogoutService": { + "url": "https://myapp.example.com/saml/slo/", + "binding": "urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect", + }, + "NameIDFormat": "urn:oasis:names:tc:SAML:1.1:nameid-format:emailAddress", + "x509cert": "", + # "privateKey": "", + }, + "security": { + "nameIdEncrypted": False, + "authnRequestsSigned": False, + "logoutRequestSigned": False, + "logoutResponseSigned": False, + "signMetadata": False, + "wantMessagesSigned": False, + "wantAssertionsSigned": True, + "wantAssertionsEncrypted": False, + "wantNameId": True, + "wantNameIdEncrypted": False, + "wantAttributeStatement": True, + }, + } + +Each SAML provider entry has the following keys: + +:name: A unique name for the identity provider. +:icon: A Font Awesome icon class for the login button. +:idp: The IdP SAML metadata (entityId, SSO/SLO URLs, and signing certificate). +:attribute_mapping: Maps SAML assertion attribute names (left) to FAB user fields (right). + Supported FAB fields: ``username``, ``email``, ``first_name``, ``last_name``, ``role_keys``. + +The ``SAML_CONFIG`` dict holds the global Service Provider settings. The ``sp`` section defines +your application's SAML endpoints. These URLs must match what you configure on the IdP side. + +SAML Endpoints +~~~~~~~~~~~~~~ + +The following endpoints are automatically registered: + +- ``/login/`` — Login page with IdP selection (or auto-redirect for single IdP) +- ``/login/`` — Initiate SSO with a specific IdP +- ``/saml/acs/`` — Assertion Consumer Service (receives SAML responses) +- ``/saml/slo/`` — Single Logout endpoint +- ``/saml/metadata/`` — SP metadata XML (configure this URL on your IdP) + +SAML Role Mapping +~~~~~~~~~~~~~~~~~ + +You can map SAML group claims to FAB roles, just like with OAuth and LDAP:: + + AUTH_ROLES_MAPPING = { + "admins": ["Admin"], + "users": ["User"], + } + + AUTH_ROLES_SYNC_AT_LOGIN = True + + PERMANENT_SESSION_LIFETIME = 1800 + +The ``role_keys`` field in ``attribute_mapping`` defines which SAML attribute contains the +user's group memberships. + +You can also use JMESPath expressions for dynamic role assignment:: + + AUTH_USER_REGISTRATION_ROLE_JMESPATH = "role_keys[0]" + +Take a look at the `SAML example `_ + + Authentication: Rate limiting ----------------------------- @@ -848,6 +982,9 @@ F.A.B. uses a different user view for each authentication method :UserDBModelView: For database auth method :UserLDAPModelView: For LDAP auth method +:UserOAuthModelView: For OAuth auth method +:UserRemoteUserModelView: For Remote User auth method +:UserSAMLModelView: For SAML auth method You can extend or create from scratch your own, and then tell F.A.B. to use them instead, by overriding their correspondent lower case properties on **SecurityManager** (just like on the given example). @@ -881,6 +1018,7 @@ If you're using: :AUTH_LDAP: Extend UserLDAPModelView :AUTH_REMOTE_USER: Extend UserRemoteUserModelView :AUTH_OAUTH: Extend UserOAuthModelView +:AUTH_SAML: Extend UserSAMLModelView So using AUTH_DB:: @@ -955,6 +1093,8 @@ Note that this is for AUTH_DB, so if you're using: :AUTH_DB: Override userdbmodelview :AUTH_LDAP: Override userldapmodelview :AUTH_REMOTE_USER: Override userremoteusermodelview +:AUTH_OAUTH: Override useroauthmodelview +:AUTH_SAML: Override usersamlmodelview Finally (as shown on the previous example) tell F.A.B. to use your SecurityManager class, so when initializing **AppBuilder** (on __init__.py):: diff --git a/examples/saml/app.py b/examples/saml/app.py new file mode 100644 index 000000000..dc63636a3 --- /dev/null +++ b/examples/saml/app.py @@ -0,0 +1,8 @@ +"""Example Flask-AppBuilder application with SAML / Entra ID authentication.""" + +from app import create_app + +app = create_app() + +if __name__ == "__main__": + app.run(host="0.0.0.0", port=5000, debug=True) diff --git a/examples/saml/app/__init__.py b/examples/saml/app/__init__.py new file mode 100644 index 000000000..6e4151f12 --- /dev/null +++ b/examples/saml/app/__init__.py @@ -0,0 +1,19 @@ +import logging + +from flask import Flask + +from .extensions import appbuilder, db + + +logging.basicConfig(format="%(asctime)s:%(levelname)s:%(name)s:%(message)s") +logging.getLogger().setLevel(logging.INFO) + + +def create_app() -> Flask: + app = Flask(__name__) + app.config.from_object("config") + with app.app_context(): + db.init_app(app) + appbuilder.init_app(app, db.session) + db.create_all() + return app diff --git a/examples/saml/app/extensions.py b/examples/saml/app/extensions.py new file mode 100644 index 000000000..419cabef3 --- /dev/null +++ b/examples/saml/app/extensions.py @@ -0,0 +1,5 @@ +from flask_appbuilder import AppBuilder +from flask_appbuilder.utils.legacy import get_sqla_class + +db = get_sqla_class()() +appbuilder = AppBuilder() diff --git a/examples/saml/config.py b/examples/saml/config.py new file mode 100644 index 000000000..6118449b7 --- /dev/null +++ b/examples/saml/config.py @@ -0,0 +1,116 @@ +"""Example configuration for SAML authentication with Flask-AppBuilder. + +Demonstrates configuring FAB with SAML / Microsoft Entra ID authentication. +Adjust the IdP settings to match your identity provider. + +Required environment variables: + SAML_TENANT_ID - Microsoft Entra ID tenant ID + SAML_IDP_CERT - IdP signing certificate (base64 content, no PEM headers) +""" + +import os + +from flask_appbuilder.const import AUTH_SAML + +basedir = os.path.abspath(os.path.dirname(__file__)) + +# Entra ID tenant ID and IdP certificate from environment +SAML_TENANT_ID = os.environ.get("SAML_TENANT_ID", "") +SAML_IDP_CERT = os.environ.get("SAML_IDP_CERT", "") + +# Flask secret key +SECRET_KEY = "\2\1thisismyscretkey\1\2\e\y\y\h" + +# Database connection +SQLALCHEMY_DATABASE_URI = "sqlite:///" + os.path.join(basedir, "app.db") + +# Flask-AppBuilder Security +AUTH_TYPE = AUTH_SAML + +# Set to True to allow user self registration via SAML +AUTH_USER_REGISTRATION = True + +# Default role for self-registered users +AUTH_USER_REGISTRATION_ROLE = "Admin" + +# Sync roles at login (maps SAML groups/roles to FAB roles) +AUTH_ROLES_SYNC_AT_LOGIN = True + +# Role mapping from SAML group names to FAB role names +AUTH_ROLES_MAPPING = { + "admins": ["Admin"], + "users": ["Public"], +} + +# ------------------------------------------------------- +# SAML Configuration +# ------------------------------------------------------- + +# List of SAML Identity Providers +# Replace with your Microsoft Entra ID (formerly Azure AD) tenant ID. +# You can find these values in the Azure Portal under: +# Entra ID > Enterprise Applications > Your App > Single sign-on +SAML_PROVIDERS = [ + { + "name": "entra_id", + "icon": "fa-microsoft", + # IdP configuration from Entra ID SAML metadata: + # https://login.microsoftonline.com//federationmetadata/2007-06/federationmetadata.xml + "idp": { + "entityId": f"https://sts.windows.net/{SAML_TENANT_ID}/", + "singleSignOnService": { + "url": f"https://login.microsoftonline.com/{SAML_TENANT_ID}/saml2", + "binding": "urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect", + }, + "singleLogoutService": { + "url": f"https://login.microsoftonline.com/{SAML_TENANT_ID}/saml2", + "binding": "urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect", + }, + "x509cert": SAML_IDP_CERT, + }, + # Map SAML assertion attributes to FAB user fields. + # Left side: SAML attribute name (Entra ID claim URIs). + # Right side: FAB user field name. + "attribute_mapping": { + "http://schemas.xmlsoap.org/ws/2005/05/identity/claims/emailaddress": "email", + "http://schemas.xmlsoap.org/ws/2005/05/identity/claims/givenname": "first_name", # noqa: E501 + "http://schemas.xmlsoap.org/ws/2005/05/identity/claims/surname": "last_name", + "http://schemas.xmlsoap.org/ws/2005/05/identity/claims/name": "username", + "http://schemas.microsoft.com/ws/2008/06/identity/claims/groups": "role_keys", + }, + }, +] + +# Global SAML Service Provider configuration +SAML_CONFIG = { + "strict": False, + "debug": True, + "sp": { + "entityId": "http://localhost:9000/saml/metadata/", + "assertionConsumerService": { + "url": "http://localhost:9000/saml/acs/", + "binding": "urn:oasis:names:tc:SAML:2.0:bindings:HTTP-POST", + }, + "singleLogoutService": { + "url": "http://localhost:9000/saml/slo/", + "binding": "urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect", + }, + "NameIDFormat": "urn:oasis:names:tc:SAML:1.1:nameid-format:emailAddress", + # SP certificate (optional, needed for signed requests) + "x509cert": "", + # "privateKey": "", + }, + "security": { + "nameIdEncrypted": False, + "authnRequestsSigned": False, + "logoutRequestSigned": False, + "logoutResponseSigned": False, + "signMetadata": False, + "wantMessagesSigned": False, + "wantAssertionsSigned": True, + "wantAssertionsEncrypted": False, + "wantNameId": True, + "wantNameIdEncrypted": False, + "wantAttributeStatement": True, + }, +} diff --git a/flask_appbuilder/const.py b/flask_appbuilder/const.py index 825272f2d..14034fcae 100644 --- a/flask_appbuilder/const.py +++ b/flask_appbuilder/const.py @@ -130,6 +130,7 @@ AUTH_LDAP = 2 AUTH_REMOTE_USER = 3 AUTH_OAUTH = 4 +AUTH_SAML = 5 """ Constants for supported authentication types """ # ----------------------------------- diff --git a/flask_appbuilder/security/manager.py b/flask_appbuilder/security/manager.py index 80fc5a587..9d88e605a 100644 --- a/flask_appbuilder/security/manager.py +++ b/flask_appbuilder/security/manager.py @@ -4,9 +4,9 @@ import importlib import logging import re -from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union +from typing import Any, Callable, Dict, List, Optional, Set, Tuple, TYPE_CHECKING, Union -from flask import current_app, Flask, g, session, url_for +from flask import current_app, Flask, g, request, session, url_for from flask_appbuilder.exceptions import InvalidLoginAttempt, OAuthProviderUnknown from flask_babel import lazy_gettext as _ from flask_jwt_extended import current_user as current_user_jwt @@ -28,6 +28,7 @@ AuthLDAPView, AuthOAuthView, AuthRemoteUserView, + AuthSAMLView, PermissionModelView, PermissionViewModelView, RegisterUserModelView, @@ -40,6 +41,7 @@ UserLDAPModelView, UserOAuthModelView, UserRemoteUserModelView, + UserSAMLModelView, UserStatsChartView, ViewMenuModelView, ) @@ -49,6 +51,7 @@ AUTH_LDAP, AUTH_OAUTH, AUTH_REMOTE_USER, + AUTH_SAML, LOGMSG_ERR_SEC_ADD_REGISTER_USER, LOGMSG_ERR_SEC_AUTH_LDAP, LOGMSG_ERR_SEC_AUTH_LDAP_TLS, @@ -59,6 +62,9 @@ PERMISSION_PREFIX, ) +if TYPE_CHECKING: + from flask_appbuilder.security.saml.types import SAMLConfig, SAMLProvider + log = logging.getLogger(__name__) @@ -186,6 +192,11 @@ class BaseSecurityManager(AbstractSecurityManager): """ Override if you want your own Authentication OAuth view """ authremoteuserview = AuthRemoteUserView """ Override if you want your own Authentication REMOTE_USER view """ + authsamlview = AuthSAMLView + """ Override if you want your own Authentication SAML view """ + + usersamlmodelview = UserSAMLModelView + """ Override if you want your own user SAML view """ registeruserdbview = RegisterUserDBView """ Override if you want your own register user db view """ @@ -519,6 +530,41 @@ def auth_ldap_tls_keyfile(self): def oauth_providers(self): return current_app.config["OAUTH_PROVIDERS"] + @property + def saml_providers(self) -> List["SAMLProvider"]: + return current_app.config.get("SAML_PROVIDERS", []) + + @property + def saml_config(self) -> "SAMLConfig": + return current_app.config.get("SAML_CONFIG", {}) + + def get_saml_provider(self, name: str) -> Optional["SAMLProvider"]: + """Return a specific SAML provider by name.""" + for provider in self.saml_providers: + if provider["name"] == name: + return provider + return None + + def get_saml_settings(self, provider_name: str) -> "SAMLConfig": + """Build the python3-saml settings dict for a given provider. + + Merges the global SAML_CONFIG with the provider-specific IdP config. + """ + import copy + + provider = self.get_saml_provider(provider_name) + if not provider: + raise ValueError( + f"SAML provider '{provider_name}' not found in configuration" + ) + + base_config = copy.deepcopy(self.saml_config) + + if "idp" in provider: + base_config["idp"] = provider["idp"] + + return base_config + @property def is_auth_limited(self) -> bool: return current_app.config["AUTH_RATE_LIMITED"] @@ -807,6 +853,9 @@ def register_views(self): elif self.auth_type == AUTH_REMOTE_USER: self.user_view = self.userremoteusermodelview self.auth_view = self.authremoteuserview() + elif self.auth_type == AUTH_SAML: + self.user_view = self.usersamlmodelview + self.auth_view = self.authsamlview() self.appbuilder.add_view_no_menu(self.auth_view) # this needs to be done after the view is added, otherwise the blueprint @@ -1448,6 +1497,238 @@ def auth_user_oauth(self, userinfo): else: return None + @staticmethod + def _prepare_saml_request() -> Dict[str, Any]: + """Prepare Flask request data in the format expected by python3-saml.""" + return { + "https": "on" if request.scheme == "https" else "off", + "http_host": request.host, + "server_port": request.environ.get("SERVER_PORT", "443"), + "script_name": request.path, + "get_data": request.args.copy(), + "post_data": request.form.copy(), + "query_string": request.query_string.decode("utf-8"), + } + + def _get_saml_auth(self, idp: str): + """Create a OneLogin_Saml2_Auth instance for the given IdP.""" + from onelogin.saml2.auth import OneLogin_Saml2_Auth + + return OneLogin_Saml2_Auth( + self._prepare_saml_request(), self.get_saml_settings(idp) + ) + + def get_saml_login_redirect_url(self, idp: str) -> str: + """Create a SAML authentication request and return the redirect URL. + + :param idp: The SAML identity provider name. + :returns: The IdP redirect URL for SSO. + """ + return self._get_saml_auth(idp).login() + + def get_saml_userinfo(self, idp: str) -> Optional[Dict[str, Any]]: + """Process a SAML ACS response and return mapped user info. + + :param idp: The SAML identity provider name. + :returns: A dict with mapped user info, session_index, and name_id, + or None if authentication failed. + """ + from flask_appbuilder.security.saml.utils import map_saml_attributes + + auth = self._get_saml_auth(idp) + auth.process_response() + errors = auth.get_errors() + + if errors: + error_reason = auth.get_last_error_reason() or "" + # Check for issuer mismatch (multi-tab / wrong IdP scenario) + if "issuer" in error_reason.lower(): + log.error( + "SAML Issuer mismatch for IdP '%s'. This may happen if you " + "initiated login with a different IdP in another tab. " + "Error: %s", + idp, + error_reason, + ) + else: + log.error( + "SAML ACS errors for IdP '%s': %s (reason: %s)", + idp, + errors, + error_reason, + ) + return None + + if not auth.is_authenticated(): + return None + + saml_attributes = auth.get_attributes() + name_id = auth.get_nameid() + log.debug( + "SAML attributes from IdP '%s': %s, NameID: %s", + idp, + saml_attributes, + name_id, + ) + + provider = self.get_saml_provider(idp) + attribute_mapping = provider.get("attribute_mapping", {}) + userinfo = map_saml_attributes(saml_attributes, attribute_mapping, name_id) + + # Include session data needed for SLO + userinfo["saml_name_id"] = name_id + userinfo["saml_session_index"] = auth.get_session_index() + + log.debug("Mapped SAML userinfo: %s", userinfo) + return userinfo + + def get_saml_logout_redirect_url( + self, + idp: str, + name_id: Optional[str] = None, + session_index: Optional[str] = None, + ) -> Tuple[Optional[str], bool]: + """Process SAML SLO or initiate a logout request. + + Handles three cases: + - Incoming SLO request from IdP (SAMLRequest) + - SLO response from IdP (SAMLResponse) + - SP-initiated logout (returns redirect URL to IdP) + + :param idp: The SAML identity provider name. + :param name_id: The SAML NameID for the session. + :param session_index: The SAML session index. + :returns: Tuple of (redirect URL or None, should_logout flag). + The caller should call logout_user() if should_logout is True. + """ + auth = self._get_saml_auth(idp) + should_logout = False + + def mark_logout(): + nonlocal should_logout + should_logout = True + + # Incoming SLO request from IdP + if "SAMLRequest" in request.form or "SAMLRequest" in request.args: + url = auth.process_slo(delete_session_cb=mark_logout) + return url, should_logout + + # SLO response from IdP + if "SAMLResponse" in request.form or "SAMLResponse" in request.args: + auth.process_slo(delete_session_cb=mark_logout) + return None, should_logout + + # SP-initiated logout + return auth.logout(name_id=name_id, session_index=session_index), True + + def _saml_calculate_user_roles(self, userinfo) -> List[str]: + user_role_objects = set() + + # apply AUTH_ROLES_MAPPING + if len(self.auth_roles_mapping) > 0: + user_role_keys = userinfo.get("role_keys", []) + user_role_objects.update(self.get_roles_from_keys(user_role_keys)) + + # apply AUTH_USER_REGISTRATION_ROLE + if self.auth_user_registration: + registration_role_name = self.auth_user_registration_role + + if self.auth_user_registration_role_jmespath: + import jmespath + + registration_role_name = jmespath.search( + self.auth_user_registration_role_jmespath, userinfo + ) + + fab_role = self.find_role(registration_role_name) + if fab_role: + user_role_objects.add(fab_role) + else: + log.warning( + "Can't find AUTH_USER_REGISTRATION role: %s", registration_role_name + ) + + return list(user_role_objects) + + def auth_user_saml(self, userinfo): + """ + Method for authenticating user with SAML. + + :param userinfo: dict with user information extracted from SAML assertion + (keys are the same as User model columns) + """ + # extract the username from userinfo + if "username" in userinfo: + username = userinfo["username"] + elif "email" in userinfo: + username = userinfo["email"] + else: + log.error("SAML userinfo does not have username or email %s", userinfo) + return None + + if (username is None) or username == "": + return None + + # Search the DB for this user by username or email + user = self.find_user(username=username) + if not user and userinfo.get("email"): + user = self.find_user(email=userinfo["email"]) + + # If user is not active, go away + if user and (not user.is_active): + return None + + # If user is not registered, and not self-registration, go away + if (not user) and (not self.auth_user_registration): + return None + + # Sync the user's roles and info + if user: + updated = False + + if self.auth_roles_sync_at_login: + new_roles = self._saml_calculate_user_roles(userinfo) + if new_roles: + user.roles = new_roles + updated = True + log.debug( + "Calculated new roles for user='%s' as: %s", + username, + user.roles, + ) + + # Update user info from SAML assertion + for field in ("first_name", "last_name", "email"): + new_val = userinfo.get(field) + if new_val and getattr(user, field) != new_val: + setattr(user, field, new_val) + updated = True + + if updated: + self.update_user(user) + + # If the user is new, register them + if (not user) and self.auth_user_registration: + user = self.add_user( + username=username, + first_name=userinfo.get("first_name", ""), + last_name=userinfo.get("last_name", ""), + email=userinfo.get("email", "") or f"{username}@email.notfound", + role=self._saml_calculate_user_roles(userinfo), + ) + log.debug("New SAML user registered: %s", user) + + if not user: + log.error("Error creating a new SAML user %s", username) + return None + + # LOGIN SUCCESS + if user: + self.update_user_auth_stat(user) + return user + else: + return None + """ ---------------------------------------- PERMISSION ACCESS CHECK diff --git a/flask_appbuilder/security/saml/__init__.py b/flask_appbuilder/security/saml/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/flask_appbuilder/security/saml/metadata.py b/flask_appbuilder/security/saml/metadata.py new file mode 100644 index 000000000..427d87067 --- /dev/null +++ b/flask_appbuilder/security/saml/metadata.py @@ -0,0 +1,26 @@ +"""SP metadata generation for SAML authentication.""" + +from __future__ import annotations + +import logging +from typing import Any, Dict + +log = logging.getLogger(__name__) + + +def get_sp_metadata(saml_settings: Dict[str, Any]) -> str: + """ + Generate SP metadata XML from SAML settings. + + :param saml_settings: The python3-saml settings dict + :return: SP metadata XML string + """ + from onelogin.saml2.settings import OneLogin_Saml2_Settings + + settings = OneLogin_Saml2_Settings(saml_settings, sp_validation_only=True) + metadata = settings.get_sp_metadata() + errors = settings.validate_metadata(metadata) + if errors: + log.error("SP Metadata validation errors: %s", errors) + raise ValueError(f"SP Metadata validation errors: {errors}") + return metadata diff --git a/flask_appbuilder/security/saml/types.py b/flask_appbuilder/security/saml/types.py new file mode 100644 index 000000000..b0d94f71b --- /dev/null +++ b/flask_appbuilder/security/saml/types.py @@ -0,0 +1,73 @@ +"""SAML TypedDict definitions for Flask-AppBuilder.""" + +from __future__ import annotations + +from typing import Any, Dict, List, TypedDict + + +class SAMLServiceBinding(TypedDict): + url: str + binding: str + + +class SAMLIdPConfig(TypedDict, total=False): + entityId: str + singleSignOnService: SAMLServiceBinding + singleLogoutService: SAMLServiceBinding + x509cert: str + + +class SAMLProvider(TypedDict, total=False): + name: str + icon: str + idp: SAMLIdPConfig + attribute_mapping: Dict[str, str] + + +class SAMLSPConfig(TypedDict, total=False): + entityId: str + assertionConsumerService: SAMLServiceBinding + singleLogoutService: SAMLServiceBinding + NameIDFormat: str + x509cert: str + privateKey: str + + +class SAMLSecurityConfig(TypedDict, total=False): + nameIdEncrypted: bool + authnRequestsSigned: bool + logoutRequestSigned: bool + logoutResponseSigned: bool + signMetadata: bool + wantMessagesSigned: bool + wantAssertionsSigned: bool + wantAssertionsEncrypted: bool + wantNameId: bool + wantNameIdEncrypted: bool + wantAttributeStatement: bool + + +class SAMLConfig(TypedDict, total=False): + strict: bool + debug: bool + sp: SAMLSPConfig + idp: SAMLIdPConfig + security: SAMLSecurityConfig + + +class SAMLFlaskRequest(TypedDict): + https: str + http_host: str + server_port: str + script_name: str + get_data: Dict[str, Any] + post_data: Dict[str, Any] + query_string: str + + +class SAMLUserInfo(TypedDict, total=False): + username: str + email: str + first_name: str + last_name: str + role_keys: List[str] diff --git a/flask_appbuilder/security/saml/utils.py b/flask_appbuilder/security/saml/utils.py new file mode 100644 index 000000000..7331669e9 --- /dev/null +++ b/flask_appbuilder/security/saml/utils.py @@ -0,0 +1,54 @@ +"""SAML utility helpers for Flask-AppBuilder.""" + +from __future__ import annotations + +import logging +from typing import Any, Dict, List, Optional + +from .types import SAMLUserInfo + +log = logging.getLogger(__name__) + + +def map_saml_attributes( + saml_attributes: Dict[str, List[str]], + attribute_mapping: Dict[str, str], + name_id: Optional[str] = None, +) -> SAMLUserInfo: + """Map SAML assertion attributes to FAB user fields. + + Args: + saml_attributes: Raw attributes from the SAML assertion. + attribute_mapping: Mapping of SAML attribute names to FAB field names. + name_id: The SAML NameID value (used as fallback for username/email). + + Returns: + Dictionary with FAB user field names as keys. + """ + userinfo: Dict[str, Any] = {} + + for saml_attr, fab_field in attribute_mapping.items(): + value = saml_attributes.get(saml_attr) + if value is None: + continue + if fab_field == "role_keys": + userinfo[fab_field] = list(value) + else: + userinfo[fab_field] = value[0] if value else "" + + # Fallback to NameID if no username/email mapped + if name_id and "username" not in userinfo: + userinfo["username"] = name_id + if "@" in name_id and "email" not in userinfo: + userinfo["email"] = name_id + + return userinfo + + +def fetch_idp_metadata(url: str) -> str: + """Fetch IdP metadata XML from a remote URL.""" + import requests + + resp = requests.get(url, timeout=10) + resp.raise_for_status() + return resp.text diff --git a/flask_appbuilder/security/sqla/manager.py b/flask_appbuilder/security/sqla/manager.py index f36701222..bcf8afa61 100755 --- a/flask_appbuilder/security/sqla/manager.py +++ b/flask_appbuilder/security/sqla/manager.py @@ -82,6 +82,8 @@ def __init__(self, appbuilder): self.useroauthmodelview.datamodel = user_datamodel elif self.auth_type == c.AUTH_REMOTE_USER: self.userremoteusermodelview.datamodel = user_datamodel + elif self.auth_type == c.AUTH_SAML: + self.usersamlmodelview.datamodel = user_datamodel if self.userstatschartview: self.userstatschartview.datamodel = user_datamodel diff --git a/flask_appbuilder/security/views.py b/flask_appbuilder/security/views.py index 9634f6627..6683812fa 100644 --- a/flask_appbuilder/security/views.py +++ b/flask_appbuilder/security/views.py @@ -3,7 +3,17 @@ import re from typing import Any, Optional -from flask import abort, current_app, flash, g, redirect, request, session, url_for +from flask import ( + abort, + current_app, + flash, + g, + make_response, + redirect, + request, + session, + url_for, +) from flask_appbuilder._compat import as_unicode from flask_appbuilder.actions import action from flask_appbuilder.baseviews import BaseView @@ -21,6 +31,7 @@ roles_or_groups_required, UserInfoEdit, ) +from flask_appbuilder.security.saml.metadata import get_sp_metadata from flask_appbuilder.security.utils import generate_random_string from flask_appbuilder.utils.base import get_safe_redirect, lazy_formatter_gettext from flask_appbuilder.validators import PasswordComplexityValidator @@ -732,6 +743,199 @@ def oauth_authorized(self, provider: str) -> WerkzeugResponse: return redirect(next_url) +class UserSAMLModelView(UserModelView): + """ + View that adds SAML specifics to User view. + Override to implement your own custom view. + Then override usersamlmodelview property on SecurityManager. + """ + + pass + + +class AuthSAMLView(AuthView): + """SAML 2.0 Authentication View.""" + + login_template = "appbuilder/general/security/login_saml.html" + + @expose("/login/") + @expose("/login/") + @no_cache + def login(self, idp: Optional[str] = None) -> WerkzeugResponse: + if g.user is not None and g.user.is_authenticated: + return redirect(self.appbuilder.get_url_for_index) + + sm = self.appbuilder.sm + providers = sm.saml_providers + + if idp is None: + if len(providers) == 1: + return redirect(url_for(".login", idp=providers[0]["name"])) + return self.render_template( + self.login_template, + providers=providers, + title=self.title, + appbuilder=self.appbuilder, + ) + + from onelogin.saml2.errors import ( + OneLogin_Saml2_Error, + OneLogin_Saml2_ValidationError, + ) + + try: + session["saml_idp"] = idp + next_url = get_safe_redirect(request.args.get("next", "")) + if next_url and next_url != self.appbuilder.get_url_for_index: + session["saml_next"] = next_url + return redirect(sm.get_saml_login_redirect_url(idp)) + except (OneLogin_Saml2_Error, OneLogin_Saml2_ValidationError) as e: + log.error("SAML error initiating login for IdP '%s': %s", idp, e) + flash(as_unicode(self.invalid_login_message), "warning") + return redirect(self.appbuilder.get_url_for_index) + except ValueError as e: + log.error("SAML configuration error for IdP '%s': %s", idp, e) + flash(as_unicode(self.invalid_login_message), "warning") + return redirect(self.appbuilder.get_url_for_index) + + @expose("/saml/acs/", methods=["POST"]) + @no_cache + def acs(self) -> WerkzeugResponse: + """Assertion Consumer Service - receives SAML responses from IdP.""" + from onelogin.saml2.errors import ( + OneLogin_Saml2_Error, + OneLogin_Saml2_ValidationError, + ) + + sm = self.appbuilder.sm + try: + idp = session.get("saml_idp") + if not idp: + providers = sm.saml_providers + if len(providers) == 1: + idp = providers[0]["name"] + else: + flash("Could not determine identity provider.", "warning") + return redirect(self.appbuilder.get_url_for_login) + + userinfo = sm.get_saml_userinfo(idp) + if userinfo is None: + flash(as_unicode(self.invalid_login_message), "warning") + return redirect(self.appbuilder.get_url_for_login) + + user = sm.auth_user_saml(userinfo) + if user is None: + flash(as_unicode(self.invalid_login_message), "warning") + return redirect(self.appbuilder.get_url_for_login) + + # Calculate redirect URL before login (so failures don't leave partial state) + next_url = session.pop("saml_next", "") or "" + if next_url: + next_url = get_safe_redirect(next_url) + else: + next_url = self.appbuilder.get_url_for_index + + # Store SAML session info for SLO + session["saml_name_id"] = userinfo.get("saml_name_id") + session["saml_session_index"] = userinfo.get("saml_session_index") + session["saml_idp"] = idp + + # login_user is the last operation before redirect + login_user(user, remember=False) + return redirect(next_url) + + except (OneLogin_Saml2_Error, OneLogin_Saml2_ValidationError) as e: + log.error("SAML validation error in ACS: %s", e) + flash(as_unicode(self.invalid_login_message), "warning") + return redirect(self.appbuilder.get_url_for_login) + except ValueError as e: + # Provider not found or config error + log.error("SAML configuration error in ACS: %s", e) + flash(as_unicode(self.invalid_login_message), "warning") + return redirect(self.appbuilder.get_url_for_login) + + @expose("/saml/slo/", methods=["GET", "POST"]) + def slo(self) -> WerkzeugResponse: + """Single Logout endpoint.""" + from onelogin.saml2.errors import ( + OneLogin_Saml2_Error, + OneLogin_Saml2_ValidationError, + ) + + try: + idp = session.get("saml_idp") + if not idp: + logout_user() + return redirect(self.appbuilder.get_url_for_index) + + url, should_logout = self.appbuilder.sm.get_saml_logout_redirect_url( + idp, + name_id=session.get("saml_name_id"), + session_index=session.get("saml_session_index"), + ) + if should_logout: + logout_user() + return redirect(url or self.appbuilder.get_url_for_index) + + except (OneLogin_Saml2_Error, OneLogin_Saml2_ValidationError) as e: + log.error("SAML SLO validation error: %s", e) + logout_user() + return redirect(self.appbuilder.get_url_for_index) + except ValueError as e: + log.error("SAML SLO configuration error: %s", e) + logout_user() + return redirect(self.appbuilder.get_url_for_index) + + @expose("/logout/") + def logout(self) -> WerkzeugResponse: + """Override logout to support SAML SLO.""" + idp = session.get("saml_idp") + if idp: + try: + return redirect(url_for(".slo")) + except Exception: + pass + logout_user() + return redirect( + current_app.config.get( + "LOGOUT_REDIRECT_URL", self.appbuilder.get_url_for_index + ) + ) + + @expose("/saml/metadata/") + @expose("/saml/metadata/") + def metadata(self, idp: Optional[str] = None) -> WerkzeugResponse: + """SP Metadata endpoint. + + Without idp parameter, returns metadata for the first configured provider. + With idp parameter, returns metadata for that specific provider. + """ + try: + sm = self.appbuilder.sm + providers = sm.saml_providers + if not providers: + abort(404) + + # Use specified IdP or default to first provider + provider_name = idp if idp else providers[0]["name"] + if idp and not sm.get_saml_provider(idp): + log.warning("Metadata requested for unknown IdP: %s", idp) + abort(404) + + saml_settings = sm.get_saml_settings(provider_name) + metadata_xml = get_sp_metadata(saml_settings) + + resp = make_response(metadata_xml, 200) + resp.headers["Content-Type"] = "text/xml" + return resp + except ValueError as e: + log.error("SAML metadata configuration error: %s", e) + abort(404) + except Exception as e: + log.error("Error generating SP metadata: %s", e) + abort(500) + + class AuthRemoteUserView(AuthView): login_template = "" diff --git a/flask_appbuilder/templates/appbuilder/general/security/login_saml.html b/flask_appbuilder/templates/appbuilder/general/security/login_saml.html new file mode 100644 index 000000000..ea6911a97 --- /dev/null +++ b/flask_appbuilder/templates/appbuilder/general/security/login_saml.html @@ -0,0 +1,45 @@ + +{% extends "appbuilder/base.html" %} + +{% block content %} + +
+
+
+
+
{{ title }}
+
+
+
+ {% for provider in providers %} + + {{_('Sign In with ')}}{{ provider.name }} + + + {% endfor %} +
+
+
+
+
+ + +{% endblock %} \ No newline at end of file diff --git a/requirements/extra.txt b/requirements/extra.txt index 2c878e897..2c2dd79cb 100644 --- a/requirements/extra.txt +++ b/requirements/extra.txt @@ -42,3 +42,5 @@ requests==2.26.0 # via -r requirements/extra.in urllib3==1.26.18 # via requests +python3-saml>=1.15.0 + # via -r requirements/extra.in diff --git a/setup.py b/setup.py index 4e09e2734..eede8f315 100644 --- a/setup.py +++ b/setup.py @@ -69,6 +69,7 @@ def desc(): extras_require={ "jmespath": ["jmespath>=0.9.5"], "oauth": ["Authlib>=0.14, <2.0.0"], + "saml": ["python3-saml>=1.15.0"], "talisman": ["flask-talisman>=1.0.0, <2.0"], }, tests_require=["nose2==0.14.0", "mockldap>=0.3.0", "hiro>=0.5.1"], diff --git a/tests/security/test_auth_saml.py b/tests/security/test_auth_saml.py new file mode 100644 index 000000000..13370263d --- /dev/null +++ b/tests/security/test_auth_saml.py @@ -0,0 +1,751 @@ +"""Tests for SAML authentication flow.""" + +import os +import unittest + +from flask import Flask +from flask_appbuilder import AppBuilder +from flask_appbuilder.const import AUTH_SAML +from flask_appbuilder.security.saml.utils import map_saml_attributes +from flask_appbuilder.utils.legacy import get_sqla_class +import jinja2 +from tests.const import USERNAME_ADMIN, USERNAME_READONLY +from tests.fixtures.users import create_default_users + + +class SAMLUtilsTestCase(unittest.TestCase): + """Test SAML utility functions (no app context needed).""" + + def test_map_saml_attributes_basic(self): + """SAML: test attribute mapping - basic fields""" + saml_attrs = { + "http://schemas.xmlsoap.org/ws/2005/05/identity/claims/emailaddress": [ + "user@example.com" + ], + "http://schemas.xmlsoap.org/ws/2005/05/identity/claims/givenname": ["John"], + "http://schemas.xmlsoap.org/ws/2005/05/identity/claims/surname": ["Doe"], + "http://schemas.xmlsoap.org/ws/2005/05/identity/claims/name": ["johndoe"], + } + mapping = { + "http://schemas.xmlsoap.org/ws/2005/05/identity/claims/emailaddress": "email", + "http://schemas.xmlsoap.org/ws/2005/05/identity/claims/givenname": "first_name", # noqa: E501 + "http://schemas.xmlsoap.org/ws/2005/05/identity/claims/surname": "last_name", + "http://schemas.xmlsoap.org/ws/2005/05/identity/claims/name": "username", + } + result = map_saml_attributes(saml_attrs, mapping) + self.assertEqual(result["email"], "user@example.com") + self.assertEqual(result["first_name"], "John") + self.assertEqual(result["last_name"], "Doe") + self.assertEqual(result["username"], "johndoe") + + def test_map_saml_attributes_role_keys(self): + """SAML: test attribute mapping - role_keys as list""" + saml_attrs = {"groups": ["Admin", "Users"]} + mapping = {"groups": "role_keys"} + result = map_saml_attributes(saml_attrs, mapping) + self.assertEqual(result["role_keys"], ["Admin", "Users"]) + + def test_map_saml_attributes_fallback_to_nameid(self): + """SAML: test attribute mapping - fallback to NameID""" + result = map_saml_attributes({}, {}, name_id="user@example.com") + self.assertEqual(result["username"], "user@example.com") + self.assertEqual(result["email"], "user@example.com") + + def test_map_saml_attributes_nameid_no_email(self): + """SAML: test attribute mapping - NameID is not an email""" + result = map_saml_attributes({}, {}, name_id="johndoe") + self.assertEqual(result["username"], "johndoe") + self.assertNotIn("email", result) + + def test_map_saml_attributes_empty(self): + """SAML: test attribute mapping - empty attributes""" + result = map_saml_attributes({}, {}) + self.assertEqual(result, {}) + + def test_map_saml_attributes_missing_attr(self): + """SAML: test attribute mapping - missing SAML attribute skipped""" + saml_attrs = {"email_claim": ["user@example.com"]} + mapping = {"email_claim": "email", "name_claim": "username"} + result = map_saml_attributes(saml_attrs, mapping) + self.assertEqual(result["email"], "user@example.com") + self.assertNotIn("username", result) + + +class SAMLRegistrationRoleTestCase(unittest.TestCase): + def setUp(self): + self.app = Flask(__name__) + self.app.jinja_env.undefined = jinja2.StrictUndefined + self.app.config["SQLALCHEMY_DATABASE_URI"] = os.environ.get( + "SQLALCHEMY_DATABASE_URI", "sqlite:///" + ) + self.app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False + self.app.config["AUTH_TYPE"] = AUTH_SAML + self.app.config["SECRET_KEY"] = "test-secret-key" + self.app.config["WTF_CSRF_ENABLED"] = False + self.app.config["SAML_PROVIDERS"] = [ + { + "name": "test_idp", + "icon": "fa-key", + "idp": { + "entityId": "https://idp.example.com/", + "singleSignOnService": { + "url": "https://idp.example.com/sso", + "binding": "urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect", + }, + "singleLogoutService": { + "url": "https://idp.example.com/slo", + "binding": "urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect", + }, + "x509cert": "", + }, + "attribute_mapping": { + "email": "email", + "givenname": "first_name", + "surname": "last_name", + "name": "username", + "groups": "role_keys", + }, + } + ] + self.app.config["SAML_CONFIG"] = { + "strict": False, + "debug": True, + "sp": { + "entityId": "http://localhost:5000/saml/metadata/", + "assertionConsumerService": { + "url": "http://localhost:5000/saml/acs/", + "binding": "urn:oasis:names:tc:SAML:2.0:bindings:HTTP-POST", + }, + "singleLogoutService": { + "url": "http://localhost:5000/saml/slo/", + "binding": "urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect", + }, + "NameIDFormat": "urn:oasis:names:tc:SAML:1.1:nameid-format:emailAddress", + }, + } + + def tearDown(self): + with self.app.app_context(): + for username in ("alice", "alice_old", "emailuser@example.com"): + user = self.appbuilder.sm.find_user(username) + if user: + self.appbuilder.session.delete(user) + self.appbuilder.session.commit() + self.app = None + self.appbuilder = None + + def assertOnlyDefaultUsers(self): + users = self.appbuilder.sm.get_all_users() + user_names = sorted([user.username for user in users]) + self.assertEqual(user_names, [USERNAME_READONLY, USERNAME_ADMIN]) + + # ---------------- + # Userinfo Objects + # ---------------- + userinfo_alice = { + "username": "alice", + "first_name": "Alice", + "last_name": "Doe", + "email": "alice@example.com", + "role_keys": ["GROUP_1", "GROUP_2"], + } + + # ---------------- + # Unit Tests + # ---------------- + def test__inactive_user(self): + """ + SAML: test login flow for - inactive user + """ + with self.app.app_context(): + SQLA = get_sqla_class() + db = SQLA(self.app) + self.appbuilder = AppBuilder(self.app, db.session) + sm = self.appbuilder.sm + create_default_users(self.appbuilder.session) + + # validate - only default users exist + self.assertOnlyDefaultUsers() + + # register a user + new_user = sm.add_user( + username="alice", + first_name="Alice", + last_name="Doe", + email="alice@example.com", + role=[], + ) + + # validate - user was registered + self.assertEqual(len(sm.get_all_users()), 3) + + # set user inactive + new_user.active = False + + # attempt login + user = sm.auth_user_saml(self.userinfo_alice) + + # validate - user was not allowed to log in + self.assertIsNone(user) + + def test__missing_username(self): + """ + SAML: test login flow for - missing credentials + """ + with self.app.app_context(): + SQLA = get_sqla_class() + db = SQLA(self.app) + self.appbuilder = AppBuilder(self.app, db.session) + sm = self.appbuilder.sm + create_default_users(self.appbuilder.session) + + # validate - only default users exist + self.assertOnlyDefaultUsers() + + # create userinfo with missing info + userinfo_missing = self.userinfo_alice.copy() + userinfo_missing["username"] = "" + userinfo_missing.pop("email", None) + + # attempt login + user = sm.auth_user_saml(userinfo_missing) + + # validate - login failure (missing username) + self.assertIsNone(user) + + # validate - no users were created + self.assertOnlyDefaultUsers() + + def test__unregistered(self): + """ + SAML: test login flow for - unregistered user + """ + self.app.config["AUTH_USER_REGISTRATION"] = True + self.app.config["AUTH_USER_REGISTRATION_ROLE"] = "Public" + with self.app.app_context(): + SQLA = get_sqla_class() + db = SQLA(self.app) + self.appbuilder = AppBuilder(self.app, db.session) + sm = self.appbuilder.sm + create_default_users(self.appbuilder.session) + + # validate - only default users exist + self.assertOnlyDefaultUsers() + + # attempt login + user = sm.auth_user_saml(self.userinfo_alice) + + # validate - user was allowed to log in + self.assertIsInstance(user, sm.user_model) + + # validate - user was registered + self.assertEqual(len(sm.get_all_users()), 3) + + # validate - user was given the AUTH_USER_REGISTRATION_ROLE role + self.assertEqual(user.roles, [sm.find_role("Public")]) + + # validate - user was given the correct attributes + self.assertEqual(user.first_name, "Alice") + self.assertEqual(user.last_name, "Doe") + self.assertEqual(user.email, "alice@example.com") + + def test__unregistered__no_self_register(self): + """ + SAML: test login flow for - unregistered user - no self-registration + """ + self.app.config["AUTH_USER_REGISTRATION"] = False + with self.app.app_context(): + SQLA = get_sqla_class() + db = SQLA(self.app) + self.appbuilder = AppBuilder(self.app, db.session) + sm = self.appbuilder.sm + create_default_users(self.appbuilder.session) + + # validate - only default users exist + self.assertOnlyDefaultUsers() + + # attempt login + user = sm.auth_user_saml(self.userinfo_alice) + + # validate - user was not allowed to log in + self.assertIsNone(user) + + # validate - no users were registered + self.assertOnlyDefaultUsers() + + def test__unregistered__single_role(self): + """ + SAML: test login flow for - unregistered user - single role mapping + """ + self.app.config["AUTH_ROLES_MAPPING"] = { + "GROUP_1": ["Admin"], + "GROUP_2": ["User"], + } + self.app.config["AUTH_USER_REGISTRATION"] = True + self.app.config["AUTH_USER_REGISTRATION_ROLE"] = "Public" + with self.app.app_context(): + SQLA = get_sqla_class() + db = SQLA(self.app) + self.appbuilder = AppBuilder(self.app, db.session) + sm = self.appbuilder.sm + create_default_users(self.appbuilder.session) + + # add User role + sm.add_role("User") + + # validate - only default users exist + self.assertOnlyDefaultUsers() + + # attempt login + user = sm.auth_user_saml(self.userinfo_alice) + + # validate - user was allowed to log in + self.assertIsInstance(user, sm.user_model) + + # validate - user was registered + self.assertEqual(len(sm.get_all_users()), 3) + + # validate - user was given the correct roles + self.assertIn(sm.find_role("Admin"), user.roles) + self.assertIn(sm.find_role("User"), user.roles) + self.assertIn(sm.find_role("Public"), user.roles) + + # validate - user was given the correct attributes + self.assertEqual(user.first_name, "Alice") + self.assertEqual(user.last_name, "Doe") + self.assertEqual(user.email, "alice@example.com") + + def test__unregistered__multi_role(self): + """ + SAML: test login flow for - unregistered user - multi role mapping + """ + self.app.config["AUTH_ROLES_MAPPING"] = {"GROUP_1": ["Admin", "User"]} + self.app.config["AUTH_USER_REGISTRATION"] = True + self.app.config["AUTH_USER_REGISTRATION_ROLE"] = "Public" + with self.app.app_context(): + SQLA = get_sqla_class() + db = SQLA(self.app) + self.appbuilder = AppBuilder(self.app, db.session) + sm = self.appbuilder.sm + create_default_users(self.appbuilder.session) + + # add User role + sm.add_role("User") + + # validate - only default users exist + self.assertOnlyDefaultUsers() + + # attempt login + user = sm.auth_user_saml(self.userinfo_alice) + + # validate - user was allowed to log in + self.assertIsInstance(user, sm.user_model) + + # validate - user was registered + self.assertEqual(len(sm.get_all_users()), 3) + + # validate - user was given the correct roles + self.assertIn(sm.find_role("Admin"), user.roles) + self.assertIn(sm.find_role("Public"), user.roles) + self.assertIn(sm.find_role("User"), user.roles) + + # validate - user was given the correct attributes + self.assertEqual(user.first_name, "Alice") + self.assertEqual(user.last_name, "Doe") + self.assertEqual(user.email, "alice@example.com") + + def test__unregistered__jmespath_role(self): + """ + SAML: test login flow for - unregistered user - jmespath registration role + """ + self.app.config["AUTH_USER_REGISTRATION"] = True + self.app.config[ + "AUTH_USER_REGISTRATION_ROLE_JMESPATH" + ] = "contains(['alice'], username) && 'User' || 'Public'" + with self.app.app_context(): + SQLA = get_sqla_class() + db = SQLA(self.app) + self.appbuilder = AppBuilder(self.app, db.session) + sm = self.appbuilder.sm + create_default_users(self.appbuilder.session) + + # add User role + sm.add_role("User") + + # validate - only default users exist + self.assertOnlyDefaultUsers() + + # attempt login + user = sm.auth_user_saml(self.userinfo_alice) + + # validate - user was allowed to log in + self.assertIsInstance(user, sm.user_model) + + # validate - user was registered + self.assertEqual(len(sm.get_all_users()), 3) + + # validate - user was given the correct roles + self.assertListEqual(user.roles, [sm.find_role("User")]) + + # validate - user was given the correct attributes + self.assertEqual(user.first_name, "Alice") + self.assertEqual(user.last_name, "Doe") + self.assertEqual(user.email, "alice@example.com") + + def test__registered__multi_role__no_role_sync(self): + """ + SAML: test login flow for - registered user - multi role mapping - no login role-sync + """ # noqa + self.app.config["AUTH_ROLES_MAPPING"] = {"GROUP_1": ["Admin", "User"]} + self.app.config["AUTH_ROLES_SYNC_AT_LOGIN"] = False + with self.app.app_context(): + SQLA = get_sqla_class() + db = SQLA(self.app) + self.appbuilder = AppBuilder(self.app, db.session) + sm = self.appbuilder.sm + create_default_users(self.appbuilder.session) + + # add User role + sm.add_role("User") + + # validate - only default users exist + self.assertOnlyDefaultUsers() + + # register a user + new_user = sm.add_user( # noqa + username="alice", + first_name="Alice", + last_name="Doe", + email="alice@example.com", + role=[], + ) + + # validate - user was registered + self.assertEqual(len(sm.get_all_users()), 3) + + # attempt login + user = sm.auth_user_saml(self.userinfo_alice) + + # validate - user was allowed to log in + self.assertIsInstance(user, sm.user_model) + + # validate - user was given no roles (sync is off) + self.assertListEqual(user.roles, []) + + def test__registered__multi_role__with_role_sync(self): + """ + SAML: test login flow for - registered user - multi role mapping - with login role-sync + """ # noqa + self.app.config["AUTH_ROLES_MAPPING"] = {"GROUP_1": ["Admin", "User"]} + self.app.config["AUTH_ROLES_SYNC_AT_LOGIN"] = True + with self.app.app_context(): + SQLA = get_sqla_class() + db = SQLA(self.app) + self.appbuilder = AppBuilder(self.app, db.session) + sm = self.appbuilder.sm + create_default_users(self.appbuilder.session) + + # add User role + sm.add_role("User") + + # validate - only default users exist + self.assertOnlyDefaultUsers() + + # register a user + new_user = sm.add_user( # noqa + username="alice", + first_name="Alice", + last_name="Doe", + email="alice@example.com", + role=[], + ) + + # validate - user was registered + self.assertEqual(len(sm.get_all_users()), 3) + + # attempt login + user = sm.auth_user_saml(self.userinfo_alice) + + # validate - user was allowed to log in + self.assertIsInstance(user, sm.user_model) + + # validate - user was given the correct roles + self.assertSetEqual( + set(user.roles), {sm.find_role("Admin"), sm.find_role("User")} + ) + + def test__registered__jmespath_role__no_role_sync(self): + """ + SAML: test login flow for - registered user - jmespath role - no login role-sync + """ # noqa + self.app.config["AUTH_ROLES_SYNC_AT_LOGIN"] = False + self.app.config["AUTH_USER_REGISTRATION"] = True + self.app.config[ + "AUTH_USER_REGISTRATION_ROLE_JMESPATH" + ] = "contains(['alice'], username) && 'User' || 'Public'" + with self.app.app_context(): + SQLA = get_sqla_class() + db = SQLA(self.app) + self.appbuilder = AppBuilder(self.app, db.session) + sm = self.appbuilder.sm + create_default_users(self.appbuilder.session) + + # add User role + sm.add_role("User") + + # validate - only default users exist + self.assertOnlyDefaultUsers() + + # register a user + new_user = sm.add_user( # noqa + username="alice", + first_name="Alice", + last_name="Doe", + email="alice@example.com", + role=[], + ) + + # validate - user was registered + self.assertEqual(len(sm.get_all_users()), 3) + + # attempt login + user = sm.auth_user_saml(self.userinfo_alice) + + # validate - user was allowed to log in + self.assertIsInstance(user, sm.user_model) + + # validate - user was given no roles (sync is off) + self.assertListEqual(user.roles, []) + + def test__registered__jmespath_role__with_role_sync(self): + """ + SAML: test login flow for - registered user - jmespath role - with login role-sync + """ # noqa + self.app.config["AUTH_ROLES_SYNC_AT_LOGIN"] = True + self.app.config["AUTH_USER_REGISTRATION"] = True + self.app.config[ + "AUTH_USER_REGISTRATION_ROLE_JMESPATH" + ] = "contains(['alice'], username) && 'User' || 'Public'" + with self.app.app_context(): + SQLA = get_sqla_class() + db = SQLA(self.app) + self.appbuilder = AppBuilder(self.app, db.session) + sm = self.appbuilder.sm + create_default_users(self.appbuilder.session) + + # add User role + sm.add_role("User") + + # validate - only default users exist + self.assertOnlyDefaultUsers() + + # register a user + new_user = sm.add_user( # noqa + username="alice", + first_name="Alice", + last_name="Doe", + email="alice@example.com", + role=[], + ) + + # validate - user was registered + self.assertEqual(len(sm.get_all_users()), 3) + + # attempt login + user = sm.auth_user_saml(self.userinfo_alice) + + # validate - user was allowed to log in + self.assertIsInstance(user, sm.user_model) + + # validate - user was given the correct roles + self.assertListEqual(user.roles, [sm.find_role("User")]) + + def test__find_user_by_email(self): + """ + SAML: test login flow for - existing user found by email when username differs + """ + self.app.config["AUTH_USER_REGISTRATION"] = True + self.app.config["AUTH_USER_REGISTRATION_ROLE"] = "Public" + with self.app.app_context(): + SQLA = get_sqla_class() + db = SQLA(self.app) + self.appbuilder = AppBuilder(self.app, db.session) + sm = self.appbuilder.sm + create_default_users(self.appbuilder.session) + + # register a user with a different username + sm.add_user( + username="alice_old", + first_name="Alice", + last_name="Doe", + email="alice@example.com", + role=sm.find_role("Public"), + ) + + # attempt login with different username but same email + userinfo = { + "username": "alice_new", + "first_name": "Alice", + "last_name": "Doe", + "email": "alice@example.com", + } + user = sm.auth_user_saml(userinfo) + + # validate - found existing user by email (not created a new one) + self.assertIsNotNone(user) + self.assertEqual(user.username, "alice_old") + + # validate - no extra user was created + self.assertEqual(len(sm.get_all_users()), 3) + + def test__user_info_updated_on_login(self): + """ + SAML: test login flow for - user info updated from SAML assertion + """ + with self.app.app_context(): + SQLA = get_sqla_class() + db = SQLA(self.app) + self.appbuilder = AppBuilder(self.app, db.session) + sm = self.appbuilder.sm + create_default_users(self.appbuilder.session) + + # register a user with old info + sm.add_user( + username="alice", + first_name="OldFirst", + last_name="OldLast", + email="old@example.com", + role=sm.find_role("Public"), + ) + + # attempt login with updated info + userinfo = { + "username": "alice", + "first_name": "Alice", + "last_name": "NewLast", + "email": "alice@example.com", + } + user = sm.auth_user_saml(userinfo) + + # validate - user info was updated + self.assertIsNotNone(user) + self.assertEqual(user.first_name, "Alice") + self.assertEqual(user.last_name, "NewLast") + self.assertEqual(user.email, "alice@example.com") + + def test__email_as_username(self): + """ + SAML: test login flow for - email used as username when username missing + """ + self.app.config["AUTH_USER_REGISTRATION"] = True + self.app.config["AUTH_USER_REGISTRATION_ROLE"] = "Public" + with self.app.app_context(): + SQLA = get_sqla_class() + db = SQLA(self.app) + self.appbuilder = AppBuilder(self.app, db.session) + sm = self.appbuilder.sm + create_default_users(self.appbuilder.session) + + # attempt login with only email (no username) + userinfo = { + "email": "emailuser@example.com", + "first_name": "Email", + "last_name": "User", + } + user = sm.auth_user_saml(userinfo) + + # validate - user was created with email as username + self.assertIsNotNone(user) + self.assertEqual(user.username, "emailuser@example.com") + + def test__no_username_no_email(self): + """ + SAML: test login flow for - no username and no email + """ + with self.app.app_context(): + SQLA = get_sqla_class() + db = SQLA(self.app) + self.appbuilder = AppBuilder(self.app, db.session) + sm = self.appbuilder.sm + + # attempt login with empty userinfo + user = sm.auth_user_saml({}) + + # validate - login failure + self.assertIsNone(user) + + def test__saml_config_properties(self): + """ + SAML: test saml_providers and saml_config properties + """ + with self.app.app_context(): + SQLA = get_sqla_class() + db = SQLA(self.app) + self.appbuilder = AppBuilder(self.app, db.session) + sm = self.appbuilder.sm + + self.assertEqual(len(sm.saml_providers), 1) + self.assertEqual(sm.saml_providers[0]["name"], "test_idp") + self.assertIn("sp", sm.saml_config) + + def test__login_page_single_provider_redirects(self): + """ + SAML: test login page - single provider redirects + """ + with self.app.app_context(): + SQLA = get_sqla_class() + db = SQLA(self.app) + self.appbuilder = AppBuilder(self.app, db.session) + + client = self.app.test_client() + response = client.get("/login/") + # With single provider, should redirect to /login/test_idp + self.assertIn(response.status_code, [301, 302]) + + def test__login_page_multiple_providers(self): + """ + SAML: test login page - multiple providers shows selection + """ + self.app.config["SAML_PROVIDERS"].append( + { + "name": "second_idp", + "icon": "fa-key", + "idp": { + "entityId": "https://idp2.example.com/", + "singleSignOnService": { + "url": "https://idp2.example.com/sso", + "binding": "urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect", + }, + "x509cert": "", + }, + "attribute_mapping": {}, + } + ) + with self.app.app_context(): + SQLA = get_sqla_class() + db = SQLA(self.app) + self.appbuilder = AppBuilder(self.app, db.session) + + client = self.app.test_client() + response = client.get("/login/") + self.assertEqual(response.status_code, 200) + self.assertIn(b"test_idp", response.data) + self.assertIn(b"second_idp", response.data) + + def test__acs_without_saml_response(self): + """ + SAML: test ACS endpoint - no SAML response redirects to login + """ + with self.app.app_context(): + SQLA = get_sqla_class() + db = SQLA(self.app) + self.appbuilder = AppBuilder(self.app, db.session) + + client = self.app.test_client() + response = client.post("/saml/acs/") + # Should redirect to login with error (no valid SAML response) + self.assertIn(response.status_code, [301, 302]) + + +if __name__ == "__main__": + unittest.main()