Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions docs/mcp/client.md
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,85 @@ agent = Agent('openai:gpt-4o', toolsets=[weather_server, calculator_server])

MCP tools can include metadata that provides additional information about the tool's characteristics, which can be useful when [filtering tools][pydantic_ai.toolsets.FilteredToolset]. The `meta`, `annotations`, and `output_schema` fields can be found on the `metadata` dict on the [`ToolDefinition`][pydantic_ai.tools.ToolDefinition] object that's passed to filter functions.

## Resources

MCP servers can provide [resources](https://modelcontextprotocol.io/docs/concepts/resources) - files, data, or content that can be accessed by the client. Resources in MCP are designed to be application-driven, with host applications determining how to incorporate context based on their needs.

Pydantic AI provides methods to discover and read resources from MCP servers:

- [`list_resources()`][pydantic_ai.mcp.MCPServer.list_resources] - List all available resources on the server
- [`list_resource_templates()`][pydantic_ai.mcp.MCPServer.list_resource_templates] - List resource templates with parameter placeholders
- [`read_resource(uri)`][pydantic_ai.mcp.MCPServer.read_resource] - Read the contents of a specific resource by URI

Resources are automatically converted: text content is returned as `str`, and binary content is returned as [`BinaryContent`][pydantic_ai.messages.BinaryContent].

Before consuming resources, we need to run a server that exposes some:

```python {title="mcp_resource_server.py"}
from mcp.server.fastmcp import FastMCP

mcp = FastMCP('Pydantic AI MCP Server')
log_level = 'unset'


@mcp.resource('resource://user_name.txt', mime_type='text/plain')
async def user_name_resource() -> str:
return 'Alice'


if __name__ == '__main__':
mcp.run()
```

Then we can create the client:

```python {title="mcp_resources.py", requires="mcp_resource_server.py"}
import asyncio

from pydantic_ai import Agent
from pydantic_ai._run_context import RunContext
from pydantic_ai.mcp import MCPServerStdio
from pydantic_ai.models.test import TestModel

agent = Agent(
model=TestModel(),
deps_type=str,
instructions="Use the customer's name while replying to them.",
)


@agent.instructions
def add_the_users_name(ctx: RunContext[str]) -> str:
return f"The user's name is {ctx.deps}."


async def main():
server = MCPServerStdio('python', args=['-m', 'mcp_resource_server'])

async with server:
# List all available resources
resources = await server.list_resources()
for resource in resources:
print(f' - {resource.name}: {resource.uri} ({resource.mime_type})')
#> - user_name_resource: resource://user_name.txt (text/plain)

# Read a text resource
user_name = await server.read_resource('resource://user_name.txt')
print(f'Text content: {user_name}')
#> Text content: Alice

# Use resources in dependencies
async with agent:
_ = await agent.run('Can you help me with my product?', deps=user_name)


if __name__ == '__main__':
asyncio.run(main())
```

_(This example is complete, it can be run "as is")_


## Custom TLS / SSL configuration

In some environments you need to tweak how HTTPS connections are established –
Expand Down
100 changes: 98 additions & 2 deletions pydantic_ai_slim/pydantic_ai/_mcp.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
import base64
from abc import ABC
from collections.abc import Sequence
from typing import Literal
from dataclasses import dataclass
from typing import Annotated, Any, Literal

from . import exceptions, messages
from pydantic import Field

from . import _utils, exceptions, messages

try:
from mcp import types as mcp_types
Expand All @@ -13,6 +17,63 @@
) from _import_error


@dataclass(repr=False, kw_only=True)
class ResourceAnnotations:
"""Additional properties describing MCP entities."""

audience: list[mcp_types.Role] | None = None
"""Intended audience for this entity."""

priority: Annotated[float, Field(ge=0.0, le=1.0)] | None = None
"""Priority level for this entity, ranging from 0.0 to 1.0."""

__repr__ = _utils.dataclasses_no_defaults_repr


@dataclass(repr=False, kw_only=True)
class BaseResource(ABC):
"""Base class for MCP resources."""

name: str
"""The programmatic name of the resource."""

title: str | None = None
"""Human-readable title for UI contexts."""

description: str | None = None
"""A description of what this resource represents."""

mime_type: str | None = None
"""The MIME type of the resource, if known."""

annotations: ResourceAnnotations | None = None
"""Optional annotations for the resource."""

meta: dict[str, Any] | None = None
"""Optional metadata for the resource."""

__repr__ = _utils.dataclasses_no_defaults_repr


@dataclass(repr=False, kw_only=True)
class Resource(BaseResource):
"""A resource that can be read from an MCP server."""

uri: str
"""The URI of the resource."""

size: int | None = None
"""The size of the raw resource content in bytes (before base64 encoding), if known."""


@dataclass(repr=False, kw_only=True)
class ResourceTemplate(BaseResource):
"""A template for parameterized resources on an MCP server."""

uri_template: str
"""URI template (RFC 6570) for constructing resource URIs."""


