diff --git a/.gitignore b/.gitignore index c316541b4..575388c8b 100644 --- a/.gitignore +++ b/.gitignore @@ -88,7 +88,7 @@ ipython_config.py # pyenv # For a library or package, you might want to ignore these files since the code is # intended to run in multiple environments; otherwise, check them in: -# .python-version +.python-version # pipenv # According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. diff --git a/src/mcp/client/auth.py b/src/mcp/client/auth.py index 376036e8c..57faa0f48 100644 --- a/src/mcp/client/auth.py +++ b/src/mcp/client/auth.py @@ -478,9 +478,18 @@ async def _handle_oauth_metadata_response(self, response: httpx.Response) -> Non content = await response.aread() metadata = OAuthMetadata.model_validate_json(content) self.context.oauth_metadata = metadata - # Apply default scope if needed - if self.context.client_metadata.scope is None and metadata.scopes_supported is not None: - self.context.client_metadata.scope = " ".join(metadata.scopes_supported) + + # Only set scope if client_metadata.scope is None + if self.context.client_metadata.scope is None: + # Priority 1: Use PRM's scopes_supported if available + if ( + self.context.protected_resource_metadata is not None + and self.context.protected_resource_metadata.scopes_supported is not None + ): + self.context.client_metadata.scope = " ".join(self.context.protected_resource_metadata.scopes_supported) + # Priority 2: Fall back to OAuth metadata scopes if available + elif metadata.scopes_supported is not None: + self.context.client_metadata.scope = " ".join(metadata.scopes_supported) async def async_auth_flow(self, request: httpx.Request) -> AsyncGenerator[httpx.Request, httpx.Response]: """HTTPX auth flow integration.""" diff --git a/tests/client/test_auth.py b/tests/client/test_auth.py index 6e58e496d..b921b9147 100644 --- a/tests/client/test_auth.py +++ b/tests/client/test_auth.py @@ -79,6 +79,63 @@ async def callback_handler() -> tuple[str, str | None]: ) +@pytest.fixture +def oauth_provider_without_scope(oauth_provider: OAuthClientProvider) -> OAuthClientProvider: + """Create OAuth provider without predefined scope.""" + oauth_provider.context.client_metadata.scope = None + return oauth_provider + + +@pytest.fixture +def oauth_metadata_response(): + """Common OAuth metadata response with scopes.""" + return httpx.Response( + 200, + content=( + b'{"issuer": "https://auth.example.com", ' + b'"authorization_endpoint": "https://auth.example.com/authorize", ' + b'"token_endpoint": "https://auth.example.com/token", ' + b'"registration_endpoint": "https://auth.example.com/register", ' + b'"scopes_supported": ["read", "write", "admin"]}' + ), + ) + + +@pytest.fixture +def prm_metadata(): + """PRM metadata with scopes.""" + return ProtectedResourceMetadata( + resource=AnyHttpUrl("https://api.example.com/v1/mcp"), + authorization_servers=[AnyHttpUrl("https://auth.example.com")], + scopes_supported=["resource:read", "resource:write"], + ) + + +@pytest.fixture +def prm_metadata_without_scopes(): + """PRM metadata without scopes.""" + return ProtectedResourceMetadata( + resource=AnyHttpUrl("https://api.example.com/v1/mcp"), + authorization_servers=[AnyHttpUrl("https://auth.example.com")], + scopes_supported=None, + ) + + +@pytest.fixture +def oauth_metadata_response_without_scopes(): + """OAuth metadata response without scopes.""" + return httpx.Response( + 200, + content=( + b'{"issuer": "https://auth.example.com", ' + b'"authorization_endpoint": "https://auth.example.com/authorize", ' + b'"token_endpoint": "https://auth.example.com/token", ' + b'"registration_endpoint": "https://auth.example.com/register"}' + # No scopes_supported field + ), + ) + + class TestPKCEParameters: """Test PKCE parameter generation.""" @@ -391,6 +448,82 @@ async def test_handle_metadata_response_success(self, oauth_provider: OAuthClien assert oauth_provider.context.oauth_metadata is not None assert str(oauth_provider.context.oauth_metadata.issuer) == "https://auth.example.com/" + @pytest.mark.anyio + async def test_prioritize_prm_scopes_over_oauth_metadata( + self, + oauth_provider_without_scope: OAuthClientProvider, + oauth_metadata_response: httpx.Response, + prm_metadata: ProtectedResourceMetadata, + ): + """Test that PRM scopes are prioritized over auth server metadata scopes.""" + provider = oauth_provider_without_scope + + # Set up PRM metadata with specific scopes + provider.context.protected_resource_metadata = prm_metadata + + # Process the OAuth metadata + await provider._handle_oauth_metadata_response(oauth_metadata_response) + + # Verify that PRM scopes are used (not OAuth metadata scopes) + assert provider.context.client_metadata.scope == "resource:read resource:write" + + @pytest.mark.anyio + async def test_fallback_to_oauth_metadata_scopes_when_no_prm_scopes( + self, + oauth_provider_without_scope: OAuthClientProvider, + oauth_metadata_response: httpx.Response, + prm_metadata_without_scopes: ProtectedResourceMetadata, + ): + """Test fallback to OAuth metadata scopes when PRM has no scopes.""" + provider = oauth_provider_without_scope + + # Set up PRM metadata without scopes + provider.context.protected_resource_metadata = prm_metadata_without_scopes + + # Process the OAuth metadata + await provider._handle_oauth_metadata_response(oauth_metadata_response) + + # Verify that OAuth metadata scopes are used as fallback + assert provider.context.client_metadata.scope == "read write admin" + + @pytest.mark.anyio + async def test_no_scope_changes_when_both_missing( + self, + oauth_provider_without_scope: OAuthClientProvider, + prm_metadata_without_scopes: ProtectedResourceMetadata, + oauth_metadata_response_without_scopes: httpx.Response, + ): + """Test that no scope changes occur when both PRM and OAuth metadata lack scopes.""" + provider = oauth_provider_without_scope + + # Set up PRM metadata without scopes + provider.context.protected_resource_metadata = prm_metadata_without_scopes + + # Process the OAuth metadata + await provider._handle_oauth_metadata_response(oauth_metadata_response_without_scopes) + + # Verify that scope remains None + assert provider.context.client_metadata.scope is None + + @pytest.mark.anyio + async def test_preserve_existing_client_scope( + self, + oauth_provider: OAuthClientProvider, + oauth_metadata_response: httpx.Response, + prm_metadata: ProtectedResourceMetadata, + ): + """Test that existing client scope is preserved regardless of metadata.""" + provider = oauth_provider + + # Set up PRM metadata with scopes + provider.context.protected_resource_metadata = prm_metadata + + # Process the OAuth metadata + await provider._handle_oauth_metadata_response(oauth_metadata_response) + + # Verify that predefined scope is preserved + assert provider.context.client_metadata.scope == "read write" + @pytest.mark.anyio async def test_register_client_request(self, oauth_provider: OAuthClientProvider): """Test client registration request building."""