Skip to content

Commit 4539c59

Browse files
committed
Add basic documentation for async tools
1 parent e6a12e1 commit 4539c59

File tree

1 file changed

+284
-0
lines changed

1 file changed

+284
-0
lines changed

README.md

Lines changed: 284 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -487,6 +487,290 @@ def get_temperature(city: str) -> float:
487487
_Full example: [examples/snippets/servers/structured_output.py](https://github.com/modelcontextprotocol/python-sdk/blob/main/examples/snippets/servers/structured_output.py)_
488488
<!-- /snippet-source -->
489489

490+
#### Async Tools
491+
492+
Tools can be configured to run asynchronously, allowing for long-running operations that execute in the background while clients poll for status and results. Async tools currently require protocol version `next` and support operation tokens for tracking execution state.
493+
494+
Tools can specify their invocation mode: `sync` (default), `async`, or `["sync", "async"]` for hybrid tools that support both patterns. Async tools can provide immediate feedback while continuing to execute, and support configurable keep-alive duration for result availability.
495+
496+
<!-- snippet-source examples/snippets/servers/async_tools.py -->
497+
```python
498+
"""
499+
FastMCP async tools example showing different invocation modes.
500+
501+
cd to the `examples/snippets/clients` directory and run:
502+
uv run server async_tools stdio
503+
"""
504+
505+
import asyncio
506+
507+
from pydantic import BaseModel, Field
508+
509+
from mcp import types
510+
from mcp.server.fastmcp import Context, FastMCP
511+
512+
# Create an MCP server with async operations support
513+
mcp = FastMCP("Async Tools Demo")
514+
515+
516+
class UserPreferences(BaseModel):
517+
"""Schema for collecting user preferences."""
518+
519+
continue_processing: bool = Field(description="Should we continue with the operation?")
520+
priority_level: str = Field(
521+
default="normal",
522+
description="Priority level: low, normal, high",
523+
)
524+
525+
526+
@mcp.tool(invocation_modes=["async"])
527+
async def async_elicitation_tool(operation: str, ctx: Context) -> str: # type: ignore[type-arg]
528+
"""An async tool that uses elicitation to get user input."""
529+
await ctx.info(f"Starting operation: {operation}")
530+
531+
# Simulate some initial processing
532+
await asyncio.sleep(0.5)
533+
await ctx.report_progress(0.3, 1.0, "Initial processing complete")
534+
535+
# Ask user for preferences
536+
result = await ctx.elicit(
537+
message=f"Operation '{operation}' requires user input. How should we proceed?",
538+
schema=UserPreferences,
539+
)
540+
541+
if result.action == "accept" and result.data:
542+
if result.data.continue_processing:
543+
await ctx.info(f"Continuing with {result.data.priority_level} priority")
544+
# Simulate processing based on user choice
545+
processing_time = {"low": 0.5, "normal": 1.0, "high": 1.5}.get(result.data.priority_level, 1.0)
546+
await asyncio.sleep(processing_time)
547+
await ctx.report_progress(1.0, 1.0, "Operation complete")
548+
return f"Operation '{operation}' completed successfully with {result.data.priority_level} priority"
549+
else:
550+
await ctx.warning("User chose not to continue")
551+
return f"Operation '{operation}' cancelled by user"
552+
else:
553+
await ctx.error("User declined or cancelled the operation")
554+
return f"Operation '{operation}' aborted"
555+
556+
557+
@mcp.tool()
558+
def sync_tool(x: int) -> str:
559+
"""An implicitly-synchronous tool."""
560+
return f"Sync result: {x * 2}"
561+
562+
563+
@mcp.tool(invocation_modes=["async"])
564+
async def async_only_tool(data: str, ctx: Context) -> str: # type: ignore[type-arg]
565+
"""An async-only tool that takes time to complete."""
566+
await ctx.info("Starting long-running analysis...")
567+
568+
# Simulate long-running work with progress updates
569+
for i in range(5):
570+
await asyncio.sleep(0.5)
571+
progress = (i + 1) / 5
572+
await ctx.report_progress(progress, 1.0, f"Processing step {i + 1}/5")
573+
574+
await ctx.info("Analysis complete!")
575+
return f"Async analysis result for: {data}"
576+
577+
578+
@mcp.tool(invocation_modes=["sync", "async"])
579+
def hybrid_tool(message: str, ctx: Context | None = None) -> str: # type: ignore[type-arg]
580+
"""A hybrid tool that works both sync and async."""
581+
if ctx:
582+
# Async mode - we have context for progress reporting
583+
import asyncio
584+
585+
async def async_work():
586+
await ctx.info(f"Processing '{message}' asynchronously...")
587+
await asyncio.sleep(0.5) # Simulate some work
588+
await ctx.debug("Async processing complete")
589+
590+
# Run the async work (this is a bit of a hack for demo purposes)
591+
try:
592+
loop = asyncio.get_event_loop()
593+
loop.create_task(async_work())
594+
except RuntimeError:
595+
pass # No event loop running
596+
597+
# Both sync and async modes return the same result
598+
return f"Hybrid result: {message.upper()}"
599+
600+
601+
async def immediate_feedback(operation: str) -> list[types.ContentBlock]:
602+
"""Provide immediate feedback for long-running operations."""
603+
return [types.TextContent(type="text", text=f"🚀 Starting {operation}... This may take a moment.")]
604+
605+
606+
@mcp.tool(invocation_modes=["async"], immediate_result=immediate_feedback)
607+
async def long_running_analysis(operation: str, ctx: Context) -> str: # type: ignore[type-arg]
608+
"""Perform analysis with immediate user feedback."""
609+
await ctx.info(f"Beginning {operation} analysis")
610+
611+
# Simulate long-running work with progress updates
612+
for i in range(5):
613+
await asyncio.sleep(1)
614+
progress = (i + 1) / 5
615+
await ctx.report_progress(progress, 1.0, f"Step {i + 1}/5 complete")
616+
617+
await ctx.info(f"Analysis '{operation}' completed successfully!")
618+
return f"Analysis '{operation}' completed successfully with detailed results!"
619+
620+
621+
@mcp.tool(invocation_modes=["async"], keep_alive=1800)
622+
async def long_running_task(task_name: str, ctx: Context) -> str: # type: ignore[type-arg]
623+
"""A long-running task with custom keep_alive duration."""
624+
await ctx.info(f"Starting long-running task: {task_name}")
625+
626+
# Simulate extended processing
627+
await asyncio.sleep(2)
628+
await ctx.report_progress(0.5, 1.0, "Halfway through processing")
629+
await asyncio.sleep(2)
630+
631+
await ctx.info(f"Task '{task_name}' completed successfully")
632+
return f"Long-running task '{task_name}' finished with 30-minute keep_alive"
633+
634+
635+
if __name__ == "__main__":
636+
mcp.run()
637+
```
638+
639+
_Full example: [examples/snippets/servers/async_tools.py](https://github.com/modelcontextprotocol/python-sdk/blob/main/examples/snippets/servers/async_tools.py)_
640+
<!-- /snippet-source -->
641+
642+
Clients using protocol version `next` can interact with async tools by polling operation status and retrieving results:
643+
644+
<!-- snippet-source examples/snippets/clients/async_tools_client.py -->
645+
```python
646+
"""
647+
Client example showing how to use async tools, including immediate result functionality.
648+
649+
cd to the `examples/snippets` directory and run:
650+
uv run async-tools-client
651+
uv run async-tools-client --protocol=latest # backwards compatible mode
652+
uv run async-tools-client --protocol=next # async tools mode
653+
"""
654+
655+
import asyncio
656+
import os
657+
import sys
658+
659+
from mcp import ClientSession, StdioServerParameters, types
660+
from mcp.client.stdio import stdio_client
661+
from mcp.shared.context import RequestContext
662+
663+
# Create server parameters for stdio connection
664+
server_params = StdioServerParameters(
665+
command="uv", # Using uv to run the server
666+
args=["run", "server", "async_tools", "stdio"],
667+
env={"UV_INDEX": os.environ.get("UV_INDEX", "")},
668+
)
669+
670+
671+
async def demonstrate_async_tool(session: ClientSession):
672+
"""Demonstrate calling an async-only tool."""
673+
print("\n=== Asynchronous Tool Demo ===")
674+
675+
# Call the async tool
676+
result = await session.call_tool("async_only_tool", arguments={"data": "sample dataset"})
677+
678+
if result.operation:
679+
token = result.operation.token
680+
print(f"Async operation started with token: {token}")
681+
682+
# Poll for status updates
683+
while True:
684+
status = await session.get_operation_status(token)
685+
print(f"Status: {status.status}")
686+
687+
if status.status == "completed":
688+
# Get the final result
689+
final_result = await session.get_operation_result(token)
690+
for content in final_result.result.content:
691+
if isinstance(content, types.TextContent):
692+
print(f"Final result: {content.text}")
693+
break
694+
elif status.status == "failed":
695+
print(f"Operation failed: {status.error}")
696+
break
697+
elif status.status in ("canceled", "unknown"):
698+
print(f"Operation ended with status: {status.status}")
699+
break
700+
701+
# Wait before polling again
702+
await asyncio.sleep(1)
703+
704+
705+
async def test_immediate_result_tool(session: ClientSession):
706+
"""Test calling async tool with immediate result functionality."""
707+
print("\n=== Immediate Result Tool Demo ===")
708+
709+
# Call the async tool with immediate_result functionality
710+
result = await session.call_tool("long_running_analysis", arguments={"operation": "data_processing"})
711+
712+
# Display immediate feedback (should be available immediately)
713+
print("Immediate response received:")
714+
if result.content:
715+
for content in result.content:
716+
if isinstance(content, types.TextContent):
717+
print(f" 📋 {content.text}")
718+
719+
# Check if there's an async operation to poll
720+
if result.operation:
721+
token = result.operation.token
722+
print(f"\nAsync operation started with token: {token}")
723+
print("Polling for final results...")
724+
725+
# Poll for status updates and final result
726+
while True:
727+
status = await session.get_operation_status(token)
728+
print(f" Status: {status.status}")
729+
730+
if status.status == "completed":
731+
# Get the final result
732+
final_result = await session.get_operation_result(token)
733+
print("\nFinal result received:")
734+
for content in final_result.result.content:
735+
if isinstance(content, types.TextContent):
736+
print(f"{content.text}")
737+
break
738+
elif status.status == "failed":
739+
print(f" ❌ Operation failed: {status.error}")
740+
break
741+
742+
# Wait before polling again
743+
await asyncio.sleep(1)
744+
745+
746+
async def run():
747+
"""Run async tool demonstrations."""
748+
protocol_version = "next" # Required for async tools support
749+
750+
async with stdio_client(server_params) as (read, write):
751+
async with ClientSession(read, write, protocol_version=protocol_version) as session:
752+
await session.initialize()
753+
754+
# List available tools to see invocation modes
755+
tools = await session.list_tools()
756+
print("Available tools:")
757+
for tool in tools.tools:
758+
invocation_mode = getattr(tool, "invocationMode", "sync")
759+
print(f" - {tool.name}: {tool.description} (mode: {invocation_mode})")
760+
761+
await demonstrate_async_tool(session)
762+
await test_immediate_result_tool(session)
763+
764+
765+
if __name__ == "__main__":
766+
asyncio.run(run())
767+
```
768+
769+
_Full example: [examples/snippets/clients/async_tools_client.py](https://github.com/modelcontextprotocol/python-sdk/blob/main/examples/snippets/clients/async_tools_client.py)_
770+
<!-- /snippet-source -->
771+
772+
The `@mcp.tool()` decorator accepts `invocation_modes` to specify supported execution patterns, `immediate_result` to provide instant feedback for async tools, and `keep_alive` to set how long operation results remain available (default: 300 seconds).
773+
490774
### Prompts
491775

492776
Prompts are reusable templates that help LLMs interact with your server effectively:

0 commit comments

Comments
 (0)