Replies: 4 comments 3 replies
-
|
@jspahrsummers any guidancefor the Python SDK ? |
Beta Was this translation helpful? Give feedback.
0 replies
-
|
I'm also interested in how this works! |
Beta Was this translation helpful? Give feedback.
0 replies
-
from mcp.server.session import ServerSession
ServerSession.send_tool_list_changed() |
Beta Was this translation helpful? Give feedback.
3 replies
-
|
I also encountered this and since it's far from trivial to get the import anyio
import janus
from mcp import stdio_server, types
from mcp.server.fastmcp import server
from mcp.server.fastmcp.server import FastMCP, Settings
from mcp.server.fastmcp.tools.base import Tool as MCPTool
from mcp.shared.message import ServerMessageMetadata, SessionMessage
from mcp.shared.session import RequestId, SendNotificationT
from mcp.types import JSONRPCMessage, JSONRPCNotification
class FastMCPWithExternalCommandExecution(FastMCP):
"""
Extends FastMCP to support executing tools in an external process.
"""
@dataclass
class ExternalCall:
method_name: str
args: dict[str, Any]
def run(
self,
transport: Literal["stdio", "sse", "streamable-http"] = "stdio",
mount_path: str | None = None,
) -> None:
"""Run the FastMCP server with cross-thread queue support.
We need this to be able to run tool executions and other operations in
the main thread.
"""
self._external_calls_queue: janus.Queue[FastMCPWithExternalCommandExecution.ExternalCall] = janus.Queue()
async def with_external_calls_queue() -> None:
# Start background queue consumer
async with anyio.create_task_group() as tg:
tg.start_soon(self._process_external_calls_queue_loop)
# Run the normal transport startup
match transport:
case "stdio":
await self.run_stdio_async()
case "sse":
await self.run_sse_async(mount_path)
case "streamable-http":
await self.run_streamable_http_async()
anyio.run(with_external_calls_queue)
async def _process_external_calls_queue_loop(self) -> None:
"""Continuously process items from the external calls queue."""
while True:
external_call: FastMCPWithExternalCommandExecution.ExternalCall = await self._external_calls_queue.async_q.get()
try:
method = getattr(self, external_call.method_name)
await method(**external_call.args)
except Exception as e:
log.error(f"Error running external call {external_call}: {e}")
def update_tool_list(self) -> None:
"""Sync method to request tool list update from other threads."""
update_tool_list_call = FastMCPWithExternalCommandExecution.ExternalCall(
method_name="send_tool_list_changed",
args={},
)
self._external_calls_queue.sync_q.put(update_tool_list_call)
# For demo purposes, we aren't using this now
def call_tool_sync(self, tool_name: str, parameters: dict[str, Any]) -> None:
"""Sync method to call a tool from other threads."""
call_tool = FastMCPWithExternalCommandExecution.ExternalCall(
method_name="call_tool",
args={
"tool_name": tool_name,
"parameters": parameters,
},
)
self._external_calls_queue.sync_q.put(call_tool)
# Copied from mcp.server.session.Session since we don't have access to it here, but
# we hacked our way to have access to the write stream.
async def send_notification(
self,
notification: SendNotificationT,
related_request_id: RequestId | None = None,
) -> None:
"""
Emits a notification, which is a one-way message that does not expect
a response.
"""
# Some transport implementations may need to set the related_request_id
# to attribute to the notifications to the request that triggered them.
jsonrpc_notification = JSONRPCNotification(
jsonrpc="2.0",
**notification.model_dump(by_alias=True, mode="json", exclude_none=True),
)
session_message = SessionMessage(
message=JSONRPCMessage(jsonrpc_notification),
metadata=ServerMessageMetadata(related_request_id=related_request_id) if related_request_id else None,
)
await self._write_stream.send(session_message)
# also copied from mcp.server.session.Session
async def send_tool_list_changed(self) -> None:
"""Send a tool list changed notification."""
await self.send_notification(
types.ServerNotification(
types.ToolListChangedNotification(
method="notifications/tools/list_changed",
)
)
)
# override to save the read and write streams, which permits talking to (stdio) clients outside of request handling
# by using the external calls queue
async def run_stdio_async(self) -> None:
"""Run the server using stdio transport."""
async with stdio_server() as (read_stream, write_stream):
self._read_stream = read_stream
self._write_stream = write_stream
await self._mcp_server.run(
read_stream,
write_stream,
self._mcp_server.create_initialization_options(),
) |
Beta Was this translation helpful? Give feedback.
0 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
Pre-submission Checklist
Question Category
Your Question
I want to use the Python SDK functionality that allows a server sending
notifications/tools/list_changedto a client and have the client process the notification and re-request the tool list.I have experimented with what its available in the repo and the user guides (only shows how it is done with JavaScript).
It seems the only way to do this is to create a subclass of ClientSession and provide an override to
Any help will be gladly appreciated
Beta Was this translation helpful? Give feedback.
All reactions