diff --git a/docs/configuration.mdx b/docs/configuration.mdx index b3eacf702..b756aa0be 100644 --- a/docs/configuration.mdx +++ b/docs/configuration.mdx @@ -136,9 +136,8 @@ authorization: issuer_url: https://auth.example.com resource_server_url: https://agent.example.com/mcp required_scopes: ["mcp.read", "mcp.write"] - introspection_endpoint: https://auth.example.com/oauth/introspect - introspection_client_id: ${INTROSPECTION_CLIENT_ID} - introspection_client_secret: ${INTROSPECTION_CLIENT_SECRET} + client_id: ${INTROSPECTION_CLIENT_ID} + client_secret: ${INTROSPECTION_CLIENT_SECRET} oauth: callback_base_url: https://agent.example.com diff --git a/examples/oauth/protected_by_oauth/README.md b/examples/oauth/protected_by_oauth/README.md new file mode 100644 index 000000000..a301814da --- /dev/null +++ b/examples/oauth/protected_by_oauth/README.md @@ -0,0 +1,79 @@ +# OAuth protected resource example + +This example shows how to integrate OAuth2 authentication to protect your MCP. + +## 1. App set up + +First, clone the repo and navigate to the functions example: + +```bash +git clone https://github.com/lastmile-ai/mcp-agent.git +cd mcp-agent/examples/oauth/protected_by_oauth +``` + +Install `uv` (if you don’t have it): + +```bash +pip install uv +``` + +Sync `mcp-agent` project dependencies: + +```bash +uv sync +``` + +## 2. Client registration +To protect your MCP with OAuth2, you first need to register your application with an OAuth2 provider, as MCP follows the Dynamic Client Registration Protocol. +If you do not have a client registered already, you can use the `registration.py` script provided with this example. +At the top of the file, +1. update the URL for your authentication server, +2. set the redirect URIs to point to your MCP endpoint (e.g. `https://your-mcp-endpoint.com/callback`), and +3. set the name for your client. + +Run the script to register your client: +```bash +uv run registration.py +``` + +You should see something like + +``` +Client registered successfully! +{ + # detailed json response +} + +=== Save these credentials === +Client ID: abc-123 +Client Secret: xyz-987 +``` + +Take a note of the client id and client secret printed at the end, as you will need them in the next step. + +## 3. Configure your MCP +Next, you need to configure your MCP to use the OAuth2 credentials you just created. +In `main.py`, update these settings: + +```python +auth_server = "" +resource_server = "http://localhost:8000" # This server's URL + +client_id = "" +client_secret = "" +``` + +## 4. Run the example + +With these in place, you can run the server using + +```python +uv run main.py +``` + +This will start an MCP server protected by OAuth2. +You can test it using an MCP client that supports OAuth2 authentication, such as [MCP Inspector](https://modelcontextprotocol.io/docs/tools/inspector). + + +## Further reading +More details on oauth authorization and the MCP protocal can be found at [https://modelcontextprotocol.io/specification/draft/basic/authorization](https://modelcontextprotocol.io/specification/draft/basic/authorization). diff --git a/examples/oauth/protected_by_oauth/main.py b/examples/oauth/protected_by_oauth/main.py new file mode 100644 index 000000000..d6db57796 --- /dev/null +++ b/examples/oauth/protected_by_oauth/main.py @@ -0,0 +1,88 @@ +""" +Demonstration of an MCP agent server configured with OAuth. +""" + +import asyncio +from typing import Optional +from pydantic import AnyHttpUrl + +from mcp_agent.core.context import Context as AppContext + +from mcp_agent.app import MCPApp +from mcp_agent.server.app_server import create_mcp_server_for_app +from mcp_agent.config import Settings, LoggerSettings, \ + OAuthTokenStoreSettings, OAuthSettings, MCPAuthorizationServerSettings + + +auth_server = "" +resource_server = "http://localhost:8000" # This server's URL + +client_id = "" +client_secret = "" + +settings = Settings( + execution_engine="asyncio", + logger=LoggerSettings(level="info"), + authorization=MCPAuthorizationServerSettings( + enabled=True, + issuer_url=AnyHttpUrl(auth_server), + resource_server_url=AnyHttpUrl(resource_server), + client_id=client_id, + client_secret=client_secret, + required_scopes=["mcp"], + expected_audiences=[client_id], + ), + oauth=OAuthSettings( + callback_base_url=AnyHttpUrl(resource_server), + flow_timeout_seconds=300, + token_store=OAuthTokenStoreSettings(refresh_leeway_seconds=60), + ) + ) + + +# Define the MCPApp instance. The server created for this app will advertise the +# MCP logging capability and forward structured logs upstream to connected clients. +app = MCPApp( + name="oauth_demo", + description="Basic agent server example", + settings=settings, +) + + +@app.tool(name="hello_world") +async def hello(app_ctx: Optional[AppContext] = None) -> str: + # Use the context's app if available for proper logging with upstream_session + _app = app_ctx.app if app_ctx else app + # Ensure the app's logger is bound to the current context with upstream_session + if _app._logger and hasattr(_app._logger, "_bound_context"): + _app._logger._bound_context = app_ctx + + if app_ctx.current_user: + user = app_ctx.current_user + if user.claims and "username" in user.claims: + return f"Hello, {user.claims['username']}!" + else: + return f"Hello, user with ID {user.subject}!" + else: + return "Hello, anonymous user!" + +async def main(): + async with app.run() as agent_app: + # Log registered workflows and agent configurations + agent_app.logger.info(f"Creating MCP server for {agent_app.name}") + + agent_app.logger.info("Registered workflows:") + for workflow_id in agent_app.workflows: + agent_app.logger.info(f" - {workflow_id}") + + # Create the MCP server that exposes both workflows and agent configurations, + # optionally using custom FastMCP settings + mcp_server = create_mcp_server_for_app(agent_app) + agent_app.logger.info(f"MCP Server settings: {mcp_server.settings}") + + # Run the server + await mcp_server.run_sse_async() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/oauth/protected_by_oauth/registration.py b/examples/oauth/protected_by_oauth/registration.py new file mode 100644 index 000000000..6b9304476 --- /dev/null +++ b/examples/oauth/protected_by_oauth/registration.py @@ -0,0 +1,59 @@ +import requests +import json + +# Authorization server URL. +auth_server_url = "" +redirect_uris = [ + # These are the redirect URIs for MCP Inspector. Replace with your app's URIs. + "http://localhost:6274/oauth/callback", + "http://localhost:6274/oauth/callback/debug" +] +client_name = "My Python Application" + +# Fetch the registration endpoint dynamically from the .well-known/oauth-authorization-server details +well_known_url = f"{auth_server_url}/.well-known/oauth-authorization-server" +response = requests.get(well_known_url) + +if response.status_code == 200: + well_known_details = response.json() + registration_endpoint = well_known_details.get("registration_endpoint") + if not registration_endpoint: + raise ValueError("Registration endpoint not found in .well-known details") +else: + raise ValueError(f"Failed to fetch .well-known details: {response.status_code}") + + +# Client registration request +registration_request = { + "client_name": client_name, + "redirect_uris": redirect_uris, + "grant_types": [ + "authorization_code", + "refresh_token" + ], + "scope": "mcp", + # use client_secret_basic when testing with MCP Inspector + "token_endpoint_auth_method": "client_secret_basic" +} + +print(f"Registering client at: {registration_endpoint}") + +# Register the client +response = requests.post( + registration_endpoint, + json=registration_request, + headers={"Content-Type": "application/json"} +) + +if response.status_code in [200, 201]: + client_info = response.json() + print("Client registered successfully!") + print(json.dumps(client_info, indent=2)) + + # Save credentials for later use + print("\n=== Save these credentials ===") + print(f"Client ID: {client_info['client_id']}") + print(f"Client Secret: {client_info['client_secret']}") +else: + print(f"Registration failed with status {response.status_code}") + print(response.text) \ No newline at end of file diff --git a/src/mcp_agent/config.py b/src/mcp_agent/config.py index 72ef61118..4e608e518 100644 --- a/src/mcp_agent/config.py +++ b/src/mcp_agent/config.py @@ -44,9 +44,8 @@ class MCPAuthorizationServerSettings(BaseModel): service_documentation_url: AnyHttpUrl | None = None required_scopes: List[str] = Field(default_factory=list) jwks_uri: AnyHttpUrl | None = None - introspection_endpoint: AnyHttpUrl | None = None - introspection_client_id: str | None = None - introspection_client_secret: str | None = None + client_id: str | None = None + client_secret: str | None = None token_cache_ttl_seconds: int = Field(300, ge=0) # RFC 9068 audience validation settings diff --git a/src/mcp_agent/oauth/metadata.py b/src/mcp_agent/oauth/metadata.py index c4986b011..87148586d 100644 --- a/src/mcp_agent/oauth/metadata.py +++ b/src/mcp_agent/oauth/metadata.py @@ -31,6 +31,33 @@ async def fetch_authorization_server_metadata( return OAuthMetadata.model_validate(response.json()) +async def fetch_authorization_server_metadata_from_issuer( + client: httpx.AsyncClient, + issuer_url: str, +) -> OAuthMetadata: + """Fetch OAuth authorization server metadata from the well-known endpoint. + + Given an issuer URL, constructs the well-known OAuth authorization server + metadata URL and fetches the metadata. + + Args: + client: HTTP client to use for the request + issuer_url: The issuer URL (e.g., "https://auth.example.com") + + Returns: + OAuthMetadata containing authorization server metadata including introspection_endpoint + """ + from httpx import URL + + parsed_url = URL(issuer_url) + metadata_url = str( + parsed_url.copy_with( + path="/.well-known/oauth-authorization-server" + parsed_url.path + ) + ) + return await fetch_authorization_server_metadata(client, metadata_url) + + def select_authorization_server( metadata: ProtectedResourceMetadata, preferred: str | None = None, diff --git a/src/mcp_agent/server/app_server.py b/src/mcp_agent/server/app_server.py index 3bbf6e59a..20abf5c66 100644 --- a/src/mcp_agent/server/app_server.py +++ b/src/mcp_agent/server/app_server.py @@ -1198,6 +1198,7 @@ async def _internal_human_prompts(request: Request): if "token_verifier" not in kwargs and token_verifier is not None: kwargs["token_verifier"] = token_verifier owns_token_verifier = True + mcp = FastMCP( name=app.name or "mcp_agent_server", # TODO: saqadri (MAC) - create a much more detailed description diff --git a/src/mcp_agent/server/token_verifier.py b/src/mcp_agent/server/token_verifier.py index 799341585..006899019 100644 --- a/src/mcp_agent/server/token_verifier.py +++ b/src/mcp_agent/server/token_verifier.py @@ -7,6 +7,7 @@ from typing import Any, Dict, List import httpx +from httpx import URL from mcp.server.auth.provider import AccessToken from mcp.server.auth.provider import TokenVerifier @@ -22,15 +23,69 @@ class MCPAgentTokenVerifier(TokenVerifier): """Verify bearer tokens issued by the MCP Agent Cloud authorization server.""" def __init__(self, settings: MCPAuthorizationServerSettings): - if not settings.introspection_endpoint: - raise ValueError( - "introspection_endpoint must be configured to verify tokens" - ) self._settings = settings timeout = httpx.Timeout(10.0) self._client = httpx.AsyncClient(timeout=timeout) self._cache: Dict[str, MCPAccessToken] = {} self._lock = asyncio.Lock() + self._introspection_endpoint: str | None = None + self._metadata_fetch_lock = asyncio.Lock() + + async def _ensure_introspection_endpoint(self) -> str: + """Ensure introspection endpoint is available, fetching from well-known if needed.""" + # Check if already fetched + if self._introspection_endpoint: + return self._introspection_endpoint + + # Fetch from well-known endpoint + async with self._metadata_fetch_lock: + # Double-check after acquiring lock + if self._introspection_endpoint: + return self._introspection_endpoint + + if not self._settings.issuer_url: + raise ValueError( + "issuer_url must be configured to fetch introspection endpoint" + ) + + try: + from mcp_agent.oauth.metadata import ( + fetch_authorization_server_metadata, + ) + + parsed_url = URL(str(self._settings.issuer_url)) + metadata_url = str(parsed_url.copy_with(path="/.well-known/oauth-authorization-server" \ + + parsed_url.path)) + + if metadata_url.endswith('/'): + metadata_url = metadata_url[:-1] + + metadata = await fetch_authorization_server_metadata( + self._client, str(metadata_url) + ) + + if not metadata.introspection_endpoint: + raise ValueError( + f"Authorization server at {self._settings.issuer_url} does not " + "advertise an introspection endpoint in its metadata" + ) + + self._introspection_endpoint = str(metadata.introspection_endpoint) + logger.info( + "Fetched introspection endpoint from authorization server metadata", + data={"introspection_endpoint": self._introspection_endpoint}, + ) + return self._introspection_endpoint + + except Exception as exc: + logger.error( + "Failed to fetch authorization server metadata", + data={"issuer_url": str(self._settings.issuer_url)}, + exc_info=True, + ) + raise ValueError( + f"Failed to fetch introspection endpoint from {self._settings.issuer_url}: {exc}" + ) from exc async def verify_token(self, token: str) -> AccessToken | None: # type: ignore[override] cached = self._cache.get(token) @@ -48,24 +103,34 @@ async def verify_token(self, token: str) -> AccessToken | None: # type: ignore[ self._cache[token] = verified else: self._cache.pop(token, None) + return verified async def _introspect(self, token: str) -> MCPAccessToken | None: + # Ensure we have the introspection endpoint + try: + introspection_endpoint = await self._ensure_introspection_endpoint() + except ValueError as exc: + logger.error(f"Cannot introspect token: {exc}") + return None + data = {"token": token} + auth = None if ( - self._settings.introspection_client_id - and self._settings.introspection_client_secret + self._settings.client_id + and self._settings.client_secret ): auth = httpx.BasicAuth( - self._settings.introspection_client_id, - self._settings.introspection_client_secret, + self._settings.client_id, + self._settings.client_secret, ) try: response = await self._client.post( - str(self._settings.introspection_endpoint), + introspection_endpoint, data=data, + headers={"Content-Type": "application/x-www-form-urlencoded"}, auth=auth, ) except httpx.HTTPError as exc: diff --git a/tests/test_audience_validation.py b/tests/test_audience_validation.py index 392cf4e18..276a05631 100644 --- a/tests/test_audience_validation.py +++ b/tests/test_audience_validation.py @@ -15,7 +15,6 @@ async def test_audience_validation_success(): enabled=True, issuer_url="https://auth.example.com", resource_server_url="https://api.example.com", - introspection_endpoint="https://auth.example.com/introspect", expected_audiences=["https://api.example.com", "api.example.com"], ) @@ -39,7 +38,6 @@ async def test_audience_validation_failure(): enabled=True, issuer_url="https://auth.example.com", resource_server_url="https://api.example.com", - introspection_endpoint="https://auth.example.com/introspect", expected_audiences=["https://api.example.com"], ) @@ -62,7 +60,6 @@ async def test_resource_claim_audience_validation(): enabled=True, issuer_url="https://auth.example.com", resource_server_url="https://api.example.com", - introspection_endpoint="https://auth.example.com/introspect", expected_audiences=["https://api.example.com"], ) @@ -152,9 +149,8 @@ async def test_token_verifier_audience_validation_integration(): enabled=True, issuer_url="https://auth.example.com", resource_server_url="https://api.example.com", - introspection_endpoint="https://auth.example.com/introspect", - introspection_client_id="test-client", - introspection_client_secret="test-secret", + client_id="test-client", + client_secret="test-secret", expected_audiences=["https://api.example.com"], ) @@ -163,6 +159,17 @@ async def test_token_verifier_audience_validation_integration(): # Mock HTTP client mock_client = Mock(spec=httpx.AsyncClient) + # Mock well-known metadata + metadata_response = Mock() + metadata_response.status_code = 200 + metadata_response.json.return_value = { + "issuer": "https://auth.example.com", + "authorization_endpoint": "https://auth.example.com/authorize", + "token_endpoint": "https://auth.example.com/token", + "introspection_endpoint": "https://auth.example.com/introspect", + "response_types_supported": ["code"], + } + # Mock successful response with valid audience valid_response = Mock() valid_response.status_code = 200 @@ -173,6 +180,8 @@ async def test_token_verifier_audience_validation_integration(): "exp": 1234567890, "iss": "https://auth.example.com/", } + + mock_client.get = AsyncMock(return_value=metadata_response) mock_client.post = AsyncMock(return_value=valid_response) verifier._client = mock_client @@ -231,9 +240,7 @@ async def test_partial_audience_match(): enabled=True, issuer_url="https://auth.example.com", resource_server_url="https://api.example.com", - introspection_endpoint="https://auth.example.com/introspect", expected_audiences=["https://api.example.com", "https://other-api.com"], - strict_audience_validation=True, ) # Token has one matching and one non-matching audience diff --git a/tests/test_token_verifier.py b/tests/test_token_verifier.py new file mode 100644 index 000000000..5a21e6700 --- /dev/null +++ b/tests/test_token_verifier.py @@ -0,0 +1,853 @@ +"""Comprehensive tests for token verification functionality.""" + +import asyncio +import time +import pytest +from unittest.mock import Mock, AsyncMock +import httpx +from mcp_agent.config import MCPAuthorizationServerSettings +from mcp_agent.server.token_verifier import MCPAgentTokenVerifier + + +@pytest.mark.asyncio +async def test_fetch_introspection_endpoint_from_well_known(): + """Test fetching introspection endpoint from .well-known metadata.""" + settings = MCPAuthorizationServerSettings( + enabled=True, + issuer_url="https://auth.example.com", + resource_server_url="https://api.example.com", + expected_audiences=["https://api.example.com", "https://api.example.com/"], + ) + + verifier = MCPAgentTokenVerifier(settings) + + # Mock HTTP client to return metadata + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "issuer": "https://auth.example.com", + "authorization_endpoint": "https://auth.example.com/authorize", + "token_endpoint": "https://auth.example.com/token", + "introspection_endpoint": "https://auth.example.com/oauth2/introspect", + "response_types_supported": ["code"], + } + + verifier._client.get = AsyncMock(return_value=mock_response) + + endpoint = await verifier._ensure_introspection_endpoint() + + assert endpoint == "https://auth.example.com/oauth2/introspect" + assert verifier._introspection_endpoint == "https://auth.example.com/oauth2/introspect" + + # Verify it's cached - call again and it should return cached value + endpoint2 = await verifier._ensure_introspection_endpoint() + assert endpoint2 == endpoint + + # Verify only one HTTP call was made (cached on second call) + assert verifier._client.get.call_count == 1 + + await verifier.aclose() + + +@pytest.mark.asyncio +async def test_fetch_introspection_endpoint_with_path(): + """Test fetching introspection endpoint when issuer has a path component.""" + settings = MCPAuthorizationServerSettings( + enabled=True, + issuer_url="https://auth.example.com/tenants/abc", + resource_server_url="https://api.example.com", + expected_audiences=["https://api.example.com", "https://api.example.com/"], + ) + + verifier = MCPAgentTokenVerifier(settings) + + # Mock HTTP client to return metadata + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "issuer": "https://auth.example.com/tenants/abc", + "authorization_endpoint": "https://auth.example.com/tenants/abc/authorize", + "token_endpoint": "https://auth.example.com/tenants/abc/token", + "introspection_endpoint": "https://auth.example.com/tenants/abc/introspect", + "response_types_supported": ["code"], + } + + verifier._client.get = AsyncMock(return_value=mock_response) + + endpoint = await verifier._ensure_introspection_endpoint() + + assert endpoint == "https://auth.example.com/tenants/abc/introspect" + + # Verify the well-known URL was constructed correctly + call_args = verifier._client.get.call_args[0] + assert "/.well-known/oauth-authorization-server/tenants/abc" in call_args[0] + + await verifier.aclose() + + +@pytest.mark.asyncio +async def test_missing_issuer_url(): + """Test that authorization requires issuer_url to be configured.""" + # When authorization is enabled, issuer_url is required by validation + # This test verifies that the config validation works correctly + with pytest.raises(ValueError, match="issuer_url.*must be set"): + MCPAuthorizationServerSettings( + enabled=True, + resource_server_url="https://api.example.com", + expected_audiences=["https://api.example.com", "https://api.example.com/"], + ) + + +@pytest.mark.asyncio +async def test_well_known_endpoint_missing_introspection(): + """Test error when well-known metadata doesn't include introspection_endpoint.""" + settings = MCPAuthorizationServerSettings( + enabled=True, + issuer_url="https://auth.example.com", + resource_server_url="https://api.example.com", + expected_audiences=["https://api.example.com", "https://api.example.com/"], + ) + + verifier = MCPAgentTokenVerifier(settings) + + # Mock HTTP client to return metadata without introspection_endpoint + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "issuer": "https://auth.example.com", + "authorization_endpoint": "https://auth.example.com/authorize", + "token_endpoint": "https://auth.example.com/token", + "response_types_supported": ["code"], + # Missing introspection_endpoint + } + + verifier._client.get = AsyncMock(return_value=mock_response) + + with pytest.raises(ValueError, match="does not advertise an introspection endpoint"): + await verifier._ensure_introspection_endpoint() + + await verifier.aclose() + + +@pytest.mark.asyncio +async def test_well_known_endpoint_http_error(): + """Test error handling when fetching well-known metadata fails.""" + settings = MCPAuthorizationServerSettings( + enabled=True, + issuer_url="https://auth.example.com", + resource_server_url="https://api.example.com", + expected_audiences=["https://api.example.com", "https://api.example.com/"], + ) + + verifier = MCPAgentTokenVerifier(settings) + + # Mock HTTP client to raise an error + verifier._client.get = AsyncMock(side_effect=httpx.HTTPError("Connection failed")) + + with pytest.raises(ValueError, match="Failed to fetch introspection endpoint"): + await verifier._ensure_introspection_endpoint() + + await verifier.aclose() + + +@pytest.mark.asyncio +async def test_well_known_endpoint_404_error(): + """Test error handling when well-known endpoint returns 404.""" + settings = MCPAuthorizationServerSettings( + enabled=True, + issuer_url="https://auth.example.com", + resource_server_url="https://api.example.com", + expected_audiences=["https://api.example.com", "https://api.example.com/"], + ) + + verifier = MCPAgentTokenVerifier(settings) + + # Mock HTTP client to raise 404 + verifier._client.get = AsyncMock( + side_effect=httpx.HTTPStatusError( + "Not Found", + request=Mock(), + response=Mock(status_code=404) + ) + ) + + with pytest.raises(ValueError, match="Failed to fetch introspection endpoint"): + await verifier._ensure_introspection_endpoint() + + await verifier.aclose() + + +@pytest.mark.asyncio +async def test_introspect_without_client_auth(): + """Test token introspection without client authentication.""" + settings = MCPAuthorizationServerSettings( + enabled=True, + issuer_url="https://auth.example.com", + resource_server_url="https://api.example.com", + expected_audiences=["https://api.example.com", "https://api.example.com/"], + ) + + verifier = MCPAgentTokenVerifier(settings) + + # Mock well-known metadata + metadata_response = Mock() + metadata_response.status_code = 200 + metadata_response.json.return_value = { + "issuer": "https://auth.example.com", + "authorization_endpoint": "https://auth.example.com/authorize", + "token_endpoint": "https://auth.example.com/token", + "introspection_endpoint": "https://auth.example.com/introspect", + "response_types_supported": ["code"], + } + + # Mock successful introspection response + introspect_response = Mock() + introspect_response.status_code = 200 + introspect_response.json.return_value = { + "active": True, + "aud": "https://api.example.com", + "sub": "user123", + "exp": 9999999999, + "iss": "https://auth.example.com/", + } + + verifier._client.get = AsyncMock(return_value=metadata_response) + verifier._client.post = AsyncMock(return_value=introspect_response) + + token = await verifier._introspect("test_token") + + assert token is not None + assert token.subject == "user123" + + # Verify no auth was used + call_kwargs = verifier._client.post.call_args[1] + assert call_kwargs.get("auth") is None + + await verifier.aclose() + + +@pytest.mark.asyncio +async def test_introspect_with_client_auth(): + """Test token introspection with client authentication.""" + settings = MCPAuthorizationServerSettings( + enabled=True, + issuer_url="https://auth.example.com", + resource_server_url="https://api.example.com", + client_id="client123", + client_secret="secret456", + expected_audiences=["https://api.example.com", "https://api.example.com/"], + ) + + verifier = MCPAgentTokenVerifier(settings) + + # Mock well-known metadata + metadata_response = Mock() + metadata_response.status_code = 200 + metadata_response.json.return_value = { + "issuer": "https://auth.example.com", + "authorization_endpoint": "https://auth.example.com/authorize", + "token_endpoint": "https://auth.example.com/token", + "introspection_endpoint": "https://auth.example.com/introspect", + "response_types_supported": ["code"], + } + + # Mock successful introspection response + introspect_response = Mock() + introspect_response.status_code = 200 + introspect_response.json.return_value = { + "active": True, + "aud": "https://api.example.com", + "sub": "user123", + "exp": 9999999999, + "iss": "https://auth.example.com/", + } + + verifier._client.get = AsyncMock(return_value=metadata_response) + verifier._client.post = AsyncMock(return_value=introspect_response) + + token = await verifier._introspect("test_token") + + assert token is not None + + # Verify auth was used + call_kwargs = verifier._client.post.call_args[1] + auth = call_kwargs.get("auth") + assert auth is not None + assert isinstance(auth, httpx.BasicAuth) + + await verifier.aclose() + + +@pytest.mark.asyncio +async def test_introspect_http_error(): + """Test handling of HTTP errors during introspection.""" + settings = MCPAuthorizationServerSettings( + enabled=True, + issuer_url="https://auth.example.com", + resource_server_url="https://api.example.com", + expected_audiences=["https://api.example.com", "https://api.example.com/"], + ) + + verifier = MCPAgentTokenVerifier(settings) + + # Mock well-known metadata + metadata_response = Mock() + metadata_response.status_code = 200 + metadata_response.json.return_value = { + "issuer": "https://auth.example.com", + "authorization_endpoint": "https://auth.example.com/authorize", + "token_endpoint": "https://auth.example.com/token", + "introspection_endpoint": "https://auth.example.com/introspect", + "response_types_supported": ["code"], + } + + verifier._client.get = AsyncMock(return_value=metadata_response) + verifier._client.post = AsyncMock(side_effect=httpx.HTTPError("Network error")) + + token = await verifier._introspect("test_token") + + assert token is None + + await verifier.aclose() + + +@pytest.mark.asyncio +async def test_introspect_non_200_response(): + """Test handling of non-200 responses from introspection endpoint.""" + settings = MCPAuthorizationServerSettings( + enabled=True, + issuer_url="https://auth.example.com", + resource_server_url="https://api.example.com", + expected_audiences=["https://api.example.com", "https://api.example.com/"], + ) + + verifier = MCPAgentTokenVerifier(settings) + + # Mock well-known metadata + metadata_response = Mock() + metadata_response.status_code = 200 + metadata_response.json.return_value = { + "issuer": "https://auth.example.com", + "authorization_endpoint": "https://auth.example.com/authorize", + "token_endpoint": "https://auth.example.com/token", + "introspection_endpoint": "https://auth.example.com/introspect", + "response_types_supported": ["code"], + } + + # Mock 401 response + introspect_response = Mock() + introspect_response.status_code = 401 + + verifier._client.get = AsyncMock(return_value=metadata_response) + verifier._client.post = AsyncMock(return_value=introspect_response) + + token = await verifier._introspect("test_token") + + assert token is None + + await verifier.aclose() + + +@pytest.mark.asyncio +async def test_introspect_invalid_json(): + """Test handling of invalid JSON response from introspection endpoint.""" + settings = MCPAuthorizationServerSettings( + enabled=True, + issuer_url="https://auth.example.com", + resource_server_url="https://api.example.com", + expected_audiences=["https://api.example.com", "https://api.example.com/"], + ) + + verifier = MCPAgentTokenVerifier(settings) + + # Mock well-known metadata + metadata_response = Mock() + metadata_response.status_code = 200 + metadata_response.json.return_value = { + "issuer": "https://auth.example.com", + "authorization_endpoint": "https://auth.example.com/authorize", + "token_endpoint": "https://auth.example.com/token", + "introspection_endpoint": "https://auth.example.com/introspect", + "response_types_supported": ["code"], + } + + # Mock response with invalid JSON + introspect_response = Mock() + introspect_response.status_code = 200 + introspect_response.json.side_effect = ValueError("Invalid JSON") + + verifier._client.get = AsyncMock(return_value=metadata_response) + verifier._client.post = AsyncMock(return_value=introspect_response) + + token = await verifier._introspect("test_token") + + assert token is None + + await verifier.aclose() + + +@pytest.mark.asyncio +async def test_introspect_inactive_token(): + """Test handling of inactive token.""" + settings = MCPAuthorizationServerSettings( + enabled=True, + issuer_url="https://auth.example.com", + resource_server_url="https://api.example.com", + expected_audiences=["https://api.example.com", "https://api.example.com/"], + ) + + verifier = MCPAgentTokenVerifier(settings) + + # Mock well-known metadata + metadata_response = Mock() + metadata_response.status_code = 200 + metadata_response.json.return_value = { + "issuer": "https://auth.example.com", + "authorization_endpoint": "https://auth.example.com/authorize", + "token_endpoint": "https://auth.example.com/token", + "introspection_endpoint": "https://auth.example.com/introspect", + "response_types_supported": ["code"], + } + + # Mock inactive token response + introspect_response = Mock() + introspect_response.status_code = 200 + introspect_response.json.return_value = { + "active": False, + } + + verifier._client.get = AsyncMock(return_value=metadata_response) + verifier._client.post = AsyncMock(return_value=introspect_response) + + token = await verifier._introspect("test_token") + + assert token is None + + await verifier.aclose() + + +@pytest.mark.asyncio +async def test_introspect_issuer_mismatch(): + """Test handling of issuer mismatch.""" + settings = MCPAuthorizationServerSettings( + enabled=True, + issuer_url="https://auth.example.com", + resource_server_url="https://api.example.com", + expected_audiences=["https://api.example.com", "https://api.example.com/"], + ) + + verifier = MCPAgentTokenVerifier(settings) + + # Mock well-known metadata + metadata_response = Mock() + metadata_response.status_code = 200 + metadata_response.json.return_value = { + "issuer": "https://auth.example.com", + "authorization_endpoint": "https://auth.example.com/authorize", + "token_endpoint": "https://auth.example.com/token", + "introspection_endpoint": "https://auth.example.com/introspect", + "response_types_supported": ["code"], + } + + # Mock response with wrong issuer + introspect_response = Mock() + introspect_response.status_code = 200 + introspect_response.json.return_value = { + "active": True, + "aud": "https://api.example.com", + "sub": "user123", + "exp": 9999999999, + "iss": "https://malicious.example.com", # Wrong issuer + } + + verifier._client.get = AsyncMock(return_value=metadata_response) + verifier._client.post = AsyncMock(return_value=introspect_response) + + token = await verifier._introspect("test_token") + + assert token is None + + await verifier.aclose() + + +@pytest.mark.asyncio +async def test_introspect_missing_required_scopes(): + """Test handling of missing required scopes.""" + settings = MCPAuthorizationServerSettings( + enabled=True, + issuer_url="https://auth.example.com", + resource_server_url="https://api.example.com", + required_scopes=["read", "write"], + expected_audiences=["https://api.example.com", "https://api.example.com/"], + ) + + verifier = MCPAgentTokenVerifier(settings) + + # Mock well-known metadata + metadata_response = Mock() + metadata_response.status_code = 200 + metadata_response.json.return_value = { + "issuer": "https://auth.example.com", + "authorization_endpoint": "https://auth.example.com/authorize", + "token_endpoint": "https://auth.example.com/token", + "introspection_endpoint": "https://auth.example.com/introspect", + "response_types_supported": ["code"], + } + + # Mock response with insufficient scopes + introspect_response = Mock() + introspect_response.status_code = 200 + introspect_response.json.return_value = { + "active": True, + "aud": "https://api.example.com", + "sub": "user123", + "exp": 9999999999, + "scope": "read", # Missing 'write' scope + "iss": "https://auth.example.com/", + } + + verifier._client.get = AsyncMock(return_value=metadata_response) + verifier._client.post = AsyncMock(return_value=introspect_response) + + token = await verifier._introspect("test_token") + + assert token is None + + await verifier.aclose() + + +@pytest.mark.asyncio +async def test_introspect_with_ttl_limit(): + """Test token cache TTL limiting.""" + settings = MCPAuthorizationServerSettings( + enabled=True, + issuer_url="https://auth.example.com", + resource_server_url="https://api.example.com", + token_cache_ttl_seconds=60, + expected_audiences=["https://api.example.com", "https://api.example.com/"], + ) + + verifier = MCPAgentTokenVerifier(settings) + + # Mock well-known metadata + metadata_response = Mock() + metadata_response.status_code = 200 + metadata_response.json.return_value = { + "issuer": "https://auth.example.com", + "authorization_endpoint": "https://auth.example.com/authorize", + "token_endpoint": "https://auth.example.com/token", + "introspection_endpoint": "https://auth.example.com/introspect", + "response_types_supported": ["code"], + } + + # Mock response with long expiration + introspect_response = Mock() + introspect_response.status_code = 200 + introspect_response.json.return_value = { + "active": True, + "aud": "https://api.example.com", + "sub": "user123", + "exp": 9999999999, # Far in the future + "iss": "https://auth.example.com/", + } + + verifier._client.get = AsyncMock(return_value=metadata_response) + verifier._client.post = AsyncMock(return_value=introspect_response) + + token = await verifier._introspect("test_token") + + assert token is not None + # The expires_at should be capped by TTL + max_expected_expiry = time.time() + 60 + 5 # TTL + small buffer + assert token.expires_at <= max_expected_expiry + + await verifier.aclose() + + +@pytest.mark.asyncio +async def test_verify_token_caching(): + """Test that verify_token properly caches tokens.""" + settings = MCPAuthorizationServerSettings( + enabled=True, + issuer_url="https://auth.example.com", + resource_server_url="https://api.example.com", + expected_audiences=["https://api.example.com", "https://api.example.com/"], + ) + + verifier = MCPAgentTokenVerifier(settings) + + # Mock well-known metadata + metadata_response = Mock() + metadata_response.status_code = 200 + metadata_response.json.return_value = { + "issuer": "https://auth.example.com", + "authorization_endpoint": "https://auth.example.com/authorize", + "token_endpoint": "https://auth.example.com/token", + "introspection_endpoint": "https://auth.example.com/introspect", + "response_types_supported": ["code"], + } + + # Mock successful introspection response + introspect_response = Mock() + introspect_response.status_code = 200 + introspect_response.json.return_value = { + "active": True, + "aud": "https://api.example.com", + "sub": "user123", + "exp": 9999999999, + "iss": "https://auth.example.com/", + } + + verifier._client.get = AsyncMock(return_value=metadata_response) + verifier._client.post = AsyncMock(return_value=introspect_response) + + # First call should hit the introspection endpoint + token1 = await verifier.verify_token("test_token") + assert token1 is not None + assert verifier._client.post.call_count == 1 + + # Second call should use cache + token2 = await verifier.verify_token("test_token") + assert token2 is not None + assert token2 is token1 # Same object from cache + assert verifier._client.post.call_count == 1 # No additional call + + await verifier.aclose() + + +@pytest.mark.asyncio +async def test_verify_token_cache_removal_on_failure(): + """Test that failed verification removes token from cache.""" + settings = MCPAuthorizationServerSettings( + enabled=True, + issuer_url="https://auth.example.com", + resource_server_url="https://api.example.com", + expected_audiences=["https://api.example.com", "https://api.example.com/"], + ) + + verifier = MCPAgentTokenVerifier(settings) + + # Mock well-known metadata + metadata_response = Mock() + metadata_response.status_code = 200 + metadata_response.json.return_value = { + "issuer": "https://auth.example.com", + "authorization_endpoint": "https://auth.example.com/authorize", + "token_endpoint": "https://auth.example.com/token", + "introspection_endpoint": "https://auth.example.com/introspect", + "response_types_supported": ["code"], + } + + verifier._client.get = AsyncMock(return_value=metadata_response) + + # First call: valid token + introspect_response1 = Mock() + introspect_response1.status_code = 200 + introspect_response1.json.return_value = { + "active": True, + "aud": "https://api.example.com", + "sub": "user123", + "exp": 9999999999, + "iss": "https://auth.example.com/", + } + + verifier._client.post = AsyncMock(return_value=introspect_response1) + + token1 = await verifier.verify_token("test_token") + assert token1 is not None + + # Second call: token becomes inactive + introspect_response2 = Mock() + introspect_response2.status_code = 200 + introspect_response2.json.return_value = { + "active": False, + } + + verifier._client.post = AsyncMock(return_value=introspect_response2) + + # Clear cache to force re-verification + verifier._cache.clear() + + token2 = await verifier.verify_token("test_token") + assert token2 is None + + # Verify token was removed from cache + assert "test_token" not in verifier._cache + + await verifier.aclose() + + +@pytest.mark.asyncio +async def test_context_manager(): + """Test using verifier as async context manager.""" + settings = MCPAuthorizationServerSettings( + enabled=True, + issuer_url="https://auth.example.com", + resource_server_url="https://api.example.com", + expected_audiences=["https://api.example.com", "https://api.example.com/"], + ) + + async with MCPAgentTokenVerifier(settings) as verifier: + assert verifier is not None + assert verifier._client is not None + + +@pytest.mark.asyncio +async def test_concurrent_metadata_fetch(): + """Test that concurrent calls to fetch metadata only make one request.""" + settings = MCPAuthorizationServerSettings( + enabled=True, + issuer_url="https://auth.example.com", + resource_server_url="https://api.example.com", + expected_audiences=["https://api.example.com", "https://api.example.com/"], + ) + + verifier = MCPAgentTokenVerifier(settings) + + # Mock HTTP client to return metadata + call_count = 0 + + async def mock_get(*args, **kwargs): + nonlocal call_count + call_count += 1 + await asyncio.sleep(0.01) # Simulate network delay + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "issuer": "https://auth.example.com", + "authorization_endpoint": "https://auth.example.com/authorize", + "token_endpoint": "https://auth.example.com/token", + "introspection_endpoint": "https://auth.example.com/oauth2/introspect", + "response_types_supported": ["code"], + } + return mock_response + + verifier._client.get = mock_get + + # Make multiple concurrent calls + results = await asyncio.gather( + verifier._ensure_introspection_endpoint(), + verifier._ensure_introspection_endpoint(), + verifier._ensure_introspection_endpoint(), + ) + + # All should return the same endpoint + assert all(r == "https://auth.example.com/oauth2/introspect" for r in results) + + # But only one HTTP call should have been made (due to locking) + assert call_count == 1 + + await verifier.aclose() + + + + +@pytest.mark.asyncio +async def test_audience_extraction(): + """Test audience extraction from various token payloads.""" + settings = MCPAuthorizationServerSettings( + enabled=True, + issuer_url="https://auth.example.com", + resource_server_url="https://api.example.com", + expected_audiences=["https://api.example.com", "https://api.example.com/"], + ) + + verifier = MCPAgentTokenVerifier(settings) + + # Test string audience + audiences = verifier._extract_audiences({"aud": "https://api.example.com"}) + assert "https://api.example.com" in audiences + + # Test array audience + audiences = verifier._extract_audiences({"aud": ["https://api1.example.com", "https://api2.example.com"]}) + assert "https://api1.example.com" in audiences + assert "https://api2.example.com" in audiences + + # Test resource claim + audiences = verifier._extract_audiences({"resource": "https://api.example.com"}) + assert "https://api.example.com" in audiences + + # Test combined aud and resource + audiences = verifier._extract_audiences({ + "aud": "https://api1.example.com", + "resource": "https://api2.example.com" + }) + assert "https://api1.example.com" in audiences + assert "https://api2.example.com" in audiences + + await verifier.aclose() + + +@pytest.mark.asyncio +async def test_audience_validation(): + """Test audience validation logic.""" + settings = MCPAuthorizationServerSettings( + enabled=True, + issuer_url="https://auth.example.com", + resource_server_url="https://api.example.com", + expected_audiences=["https://api.example.com", "https://api2.example.com"], + ) + + verifier = MCPAgentTokenVerifier(settings) + + # Valid - exact match + assert verifier._validate_audiences(["https://api.example.com"]) is True + + # Valid - one of multiple + assert verifier._validate_audiences(["https://api2.example.com"]) is True + + # Valid - multiple with one match + assert verifier._validate_audiences(["https://api.example.com", "https://other.com"]) is True + + # Invalid - no match + assert verifier._validate_audiences(["https://malicious.example.com"]) is False + + # Invalid - empty + assert verifier._validate_audiences([]) is False + + await verifier.aclose() + + +@pytest.mark.asyncio +async def test_audience_validation_failure_through_introspect(): + """Test audience validation failure during token introspection.""" + settings = MCPAuthorizationServerSettings( + enabled=True, + issuer_url="https://auth.example.com", + resource_server_url="https://api.example.com", + expected_audiences=["https://expected-api.example.com"], + ) + + verifier = MCPAgentTokenVerifier(settings) + + # Mock well-known metadata + metadata_response = Mock() + metadata_response.status_code = 200 + metadata_response.json.return_value = { + "issuer": "https://auth.example.com", + "authorization_endpoint": "https://auth.example.com/authorize", + "token_endpoint": "https://auth.example.com/token", + "introspection_endpoint": "https://auth.example.com/introspect", + "response_types_supported": ["code"], + } + + # Mock introspection response with wrong audience + introspect_response = Mock() + introspect_response.status_code = 200 + introspect_response.json.return_value = { + "active": True, + "aud": "https://wrong-api.example.com", + "sub": "user123", + "exp": 9999999999, + "iss": "https://auth.example.com/", + } + + verifier._client.get = AsyncMock(return_value=metadata_response) + verifier._client.post = AsyncMock(return_value=introspect_response) + + token = await verifier._introspect("test_token") + + # Should return None due to audience mismatch + assert token is None + + await verifier.aclose()