diff --git a/changes/api/941.changed.md b/changes/api/941.changed.md new file mode 100644 index 000000000..887a7499a --- /dev/null +++ b/changes/api/941.changed.md @@ -0,0 +1 @@ +Reintroduced armasec domain config diff --git a/jobbergate-api/jobbergate_api/config.py b/jobbergate-api/jobbergate_api/config.py index 8aca0c1a8..bfbcc4894 100644 --- a/jobbergate-api/jobbergate_api/config.py +++ b/jobbergate-api/jobbergate_api/config.py @@ -80,6 +80,12 @@ class Settings(BaseSettings): ARMASEC_DOMAIN: str ARMASEC_USE_HTTPS: bool = Field(True) ARMASEC_DEBUG: bool = Field(False) + ARMASEC_ADMIN_DOMAIN: Optional[str] = None + ARMASEC_ADMIN_MATCH_KEY: Optional[str] = None + ARMASEC_ADMIN_MATCH_VALUE: Optional[str] = None + + # Key to custom claims packaged with Auth0 tokens + IDENTITY_CLAIMS_KEY: str = "https://omnivector.solutions" # Sentry configuration SENTRY_DSN: Optional[HttpUrl] = None diff --git a/jobbergate-api/jobbergate_api/security.py b/jobbergate-api/jobbergate_api/security.py index e8d2fc2c2..d242a4db7 100644 --- a/jobbergate-api/jobbergate_api/security.py +++ b/jobbergate-api/jobbergate_api/security.py @@ -7,6 +7,7 @@ from typing import Annotated from armasec import Armasec, TokenPayload +from armasec.schemas import DomainConfig from armasec.token_security import PermissionMode from buzz import check_expressions from fastapi import Depends, HTTPException, status @@ -16,11 +17,46 @@ from jobbergate_api.config import settings +def get_domain_configs() -> list[DomainConfig]: + """ + Return a list of DomainConfig objects based on the input variables for the Settings class. + """ + # make type checkers happy + assert settings.ARMASEC_DOMAIN is not None + + domain_configs = [ + DomainConfig( + domain=settings.ARMASEC_DOMAIN, + use_https=settings.ARMASEC_USE_HTTPS, + ignore_audience=True, + ) + ] + if all( + [ + settings.ARMASEC_ADMIN_DOMAIN, + settings.ARMASEC_ADMIN_MATCH_KEY, + settings.ARMASEC_ADMIN_MATCH_VALUE, + ] + ): + # make type checkers happy + assert settings.ARMASEC_ADMIN_DOMAIN is not None + assert settings.ARMASEC_ADMIN_MATCH_KEY is not None + assert settings.ARMASEC_ADMIN_MATCH_VALUE is not None + + domain_configs.append( + DomainConfig( + domain=settings.ARMASEC_ADMIN_DOMAIN, + use_https=settings.ARMASEC_USE_HTTPS, + match_keys={settings.ARMASEC_ADMIN_MATCH_KEY: settings.ARMASEC_ADMIN_MATCH_VALUE}, + ignore_audience=True, + ) + ) + return domain_configs + + guard = Armasec( - domain=settings.ARMASEC_DOMAIN, + domain_configs=get_domain_configs(), debug_logger=logger.debug if settings.ARMASEC_DEBUG else None, - use_https=settings.ARMASEC_USE_HTTPS, - ignore_audience=True, ) @@ -95,7 +131,9 @@ def lockdown_with_identity( """ def dependency( - token_payload: Annotated[TokenPayload, Depends(guard.lockdown(*scopes, permission_mode=permission_mode))], + token_payload: Annotated[ + TokenPayload, Depends(guard.lockdown(*scopes, permission_mode=permission_mode)) + ], ) -> IdentityPayload: """ Provide an injectable function to lockdown a route and extract the identity payload. diff --git a/jobbergate-api/tests/test_security.py b/jobbergate-api/tests/test_security.py index 3c1d8f469..4a9bbe41d 100644 --- a/jobbergate-api/tests/test_security.py +++ b/jobbergate-api/tests/test_security.py @@ -2,16 +2,59 @@ Test the security module. """ +from unittest.mock import patch + import pytest from jobbergate_api.security import ( HTTPException, IdentityPayload, TokenPayload, + get_domain_configs, lockdown_with_identity, + settings, ) +def test_get_domain_configs__loads_only_base_settings(): + """Check if the correct domain configuration is loaded when only one domain is provided.""" + with ( + patch.object(settings, "ARMASEC_DOMAIN", new="foo.io"), + ): + domain_configs = get_domain_configs() + + assert len(domain_configs) == 1 + first_config = domain_configs.pop() + assert first_config.domain == "foo.io" + + +def test_get_domain_configs__loads_admin_settings_if_all_are_present(): + """Check if the correct domain configuration is loaded when two domains are provided.""" + with ( + patch.object(settings, "ARMASEC_DOMAIN", new="foo.io"), + patch.object(settings, "ARMASEC_ADMIN_DOMAIN", new="admin.io"), + ): + domain_configs = get_domain_configs() + + assert len(domain_configs) == 1 + first_config = domain_configs.pop() + assert first_config.domain == "foo.io" + + with ( + patch.object(settings, "ARMASEC_DOMAIN", new="foo.io"), + patch.object(settings, "ARMASEC_ADMIN_DOMAIN", new="admin.io"), + patch.object(settings, "ARMASEC_ADMIN_MATCH_KEY", new="foo"), + patch.object(settings, "ARMASEC_ADMIN_MATCH_VALUE", new="bar"), + ): + domain_configs = get_domain_configs() + + assert len(domain_configs) == 2 + (first_config, second_config) = domain_configs + assert first_config.domain == "foo.io" + assert second_config.domain == "admin.io" + assert second_config.match_keys == {"foo": "bar"} + + def test_lockdown_with_identity__success(): """Check if the lockdown_with_identity decorator returns the correct identity.""" token_raw_data = {"sub": "dummy-sub"}