Skip to content

Commit 07a2821

Browse files
committed
Support configuring immediate LRO result
1 parent 047664f commit 07a2821

File tree

10 files changed

+1026
-13
lines changed

10 files changed

+1026
-13
lines changed

examples/snippets/clients/async_tools_client.py

Lines changed: 66 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,14 @@
11
"""
2-
Client example showing how to use async tools.
2+
Client example showing how to use async tools, including immediate result functionality.
3+
4+
This example demonstrates:
5+
- Synchronous tools (immediate response)
6+
- Hybrid tools (sync/async modes)
7+
- Async-only tools (background execution with polling)
8+
- Batch processing with progress updates
9+
- Data processing pipelines
10+
- Elicitation (user input during async execution)
11+
- Immediate result tools (instant feedback + async execution)
312
413
cd to the `examples/snippets` directory and run:
514
uv run async-tools-client
@@ -222,6 +231,61 @@ async def demonstrate_elicitation(session: ClientSession):
222231
await asyncio.sleep(0.5)
223232

224233

234+
async def test_immediate_result_tool(session: ClientSession):
235+
"""Test calling async tool with immediate result functionality.
236+
237+
This demonstrates the immediate_result feature where async tools can provide
238+
instant feedback while continuing to execute in the background.
239+
"""
240+
print("\n=== Immediate Result Tool Demo ===")
241+
242+
# Call the async tool with immediate_result functionality
243+
result = await session.call_tool("long_running_analysis", arguments={"operation": "data_processing"})
244+
245+
# Display immediate feedback (should be available immediately)
246+
print("Immediate response received:")
247+
if result.content:
248+
for content in result.content:
249+
if isinstance(content, types.TextContent):
250+
print(f" 📋 {content.text}")
251+
else:
252+
print(" (No immediate content received)")
253+
254+
# Check if there's an async operation to poll
255+
if result.operation:
256+
token = result.operation.token
257+
print(f"\nAsync operation started with token: {token}")
258+
print("Polling for final results...")
259+
260+
# Poll for status updates and final result
261+
while True:
262+
status = await session.get_operation_status(token)
263+
print(f" Status: {status.status}")
264+
265+
if status.status == "completed":
266+
# Get the final result
267+
final_result = await session.get_operation_result(token)
268+
print("\nFinal result received:")
269+
for content in final_result.result.content:
270+
if isinstance(content, types.TextContent):
271+
print(f" ✅ {content.text}")
272+
break
273+
elif status.status == "failed":
274+
print(f" ❌ Operation failed: {status.error}")
275+
break
276+
elif status.status in ("canceled", "unknown"):
277+
print(f" ⚠️ Operation ended with status: {status.status}")
278+
break
279+
280+
# Wait before polling again
281+
await asyncio.sleep(1)
282+
else:
283+
# This shouldn't happen for async tools, but handle gracefully
284+
print("⚠️ Unexpected: tool returned synchronous result instead of async operation")
285+
286+
print("Immediate result demonstration complete!")
287+
288+
225289
async def run():
226290
"""Run all async tool demonstrations."""
227291
# Determine protocol version from command line
@@ -261,6 +325,7 @@ async def run():
261325
await demonstrate_batch_processing(session)
262326
await demonstrate_data_processing(session)
263327
await demonstrate_elicitation(session)
328+
await test_immediate_result_tool(session)
264329

265330
print("\n=== All demonstrations complete! ===")
266331

examples/snippets/servers/async_tools.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
from pydantic import BaseModel, Field
1111

12+
from mcp import types
1213
from mcp.server.fastmcp import Context, FastMCP
1314

1415
# Create an MCP server with async operations support
@@ -206,5 +207,25 @@ async def quick_expiry_task(message: str, ctx: Context) -> str: # type: ignore[
206207
return f"Quick task completed: {message} (expires in 2 seconds)"
207208

208209

210+
async def immediate_feedback(operation: str) -> list[types.ContentBlock]:
211+
"""Provide immediate feedback for long-running operations."""
212+
return [types.TextContent(type="text", text=f"🚀 Starting {operation}... This may take a moment.")]
213+
214+
215+
@mcp.tool(invocation_modes=["async"], immediate_result=immediate_feedback)
216+
async def long_running_analysis(operation: str, ctx: Context) -> str: # type: ignore[type-arg]
217+
"""Perform analysis with immediate user feedback."""
218+
await ctx.info(f"Beginning {operation} analysis")
219+
220+
# Simulate long-running work with progress updates
221+
for i in range(5):
222+
await asyncio.sleep(1)
223+
progress = (i + 1) / 5
224+
await ctx.report_progress(progress, 1.0, f"Step {i + 1}/5 complete")
225+
226+
await ctx.info(f"Analysis '{operation}' completed successfully!")
227+
return f"Analysis '{operation}' completed successfully with detailed results!"
228+
229+
209230
if __name__ == "__main__":
210231
mcp.run()

src/mcp/server/fastmcp/server.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
from starlette.routing import Mount, Route
2222
from starlette.types import Receive, Scope, Send
2323

24+
import mcp.types as types
2425
from mcp.server.auth.middleware.auth_context import AuthContextMiddleware
2526
from mcp.server.auth.middleware.bearer_auth import BearerAuthBackend, RequireAuthMiddleware
2627
from mcp.server.auth.provider import OAuthAuthorizationServerProvider, ProviderTokenVerifier, TokenVerifier
@@ -364,6 +365,10 @@ async def list_tools(self) -> list[MCPTool]:
364365
annotations=info.annotations,
365366
invocationMode=self._get_invocation_mode(info, client_supports_async),
366367
_meta=info.meta,
368+
internal=types.InternalToolProperties(
369+
immediate_result=info.immediate_result,
370+
keepalive=info.meta.get("_keep_alive") if info.meta else None,
371+
),
367372
)
368373
for info in tools
369374
if client_supports_async or info.invocation_modes != ["async"]
@@ -438,6 +443,7 @@ def add_tool(
438443
structured_output: bool | None = None,
439444
invocation_modes: list[InvocationMode] | None = None,
440445
keep_alive: int | None = None,
446+
immediate_result: Callable[..., Awaitable[list[ContentBlock]]] | None = None,
441447
) -> None:
442448
"""Add a tool to the server.
443449
@@ -458,6 +464,8 @@ def add_tool(
458464
- If None, defaults to ["sync"] for backwards compatibility
459465
keep_alive: How long (in seconds) async operation results should be kept available.
460466
Only applies to async tools.
467+
immediate_result: Optional async function that returns immediate feedback content
468+
for async tools. Must return list[ContentBlock]. Only valid for async-compatible tools.
461469
"""
462470
self._tool_manager.add_tool(
463471
fn,
@@ -468,6 +476,7 @@ def add_tool(
468476
structured_output=structured_output,
469477
invocation_modes=invocation_modes,
470478
keep_alive=keep_alive,
479+
immediate_result=immediate_result,
471480
)
472481

473482
def tool(
@@ -479,6 +488,7 @@ def tool(
479488
structured_output: bool | None = None,
480489
invocation_modes: list[InvocationMode] | None = None,
481490
keep_alive: int | None = None,
491+
immediate_result: Callable[..., Awaitable[list[ContentBlock]]] | None = None,
482492
) -> Callable[[AnyFunction], AnyFunction]:
483493
"""Decorator to register a tool.
484494
@@ -501,6 +511,8 @@ def tool(
501511
- Tools with "async" mode will be hidden from clients that don't support async execution
502512
keep_alive: How long (in seconds) async operation results should be kept available.
503513
Only applies to async tools.
514+
immediate_result: Optional async function that returns immediate feedback content
515+
for async tools. Must return list[ContentBlock]. Only valid for async-compatible tools.
504516
505517
Example:
506518
@server.tool()
@@ -527,6 +539,15 @@ async def async_only_tool(data: str, ctx: Context) -> str:
527539
def hybrid_tool(x: int) -> str:
528540
# This tool supports both sync and async execution
529541
return str(x)
542+
543+
async def immediate_feedback(operation: str) -> list[ContentBlock]:
544+
return [TextContent(type="text", text=f"Starting {operation}...")]
545+
546+
@server.tool(invocation_modes=["async"], immediate_result=immediate_feedback)
547+
async def long_running_tool(operation: str, ctx: Context) -> str:
548+
# This tool provides immediate feedback while running asynchronously
549+
await ctx.info(f"Processing {operation}")
550+
return f"Completed {operation}"
530551
"""
531552
# Check if user passed function directly instead of calling decorator
532553
if callable(name):
@@ -544,6 +565,7 @@ def decorator(fn: AnyFunction) -> AnyFunction:
544565
structured_output=structured_output,
545566
invocation_modes=invocation_modes,
546567
keep_alive=keep_alive,
568+
immediate_result=immediate_result,
547569
)
548570
return fn
549571

src/mcp/server/fastmcp/tools/base.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from mcp.server.fastmcp.exceptions import ToolError
1212
from mcp.server.fastmcp.utilities.context_injection import find_context_parameter
1313
from mcp.server.fastmcp.utilities.func_metadata import FuncMetadata, func_metadata
14-
from mcp.types import ToolAnnotations
14+
from mcp.types import ContentBlock, ToolAnnotations
1515

1616
if TYPE_CHECKING:
1717
from mcp.server.fastmcp.server import Context
@@ -38,10 +38,10 @@ class Tool(BaseModel):
3838
invocation_modes: list[InvocationMode] = Field(
3939
default=["sync"], description="Supported invocation modes (sync/async)"
4040
)
41-
meta: dict[str, Any] | None = Field(description="Optional additional tool information.", default=None)
42-
immediate_result: Callable[..., Awaitable[list[Any]]] | None = Field(
41+
immediate_result: Callable[..., Awaitable[list[ContentBlock]]] | None = Field(
4342
None, exclude=True, description="Optional immediate result function for async tools"
4443
)
44+
meta: dict[str, Any] | None = Field(description="Optional additional tool information.", default=None)
4545

4646
@cached_property
4747
def output_schema(self) -> dict[str, Any] | None:
@@ -59,8 +59,8 @@ def from_function(
5959
structured_output: bool | None = None,
6060
invocation_modes: list[InvocationMode] | None = None,
6161
keep_alive: int | None = None,
62-
meta: dict[str, Any] | None = None,
6362
immediate_result: Callable[..., Awaitable[list[Any]]] | None = None,
63+
meta: dict[str, Any] | None = None,
6464
) -> Tool:
6565
"""Create a Tool from a function."""
6666
func_name = name or fn.__name__
@@ -129,8 +129,8 @@ def from_function(
129129
context_kwarg=context_kwarg,
130130
annotations=annotations,
131131
invocation_modes=invocation_modes,
132-
meta=meta,
133132
immediate_result=immediate_result,
133+
meta=meta,
134134
)
135135

136136
async def run(

src/mcp/server/fastmcp/tools/tool_manager.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
from __future__ import annotations as _annotations
22

3-
from collections.abc import Callable
3+
from collections.abc import Awaitable, Callable
44
from typing import TYPE_CHECKING, Any
55

66
from mcp.server.fastmcp.exceptions import ToolError
77
from mcp.server.fastmcp.tools.base import InvocationMode, Tool
88
from mcp.server.fastmcp.utilities.logging import get_logger
99
from mcp.shared.context import LifespanContextT, RequestT
10-
from mcp.types import ToolAnnotations
10+
from mcp.types import ContentBlock, ToolAnnotations
1111

1212
if TYPE_CHECKING:
1313
from mcp.server.fastmcp.server import Context
@@ -52,6 +52,7 @@ def add_tool(
5252
structured_output: bool | None = None,
5353
invocation_modes: list[InvocationMode] | None = None,
5454
keep_alive: int | None = None,
55+
immediate_result: Callable[..., Awaitable[list[ContentBlock]]] | None = None,
5556
meta: dict[str, Any] | None = None,
5657
) -> Tool:
5758
"""Add a tool to the server."""
@@ -64,6 +65,7 @@ def add_tool(
6465
structured_output=structured_output,
6566
invocation_modes=invocation_modes,
6667
keep_alive=keep_alive,
68+
immediate_result=immediate_result,
6769
meta=meta,
6870
)
6971
existing = self._tools.get(tool.name)

src/mcp/server/lowlevel/server.py

Lines changed: 47 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -469,6 +469,23 @@ async def handler(req: types.CallToolRequest):
469469
# Check for async execution
470470
if tool and self.async_operations and self._should_execute_async(tool):
471471
keep_alive = self._get_tool_keep_alive(tool)
472+
immediate_content: list[types.ContentBlock] = []
473+
474+
# Execute immediate result if available
475+
if self._has_immediate_result(tool):
476+
try:
477+
immediate_content = await self._execute_immediate_result(tool, arguments)
478+
logger.debug(f"Executed immediate result for {tool_name}")
479+
except McpError:
480+
# Re-raise McpError as-is
481+
raise
482+
except Exception as e:
483+
raise McpError(
484+
types.ErrorData(
485+
code=types.INTERNAL_ERROR,
486+
message=f"Immediate result execution failed: {str(e)}",
487+
)
488+
)
472489

473490
# Create async operation
474491
operation = self.async_operations.create_operation(
@@ -499,11 +516,11 @@ async def execute_async():
499516

500517
asyncio.create_task(execute_async())
501518

502-
# Return operation result immediately
519+
# Return operation result with immediate content
503520
logger.info(f"Returning async operation result for {tool_name}")
504521
return types.ServerResult(
505522
types.CallToolResult(
506-
content=[],
523+
content=immediate_content,
507524
operation=types.AsyncResultProperties(
508525
token=operation.token,
509526
keepAlive=operation.keep_alive,
@@ -588,9 +605,34 @@ def _should_execute_async(self, tool: types.Tool) -> bool:
588605

589606
def _get_tool_keep_alive(self, tool: types.Tool) -> int:
590607
"""Get the keepalive value for an async tool."""
591-
if not tool.meta or "_keep_alive" not in tool.meta:
592-
raise ValueError(f"_keep_alive not defined for tool {tool.name}")
593-
return cast(int, tool.meta["_keep_alive"])
608+
if tool.internal.keepalive is None:
609+
raise ValueError(f"keepalive not defined for tool {tool.name}")
610+
return tool.internal.keepalive
611+
612+
def _has_immediate_result(self, tool: types.Tool) -> bool:
613+
"""Check if tool has immediate_result function."""
614+
return tool.internal.immediate_result is not None and callable(tool.internal.immediate_result)
615+
616+
async def _execute_immediate_result(self, tool: types.Tool, arguments: dict[str, Any]) -> list[types.ContentBlock]:
617+
"""Execute immediate result function and return content blocks."""
618+
immediate_fn = tool.internal.immediate_result
619+
620+
if immediate_fn is None:
621+
raise ValueError(f"No immediate_result function found for tool {tool.name}")
622+
623+
# Validate function signature and execute
624+
try:
625+
result = await immediate_fn(**arguments)
626+
if not isinstance(result, list):
627+
raise ValueError("immediate_result must return list[ContentBlock]")
628+
return cast(list[types.ContentBlock], result)
629+
except McpError:
630+
# Re-raise McpError as-is
631+
raise
632+
except Exception as e:
633+
raise McpError(
634+
types.ErrorData(code=types.INTERNAL_ERROR, message=f"Immediate result execution error: {str(e)}")
635+
)
594636

595637
def progress_notification(self):
596638
def decorator(

src/mcp/types.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -858,6 +858,18 @@ class ToolAnnotations(BaseModel):
858858
model_config = ConfigDict(extra="allow")
859859

860860

861+
class InternalToolProperties(BaseModel):
862+
"""
863+
Internal properties for tools that are not serialized in the MCP protocol.
864+
"""
865+
866+
immediate_result: Any = Field(default=None)
867+
"""Function to execute for immediate results in async operations."""
868+
869+
keepalive: int | None = Field(default=None)
870+
"""Keepalive duration in seconds for async operations."""
871+
872+
861873
class Tool(BaseMetadata):
862874
"""Definition for a tool the client can call."""
863875

@@ -883,6 +895,10 @@ class Tool(BaseMetadata):
883895
See [MCP specification](https://github.com/modelcontextprotocol/modelcontextprotocol/blob/47339c03c143bb4ec01a26e721a1b8fe66634ebe/docs/specification/draft/basic/index.mdx#general-fields)
884896
for notes on _meta usage.
885897
"""
898+
internal: InternalToolProperties = Field(default_factory=InternalToolProperties, exclude=True)
899+
"""
900+
Internal properties not serialized in MCP protocol.
901+
"""
886902
model_config = ConfigDict(extra="allow")
887903

888904

0 commit comments

Comments
 (0)