def map_from_mcp_params(params: mcp_types.CreateMessageRequestParams) -> list[messages.ModelMessage]:
"""Convert from MCP create message request parameters to pydantic-ai messages."""
pai_messages: list[messages.ModelMessage] = []
Expand Down Expand Up @@ -121,3 +182,38 @@ def map_from_sampling_content(
return messages.TextPart(content=content.text)
else:
raise NotImplementedError('Image and Audio responses in sampling are not yet supported')


def map_from_mcp_resource(mcp_resource: mcp_types.Resource) -> Resource:
"""Convert from MCP Resource to native Pydantic AI Resource."""
return Resource(
uri=str(mcp_resource.uri),
name=mcp_resource.name,
title=mcp_resource.title,
description=mcp_resource.description,
mime_type=mcp_resource.mimeType,
size=mcp_resource.size,
annotations=(
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To my frustration, there is no support for annotations in mcp.server.fastmcp.server.FastMCP.resource: https://github.com/modelcontextprotocol/python-sdk/blob/main/src/mcp/server/fastmcp/server.py#L480

This is what we use for testing in mcp_server.py, which means I couldn't actually write an integration test (at least using that pattern) to test it all end to end.

Point me in the right direction if there's something else I should do.

ResourceAnnotations(audience=mcp_resource.annotations.audience, priority=mcp_resource.annotations.priority)
if mcp_resource.annotations
else None
),
meta=mcp_resource.meta,
)


def map_from_mcp_resource_template(mcp_template: mcp_types.ResourceTemplate) -> ResourceTemplate:
"""Convert from MCP ResourceTemplate to native Pydantic AI ResourceTemplate."""
return ResourceTemplate(
uri_template=mcp_template.uriTemplate,
name=mcp_template.name,
title=mcp_template.title,
description=mcp_template.description,
mime_type=mcp_template.mimeType,
annotations=(
ResourceAnnotations(audience=mcp_template.annotations.audience, priority=mcp_template.annotations.priority)
if mcp_template.annotations
else None
),
meta=mcp_template.meta,
)
57 changes: 49 additions & 8 deletions pydantic_ai_slim/pydantic_ai/mcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@
from dataclasses import field, replace
from datetime import timedelta
from pathlib import Path
from typing import Annotated, Any
from typing import Annotated, Any, overload

import anyio
import httpx
import pydantic_core
from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream
from pydantic import BaseModel, Discriminator, Field, Tag
from pydantic import AnyUrl, BaseModel, Discriminator, Field, Tag
from pydantic_core import CoreSchema, core_schema
from typing_extensions import Self, assert_never, deprecated

Expand Down Expand Up @@ -303,6 +303,52 @@ def tool_for_tool_def(self, tool_def: ToolDefinition) -> ToolsetTool[Any]:
args_validator=TOOL_SCHEMA_VALIDATOR,
)

async def list_resources(self) -> list[_mcp.Resource]:
"""Retrieve resources that are currently present on the server.

Note:
- We don't cache resources as they might change.
- We also don't subscribe to resource changes to avoid complexity.
"""
async with self: # Ensure server is running
result = await self._client.list_resources()
return [_mcp.map_from_mcp_resource(r) for r in result.resources]

async def list_resource_templates(self) -> list[_mcp.ResourceTemplate]:
"""Retrieve resource templates that are currently present on the server."""
async with self: # Ensure server is running
result = await self._client.list_resource_templates()
return [_mcp.map_from_mcp_resource_template(t) for t in result.resourceTemplates]

@overload
async def read_resource(self, uri: str) -> str | messages.BinaryContent | list[str | messages.BinaryContent]: ...

@overload
async def read_resource(
self, uri: _mcp.Resource
) -> str | messages.BinaryContent | list[str | messages.BinaryContent]: ...

async def read_resource(
self, uri: str | _mcp.Resource
) -> str | messages.BinaryContent | list[str | messages.BinaryContent]:
"""Read the contents of a specific resource by URI.

Args:
uri: The URI of the resource to read, or a Resource object.

Returns:
The resource contents. If the resource has a single content item, returns that item directly.
If the resource has multiple content items, returns a list of items.
"""
resource_uri = uri if isinstance(uri, str) else uri.uri
async with self: # Ensure server is running
result = await self._client.read_resource(AnyUrl(resource_uri))
return (
self._get_content(result.contents[0])
if len(result.contents) == 1
else [self._get_content(resource) for resource in result.contents]
)

async def __aenter__(self) -> Self:
"""Enter the MCP server context.

Expand Down Expand Up @@ -397,12 +443,7 @@ async def _map_tool_result_part(
resource = part.resource
return self._get_content(resource)
elif isinstance(part, mcp_types.ResourceLink):
resource_result: mcp_types.ReadResourceResult = await self._client.read_resource(part.uri)
return (
self._get_content(resource_result.contents[0])
if len(resource_result.contents) == 1
else [self._get_content(resource) for resource in resource_result.contents]
)
return await self.read_resource(str(part.uri))
else:
assert_never(part)

Expand Down
6 changes: 6 additions & 0 deletions tests/mcp_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,12 @@ async def product_name_resource() -> str:
return Path(__file__).parent.joinpath('assets/product_name.txt').read_text()


@mcp.resource('resource://greeting/{name}', mime_type='text/plain')
async def greeting_resource_template(name: str) -> str:
"""Dynamic greeting resource template."""
return f'Hello, {name}!'


@mcp.tool()
async def get_image() -> Image:
data = Path(__file__).parent.joinpath('assets/kiwi.png').read_bytes()
Expand Down
Loading