-
Notifications
You must be signed in to change notification settings - Fork 61
Description
Support New MCP Authorization Spec (November 2025)
Summary
Implement support for the updated MCP authorization specification that eliminates Dynamic Client Registration (DCR) in favor of decentralized client identity and enterprise-controlled authorization flows.
Reference: MCP Authorization Spec Update - Aaron Parecki
Glossary of Security Terms
Before diving into the implementation, here are the key security terms used throughout this document:
| Term | Definition | Reference |
|---|---|---|
| DCR (Dynamic Client Registration) | An OAuth 2.0 extension that allows clients to register themselves with an authorization server programmatically, without manual configuration. The new spec removes this requirement. | RFC 7591 |
| CIMD (Client ID Metadata Document) | A JSON document hosted at a URL that describes an OAuth client's properties (name, redirect URIs, logo). Instead of pre-registering, clients provide a URL to this document. | IETF Draft |
| ID-JAG (Identity Assertion Authorization Grant) | A token exchange mechanism where a client presents an ID token from one Identity Provider to obtain an access token from another. Used for enterprise SSO scenarios. | IETF Draft |
| Token Exchange | The process of exchanging one security token for another. For example, exchanging a corporate SSO token for an MCP access token. | RFC 8693 |
| Protected Resource Metadata | A JSON document that tells OAuth clients which authorization servers protect a resource and what scopes are available. | RFC 9728 |
| Authorization Server Metadata | A JSON document describing an OAuth authorization server's capabilities (endpoints, supported grant types, etc.). | RFC 8414 |
| JWKS (JSON Web Key Set) | A JSON document containing public keys used to verify JWT signatures. Authorization servers publish JWKS so clients can verify tokens. | RFC 7517 |
| PKCE (Proof Key for Code Exchange) | A security extension for OAuth that prevents authorization code interception attacks. The client creates a random "code_verifier" and sends a hash ("code_challenge") during authorization. | RFC 7636 |
| IdP (Identity Provider) | A service that authenticates users and issues identity tokens (e.g., Keycloak, Azure AD, Okta, Google). | - |
| M2M (Machine-to-Machine) | Authentication between services/applications without user interaction, typically using client credentials grant. | - |
| 3LO (Three-Legged OAuth) | OAuth flow involving three parties: user, client, and authorization server. The user explicitly authorizes the client. | - |
| 2LO (Two-Legged OAuth) | OAuth flow with only client and authorization server (no user). Used for M2M authentication. | - |
Background: Why This Change?
The previous MCP authorization approach relied on Dynamic Client Registration (DCR). This created problems:
Problems with DCR for MCP Servers
- Unbounded database growth: Every new client that registers creates a database record. With thousands of MCP clients, this becomes unmanageable.
- Rate limiting complexity: Public DCR endpoints must be rate-limited to prevent abuse, but legitimate clients may hit these limits.
- No revocation mechanism: Once a client registers, there's no standard way to revoke or audit those registrations.
- Enterprise security concerns: IT departments cannot control which clients employees register.
Problems with DCR for MCP Clients
- Secret management burden: Each registration creates credentials that must be securely stored and rotated.
- Per-server registration: A client must register separately with every MCP server it wants to access.
- No validation mechanism: Servers cannot verify that a client is legitimate before registration.
New Authorization Mechanisms
The updated spec introduces two complementary mechanisms that solve these problems:
Mechanism 1: Client ID Metadata Documents (CIMD)
What it is: Instead of registering with each server, a client publishes a JSON document at a URL it controls. This URL becomes the client's identity.
How it works:
1. Client wants to authenticate with MCP Server
2. Client sends authorization request:
GET /authorize?client_id=https://myapp.com/oauth/client.json&redirect_uri=...
3. Authorization Server fetches https://myapp.com/oauth/client.json
4. Authorization Server validates:
- The document exists and is valid JSON
- redirect_uri matches one in the document
- (Optional) Client's public key for authentication
5. Authorization proceeds as normal
Example Client Metadata Document:
{
"client_id": "https://myapp.com/oauth/client.json",
"client_name": "My MCP Client Application",
"client_uri": "https://myapp.com",
"logo_uri": "https://myapp.com/logo.png",
"redirect_uris": [
"https://myapp.com/callback",
"http://localhost:8080/callback"
],
"grant_types": ["authorization_code", "refresh_token"],
"response_types": ["code"],
"token_endpoint_auth_method": "none",
"jwks_uri": "https://myapp.com/.well-known/jwks.json"
}Security properties:
- Trust is based on DNS ownership (if you control the domain, you control the client identity)
- Clients can update their metadata without coordinating with servers
- Public keys can be published for strong client authentication
- Already implemented in VS Code
Mechanism 2: Cross App Access (XAA) / Enterprise SSO
What it is: A flow where enterprise Identity Providers (like Azure AD, Okta) control which MCP clients can access which MCP servers, based on corporate policy.
Why it matters: Without this, employees can connect any MCP client to any MCP server without IT oversight ("shadow IT"). With XAA, the enterprise IdP acts as a gatekeeper.
How it works (ID-JAG flow):
1. User logs into MCP Client using corporate SSO (Azure AD, Okta, etc.)
→ Client receives an ID token from corporate IdP
2. Client wants to access MCP Server
→ Instead of starting a new OAuth flow with MCP Server's auth server...
3. Client sends token exchange request to MCP Auth Server:
POST /token
grant_type=urn:ietf:params:oauth:grant-type:token-exchange
subject_token=<corporate_id_token>
subject_token_type=urn:ietf:params:oauth:token-type:id_token
4. MCP Auth Server:
a. Validates the corporate ID token (checks signature against corporate IdP's JWKS)
b. Verifies the corporate IdP is trusted
c. Maps corporate groups/roles to MCP scopes
d. Issues MCP access token
5. Client uses MCP access token to access MCP Server
Key benefit: No additional user interaction required after initial corporate login. The corporate IdP controls access through policy (e.g., "Engineering team can use Claude to access Jira").
Current Implementation Status
Already Implemented
| Feature | File Location | Description |
|---|---|---|
| Keycloak OAuth2/OIDC | auth_server/providers/keycloak.py |
Full Keycloak integration with JWT validation |
| Amazon Cognito OAuth2 | auth_server/providers/cognito.py |
Cognito user pools and M2M |
| Microsoft Entra ID OAuth2 | auth_server/providers/entra.py |
Azure AD integration |
| M2M Authentication | get_m2m_token() in providers |
Client credentials grant for service-to-service |
| Authorization Code Flow | exchange_code_for_token() |
Standard 3LO user authentication |
| Group-to-Scope Mapping | map_groups_to_scopes() in server.py |
Maps IdP groups to MCP scopes via scopes.yml |
| Fine-Grained Access Control | validate_server_tool_access() |
Per-server, per-tool permission checks |
| Session Cookie Auth | validate_session_cookie() |
Browser session management |
| Self-Signed Tokens | /internal/tokens endpoint |
Generate tokens for authenticated users |
Not Yet Implemented
| Feature | Priority | Implementation Location |
|---|---|---|
| Protected Resource Metadata | High | Auth Server - new endpoint |
| Client ID Metadata Documents | High | Auth Server - new module + endpoint changes |
| Authorization Server Metadata | Medium | Auth Server - new endpoint |
| Token Exchange (ID-JAG) | Medium | Auth Server - new token endpoint |
| Enterprise Policy Integration | Low | Future - depends on corporate IdP capabilities |
Implementation Guide
Task 1: Protected Resource Metadata Endpoint
RFC Reference: RFC 9728 - OAuth 2.0 Protected Resource Metadata
What to build: An endpoint at /.well-known/oauth-protected-resource that returns JSON describing how to authenticate with this MCP Gateway.
Why needed: MCP clients need to discover which authorization server protects the MCP Gateway and what scopes are available. Without this, clients must be pre-configured with this information.
File to modify: auth_server/server.py
Implementation:
@app.get("/.well-known/oauth-protected-resource")
async def protected_resource_metadata(request: Request):
"""
OAuth 2.0 Protected Resource Metadata (RFC 9728).
This endpoint tells OAuth clients:
- Which authorization server(s) can issue tokens for this resource
- What scopes are available
- How to present the bearer token
MCP clients fetch this to discover how to authenticate.
"""
auth_provider = get_auth_provider()
provider_info = auth_provider.get_provider_info()
# Determine the authorization server URL based on configured provider
if provider_info.get('provider_type') == 'keycloak':
# Keycloak authorization server is the realm endpoint
auth_servers = [
f"{provider_info.get('keycloak_url', '')}/realms/{provider_info.get('realm', '')}"
]
elif provider_info.get('provider_type') == 'cognito':
# Cognito authorization server URL format
region = os.environ.get('AWS_REGION', 'us-east-1')
user_pool_id = os.environ.get('COGNITO_USER_POOL_ID', '')
auth_servers = [
f"https://cognito-idp.{region}.amazonaws.com/{user_pool_id}"
]
elif provider_info.get('provider_type') == 'entra':
# Azure AD / Entra ID
tenant_id = os.environ.get('AZURE_TENANT_ID', '')
auth_servers = [
f"https://login.microsoftonline.com/{tenant_id}/v2.0"
]
else:
# Fallback to self
auth_servers = [os.environ.get("AUTH_SERVER_EXTERNAL_URL", str(request.base_url))]
# Get available scopes from our scopes configuration
available_scopes = []
for scope_name in SCOPES_CONFIG.keys():
if scope_name != 'group_mappings': # Skip the group mappings key
available_scopes.append(scope_name)
# Also add scopes from group mappings
for group, scopes in SCOPES_CONFIG.get('group_mappings', {}).items():
available_scopes.extend(scopes)
# Deduplicate
available_scopes = list(set(available_scopes))
return {
# The identifier for this protected resource (the MCP Gateway URL)
"resource": os.environ.get("RESOURCE_URL", str(request.base_url).rstrip('/')),
# Authorization servers that can issue tokens for this resource
"authorization_servers": auth_servers,
# Scopes that this resource understands
"scopes_supported": available_scopes,
# How to present bearer tokens (we only support Authorization header)
"bearer_methods_supported": ["header"],
# Link to documentation
"resource_documentation": os.environ.get(
"RESOURCE_DOCS_URL",
"https://github.com/mcp-gateway/mcp-gateway-registry/blob/main/docs/auth.md"
)
}Environment variables to add:
# The public URL of the MCP Gateway (used in metadata)
RESOURCE_URL=https://registry.mycorp.click
# Link to authentication documentation
RESOURCE_DOCS_URL=https://github.com/your-org/mcp-gateway-registry/docs/auth.mdNginx configuration update (in nginx.conf or gateway config):
# Expose protected resource metadata at the registry domain
location /.well-known/oauth-protected-resource {
proxy_pass http://auth-server:8888/.well-known/oauth-protected-resource;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
}Test:
curl https://registry.mycorp.click/.well-known/oauth-protected-resource | jq .Expected response:
{
"resource": "https://registry.mycorp.click",
"authorization_servers": ["https://kc.mycorp.click/realms/mcp-gateway"],
"scopes_supported": [
"mcp-servers-unrestricted/read",
"mcp-servers-unrestricted/execute",
"mcp-servers-restricted/read",
"mcp-servers-restricted/execute",
"mcp-registry-admin",
"mcp-registry-user"
],
"bearer_methods_supported": ["header"],
"resource_documentation": "https://github.com/..."
}Task 2: Authorization Server Metadata Endpoint
RFC Reference: RFC 8414 - OAuth 2.0 Authorization Server Metadata
What to build: An endpoint at /.well-known/oauth-authorization-server that describes the authorization server's capabilities.
Why needed: Clients need to discover endpoints (authorize, token, etc.) and supported features (grant types, PKCE methods) without hardcoding.
Note: If using Keycloak/Cognito, they already expose this endpoint. We may proxy to them or provide our own if we're doing token exchange.
File to modify: auth_server/server.py
Implementation:
@app.get("/.well-known/oauth-authorization-server")
async def authorization_server_metadata(request: Request):
"""
OAuth 2.0 Authorization Server Metadata (RFC 8414).
This endpoint describes the authorization server's capabilities:
- Available endpoints (authorize, token, etc.)
- Supported grant types
- Supported response types
- PKCE support
For deployments using Keycloak/Cognito, consider proxying to their
native metadata endpoints instead.
"""
base_url = os.environ.get("AUTH_SERVER_EXTERNAL_URL", str(request.base_url).rstrip('/'))
return {
# Issuer identifier (must exactly match 'iss' claim in tokens we issue)
"issuer": JWT_ISSUER,
# OAuth endpoints
"authorization_endpoint": f"{base_url}/oauth2/authorize",
"token_endpoint": f"{base_url}/token",
"jwks_uri": f"{base_url}/.well-known/jwks.json",
# Supported authentication methods for token endpoint
# "none" = public clients (no secret)
# "client_secret_basic" = HTTP Basic auth with client_id:client_secret
# "client_secret_post" = client_id and client_secret in POST body
"token_endpoint_auth_methods_supported": [
"none",
"client_secret_basic",
"client_secret_post"
],
# Supported OAuth grant types
"grant_types_supported": [
"authorization_code", # Standard user auth
"client_credentials", # M2M auth
"refresh_token", # Token refresh
"urn:ietf:params:oauth:grant-type:token-exchange" # Enterprise SSO
],
# Supported response types for authorization endpoint
"response_types_supported": ["code"],
# Available scopes
"scopes_supported": ["openid", "profile", "email", "offline_access"],
# PKCE support (S256 is the secure method, plain should only be for legacy)
"code_challenge_methods_supported": ["S256"],
# Token exchange specific (RFC 8693)
"token_exchange_subject_token_types_supported": [
"urn:ietf:params:oauth:token-type:id_token",
"urn:ietf:params:oauth:token-type:access_token"
]
}Task 3: Client ID Metadata Document (CIMD) Support
IETF Draft Reference: Client ID Metadata Document
What to build: A module that fetches, validates, and caches client metadata documents when a client presents a URL as its client_id.
Why needed: Instead of pre-registering clients, this allows any client with a valid metadata document to authenticate. This is the core of the new "decentralized client identity" model.
New file to create: auth_server/cimd.py
"""
Client ID Metadata Document (CIMD) Validator.
This module implements the IETF draft for Client ID Metadata Documents,
which allows OAuth clients to identify themselves using a URL they control
instead of pre-registering with each authorization server.
Reference: https://datatracker.ietf.org/doc/draft-parecki-oauth-client-id-metadata-document/
Security considerations:
1. Only HTTPS URLs are accepted (except localhost for development)
2. The client_id in the document must match the URL it was fetched from
3. redirect_uris are validated to prevent open redirect attacks
4. Documents are cached to prevent DoS via repeated fetches
5. Fetches have timeouts to prevent hanging
"""
import httpx
import logging
import time
from typing import Any, Dict, Optional, List
from urllib.parse import urlparse
from dataclasses import dataclass
logger = logging.getLogger(__name__)
# Cache TTL in seconds (1 hour)
CIMD_CACHE_TTL = int(os.environ.get('CIMD_CACHE_TTL', '3600'))
# Whether to allow http:// for localhost (development only)
CIMD_ALLOW_HTTP_LOCALHOST = os.environ.get('CIMD_ALLOW_HTTP_LOCALHOST', 'true').lower() == 'true'
class ClientMetadataError(Exception):
"""
Raised when client metadata validation fails.
This exception contains a user-safe error message that can be
returned to the client without exposing internal details.
"""
pass
@dataclass
class CachedMetadata:
"""Cached client metadata with timestamp."""
metadata: Dict[str, Any]
fetched_at: float
class ClientMetadataValidator:
"""
Validates Client ID Metadata Documents.
Usage:
validator = ClientMetadataValidator()
# Check if client_id is a URL (CIMD) or traditional ID
if validator.is_url_client_id(client_id):
metadata = await validator.fetch_and_validate(client_id)
if not validator.is_redirect_uri_allowed(metadata, redirect_uri):
raise HTTPException(400, "redirect_uri not allowed")
"""
def __init__(self):
# In-memory cache: client_id -> CachedMetadata
self._cache: Dict[str, CachedMetadata] = {}
def is_url_client_id(self, client_id: str) -> bool:
"""
Check if client_id is a URL (indicating CIMD) or traditional client ID.
Args:
client_id: The client_id from the OAuth request
Returns:
True if client_id is a URL, False if traditional
"""
return client_id.startswith('http://') or client_id.startswith('https://')
def _validate_client_id_url(self, client_id: str) -> None:
"""
Validate that a client_id URL is acceptable.
Security rules:
1. Must be HTTPS (except localhost in development)
2. Must have a path component (not just domain)
3. Must not have query parameters or fragments
Args:
client_id: The URL to validate
Raises:
ClientMetadataError: If URL is invalid
"""
try:
parsed = urlparse(client_id)
except Exception as e:
raise ClientMetadataError(f"Invalid client_id URL: {e}")
# Check scheme
if parsed.scheme == 'https':
pass # Always allowed
elif parsed.scheme == 'http':
if CIMD_ALLOW_HTTP_LOCALHOST and parsed.hostname in ('localhost', '127.0.0.1'):
logger.warning(f"Allowing HTTP client_id for localhost: {client_id}")
else:
raise ClientMetadataError(
"client_id URL must use HTTPS (http:// only allowed for localhost in development)"
)
else:
raise ClientMetadataError(f"client_id URL must use HTTPS, got {parsed.scheme}")
# Must have a path (not just domain)
if not parsed.path or parsed.path == '/':
raise ClientMetadataError(
"client_id URL must have a path component (e.g., https://example.com/client.json)"
)
# Should not have query or fragment (they could be manipulated)
if parsed.query or parsed.fragment:
raise ClientMetadataError(
"client_id URL must not have query parameters or fragments"
)
async def fetch_and_validate(self, client_id: str) -> Dict[str, Any]:
"""
Fetch client metadata document and validate it.
This method:
1. Checks cache first
2. Validates the client_id URL
3. Fetches the document over HTTPS
4. Validates required fields
5. Caches the result
Args:
client_id: URL of the client metadata document
Returns:
Validated client metadata dictionary
Raises:
ClientMetadataError: If validation fails
"""
# Check cache
if client_id in self._cache:
cached = self._cache[client_id]
if time.time() - cached.fetched_at < CIMD_CACHE_TTL:
logger.debug(f"Using cached client metadata for {client_id}")
return cached.metadata
# Validate URL format
self._validate_client_id_url(client_id)
# Fetch the document
try:
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(
client_id,
headers={
"Accept": "application/json",
"User-Agent": "MCP-Gateway-Auth-Server/1.0"
},
follow_redirects=True
)
response.raise_for_status()
metadata = response.json()
except httpx.TimeoutException:
raise ClientMetadataError(f"Timeout fetching client metadata from {client_id}")
except httpx.HTTPStatusError as e:
raise ClientMetadataError(
f"HTTP {e.response.status_code} fetching client metadata from {client_id}"
)
except Exception as e:
logger.error(f"Error fetching client metadata from {client_id}: {e}")
raise ClientMetadataError(f"Failed to fetch client metadata: {e}")
# Validate the document
self._validate_metadata(metadata, client_id)
# Cache it
self._cache[client_id] = CachedMetadata(metadata=metadata, fetched_at=time.time())
logger.info(f"Successfully validated client metadata for {metadata.get('client_name', client_id)}")
return metadata
def _validate_metadata(self, metadata: Dict[str, Any], client_id: str) -> None:
"""
Validate required fields in client metadata document.
Required fields per the spec:
- client_id: Must match the URL we fetched from
- redirect_uris: List of allowed redirect URIs
Optional but validated if present:
- client_name: Human-readable name
- logo_uri: Must be HTTPS
- client_uri: Must be HTTPS
Args:
metadata: The parsed JSON document
client_id: The URL we fetched from
Raises:
ClientMetadataError: If validation fails
"""
# client_id in document must match fetch URL (prevents DNS rebinding attacks)
doc_client_id = metadata.get('client_id')
if doc_client_id != client_id:
raise ClientMetadataError(
f"client_id mismatch: document contains '{doc_client_id}' but was fetched from '{client_id}'. "
"The client_id field in the document must exactly match the URL."
)
# redirect_uris is required
redirect_uris = metadata.get('redirect_uris')
if not redirect_uris:
raise ClientMetadataError("Client metadata must contain 'redirect_uris' field")
if not isinstance(redirect_uris, list):
raise ClientMetadataError("redirect_uris must be an array")
if len(redirect_uris) == 0:
raise ClientMetadataError("redirect_uris must contain at least one URI")
# Validate each redirect URI
for uri in redirect_uris:
if not isinstance(uri, str):
raise ClientMetadataError(f"redirect_uri must be a string, got {type(uri)}")
try:
parsed = urlparse(uri)
if not parsed.scheme or not parsed.netloc:
raise ClientMetadataError(f"Invalid redirect_uri: {uri}")
except Exception:
raise ClientMetadataError(f"Invalid redirect_uri: {uri}")
# Validate optional URIs are HTTPS if present
for field in ['logo_uri', 'client_uri', 'policy_uri', 'tos_uri']:
if field in metadata:
uri = metadata[field]
if not uri.startswith('https://'):
# Allow http for localhost
parsed = urlparse(uri)
if not (CIMD_ALLOW_HTTP_LOCALHOST and parsed.hostname in ('localhost', '127.0.0.1')):
raise ClientMetadataError(f"{field} must use HTTPS: {uri}")
def is_redirect_uri_allowed(self, metadata: Dict[str, Any], redirect_uri: str) -> bool:
"""
Check if a redirect_uri is allowed for this client.
Security note: This implements EXACT matching as required by OAuth 2.0.
No wildcards or partial matching is allowed, as that could enable
open redirect vulnerabilities.
Exception: For localhost URIs, we allow port variations to support
development scenarios where the port may change.
Args:
metadata: Validated client metadata
redirect_uri: The redirect_uri from the OAuth request
Returns:
True if allowed, False otherwise
"""
allowed_uris = metadata.get('redirect_uris', [])
# Exact match (required by OAuth 2.0 security best practices)
if redirect_uri in allowed_uris:
return True
# Development convenience: allow localhost port variations
parsed = urlparse(redirect_uri)
if parsed.hostname in ('localhost', '127.0.0.1'):
for allowed in allowed_uris:
allowed_parsed = urlparse(allowed)
if allowed_parsed.hostname in ('localhost', '127.0.0.1'):
# Same path is sufficient for localhost
if allowed_parsed.path == parsed.path:
logger.debug(
f"Allowing localhost redirect_uri port variation: "
f"{redirect_uri} matches {allowed}"
)
return True
return False
async def get_client_jwks(self, metadata: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""
Fetch client's public keys for client authentication.
Some clients (especially confidential clients) publish public keys
that they use to sign requests. This allows the authorization server
to verify that requests actually come from the legitimate client.
Args:
metadata: Validated client metadata
Returns:
JWKS dictionary or None if client doesn't publish keys
"""
# Check for jwks_uri (external key set)
jwks_uri = metadata.get('jwks_uri')
if jwks_uri:
try:
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(jwks_uri)
response.raise_for_status()
return response.json()
except Exception as e:
logger.warning(f"Failed to fetch client JWKS from {jwks_uri}: {e}")
return None
# Check for inline jwks
return metadata.get('jwks')
def get_client_name(self, metadata: Dict[str, Any]) -> str:
"""Get human-readable client name for consent screens."""
return metadata.get('client_name', metadata.get('client_id', 'Unknown Client'))
def get_client_logo(self, metadata: Dict[str, Any]) -> Optional[str]:
"""Get client logo URL for consent screens."""
return metadata.get('logo_uri')
def clear_cache(self, client_id: Optional[str] = None) -> None:
"""
Clear cached client metadata.
Args:
client_id: Specific client to clear, or None for all
"""
if client_id:
self._cache.pop(client_id, None)
else:
self._cache.clear()
# Global validator instance
cimd_validator = ClientMetadataValidator()Integration with OAuth2 authorization endpoint:
Modify the authorization flow in server.py to support URL-based client_id:
# Add import at top
from cimd import cimd_validator, ClientMetadataError
# Modify or create authorization endpoint
@app.get("/oauth2/authorize")
async def oauth2_authorize(
request: Request,
client_id: str,
redirect_uri: str,
response_type: str = "code",
scope: str = "openid",
state: Optional[str] = None,
code_challenge: Optional[str] = None,
code_challenge_method: Optional[str] = None
):
"""
OAuth2 Authorization Endpoint with CIMD support.
If client_id is a URL, it's treated as a Client ID Metadata Document URL.
Otherwise, it's treated as a traditional pre-registered client ID.
"""
# Determine if this is a CIMD client or traditional client
if cimd_validator.is_url_client_id(client_id):
# CIMD flow: fetch and validate client metadata
try:
client_metadata = await cimd_validator.fetch_and_validate(client_id)
except ClientMetadataError as e:
logger.warning(f"CIMD validation failed for {client_id}: {e}")
raise HTTPException(status_code=400, detail=str(e))
# Validate redirect_uri against metadata
if not cimd_validator.is_redirect_uri_allowed(client_metadata, redirect_uri):
raise HTTPException(
status_code=400,
detail=f"redirect_uri '{redirect_uri}' is not registered for this client"
)
# Extract client info for consent screen
client_name = cimd_validator.get_client_name(client_metadata)
client_logo = cimd_validator.get_client_logo(client_metadata)
logger.info(f"CIMD client authorized: {client_name} ({client_id})")
else:
# Traditional pre-registered client
# ... existing validation logic ...
client_name = client_id # Or look up from database
client_logo = None
# Continue with standard OAuth2 authorization flow
# (redirect to IdP, show consent screen, etc.)
# ...Task 4: Token Exchange / ID-JAG Support
RFC Reference: RFC 8693 - OAuth 2.0 Token Exchange
What to build: A token endpoint that accepts ID tokens from trusted enterprise Identity Providers and exchanges them for MCP access tokens.
Why needed: This enables enterprise SSO scenarios where:
- Employee logs into corporate IdP (Azure AD, Okta, etc.) once
- MCP clients can then obtain MCP access tokens without additional user interaction
- Enterprise IT controls which clients can access which MCP servers via IdP policies
New file to create: auth_server/token_exchange.py
"""
OAuth 2.0 Token Exchange (RFC 8693) implementation.
This module implements the token exchange grant type, which allows
exchanging tokens from trusted enterprise Identity Providers for
MCP access tokens.
Use case: Enterprise SSO
1. Employee logs into corporate IdP (Azure AD, Okta, etc.)
2. MCP client receives corporate ID token
3. MCP client exchanges corporate ID token for MCP access token
4. Enterprise IT controls access via corporate IdP policies
Security considerations:
1. Only tokens from explicitly trusted IdPs are accepted
2. IdP trust is established via JWKS verification
3. Token exchange is logged for audit purposes
4. Scopes are mapped from corporate groups, not blindly trusted
Reference: https://datatracker.ietf.org/doc/html/rfc8693
"""
import httpx
import jwt
import logging
import os
import time
import uuid
import yaml
from typing import Any, Dict, List, Optional
from dataclasses import dataclass
from functools import lru_cache
logger = logging.getLogger(__name__)
# Token type URIs from RFC 8693
class TokenType:
"""Standard token type URIs from RFC 8693."""
ACCESS_TOKEN = "urn:ietf:params:oauth:token-type:access_token"
REFRESH_TOKEN = "urn:ietf:params:oauth:token-type:refresh_token"
ID_TOKEN = "urn:ietf:params:oauth:token-type:id_token"
JWT = "urn:ietf:params:oauth:token-type:jwt"
@dataclass
class TrustedIdP:
"""
Configuration for a trusted enterprise Identity Provider.
Tokens from this IdP will be accepted for token exchange.
"""
# Issuer claim value (must match 'iss' in tokens)
issuer: str
# URL to fetch JWKS for signature verification
jwks_uri: str
# Human-readable name for logging
name: str
# Optional: restrict to specific client IDs
# If empty, any client_id from this IdP is allowed
allowed_client_ids: List[str]
# Claim name containing user groups (varies by IdP)
# Azure AD: "groups" or "roles"
# Okta: "groups"
# Google: "hd" (hosted domain)
group_claim: str
# Map IdP groups to MCP scopes
# Example: {"Engineering": ["mcp-servers-unrestricted/read"]}
scope_mappings: Dict[str, List[str]]
class TokenExchangeError(Exception):
"""Error during token exchange."""
pass
class TokenExchangeHandler:
"""
Handles OAuth 2.0 Token Exchange requests.
This class validates tokens from trusted enterprise IdPs and
exchanges them for MCP access tokens.
"""
def __init__(self, config_path: Optional[str] = None):
"""
Initialize with trusted IdP configuration.
Args:
config_path: Path to trusted_idps.yml, or None to use env var
"""
self._trusted_idps: Dict[str, TrustedIdP] = {}
self._jwks_cache: Dict[str, tuple[Dict, float]] = {}
self._jwks_cache_ttl = 3600 # 1 hour
self._load_trusted_idps(config_path)
def _load_trusted_idps(self, config_path: Optional[str]) -> None:
"""Load trusted IdP configuration from YAML file."""
path = config_path or os.environ.get('TRUSTED_IDPS_CONFIG', 'trusted_idps.yml')
try:
with open(path, 'r') as f:
config = yaml.safe_load(f)
except FileNotFoundError:
logger.warning(f"Trusted IdPs config not found at {path}. Token exchange disabled.")
return
except Exception as e:
logger.error(f"Failed to load trusted IdPs config: {e}")
return
for idp_name, idp_config in config.get('trusted_idps', {}).items():
try:
idp = TrustedIdP(
issuer=idp_config['issuer'],
jwks_uri=idp_config['jwks_uri'],
name=idp_config.get('name', idp_name),
allowed_client_ids=idp_config.get('allowed_client_ids', []),
group_claim=idp_config.get('group_claim', 'groups'),
scope_mappings=idp_config.get('scope_mappings', {})
)
self._trusted_idps[idp.issuer] = idp
logger.info(f"Loaded trusted IdP: {idp.name} ({idp.issuer})")
except KeyError as e:
logger.error(f"Invalid IdP config for {idp_name}: missing {e}")
logger.info(f"Loaded {len(self._trusted_idps)} trusted IdPs")
async def _get_jwks(self, jwks_uri: str) -> Dict[str, Any]:
"""
Fetch JWKS from IdP with caching.
JWKS (JSON Web Key Set) contains the public keys used to
verify token signatures.
"""
# Check cache
if jwks_uri in self._jwks_cache:
jwks, cached_at = self._jwks_cache[jwks_uri]
if time.time() - cached_at < self._jwks_cache_ttl:
return jwks
# Fetch from IdP
try:
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(jwks_uri)
response.raise_for_status()
jwks = response.json()
except Exception as e:
raise TokenExchangeError(f"Failed to fetch JWKS from {jwks_uri}: {e}")
# Cache
self._jwks_cache[jwks_uri] = (jwks, time.time())
return jwks
async def validate_subject_token(
self,
token: str,
token_type: str
) -> Dict[str, Any]:
"""
Validate a subject token from a trusted IdP.
This method:
1. Decodes the token to find the issuer
2. Verifies the issuer is in our trusted list
3. Fetches the IdP's JWKS
4. Verifies the token signature
5. Validates claims (expiration, etc.)
6. Checks client_id restrictions if configured
Args:
token: The JWT token to validate
token_type: Token type URI (must be id_token or access_token)
Returns:
Dictionary containing:
- claims: Validated token claims
- idp: TrustedIdP configuration
- subject: User identifier
- groups: User's group memberships
Raises:
TokenExchangeError: If validation fails
"""
# Validate token type
if token_type not in (TokenType.ID_TOKEN, TokenType.ACCESS_TOKEN, TokenType.JWT):
raise TokenExchangeError(
f"Unsupported subject_token_type: {token_type}. "
f"Supported types: {TokenType.ID_TOKEN}, {TokenType.ACCESS_TOKEN}"
)
# Decode without verification to get issuer
try:
unverified = jwt.decode(token, options={"verify_signature": False})
except jwt.DecodeError as e:
raise TokenExchangeError(f"Invalid JWT format: {e}")
# Get issuer
issuer = unverified.get('iss')
if not issuer:
raise TokenExchangeError("Token missing 'iss' (issuer) claim")
# Check if issuer is trusted
if issuer not in self._trusted_idps:
raise TokenExchangeError(
f"Token issuer '{issuer}' is not trusted. "
"Contact your administrator to add this IdP to the trusted list."
)
idp = self._trusted_idps[issuer]
# Fetch JWKS
try:
jwks = await self._get_jwks(idp.jwks_uri)
except TokenExchangeError:
raise
except Exception as e:
raise TokenExchangeError(f"Failed to verify token: {e}")
# Find signing key
kid = jwt.get_unverified_header(token).get('kid')
if not kid:
raise TokenExchangeError("Token missing 'kid' (key ID) in header")
signing_key = None
for key in jwks.get('keys', []):
if key.get('kid') == kid:
try:
from jwt import PyJWK
signing_key = PyJWK(key).key
except Exception as e:
raise TokenExchangeError(f"Invalid key format: {e}")
break
if not signing_key:
raise TokenExchangeError(
f"No key found with kid '{kid}'. The IdP may have rotated keys. "
"Try again in a few minutes."
)
# Validate token
try:
claims = jwt.decode(
token,
signing_key,
algorithms=['RS256', 'RS384', 'RS512', 'ES256', 'ES384', 'ES512'],
issuer=issuer,
options={
"verify_aud": False, # Audience will be the original client, not us
"verify_exp": True,
"verify_iat": True
}
)
except jwt.ExpiredSignatureError:
raise TokenExchangeError("Subject token has expired")
except jwt.InvalidIssuerError:
raise TokenExchangeError("Token issuer mismatch")
except jwt.InvalidTokenError as e:
raise TokenExchangeError(f"Invalid token: {e}")
# Check client_id restrictions
if idp.allowed_client_ids:
# azp = authorized party (client that requested the token)
token_client = claims.get('azp') or claims.get('client_id') or claims.get('aud')
if isinstance(token_client, list):
token_client = token_client[0] if token_client else None
if token_client not in idp.allowed_client_ids:
raise TokenExchangeError(
f"Client '{token_client}' is not authorized for token exchange with this IdP"
)
# Extract user groups
groups = claims.get(idp.group_claim, [])
if isinstance(groups, str):
groups = [groups]
# Get user identifier
subject = (
claims.get('preferred_username') or
claims.get('email') or
claims.get('sub')
)
logger.info(
f"Token exchange: validated token from {idp.name} for user {subject} "
f"with groups {groups}"
)
return {
'claims': claims,
'idp': idp,
'subject': subject,
'groups': groups
}
def map_groups_to_scopes(
self,
groups: List[str],
idp: TrustedIdP
) -> List[str]:
"""
Map IdP groups to MCP scopes.
This uses two sources:
1. IdP-specific scope mappings from trusted_idps.yml
2. Global group mappings from scopes.yml
Args:
groups: User's group memberships from IdP
idp: TrustedIdP configuration
Returns:
List of MCP scopes
"""
scopes = set()
# Apply IdP-specific mappings
for group in groups:
if group in idp.scope_mappings:
scopes.update(idp.scope_mappings[group])
# Also apply global group mappings (from scopes.yml)
# Import here to avoid circular dependency
from server import map_groups_to_scopes as global_map
scopes.update(global_map(groups))
return list(scopes)
# Global handler instance
token_exchange_handler = TokenExchangeHandler()Configuration file to create: auth_server/trusted_idps.yml
# Trusted Enterprise Identity Providers
#
# Tokens from these IdPs can be exchanged for MCP access tokens.
# This enables enterprise SSO scenarios where employees authenticate
# once with their corporate IdP and can then access MCP servers
# without additional OAuth flows.
#
# SECURITY NOTE: Only add IdPs that you trust to assert user identity.
# The MCP Gateway will accept any valid token from these issuers.
trusted_idps:
# Microsoft Azure AD / Entra ID
azure_ad_example:
# Issuer URL (must match 'iss' claim in tokens)
# Replace {tenant-id} with your Azure AD tenant ID
issuer: "https://login.microsoftonline.com/{tenant-id}/v2.0"
# JWKS endpoint for signature verification
jwks_uri: "https://login.microsoftonline.com/{tenant-id}/discovery/v2.0/keys"
# Human-readable name (for logging)
name: "Contoso Azure AD"
# Optional: restrict to specific client IDs
# If empty or omitted, any client from this IdP is allowed
allowed_client_ids:
- "00000000-0000-0000-0000-000000000001" # Claude Desktop
- "00000000-0000-0000-0000-000000000002" # VS Code
# Claim containing user groups
# Azure AD uses "groups" by default, but may need app configuration
group_claim: "groups"
# Map Azure AD groups to MCP scopes
# Group can be name or object ID
scope_mappings:
# Azure AD group "Engineering" gets full MCP access
"Engineering":
- "mcp-servers-unrestricted/read"
- "mcp-servers-unrestricted/execute"
# Azure AD group "QA" gets restricted access
"QA":
- "mcp-servers-restricted/read"
- "mcp-servers-restricted/execute"
# Azure AD group "MCP-Admins" gets admin access
"MCP-Admins":
- "mcp-registry-admin"
- "mcp-servers-unrestricted/read"
- "mcp-servers-unrestricted/execute"
# Okta example
okta_example:
issuer: "https://company.okta.com/oauth2/default"
jwks_uri: "https://company.okta.com/oauth2/default/v1/keys"
name: "Company Okta"
group_claim: "groups"
scope_mappings:
"mcp-users":
- "mcp-servers-restricted/read"
"mcp-power-users":
- "mcp-servers-unrestricted/read"
- "mcp-servers-unrestricted/execute"
# Google Workspace example
google_workspace:
issuer: "https://accounts.google.com"
jwks_uri: "https://www.googleapis.com/oauth2/v3/certs"
name: "Google Workspace"
# Google uses "hd" (hosted domain) instead of groups
group_claim: "hd"
scope_mappings:
# Trust all users from company.com domain
"company.com":
- "mcp-servers-restricted/read"
- "mcp-servers-restricted/execute"
# Keycloak example (for multi-Keycloak setups)
external_keycloak:
issuer: "https://auth.partner.com/realms/partner"
jwks_uri: "https://auth.partner.com/realms/partner/protocol/openid-connect/certs"
name: "Partner Keycloak"
group_claim: "groups"
scope_mappings:
"/partner-mcp-users": # Keycloak groups have leading slash
- "mcp-servers-restricted/read"Token endpoint implementation (add to server.py):
from token_exchange import token_exchange_handler, TokenExchangeError, TokenType
@app.post("/token")
async def token_endpoint(
request: Request,
grant_type: str = Form(...),
# Token exchange parameters (RFC 8693)
subject_token: Optional[str] = Form(None),
subject_token_type: Optional[str] = Form(None),
actor_token: Optional[str] = Form(None),
actor_token_type: Optional[str] = Form(None),
resource: Optional[str] = Form(None),
audience: Optional[str] = Form(None),
scope: Optional[str] = Form(None),
requested_token_type: Optional[str] = Form(None),
# Client authentication
client_id: Optional[str] = Form(None),
client_secret: Optional[str] = Form(None),
# Authorization code parameters
code: Optional[str] = Form(None),
redirect_uri: Optional[str] = Form(None),
code_verifier: Optional[str] = Form(None)
):
"""
OAuth 2.0 Token Endpoint.
Supported grant types:
- authorization_code: Exchange auth code for tokens (user authentication)
- client_credentials: M2M authentication
- refresh_token: Refresh an access token
- urn:ietf:params:oauth:grant-type:token-exchange: Enterprise SSO (RFC 8693)
"""
if grant_type == "urn:ietf:params:oauth:grant-type:token-exchange":
# Token Exchange flow (RFC 8693)
# Validate required parameters
if not subject_token:
raise HTTPException(
status_code=400,
detail="subject_token is required for token exchange"
)
if not subject_token_type:
raise HTTPException(
status_code=400,
detail="subject_token_type is required for token exchange"
)
# Validate the subject token
try:
validation = await token_exchange_handler.validate_subject_token(
subject_token,
subject_token_type
)
except TokenExchangeError as e:
logger.warning(f"Token exchange failed: {e}")
raise HTTPException(status_code=401, detail=str(e))
# Map IdP groups to MCP scopes
user_scopes = token_exchange_handler.map_groups_to_scopes(
validation['groups'],
validation['idp']
)
# Filter to requested scopes if specified
if scope:
requested = set(scope.split())
user_scopes = [s for s in user_scopes if s in requested]
if not user_scopes:
raise HTTPException(
status_code=403,
detail="No valid scopes available for this user. Contact your administrator."
)
# Generate access token
current_time = int(time.time())
expires_in = 3600 # 1 hour
access_payload = {
"iss": JWT_ISSUER,
"aud": audience or JWT_AUDIENCE,
"sub": validation['subject'],
"scope": " ".join(user_scopes),
"exp": current_time + expires_in,
"iat": current_time,
"jti": str(uuid.uuid4()),
"token_use": "access",
# Record the token exchange for audit
"act": {
"sub": validation['claims'].get('sub'),
"iss": validation['idp'].issuer
},
"exchange_grant": "token_exchange",
"original_idp": validation['idp'].name
}
access_token = jwt.encode(access_payload, SECRET_KEY, algorithm='HS256')
logger.info(
f"Token exchange successful: issued token for {validation['subject']} "
f"from {validation['idp'].name} with scopes {user_scopes}"
)
return {
"access_token": access_token,
"token_type": "Bearer",
"expires_in": expires_in,
"scope": " ".join(user_scopes),
"issued_token_type": TokenType.ACCESS_TOKEN
}
elif grant_type == "client_credentials":
# Existing M2M flow - delegate to provider
auth_provider = get_auth_provider()
return auth_provider.get_m2m_token(client_id, client_secret, scope)
elif grant_type == "authorization_code":
# Existing auth code flow
# ... existing implementation ...
pass
elif grant_type == "refresh_token":
# Refresh token flow
# ... existing implementation ...
pass
else:
raise HTTPException(
status_code=400,
detail=f"Unsupported grant_type: {grant_type}. "
f"Supported: authorization_code, client_credentials, refresh_token, "
f"urn:ietf:params:oauth:grant-type:token-exchange"
)What Changes Are Needed in MCP Servers?
Short answer: Most MCP servers behind the gateway need no changes.
The gateway handles all authentication. Individual MCP servers just receive authenticated requests with user context in headers.
Exceptions:
-
Direct-access MCP servers (not behind gateway): If an MCP server can be accessed directly (not through the gateway), it should expose
/.well-known/oauth-protected-resourceto help clients discover authentication requirements. -
Nginx/Gateway configuration: The gateway's nginx needs routes to expose the well-known endpoints:
# Add to gateway nginx configuration
# Protected Resource Metadata (for MCP clients to discover auth requirements)
location /.well-known/oauth-protected-resource {
proxy_pass http://auth-server:8888/.well-known/oauth-protected-resource;
proxy_set_header Host $host;
proxy_set_header X-Forwarded-Proto $scheme;
}
# Authorization Server Metadata
location /.well-known/oauth-authorization-server {
proxy_pass http://auth-server:8888/.well-known/oauth-authorization-server;
proxy_set_header Host $host;
proxy_set_header X-Forwarded-Proto $scheme;
}Testing Checklist
Protected Resource Metadata
- Endpoint returns valid JSON
-
authorization_serversarray contains correct IdP URL -
scopes_supportedcontains all scopes from scopes.yml - Works with all configured providers (Keycloak, Cognito, Entra)
CIMD Support
- URL client_id is recognized and fetched
- Invalid URLs are rejected
- HTTP URLs rejected (except localhost)
- client_id mismatch in document rejected
- redirect_uri validation works
- Caching works correctly
- Traditional client_ids still work
Token Exchange
- Valid ID tokens from trusted IdPs are accepted
- Tokens from untrusted IdPs are rejected
- Expired tokens are rejected
- Group-to-scope mapping works
- Issued tokens are valid and contain correct claims
- Rate limiting prevents abuse
Environment Variables Summary
# Protected Resource Metadata
RESOURCE_URL=https://registry.mycorp.click
RESOURCE_DOCS_URL=https://github.com/org/repo/docs/auth.md
# CIMD
CIMD_CACHE_TTL=3600
CIMD_ALLOW_HTTP_LOCALHOST=true # Set to false in production
# Token Exchange
TRUSTED_IDPS_CONFIG=/app/config/trusted_idps.yml
TOKEN_EXCHANGE_ENABLED=trueMigration Notes
Breaking changes: None. All existing authentication flows continue to work.
Recommended rollout:
- Deploy Protected Resource Metadata endpoint (non-breaking)
- Deploy Authorization Server Metadata endpoint (non-breaking)
- Deploy CIMD support (clients must opt-in by using URL client_id)
- Deploy Token Exchange (requires trusted IdP configuration)