diff --git a/docs/mcp/client.md b/docs/mcp/client.md index 6c47dcf4e3..b10d3ddaf1 100644 --- a/docs/mcp/client.md +++ b/docs/mcp/client.md @@ -318,6 +318,85 @@ agent = Agent('openai:gpt-4o', toolsets=[weather_server, calculator_server]) MCP tools can include metadata that provides additional information about the tool's characteristics, which can be useful when [filtering tools][pydantic_ai.toolsets.FilteredToolset]. The `meta`, `annotations`, and `output_schema` fields can be found on the `metadata` dict on the [`ToolDefinition`][pydantic_ai.tools.ToolDefinition] object that's passed to filter functions. +## Resources + +MCP servers can provide [resources](https://modelcontextprotocol.io/docs/concepts/resources) - files, data, or content that can be accessed by the client. Resources in MCP are designed to be application-driven, with host applications determining how to incorporate context based on their needs. + +Pydantic AI provides methods to discover and read resources from MCP servers: + +- [`list_resources()`][pydantic_ai.mcp.MCPServer.list_resources] - List all available resources on the server +- [`list_resource_templates()`][pydantic_ai.mcp.MCPServer.list_resource_templates] - List resource templates with parameter placeholders +- [`read_resource(uri)`][pydantic_ai.mcp.MCPServer.read_resource] - Read the contents of a specific resource by URI + +Resources are automatically converted: text content is returned as `str`, and binary content is returned as [`BinaryContent`][pydantic_ai.messages.BinaryContent]. + +Before consuming resources, we need to run a server that exposes some: + +```python {title="mcp_resource_server.py"} +from mcp.server.fastmcp import FastMCP + +mcp = FastMCP('Pydantic AI MCP Server') +log_level = 'unset' + + +@mcp.resource('resource://user_name.txt', mime_type='text/plain') +async def user_name_resource() -> str: + return 'Alice' + + +if __name__ == '__main__': + mcp.run() +``` + +Then we can create the client: + +```python {title="mcp_resources.py", requires="mcp_resource_server.py"} +import asyncio + +from pydantic_ai import Agent +from pydantic_ai._run_context import RunContext +from pydantic_ai.mcp import MCPServerStdio +from pydantic_ai.models.test import TestModel + +agent = Agent( + model=TestModel(), + deps_type=str, + instructions="Use the customer's name while replying to them.", +) + + +@agent.instructions +def add_the_users_name(ctx: RunContext[str]) -> str: + return f"The user's name is {ctx.deps}." + + +async def main(): + server = MCPServerStdio('python', args=['-m', 'mcp_resource_server']) + + async with server: + # List all available resources + resources = await server.list_resources() + for resource in resources: + print(f' - {resource.name}: {resource.uri} ({resource.mime_type})') + #> - user_name_resource: resource://user_name.txt (text/plain) + + # Read a text resource + user_name = await server.read_resource('resource://user_name.txt') + print(f'Text content: {user_name}') + #> Text content: Alice + + # Use resources in dependencies + async with agent: + _ = await agent.run('Can you help me with my product?', deps=user_name) + + +if __name__ == '__main__': + asyncio.run(main()) +``` + +_(This example is complete, it can be run "as is")_ + + ## Custom TLS / SSL configuration In some environments you need to tweak how HTTPS connections are established – diff --git a/pydantic_ai_slim/pydantic_ai/_mcp.py b/pydantic_ai_slim/pydantic_ai/_mcp.py index 1e09246ccc..9dfef5f8aa 100644 --- a/pydantic_ai_slim/pydantic_ai/_mcp.py +++ b/pydantic_ai_slim/pydantic_ai/_mcp.py @@ -1,8 +1,12 @@ import base64 +from abc import ABC from collections.abc import Sequence -from typing import Literal +from dataclasses import dataclass +from typing import Annotated, Any, Literal -from . import exceptions, messages +from pydantic import Field + +from . import _utils, exceptions, messages try: from mcp import types as mcp_types @@ -13,6 +17,63 @@ ) from _import_error +@dataclass(repr=False, kw_only=True) +class ResourceAnnotations: + """Additional properties describing MCP entities.""" + + audience: list[mcp_types.Role] | None = None + """Intended audience for this entity.""" + + priority: Annotated[float, Field(ge=0.0, le=1.0)] | None = None + """Priority level for this entity, ranging from 0.0 to 1.0.""" + + __repr__ = _utils.dataclasses_no_defaults_repr + + +@dataclass(repr=False, kw_only=True) +class BaseResource(ABC): + """Base class for MCP resources.""" + + name: str + """The programmatic name of the resource.""" + + title: str | None = None + """Human-readable title for UI contexts.""" + + description: str | None = None + """A description of what this resource represents.""" + + mime_type: str | None = None + """The MIME type of the resource, if known.""" + + annotations: ResourceAnnotations | None = None + """Optional annotations for the resource.""" + + meta: dict[str, Any] | None = None + """Optional metadata for the resource.""" + + __repr__ = _utils.dataclasses_no_defaults_repr + + +@dataclass(repr=False, kw_only=True) +class Resource(BaseResource): + """A resource that can be read from an MCP server.""" + + uri: str + """The URI of the resource.""" + + size: int | None = None + """The size of the raw resource content in bytes (before base64 encoding), if known.""" + + +@dataclass(repr=False, kw_only=True) +class ResourceTemplate(BaseResource): + """A template for parameterized resources on an MCP server.""" + + uri_template: str + """URI template (RFC 6570) for constructing resource URIs.""" + + def map_from_mcp_params(params: mcp_types.CreateMessageRequestParams) -> list[messages.ModelMessage]: """Convert from MCP create message request parameters to pydantic-ai messages.""" pai_messages: list[messages.ModelMessage] = [] @@ -121,3 +182,38 @@ def map_from_sampling_content( return messages.TextPart(content=content.text) else: raise NotImplementedError('Image and Audio responses in sampling are not yet supported') + + +def map_from_mcp_resource(mcp_resource: mcp_types.Resource) -> Resource: + """Convert from MCP Resource to native Pydantic AI Resource.""" + return Resource( + uri=str(mcp_resource.uri), + name=mcp_resource.name, + title=mcp_resource.title, + description=mcp_resource.description, + mime_type=mcp_resource.mimeType, + size=mcp_resource.size, + annotations=( + ResourceAnnotations(audience=mcp_resource.annotations.audience, priority=mcp_resource.annotations.priority) + if mcp_resource.annotations + else None + ), + meta=mcp_resource.meta, + ) + + +def map_from_mcp_resource_template(mcp_template: mcp_types.ResourceTemplate) -> ResourceTemplate: + """Convert from MCP ResourceTemplate to native Pydantic AI ResourceTemplate.""" + return ResourceTemplate( + uri_template=mcp_template.uriTemplate, + name=mcp_template.name, + title=mcp_template.title, + description=mcp_template.description, + mime_type=mcp_template.mimeType, + annotations=( + ResourceAnnotations(audience=mcp_template.annotations.audience, priority=mcp_template.annotations.priority) + if mcp_template.annotations + else None + ), + meta=mcp_template.meta, + ) diff --git a/pydantic_ai_slim/pydantic_ai/mcp.py b/pydantic_ai_slim/pydantic_ai/mcp.py index 0fbcbee39f..7b237a34a3 100644 --- a/pydantic_ai_slim/pydantic_ai/mcp.py +++ b/pydantic_ai_slim/pydantic_ai/mcp.py @@ -10,13 +10,13 @@ from dataclasses import field, replace from datetime import timedelta from pathlib import Path -from typing import Annotated, Any +from typing import Annotated, Any, overload import anyio import httpx import pydantic_core from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream -from pydantic import BaseModel, Discriminator, Field, Tag +from pydantic import AnyUrl, BaseModel, Discriminator, Field, Tag from pydantic_core import CoreSchema, core_schema from typing_extensions import Self, assert_never, deprecated @@ -303,6 +303,52 @@ def tool_for_tool_def(self, tool_def: ToolDefinition) -> ToolsetTool[Any]: args_validator=TOOL_SCHEMA_VALIDATOR, ) + async def list_resources(self) -> list[_mcp.Resource]: + """Retrieve resources that are currently present on the server. + + Note: + - We don't cache resources as they might change. + - We also don't subscribe to resource changes to avoid complexity. + """ + async with self: # Ensure server is running + result = await self._client.list_resources() + return [_mcp.map_from_mcp_resource(r) for r in result.resources] + + async def list_resource_templates(self) -> list[_mcp.ResourceTemplate]: + """Retrieve resource templates that are currently present on the server.""" + async with self: # Ensure server is running + result = await self._client.list_resource_templates() + return [_mcp.map_from_mcp_resource_template(t) for t in result.resourceTemplates] + + @overload + async def read_resource(self, uri: str) -> str | messages.BinaryContent | list[str | messages.BinaryContent]: ... + + @overload + async def read_resource( + self, uri: _mcp.Resource + ) -> str | messages.BinaryContent | list[str | messages.BinaryContent]: ... + + async def read_resource( + self, uri: str | _mcp.Resource + ) -> str | messages.BinaryContent | list[str | messages.BinaryContent]: + """Read the contents of a specific resource by URI. + + Args: + uri: The URI of the resource to read, or a Resource object. + + Returns: + The resource contents. If the resource has a single content item, returns that item directly. + If the resource has multiple content items, returns a list of items. + """ + resource_uri = uri if isinstance(uri, str) else uri.uri + async with self: # Ensure server is running + result = await self._client.read_resource(AnyUrl(resource_uri)) + return ( + self._get_content(result.contents[0]) + if len(result.contents) == 1 + else [self._get_content(resource) for resource in result.contents] + ) + async def __aenter__(self) -> Self: """Enter the MCP server context. @@ -397,12 +443,7 @@ async def _map_tool_result_part( resource = part.resource return self._get_content(resource) elif isinstance(part, mcp_types.ResourceLink): - resource_result: mcp_types.ReadResourceResult = await self._client.read_resource(part.uri) - return ( - self._get_content(resource_result.contents[0]) - if len(resource_result.contents) == 1 - else [self._get_content(resource) for resource in resource_result.contents] - ) + return await self.read_resource(str(part.uri)) else: assert_never(part) diff --git a/tests/mcp_server.py b/tests/mcp_server.py index 54b105ab29..13ef2410c2 100644 --- a/tests/mcp_server.py +++ b/tests/mcp_server.py @@ -125,6 +125,12 @@ async def product_name_resource() -> str: return Path(__file__).parent.joinpath('assets/product_name.txt').read_text() +@mcp.resource('resource://greeting/{name}', mime_type='text/plain') +async def greeting_resource_template(name: str) -> str: + """Dynamic greeting resource template.""" + return f'Hello, {name}!' + + @mcp.tool() async def get_image() -> Image: data = Path(__file__).parent.joinpath('assets/kiwi.png').read_bytes() diff --git a/tests/test_mcp.py b/tests/test_mcp.py index 9342f49f10..b43ca94aa5 100644 --- a/tests/test_mcp.py +++ b/tests/test_mcp.py @@ -23,6 +23,7 @@ ToolReturnPart, UserPromptPart, ) +from pydantic_ai._mcp import Resource from pydantic_ai.agent import Agent from pydantic_ai.exceptions import ModelRetry, UnexpectedModelBehavior, UserError from pydantic_ai.mcp import MCPServerStreamableHTTP, load_mcp_servers @@ -37,7 +38,13 @@ from mcp import ErrorData, McpError, SamplingMessage from mcp.client.session import ClientSession from mcp.shared.context import RequestContext - from mcp.types import CreateMessageRequestParams, ElicitRequestParams, ElicitResult, ImageContent, TextContent + from mcp.types import ( + CreateMessageRequestParams, + ElicitRequestParams, + ElicitResult, + ImageContent, + TextContent, + ) from pydantic_ai._mcp import map_from_mcp_params, map_from_model_response from pydantic_ai.mcp import CallToolFunc, MCPServerSSE, MCPServerStdio, ToolResult @@ -314,6 +321,36 @@ async def test_log_level_unset(run_context: RunContext[int]): assert result == snapshot('unset') +async def test_stdio_server_list_resources(run_context: RunContext[int]): + server = MCPServerStdio('python', ['-m', 'tests.mcp_server']) + async with server: + resources = await server.list_resources() + assert len(resources) == snapshot(3) + + assert resources[0].uri == snapshot('resource://kiwi.png') + assert resources[0].mime_type == snapshot('image/png') + assert resources[0].name == snapshot('kiwi_resource') + + assert resources[1].uri == snapshot('resource://marcelo.mp3') + assert resources[1].mime_type == snapshot('audio/mpeg') + assert resources[1].name == snapshot('marcelo_resource') + + assert resources[2].uri == snapshot('resource://product_name.txt') + assert resources[2].mime_type == snapshot('text/plain') + assert resources[2].name == snapshot('product_name_resource') + + +async def test_stdio_server_list_resource_templates(run_context: RunContext[int]): + server = MCPServerStdio('python', ['-m', 'tests.mcp_server']) + async with server: + resource_templates = await server.list_resource_templates() + assert len(resource_templates) == snapshot(1) + + assert resource_templates[0].uri_template == snapshot('resource://greeting/{name}') + assert resource_templates[0].name == snapshot('greeting_resource_template') + assert resource_templates[0].description == snapshot('Dynamic greeting resource template.') + + async def test_log_level_set(run_context: RunContext[int]): server = MCPServerStdio('python', ['-m', 'tests.mcp_server'], log_level='info') assert server.log_level == 'info' @@ -1460,6 +1497,42 @@ async def test_elicitation_callback_not_set(run_context: RunContext[int]): await server.direct_call_tool('use_elicitation', {'question': 'Should I continue?'}) +async def test_read_text_resource(run_context: RunContext[int]): + """Test reading a text resource (converted to string).""" + server = MCPServerStdio('python', ['-m', 'tests.mcp_server']) + async with server: + # Test reading by URI string + content = await server.read_resource('resource://product_name.txt') + assert isinstance(content, str) + assert content == snapshot('Pydantic AI\n') + + # Test reading by Resource object + resource = Resource(uri='resource://product_name.txt', name='product_name_resource') + content_from_resource = await server.read_resource(resource) + assert isinstance(content_from_resource, str) + assert content_from_resource == snapshot('Pydantic AI\n') + + +async def test_read_blob_resource(run_context: RunContext[int]): + """Test reading a binary resource (converted to BinaryContent).""" + server = MCPServerStdio('python', ['-m', 'tests.mcp_server']) + async with server: + content = await server.read_resource('resource://kiwi.png') + assert isinstance(content, BinaryContent) + assert content.media_type == snapshot('image/png') + # Verify it's PNG data (starts with PNG magic bytes) + assert content.data[:8] == b'\x89PNG\r\n\x1a\n' # PNG magic bytes + + +async def test_read_resource_template(run_context: RunContext[int]): + """Test reading a resource template with parameters (converted to string).""" + server = MCPServerStdio('python', ['-m', 'tests.mcp_server']) + async with server: + content = await server.read_resource('resource://greeting/Alice') + assert isinstance(content, str) + assert content == snapshot('Hello, Alice!') + + def test_load_mcp_servers(tmp_path: Path): config = tmp_path / 'mcp.json'