Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
.python-version

# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
Expand Down
15 changes: 12 additions & 3 deletions src/mcp/client/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -478,9 +478,18 @@ async def _handle_oauth_metadata_response(self, response: httpx.Response) -> Non
content = await response.aread()
metadata = OAuthMetadata.model_validate_json(content)
self.context.oauth_metadata = metadata
# Apply default scope if needed
if self.context.client_metadata.scope is None and metadata.scopes_supported is not None:
self.context.client_metadata.scope = " ".join(metadata.scopes_supported)

# Only set scope if client_metadata.scope is None
if self.context.client_metadata.scope is None:
# Priority 1: Use PRM's scopes_supported if available
if (
self.context.protected_resource_metadata is not None
and self.context.protected_resource_metadata.scopes_supported is not None
):
self.context.client_metadata.scope = " ".join(self.context.protected_resource_metadata.scopes_supported)
# Priority 2: Fall back to OAuth metadata scopes if available
elif metadata.scopes_supported is not None:
self.context.client_metadata.scope = " ".join(metadata.scopes_supported)

async def async_auth_flow(self, request: httpx.Request) -> AsyncGenerator[httpx.Request, httpx.Response]:
"""HTTPX auth flow integration."""
Expand Down
133 changes: 133 additions & 0 deletions tests/client/test_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,63 @@ async def callback_handler() -> tuple[str, str | None]:
)


@pytest.fixture
def oauth_provider_without_scope(oauth_provider: OAuthClientProvider) -> OAuthClientProvider:
"""Create OAuth provider without predefined scope."""
oauth_provider.context.client_metadata.scope = None
return oauth_provider


@pytest.fixture
def oauth_metadata_response():
"""Common OAuth metadata response with scopes."""
return httpx.Response(
200,
content=(
b'{"issuer": "https://auth.example.com", '
b'"authorization_endpoint": "https://auth.example.com/authorize", '
b'"token_endpoint": "https://auth.example.com/token", '
b'"registration_endpoint": "https://auth.example.com/register", '
b'"scopes_supported": ["read", "write", "admin"]}'
),
)


@pytest.fixture
def prm_metadata():
"""PRM metadata with scopes."""
return ProtectedResourceMetadata(
resource=AnyHttpUrl("https://api.example.com/v1/mcp"),
authorization_servers=[AnyHttpUrl("https://auth.example.com")],
scopes_supported=["resource:read", "resource:write"],
)


@pytest.fixture
def prm_metadata_without_scopes():
"""PRM metadata without scopes."""
return ProtectedResourceMetadata(
resource=AnyHttpUrl("https://api.example.com/v1/mcp"),
authorization_servers=[AnyHttpUrl("https://auth.example.com")],
scopes_supported=None,
)


@pytest.fixture
def oauth_metadata_response_without_scopes():
"""OAuth metadata response without scopes."""
return httpx.Response(
200,
content=(
b'{"issuer": "https://auth.example.com", '
b'"authorization_endpoint": "https://auth.example.com/authorize", '
b'"token_endpoint": "https://auth.example.com/token", '
b'"registration_endpoint": "https://auth.example.com/register"}'
# No scopes_supported field
),
)


class TestPKCEParameters:
"""Test PKCE parameter generation."""

Expand Down Expand Up @@ -391,6 +448,82 @@ async def test_handle_metadata_response_success(self, oauth_provider: OAuthClien
assert oauth_provider.context.oauth_metadata is not None
assert str(oauth_provider.context.oauth_metadata.issuer) == "https://auth.example.com/"

@pytest.mark.anyio
async def test_prioritize_prm_scopes_over_oauth_metadata(
self,
oauth_provider_without_scope: OAuthClientProvider,
oauth_metadata_response: httpx.Response,
prm_metadata: ProtectedResourceMetadata,
):
"""Test that PRM scopes are prioritized over auth server metadata scopes."""
provider = oauth_provider_without_scope

# Set up PRM metadata with specific scopes
provider.context.protected_resource_metadata = prm_metadata

# Process the OAuth metadata
await provider._handle_oauth_metadata_response(oauth_metadata_response)

# Verify that PRM scopes are used (not OAuth metadata scopes)
assert provider.context.client_metadata.scope == "resource:read resource:write"

@pytest.mark.anyio
async def test_fallback_to_oauth_metadata_scopes_when_no_prm_scopes(
self,
oauth_provider_without_scope: OAuthClientProvider,
oauth_metadata_response: httpx.Response,
prm_metadata_without_scopes: ProtectedResourceMetadata,
):
"""Test fallback to OAuth metadata scopes when PRM has no scopes."""
provider = oauth_provider_without_scope

# Set up PRM metadata without scopes
provider.context.protected_resource_metadata = prm_metadata_without_scopes

# Process the OAuth metadata
await provider._handle_oauth_metadata_response(oauth_metadata_response)

# Verify that OAuth metadata scopes are used as fallback
assert provider.context.client_metadata.scope == "read write admin"

@pytest.mark.anyio
async def test_no_scope_changes_when_both_missing(
self,
oauth_provider_without_scope: OAuthClientProvider,
prm_metadata_without_scopes: ProtectedResourceMetadata,
oauth_metadata_response_without_scopes: httpx.Response,
):
"""Test that no scope changes occur when both PRM and OAuth metadata lack scopes."""
provider = oauth_provider_without_scope

# Set up PRM metadata without scopes
provider.context.protected_resource_metadata = prm_metadata_without_scopes

# Process the OAuth metadata
await provider._handle_oauth_metadata_response(oauth_metadata_response_without_scopes)

# Verify that scope remains None
assert provider.context.client_metadata.scope is None

@pytest.mark.anyio
async def test_preserve_existing_client_scope(
self,
oauth_provider: OAuthClientProvider,
oauth_metadata_response: httpx.Response,
prm_metadata: ProtectedResourceMetadata,
):
"""Test that existing client scope is preserved regardless of metadata."""
provider = oauth_provider

# Set up PRM metadata with scopes
provider.context.protected_resource_metadata = prm_metadata

# Process the OAuth metadata
await provider._handle_oauth_metadata_response(oauth_metadata_response)

# Verify that predefined scope is preserved
assert provider.context.client_metadata.scope == "read write"

@pytest.mark.anyio
async def test_register_client_request(self, oauth_provider: OAuthClientProvider):
"""Test client registration request building."""
Expand Down
Loading