From 9fde764ed1d583a93314a23a1ddc697c604c6589 Mon Sep 17 00:00:00 2001 From: Fenn Bailey Date: Mon, 29 Sep 2025 21:26:07 +1000 Subject: [PATCH 01/11] Add initial list_resources() method to MCPServer. --- pydantic_ai_slim/pydantic_ai/mcp.py | 11 +++++++++++ tests/test_mcp.py | 19 +++++++++++++++++++ 2 files changed, 30 insertions(+) diff --git a/pydantic_ai_slim/pydantic_ai/mcp.py b/pydantic_ai_slim/pydantic_ai/mcp.py index b61f254500..c296d7cf5d 100644 --- a/pydantic_ai_slim/pydantic_ai/mcp.py +++ b/pydantic_ai_slim/pydantic_ai/mcp.py @@ -275,6 +275,17 @@ def tool_for_tool_def(self, tool_def: ToolDefinition) -> ToolsetTool[Any]: args_validator=TOOL_SCHEMA_VALIDATOR, ) + async def list_resources(self) -> list[mcp_types.Resource]: + """Retrieve resources that are currently active on the server. + + Note: + - We don't cache resources as they might change. + - We also don't subscribe to resource changes to avoid complexity. + """ + async with self: # Ensure server is running + result = await self._client.list_resources() + return result.resources + async def __aenter__(self) -> Self: """Enter the MCP server context. diff --git a/tests/test_mcp.py b/tests/test_mcp.py index b6d99249c7..ed52a2e17b 100644 --- a/tests/test_mcp.py +++ b/tests/test_mcp.py @@ -318,6 +318,25 @@ async def test_log_level_unset(run_context: RunContext[int]): assert result == snapshot('unset') +async def test_stdio_server_list_resources(run_context: RunContext[int]): + server = MCPServerStdio('python', ['-m', 'tests.mcp_server']) + async with server: + resources = await server.list_resources() + assert len(resources) == snapshot(3) + + assert resources[0].uri == snapshot('resource://kiwi.png') + assert resources[0].mimeType == snapshot('image/png') + assert resources[0].name == snapshot('kiwi_resource') + + assert resources[1].uri == snapshot('resource://marcelo.mp3') + assert resources[1].mimeType == snapshot('audio/mpeg') + assert resources[1].name == snapshot('marcelo_resource') + + assert resources[2].uri == snapshot('resource://product_name.txt') + assert resources[2].mimeType == snapshot('text/plain') + assert resources[2].name == snapshot('product_name_resource') + + async def test_log_level_set(run_context: RunContext[int]): server = MCPServerStdio('python', ['-m', 'tests.mcp_server'], log_level='info') assert server.log_level == 'info' From a7404a2c6a42dbc4dc0bfed10c720ba96ae3bd77 Mon Sep 17 00:00:00 2001 From: Fenn Bailey Date: Mon, 29 Sep 2025 21:50:33 +1000 Subject: [PATCH 02/11] Add list_resource_templates() to MCPServer. --- pydantic_ai_slim/pydantic_ai/mcp.py | 8 +++++++- tests/mcp_server.py | 6 ++++++ tests/test_mcp.py | 11 +++++++++++ 3 files changed, 24 insertions(+), 1 deletion(-) diff --git a/pydantic_ai_slim/pydantic_ai/mcp.py b/pydantic_ai_slim/pydantic_ai/mcp.py index c296d7cf5d..1f1c8c9438 100644 --- a/pydantic_ai_slim/pydantic_ai/mcp.py +++ b/pydantic_ai_slim/pydantic_ai/mcp.py @@ -276,7 +276,7 @@ def tool_for_tool_def(self, tool_def: ToolDefinition) -> ToolsetTool[Any]: ) async def list_resources(self) -> list[mcp_types.Resource]: - """Retrieve resources that are currently active on the server. + """Retrieve resources that are currently present on the server. Note: - We don't cache resources as they might change. @@ -286,6 +286,12 @@ async def list_resources(self) -> list[mcp_types.Resource]: result = await self._client.list_resources() return result.resources + async def list_resource_templates(self) -> list[mcp_types.ResourceTemplate]: + """Retrieve resource templates that are currently present on the server.""" + async with self: # Ensure server is running + result = await self._client.list_resource_templates() + return result.resourceTemplates + async def __aenter__(self) -> Self: """Enter the MCP server context. diff --git a/tests/mcp_server.py b/tests/mcp_server.py index e7beef66d8..ef47ca81f3 100644 --- a/tests/mcp_server.py +++ b/tests/mcp_server.py @@ -124,6 +124,12 @@ async def product_name_resource() -> str: return Path(__file__).parent.joinpath('assets/product_name.txt').read_text() +@mcp.resource('resource://greeting/{name}', mime_type='text/plain') +async def greeting_resource_template(name: str) -> str: + """Dynamic greeting resource template.""" + return f'Hello, {name}!' + + @mcp.tool() async def get_image() -> Image: data = Path(__file__).parent.joinpath('assets/kiwi.png').read_bytes() diff --git a/tests/test_mcp.py b/tests/test_mcp.py index ed52a2e17b..bee477eb28 100644 --- a/tests/test_mcp.py +++ b/tests/test_mcp.py @@ -337,6 +337,17 @@ async def test_stdio_server_list_resources(run_context: RunContext[int]): assert resources[2].name == snapshot('product_name_resource') +async def test_stdio_server_list_resource_templates(run_context: RunContext[int]): + server = MCPServerStdio('python', ['-m', 'tests.mcp_server']) + async with server: + resource_templates = await server.list_resource_templates() + assert len(resource_templates) == snapshot(1) + + assert resource_templates[0].uriTemplate == snapshot('resource://greeting/{name}') + assert resource_templates[0].name == snapshot('greeting_resource_template') + assert resource_templates[0].description == snapshot('Dynamic greeting resource template.') + + async def test_log_level_set(run_context: RunContext[int]): server = MCPServerStdio('python', ['-m', 'tests.mcp_server'], log_level='info') assert server.log_level == 'info' From ecea4665f6d041842bf5992e05f89f8617a28d00 Mon Sep 17 00:00:00 2001 From: Fenn Bailey Date: Mon, 29 Sep 2025 22:13:03 +1000 Subject: [PATCH 03/11] Workaround limitations with inline-snapshot & AnyUrl. --- tests/test_mcp.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/test_mcp.py b/tests/test_mcp.py index bee477eb28..cc3255101c 100644 --- a/tests/test_mcp.py +++ b/tests/test_mcp.py @@ -324,15 +324,15 @@ async def test_stdio_server_list_resources(run_context: RunContext[int]): resources = await server.list_resources() assert len(resources) == snapshot(3) - assert resources[0].uri == snapshot('resource://kiwi.png') + assert str(resources[0].uri) == snapshot('resource://kiwi.png') assert resources[0].mimeType == snapshot('image/png') assert resources[0].name == snapshot('kiwi_resource') - assert resources[1].uri == snapshot('resource://marcelo.mp3') + assert str(resources[1].uri) == snapshot('resource://marcelo.mp3') assert resources[1].mimeType == snapshot('audio/mpeg') assert resources[1].name == snapshot('marcelo_resource') - assert resources[2].uri == snapshot('resource://product_name.txt') + assert str(resources[2].uri) == snapshot('resource://product_name.txt') assert resources[2].mimeType == snapshot('text/plain') assert resources[2].name == snapshot('product_name_resource') From 18862997b3bf15446757478bfb4e744b31f6cec9 Mon Sep 17 00:00:00 2001 From: Fenn Bailey Date: Mon, 29 Sep 2025 22:41:23 +1000 Subject: [PATCH 04/11] Add basic read_resource() to MCPServer. --- pydantic_ai_slim/pydantic_ai/mcp.py | 15 +++++++- tests/test_mcp.py | 56 ++++++++++++++++++++++++++++- 2 files changed, 69 insertions(+), 2 deletions(-) diff --git a/pydantic_ai_slim/pydantic_ai/mcp.py b/pydantic_ai_slim/pydantic_ai/mcp.py index 1f1c8c9438..078e89b6ab 100644 --- a/pydantic_ai_slim/pydantic_ai/mcp.py +++ b/pydantic_ai_slim/pydantic_ai/mcp.py @@ -16,7 +16,7 @@ import httpx import pydantic_core from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream -from pydantic import BaseModel, Discriminator, Field, Tag +from pydantic import AnyUrl, BaseModel, Discriminator, Field, Tag from pydantic_core import CoreSchema, core_schema from typing_extensions import Self, assert_never, deprecated @@ -292,6 +292,19 @@ async def list_resource_templates(self) -> list[mcp_types.ResourceTemplate]: result = await self._client.list_resource_templates() return result.resourceTemplates + async def read_resource(self, uri: str) -> list[mcp_types.TextResourceContents | mcp_types.BlobResourceContents]: + """Read the contents of a specific resource by URI. + + Args: + uri: The URI of the resource to read. + + Returns: + A list of resource contents (either TextResourceContents or BlobResourceContents). + """ + async with self: # Ensure server is running + result = await self._client.read_resource(AnyUrl(uri)) + return result.contents + async def __aenter__(self) -> Self: """Enter the MCP server context. diff --git a/tests/test_mcp.py b/tests/test_mcp.py index cc3255101c..8b046456a9 100644 --- a/tests/test_mcp.py +++ b/tests/test_mcp.py @@ -37,7 +37,15 @@ from mcp import ErrorData, McpError, SamplingMessage from mcp.client.session import ClientSession from mcp.shared.context import RequestContext - from mcp.types import CreateMessageRequestParams, ElicitRequestParams, ElicitResult, ImageContent, TextContent + from mcp.types import ( + BlobResourceContents, + CreateMessageRequestParams, + ElicitRequestParams, + ElicitResult, + ImageContent, + TextContent, + TextResourceContents, + ) from pydantic_ai._mcp import map_from_mcp_params, map_from_model_response from pydantic_ai.mcp import CallToolFunc, MCPServerSSE, MCPServerStdio, ToolResult @@ -1424,6 +1432,52 @@ async def test_elicitation_callback_not_set(run_context: RunContext[int]): await server.direct_call_tool('use_elicitation', {'question': 'Should I continue?'}) +async def test_read_text_resource(run_context: RunContext[int]): + """Test reading a text resource (TextResourceContents).""" + server = MCPServerStdio('python', ['-m', 'tests.mcp_server']) + async with server: + contents = await server.read_resource('resource://product_name.txt') + assert len(contents) == snapshot(1) + + content = contents[0] + assert str(content.uri) == snapshot('resource://product_name.txt') + assert content.mimeType == snapshot('text/plain') + assert isinstance(content, TextResourceContents) + assert content.text == snapshot('Pydantic AI\n') + + +async def test_read_blob_resource(run_context: RunContext[int]): + """Test reading a binary resource (BlobResourceContents).""" + server = MCPServerStdio('python', ['-m', 'tests.mcp_server']) + async with server: + contents = await server.read_resource('resource://kiwi.png') + assert len(contents) == snapshot(1) + + content = contents[0] + assert str(content.uri) == snapshot('resource://kiwi.png') + assert content.mimeType == snapshot('image/png') + assert isinstance(content, BlobResourceContents) + # blob should be base64 encoded string + assert isinstance(content.blob, str) + # Decode and verify it's PNG data (starts with PNG magic bytes) + decoded_data = base64.b64decode(content.blob) + assert decoded_data[:8] == b'\x89PNG\r\n\x1a\n' # PNG magic bytes + + +async def test_read_resource_template(run_context: RunContext[int]): + """Test reading a resource template with parameters (TextResourceContents).""" + server = MCPServerStdio('python', ['-m', 'tests.mcp_server']) + async with server: + contents = await server.read_resource('resource://greeting/Alice') + assert len(contents) == snapshot(1) + + content = contents[0] + assert str(content.uri) == snapshot('resource://greeting/Alice') + assert content.mimeType == snapshot('text/plain') + assert isinstance(content, TextResourceContents) + assert content.text == snapshot('Hello, Alice!') + + def test_load_mcp_servers(tmp_path: Path): config = tmp_path / 'mcp.json' From dfb54dbb8e345653b023fdc71b466b7168d7f50f Mon Sep 17 00:00:00 2001 From: Fenn Bailey Date: Mon, 29 Sep 2025 23:23:36 +1000 Subject: [PATCH 05/11] Update mcp client docs to add Resources section. --- docs/mcp/client.md | 108 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 108 insertions(+) diff --git a/docs/mcp/client.md b/docs/mcp/client.md index 668cb91750..ba91b08899 100644 --- a/docs/mcp/client.md +++ b/docs/mcp/client.md @@ -322,6 +322,114 @@ agent = Agent('openai:gpt-4o', toolsets=[weather_server, calculator_server]) MCP tools can include metadata that provides additional information about the tool's characteristics, which can be useful when [filtering tools][pydantic_ai.toolsets.FilteredToolset]. The `meta`, `annotations`, and `output_schema` fields can be found on the `metadata` dict on the [`ToolDefinition`][pydantic_ai.tools.ToolDefinition] object that's passed to filter functions. +## Resources + +MCP servers can provide [resources](https://modelcontextprotocol.io/docs/concepts/resources) - files, data, or content that can be accessed by the client. Resources in MCP are designed to be application-driven, with host applications determining how to incorporate context based on their needs. + +Pydantic AI provides methods to discover and read resources from MCP servers: + +- [`list_resources()`][pydantic_ai.mcp.MCPServer.list_resources] - List all available resources on the server +- [`list_resource_templates()`][pydantic_ai.mcp.MCPServer.list_resource_templates] - List resource templates with parameter placeholders +- [`read_resource(uri)`][pydantic_ai.mcp.MCPServer.read_resource] - Read the contents of a specific resource by URI + +Resources can contain either text content ([`TextResourceContents`][mcp.types.TextResourceContents]) or binary content ([`BlobResourceContents`][mcp.types.BlobResourceContents]) encoded as base64. + +```python {title="mcp_resources.py"} +import asyncio +import base64 +from dataclasses import dataclass + +from mcp.types import BlobResourceContents, TextResourceContents +from pydantic_ai import Agent +from pydantic_ai._run_context import RunContext +from pydantic_ai.mcp import MCPServerStdio +from pydantic_ai.models.test import TestModel + + +@dataclass +class Deps: + product_name: str + + +agent = Agent( + model=TestModel(), + deps_type=Deps, + instructions='Be sure to give advice related to the users product.', +) + + +@agent.instructions +def add_the_users_product(ctx: RunContext[Deps]) -> str: + return f"The user's product is {ctx.deps.product_name}." + + +async def main(): + server = MCPServerStdio('python', args=['-m', 'tests.mcp_server']) + + async with server: + # List all available resources + resources = await server.list_resources() + print(f'Found {len(resources)} resources:') + for resource in resources: + print(f' - {resource.name}: {resource.uri} ({resource.mimeType})') + + # List resource templates (with parameters) + templates = await server.list_resource_templates() + print(f'\nFound {len(templates)} resource templates:') + for template in templates: + print(f' - {template.name}: {template.uriTemplate}') + + # Read a text resource + text_contents = await server.read_resource('resource://product_name.txt') + for content in text_contents: + if isinstance(content, TextResourceContents): + print(f'\nText content from {content.uri}: {content.text.strip()}') + + # Read a binary resource + binary_contents = await server.read_resource('resource://kiwi.png') + for content in binary_contents: + if isinstance(content, BlobResourceContents): + binary_data = base64.b64decode(content.blob) + print(f'\nBinary content from {content.uri}: {len(binary_data)} bytes') + + # Read from a resource template with parameters + greeting_contents = await server.read_resource('resource://greeting/Alice') + for content in greeting_contents: + if isinstance(content, TextResourceContents): + print(f'\nTemplate content: {content.text}') + + # Use resources in dependencies + async with agent: + product_name = text_contents[0].text + deps = Deps(product_name=product_name) + print(f'\nDeps: {deps}') + result = await agent.run('Can you help me with my product?', deps=deps) + print(result.output) + + +if __name__ == '__main__': + asyncio.run(main()) + + #> Found 3 resources: + #> - kiwi_resource: resource://kiwi.png (image/png) + #> - marcelo_resource: resource://marcelo.mp3 (audio/mpeg) + #> - product_name_resource: resource://product_name.txt (text/plain) + #> + #> Found 1 resource templates: + #> - greeting_resource_template: resource://greeting/{name} + #> + #> Text content from resource://product_name.txt: Pydantic AI + #> + #> Binary content from resource://kiwi.png: 2084609 bytes + #> + #> Template content: Hello, Alice! + #> + #> Deps: Deps(product_name='Pydantic AI\n') +``` + +_(This example is complete, it can be run "as is")_ + + ## Custom TLS / SSL configuration In some environments you need to tweak how HTTPS connections are established – From f52bb71a71f6b6a72fbc5e05925dcbcc2f3eb73b Mon Sep 17 00:00:00 2001 From: Fenn Bailey Date: Mon, 29 Sep 2025 23:35:22 +1000 Subject: [PATCH 06/11] Organize imports in doc examples. --- docs/mcp/client.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/mcp/client.md b/docs/mcp/client.md index ba91b08899..c880a354f3 100644 --- a/docs/mcp/client.md +++ b/docs/mcp/client.md @@ -340,6 +340,7 @@ import base64 from dataclasses import dataclass from mcp.types import BlobResourceContents, TextResourceContents + from pydantic_ai import Agent from pydantic_ai._run_context import RunContext from pydantic_ai.mcp import MCPServerStdio From 232ff04da09228e61acbeb7d6d771a1d7ba58406 Mon Sep 17 00:00:00 2001 From: Fenn Bailey Date: Tue, 30 Sep 2025 00:23:53 +1000 Subject: [PATCH 07/11] Update MCP resource example docs. --- docs/mcp/client.md | 93 +++++++++++++++++----------------------------- 1 file changed, 34 insertions(+), 59 deletions(-) diff --git a/docs/mcp/client.md b/docs/mcp/client.md index c880a354f3..0cf62dee55 100644 --- a/docs/mcp/client.md +++ b/docs/mcp/client.md @@ -334,98 +334,73 @@ Pydantic AI provides methods to discover and read resources from MCP servers: Resources can contain either text content ([`TextResourceContents`][mcp.types.TextResourceContents]) or binary content ([`BlobResourceContents`][mcp.types.BlobResourceContents]) encoded as base64. -```python {title="mcp_resources.py"} +Before consuming resources, we need to run a server that exposes some: + +```python {title="mcp_resource_server.py"} +from mcp.server.fastmcp import FastMCP + +mcp = FastMCP('Pydantic AI MCP Server') +log_level = 'unset' + + +@mcp.resource('resource://user_name.txt', mime_type='text/plain') +async def user_name_resource() -> str: + return 'Alice' + + +if __name__ == '__main__': + mcp.run() +``` + +Then we can create the client: + +```python {title="mcp_resources.py", requires="mcp_resource_server.py"} import asyncio -import base64 -from dataclasses import dataclass -from mcp.types import BlobResourceContents, TextResourceContents +from mcp.types import TextResourceContents from pydantic_ai import Agent from pydantic_ai._run_context import RunContext from pydantic_ai.mcp import MCPServerStdio from pydantic_ai.models.test import TestModel - -@dataclass -class Deps: - product_name: str - - agent = Agent( model=TestModel(), - deps_type=Deps, - instructions='Be sure to give advice related to the users product.', + deps_type=str, + instructions="Use the customer's name while replying to them.", ) @agent.instructions -def add_the_users_product(ctx: RunContext[Deps]) -> str: - return f"The user's product is {ctx.deps.product_name}." +def add_the_users_name(ctx: RunContext[str]) -> str: + return f"The user's name is {ctx.deps}." async def main(): - server = MCPServerStdio('python', args=['-m', 'tests.mcp_server']) + server = MCPServerStdio('python', args=['-m', 'mcp_resource_server']) async with server: # List all available resources resources = await server.list_resources() - print(f'Found {len(resources)} resources:') for resource in resources: - print(f' - {resource.name}: {resource.uri} ({resource.mimeType})') - - # List resource templates (with parameters) - templates = await server.list_resource_templates() - print(f'\nFound {len(templates)} resource templates:') - for template in templates: - print(f' - {template.name}: {template.uriTemplate}') + print(f' - {resource.name}: {resource.uri} ({resource.mimeType})') + #> - user_name_resource: resource://user_name.txt (text/plain) # Read a text resource - text_contents = await server.read_resource('resource://product_name.txt') + text_contents = await server.read_resource('resource://user_name.txt') for content in text_contents: if isinstance(content, TextResourceContents): - print(f'\nText content from {content.uri}: {content.text.strip()}') - - # Read a binary resource - binary_contents = await server.read_resource('resource://kiwi.png') - for content in binary_contents: - if isinstance(content, BlobResourceContents): - binary_data = base64.b64decode(content.blob) - print(f'\nBinary content from {content.uri}: {len(binary_data)} bytes') - - # Read from a resource template with parameters - greeting_contents = await server.read_resource('resource://greeting/Alice') - for content in greeting_contents: - if isinstance(content, TextResourceContents): - print(f'\nTemplate content: {content.text}') + print(f'Text content from {content.uri}: {content.text.strip()}') + #> Text content from resource://user_name.txt: Alice # Use resources in dependencies async with agent: - product_name = text_contents[0].text - deps = Deps(product_name=product_name) - print(f'\nDeps: {deps}') - result = await agent.run('Can you help me with my product?', deps=deps) - print(result.output) + user_name = text_contents[0].text + _ = await agent.run('Can you help me with my product?', deps=user_name) if __name__ == '__main__': asyncio.run(main()) - - #> Found 3 resources: - #> - kiwi_resource: resource://kiwi.png (image/png) - #> - marcelo_resource: resource://marcelo.mp3 (audio/mpeg) - #> - product_name_resource: resource://product_name.txt (text/plain) - #> - #> Found 1 resource templates: - #> - greeting_resource_template: resource://greeting/{name} - #> - #> Text content from resource://product_name.txt: Pydantic AI - #> - #> Binary content from resource://kiwi.png: 2084609 bytes - #> - #> Template content: Hello, Alice! - #> - #> Deps: Deps(product_name='Pydantic AI\n') ``` _(This example is complete, it can be run "as is")_ From 317416c0566058e303d7a5b548113e7f42ef22f1 Mon Sep 17 00:00:00 2001 From: Fenn Bailey Date: Mon, 6 Oct 2025 22:03:27 +1100 Subject: [PATCH 08/11] Create native Resource and ResourceTemplate types and update MCPServer to return these in preference to `mcp` upstream types. --- docs/mcp/client.md | 2 +- pydantic_ai_slim/pydantic_ai/_mcp.py | 77 +++++++++++++++++++++++++++- pydantic_ai_slim/pydantic_ai/mcp.py | 8 +-- tests/test_mcp.py | 14 ++--- 4 files changed, 87 insertions(+), 14 deletions(-) diff --git a/docs/mcp/client.md b/docs/mcp/client.md index 4d9e558e6f..fcc1b6f510 100644 --- a/docs/mcp/client.md +++ b/docs/mcp/client.md @@ -379,7 +379,7 @@ async def main(): # List all available resources resources = await server.list_resources() for resource in resources: - print(f' - {resource.name}: {resource.uri} ({resource.mimeType})') + print(f' - {resource.name}: {resource.uri} ({resource.mime_type})') #> - user_name_resource: resource://user_name.txt (text/plain) # Read a text resource diff --git a/pydantic_ai_slim/pydantic_ai/_mcp.py b/pydantic_ai_slim/pydantic_ai/_mcp.py index 1e09246ccc..9475b407c9 100644 --- a/pydantic_ai_slim/pydantic_ai/_mcp.py +++ b/pydantic_ai_slim/pydantic_ai/_mcp.py @@ -1,8 +1,10 @@ import base64 +from abc import ABC from collections.abc import Sequence -from typing import Literal +from dataclasses import dataclass +from typing import Any, Literal -from . import exceptions, messages +from . import _utils, exceptions, messages try: from mcp import types as mcp_types @@ -13,6 +15,50 @@ ) from _import_error +@dataclass(repr=False, kw_only=True) +class BaseResource(ABC): + """Base class for MCP resources.""" + + name: str + """The programmatic name of the resource.""" + + title: str | None = None + """Human-readable title for UI contexts.""" + + description: str | None = None + """A description of what this resource represents.""" + + mime_type: str | None = None + """The MIME type of the resource, if known.""" + + annotations: dict[str, Any] | None = None + """Optional annotations for the resource.""" + + meta: dict[str, Any] | None = None + """Optional metadata for the resource.""" + + __repr__ = _utils.dataclasses_no_defaults_repr + + +@dataclass(repr=False, kw_only=True) +class Resource(BaseResource): + """A resource that can be read from an MCP server.""" + + uri: str + """The URI of the resource.""" + + size: int | None = None + """The size of the raw resource content in bytes (before base64 encoding), if known.""" + + +@dataclass(repr=False, kw_only=True) +class ResourceTemplate(BaseResource): + """A template for parameterized resources on an MCP server.""" + + uri_template: str + """URI template (RFC 6570) for constructing resource URIs.""" + + def map_from_mcp_params(params: mcp_types.CreateMessageRequestParams) -> list[messages.ModelMessage]: """Convert from MCP create message request parameters to pydantic-ai messages.""" pai_messages: list[messages.ModelMessage] = [] @@ -121,3 +167,30 @@ def map_from_sampling_content( return messages.TextPart(content=content.text) else: raise NotImplementedError('Image and Audio responses in sampling are not yet supported') + + +def map_from_mcp_resource(mcp_resource: mcp_types.Resource) -> Resource: + """Convert from MCP Resource to native Pydantic AI Resource.""" + return Resource( + uri=str(mcp_resource.uri), + name=mcp_resource.name, + title=mcp_resource.title, + description=mcp_resource.description, + mime_type=mcp_resource.mimeType, + size=mcp_resource.size, + annotations=mcp_resource.annotations.model_dump() if mcp_resource.annotations else None, + meta=mcp_resource.meta, + ) + + +def map_from_mcp_resource_template(mcp_template: mcp_types.ResourceTemplate) -> ResourceTemplate: + """Convert from MCP ResourceTemplate to native Pydantic AI ResourceTemplate.""" + return ResourceTemplate( + uri_template=mcp_template.uriTemplate, + name=mcp_template.name, + title=mcp_template.title, + description=mcp_template.description, + mime_type=mcp_template.mimeType, + annotations=mcp_template.annotations.model_dump() if mcp_template.annotations else None, + meta=mcp_template.meta, + ) diff --git a/pydantic_ai_slim/pydantic_ai/mcp.py b/pydantic_ai_slim/pydantic_ai/mcp.py index e39ce6f354..b40fc59ce8 100644 --- a/pydantic_ai_slim/pydantic_ai/mcp.py +++ b/pydantic_ai_slim/pydantic_ai/mcp.py @@ -303,7 +303,7 @@ def tool_for_tool_def(self, tool_def: ToolDefinition) -> ToolsetTool[Any]: args_validator=TOOL_SCHEMA_VALIDATOR, ) - async def list_resources(self) -> list[mcp_types.Resource]: + async def list_resources(self) -> list[_mcp.Resource]: """Retrieve resources that are currently present on the server. Note: @@ -312,13 +312,13 @@ async def list_resources(self) -> list[mcp_types.Resource]: """ async with self: # Ensure server is running result = await self._client.list_resources() - return result.resources + return [_mcp.map_from_mcp_resource(r) for r in result.resources] - async def list_resource_templates(self) -> list[mcp_types.ResourceTemplate]: + async def list_resource_templates(self) -> list[_mcp.ResourceTemplate]: """Retrieve resource templates that are currently present on the server.""" async with self: # Ensure server is running result = await self._client.list_resource_templates() - return result.resourceTemplates + return [_mcp.map_from_mcp_resource_template(t) for t in result.resourceTemplates] async def read_resource(self, uri: str) -> list[mcp_types.TextResourceContents | mcp_types.BlobResourceContents]: """Read the contents of a specific resource by URI. diff --git a/tests/test_mcp.py b/tests/test_mcp.py index e925f5db14..735082e6e1 100644 --- a/tests/test_mcp.py +++ b/tests/test_mcp.py @@ -328,16 +328,16 @@ async def test_stdio_server_list_resources(run_context: RunContext[int]): resources = await server.list_resources() assert len(resources) == snapshot(3) - assert str(resources[0].uri) == snapshot('resource://kiwi.png') - assert resources[0].mimeType == snapshot('image/png') + assert resources[0].uri == snapshot('resource://kiwi.png') + assert resources[0].mime_type == snapshot('image/png') assert resources[0].name == snapshot('kiwi_resource') - assert str(resources[1].uri) == snapshot('resource://marcelo.mp3') - assert resources[1].mimeType == snapshot('audio/mpeg') + assert resources[1].uri == snapshot('resource://marcelo.mp3') + assert resources[1].mime_type == snapshot('audio/mpeg') assert resources[1].name == snapshot('marcelo_resource') - assert str(resources[2].uri) == snapshot('resource://product_name.txt') - assert resources[2].mimeType == snapshot('text/plain') + assert resources[2].uri == snapshot('resource://product_name.txt') + assert resources[2].mime_type == snapshot('text/plain') assert resources[2].name == snapshot('product_name_resource') @@ -347,7 +347,7 @@ async def test_stdio_server_list_resource_templates(run_context: RunContext[int] resource_templates = await server.list_resource_templates() assert len(resource_templates) == snapshot(1) - assert resource_templates[0].uriTemplate == snapshot('resource://greeting/{name}') + assert resource_templates[0].uri_template == snapshot('resource://greeting/{name}') assert resource_templates[0].name == snapshot('greeting_resource_template') assert resource_templates[0].description == snapshot('Dynamic greeting resource template.') From 67f9b077c616d2e12a733bdbf2e20c6b6b44f389 Mon Sep 17 00:00:00 2001 From: Fenn Bailey Date: Mon, 6 Oct 2025 22:30:06 +1100 Subject: [PATCH 09/11] Update MCPServer.read_resource() to decode/return native types. --- docs/mcp/client.md | 13 +++----- pydantic_ai_slim/pydantic_ai/mcp.py | 18 +++++------ tests/test_mcp.py | 47 +++++++++-------------------- 3 files changed, 27 insertions(+), 51 deletions(-) diff --git a/docs/mcp/client.md b/docs/mcp/client.md index fcc1b6f510..b10d3ddaf1 100644 --- a/docs/mcp/client.md +++ b/docs/mcp/client.md @@ -328,7 +328,7 @@ Pydantic AI provides methods to discover and read resources from MCP servers: - [`list_resource_templates()`][pydantic_ai.mcp.MCPServer.list_resource_templates] - List resource templates with parameter placeholders - [`read_resource(uri)`][pydantic_ai.mcp.MCPServer.read_resource] - Read the contents of a specific resource by URI -Resources can contain either text content ([`TextResourceContents`][mcp.types.TextResourceContents]) or binary content ([`BlobResourceContents`][mcp.types.BlobResourceContents]) encoded as base64. +Resources are automatically converted: text content is returned as `str`, and binary content is returned as [`BinaryContent`][pydantic_ai.messages.BinaryContent]. Before consuming resources, we need to run a server that exposes some: @@ -353,8 +353,6 @@ Then we can create the client: ```python {title="mcp_resources.py", requires="mcp_resource_server.py"} import asyncio -from mcp.types import TextResourceContents - from pydantic_ai import Agent from pydantic_ai._run_context import RunContext from pydantic_ai.mcp import MCPServerStdio @@ -383,15 +381,12 @@ async def main(): #> - user_name_resource: resource://user_name.txt (text/plain) # Read a text resource - text_contents = await server.read_resource('resource://user_name.txt') - for content in text_contents: - if isinstance(content, TextResourceContents): - print(f'Text content from {content.uri}: {content.text.strip()}') - #> Text content from resource://user_name.txt: Alice + user_name = await server.read_resource('resource://user_name.txt') + print(f'Text content: {user_name}') + #> Text content: Alice # Use resources in dependencies async with agent: - user_name = text_contents[0].text _ = await agent.run('Can you help me with my product?', deps=user_name) diff --git a/pydantic_ai_slim/pydantic_ai/mcp.py b/pydantic_ai_slim/pydantic_ai/mcp.py index b40fc59ce8..cde87f9298 100644 --- a/pydantic_ai_slim/pydantic_ai/mcp.py +++ b/pydantic_ai_slim/pydantic_ai/mcp.py @@ -320,18 +320,23 @@ async def list_resource_templates(self) -> list[_mcp.ResourceTemplate]: result = await self._client.list_resource_templates() return [_mcp.map_from_mcp_resource_template(t) for t in result.resourceTemplates] - async def read_resource(self, uri: str) -> list[mcp_types.TextResourceContents | mcp_types.BlobResourceContents]: + async def read_resource(self, uri: str) -> str | messages.BinaryContent | list[str | messages.BinaryContent]: """Read the contents of a specific resource by URI. Args: uri: The URI of the resource to read. Returns: - A list of resource contents (either TextResourceContents or BlobResourceContents). + The resource contents. If the resource has a single content item, returns that item directly. + If the resource has multiple content items, returns a list of items. """ async with self: # Ensure server is running result = await self._client.read_resource(AnyUrl(uri)) - return result.contents + return ( + self._get_content(result.contents[0]) + if len(result.contents) == 1 + else [self._get_content(resource) for resource in result.contents] + ) async def __aenter__(self) -> Self: """Enter the MCP server context. @@ -427,12 +432,7 @@ async def _map_tool_result_part( resource = part.resource return self._get_content(resource) elif isinstance(part, mcp_types.ResourceLink): - resource_result: mcp_types.ReadResourceResult = await self._client.read_resource(part.uri) - return ( - self._get_content(resource_result.contents[0]) - if len(resource_result.contents) == 1 - else [self._get_content(resource) for resource in resource_result.contents] - ) + return await self.read_resource(str(part.uri)) else: assert_never(part) diff --git a/tests/test_mcp.py b/tests/test_mcp.py index 735082e6e1..8d4591697b 100644 --- a/tests/test_mcp.py +++ b/tests/test_mcp.py @@ -38,13 +38,11 @@ from mcp.client.session import ClientSession from mcp.shared.context import RequestContext from mcp.types import ( - BlobResourceContents, CreateMessageRequestParams, ElicitRequestParams, ElicitResult, ImageContent, TextContent, - TextResourceContents, ) from pydantic_ai._mcp import map_from_mcp_params, map_from_model_response @@ -1499,49 +1497,32 @@ async def test_elicitation_callback_not_set(run_context: RunContext[int]): async def test_read_text_resource(run_context: RunContext[int]): - """Test reading a text resource (TextResourceContents).""" + """Test reading a text resource (converted to string).""" server = MCPServerStdio('python', ['-m', 'tests.mcp_server']) async with server: - contents = await server.read_resource('resource://product_name.txt') - assert len(contents) == snapshot(1) - - content = contents[0] - assert str(content.uri) == snapshot('resource://product_name.txt') - assert content.mimeType == snapshot('text/plain') - assert isinstance(content, TextResourceContents) - assert content.text == snapshot('Pydantic AI\n') + content = await server.read_resource('resource://product_name.txt') + assert isinstance(content, str) + assert content == snapshot('Pydantic AI\n') async def test_read_blob_resource(run_context: RunContext[int]): - """Test reading a binary resource (BlobResourceContents).""" + """Test reading a binary resource (converted to BinaryContent).""" server = MCPServerStdio('python', ['-m', 'tests.mcp_server']) async with server: - contents = await server.read_resource('resource://kiwi.png') - assert len(contents) == snapshot(1) - - content = contents[0] - assert str(content.uri) == snapshot('resource://kiwi.png') - assert content.mimeType == snapshot('image/png') - assert isinstance(content, BlobResourceContents) - # blob should be base64 encoded string - assert isinstance(content.blob, str) - # Decode and verify it's PNG data (starts with PNG magic bytes) - decoded_data = base64.b64decode(content.blob) - assert decoded_data[:8] == b'\x89PNG\r\n\x1a\n' # PNG magic bytes + content = await server.read_resource('resource://kiwi.png') + assert isinstance(content, BinaryContent) + assert content.media_type == snapshot('image/png') + # Verify it's PNG data (starts with PNG magic bytes) + assert content.data[:8] == b'\x89PNG\r\n\x1a\n' # PNG magic bytes async def test_read_resource_template(run_context: RunContext[int]): - """Test reading a resource template with parameters (TextResourceContents).""" + """Test reading a resource template with parameters (converted to string).""" server = MCPServerStdio('python', ['-m', 'tests.mcp_server']) async with server: - contents = await server.read_resource('resource://greeting/Alice') - assert len(contents) == snapshot(1) - - content = contents[0] - assert str(content.uri) == snapshot('resource://greeting/Alice') - assert content.mimeType == snapshot('text/plain') - assert isinstance(content, TextResourceContents) - assert content.text == snapshot('Hello, Alice!') + content = await server.read_resource('resource://greeting/Alice') + assert isinstance(content, str) + assert content == snapshot('Hello, Alice!') def test_load_mcp_servers(tmp_path: Path): From e6cb0864e821502c6dc9c554f92b445f775e349b Mon Sep 17 00:00:00 2001 From: Fenn Bailey Date: Mon, 6 Oct 2025 23:12:23 +1100 Subject: [PATCH 10/11] Add native MCP ResourceAnnotations type. --- pydantic_ai_slim/pydantic_ai/_mcp.py | 31 ++++++++++++++++++++++++---- 1 file changed, 27 insertions(+), 4 deletions(-) diff --git a/pydantic_ai_slim/pydantic_ai/_mcp.py b/pydantic_ai_slim/pydantic_ai/_mcp.py index 9475b407c9..9dfef5f8aa 100644 --- a/pydantic_ai_slim/pydantic_ai/_mcp.py +++ b/pydantic_ai_slim/pydantic_ai/_mcp.py @@ -2,7 +2,9 @@ from abc import ABC from collections.abc import Sequence from dataclasses import dataclass -from typing import Any, Literal +from typing import Annotated, Any, Literal + +from pydantic import Field from . import _utils, exceptions, messages @@ -15,6 +17,19 @@ ) from _import_error +@dataclass(repr=False, kw_only=True) +class ResourceAnnotations: + """Additional properties describing MCP entities.""" + + audience: list[mcp_types.Role] | None = None + """Intended audience for this entity.""" + + priority: Annotated[float, Field(ge=0.0, le=1.0)] | None = None + """Priority level for this entity, ranging from 0.0 to 1.0.""" + + __repr__ = _utils.dataclasses_no_defaults_repr + + @dataclass(repr=False, kw_only=True) class BaseResource(ABC): """Base class for MCP resources.""" @@ -31,7 +46,7 @@ class BaseResource(ABC): mime_type: str | None = None """The MIME type of the resource, if known.""" - annotations: dict[str, Any] | None = None + annotations: ResourceAnnotations | None = None """Optional annotations for the resource.""" meta: dict[str, Any] | None = None @@ -178,7 +193,11 @@ def map_from_mcp_resource(mcp_resource: mcp_types.Resource) -> Resource: description=mcp_resource.description, mime_type=mcp_resource.mimeType, size=mcp_resource.size, - annotations=mcp_resource.annotations.model_dump() if mcp_resource.annotations else None, + annotations=( + ResourceAnnotations(audience=mcp_resource.annotations.audience, priority=mcp_resource.annotations.priority) + if mcp_resource.annotations + else None + ), meta=mcp_resource.meta, ) @@ -191,6 +210,10 @@ def map_from_mcp_resource_template(mcp_template: mcp_types.ResourceTemplate) -> title=mcp_template.title, description=mcp_template.description, mime_type=mcp_template.mimeType, - annotations=mcp_template.annotations.model_dump() if mcp_template.annotations else None, + annotations=( + ResourceAnnotations(audience=mcp_template.annotations.audience, priority=mcp_template.annotations.priority) + if mcp_template.annotations + else None + ), meta=mcp_template.meta, ) From b8424d80eeeb46aef3ad8ae49ae29b1ef820c7af Mon Sep 17 00:00:00 2001 From: Fenn Bailey Date: Mon, 6 Oct 2025 23:34:40 +1100 Subject: [PATCH 11/11] Allow MCPServer.read_resource() to read resources by Resource. --- pydantic_ai_slim/pydantic_ai/mcp.py | 19 +++++++++++++++---- tests/test_mcp.py | 8 ++++++++ 2 files changed, 23 insertions(+), 4 deletions(-) diff --git a/pydantic_ai_slim/pydantic_ai/mcp.py b/pydantic_ai_slim/pydantic_ai/mcp.py index cde87f9298..7b237a34a3 100644 --- a/pydantic_ai_slim/pydantic_ai/mcp.py +++ b/pydantic_ai_slim/pydantic_ai/mcp.py @@ -10,7 +10,7 @@ from dataclasses import field, replace from datetime import timedelta from pathlib import Path -from typing import Annotated, Any +from typing import Annotated, Any, overload import anyio import httpx @@ -320,18 +320,29 @@ async def list_resource_templates(self) -> list[_mcp.ResourceTemplate]: result = await self._client.list_resource_templates() return [_mcp.map_from_mcp_resource_template(t) for t in result.resourceTemplates] - async def read_resource(self, uri: str) -> str | messages.BinaryContent | list[str | messages.BinaryContent]: + @overload + async def read_resource(self, uri: str) -> str | messages.BinaryContent | list[str | messages.BinaryContent]: ... + + @overload + async def read_resource( + self, uri: _mcp.Resource + ) -> str | messages.BinaryContent | list[str | messages.BinaryContent]: ... + + async def read_resource( + self, uri: str | _mcp.Resource + ) -> str | messages.BinaryContent | list[str | messages.BinaryContent]: """Read the contents of a specific resource by URI. Args: - uri: The URI of the resource to read. + uri: The URI of the resource to read, or a Resource object. Returns: The resource contents. If the resource has a single content item, returns that item directly. If the resource has multiple content items, returns a list of items. """ + resource_uri = uri if isinstance(uri, str) else uri.uri async with self: # Ensure server is running - result = await self._client.read_resource(AnyUrl(uri)) + result = await self._client.read_resource(AnyUrl(resource_uri)) return ( self._get_content(result.contents[0]) if len(result.contents) == 1 diff --git a/tests/test_mcp.py b/tests/test_mcp.py index 8d4591697b..b43ca94aa5 100644 --- a/tests/test_mcp.py +++ b/tests/test_mcp.py @@ -23,6 +23,7 @@ ToolReturnPart, UserPromptPart, ) +from pydantic_ai._mcp import Resource from pydantic_ai.agent import Agent from pydantic_ai.exceptions import ModelRetry, UnexpectedModelBehavior, UserError from pydantic_ai.mcp import MCPServerStreamableHTTP, load_mcp_servers @@ -1500,10 +1501,17 @@ async def test_read_text_resource(run_context: RunContext[int]): """Test reading a text resource (converted to string).""" server = MCPServerStdio('python', ['-m', 'tests.mcp_server']) async with server: + # Test reading by URI string content = await server.read_resource('resource://product_name.txt') assert isinstance(content, str) assert content == snapshot('Pydantic AI\n') + # Test reading by Resource object + resource = Resource(uri='resource://product_name.txt', name='product_name_resource') + content_from_resource = await server.read_resource(resource) + assert isinstance(content_from_resource, str) + assert content_from_resource == snapshot('Pydantic AI\n') + async def test_read_blob_resource(run_context: RunContext[int]): """Test reading a binary resource (converted to BinaryContent)."""