5
5
uv run server async_tool_elicitation stdio
6
6
"""
7
7
8
- import asyncio
9
-
8
+ import anyio
10
9
from pydantic import BaseModel , Field
11
10
12
11
from mcp .server .fastmcp import Context , FastMCP
12
+ from mcp .server .session import ServerSession
13
13
14
14
mcp = FastMCP ("Async Tool Elicitation" )
15
15
@@ -32,12 +32,12 @@ class FileOperationChoice(BaseModel):
32
32
33
33
34
34
@mcp .tool (invocation_modes = ["async" ])
35
- async def process_with_confirmation (operation : str , ctx : Context ) -> str : # type: ignore[type-arg]
35
+ async def process_with_confirmation (operation : str , ctx : Context [ ServerSession , None ] ) -> str :
36
36
"""Process an operation that requires user confirmation."""
37
37
await ctx .info (f"Starting operation: { operation } " )
38
38
39
39
# Simulate some initial processing
40
- await asyncio .sleep (0.5 )
40
+ await anyio .sleep (0.5 )
41
41
await ctx .report_progress (0.3 , 1.0 , "Initial processing complete" )
42
42
43
43
# Ask user for preferences
@@ -51,7 +51,7 @@ async def process_with_confirmation(operation: str, ctx: Context) -> str: # typ
51
51
await ctx .info (f"Continuing with { result .data .priority_level } priority" )
52
52
# Simulate processing based on user choice
53
53
processing_time = {"low" : 0.5 , "normal" : 1.0 , "high" : 1.5 }.get (result .data .priority_level , 1.0 )
54
- await asyncio .sleep (processing_time )
54
+ await anyio .sleep (processing_time )
55
55
await ctx .report_progress (1.0 , 1.0 , "Operation complete" )
56
56
return f"Operation '{ operation } ' completed successfully with { result .data .priority_level } priority"
57
57
else :
@@ -63,12 +63,12 @@ async def process_with_confirmation(operation: str, ctx: Context) -> str: # typ
63
63
64
64
65
65
@mcp .tool (invocation_modes = ["async" ])
66
- async def file_operation (file_path : str , operation_type : str , ctx : Context ) -> str : # type: ignore[type-arg]
66
+ async def file_operation (file_path : str , operation_type : str , ctx : Context [ ServerSession , None ] ) -> str :
67
67
"""Perform file operation with user confirmation."""
68
68
await ctx .info (f"Analyzing file: { file_path } " )
69
69
70
70
# Simulate initial analysis
71
- await asyncio .sleep (1 )
71
+ await anyio .sleep (1 )
72
72
await ctx .report_progress (0.3 , 1.0 , "File analysis complete" )
73
73
74
74
# Simulate finding something that requires user confirmation
@@ -84,11 +84,11 @@ async def file_operation(file_path: str, operation_type: str, ctx: Context) -> s
84
84
if result .data .confirm_operation :
85
85
if result .data .backup_first :
86
86
await ctx .info ("Creating backup first..." )
87
- await asyncio .sleep (0.5 )
87
+ await anyio .sleep (0.5 )
88
88
await ctx .report_progress (0.7 , 1.0 , "Backup created" )
89
89
90
90
await ctx .info (f"Performing { operation_type } operation..." )
91
- await asyncio .sleep (1 )
91
+ await anyio .sleep (1 )
92
92
await ctx .report_progress (1.0 , 1.0 , "Operation complete" )
93
93
94
94
backup_msg = " (with backup)" if result .data .backup_first else " (no backup)"
0 commit comments