-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmcp_client.py
More file actions
91 lines (75 loc) · 2.7 KB
/
mcp_client.py
File metadata and controls
91 lines (75 loc) · 2.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
import sys
import asyncio
from typing import Optional, Any
from contextlib import AsyncExitStack
from mcp import ClientSession, StdioServerParameters, types
from mcp.client.stdio import stdio_client
class MCPClient:
def __init__(
self,
command: str,
args: list[str],
env: Optional[dict] = None,
):
self._command = command
self._args = args
self._env = env
self._session: Optional[ClientSession] = None
self._exit_stack: AsyncExitStack = AsyncExitStack()
async def connect(self):
server_params = StdioServerParameters(
command=self._command,
args=self._args,
env=self._env,
)
stdio_transport = await self._exit_stack.enter_async_context(
stdio_client(server_params)
)
_stdio, _write = stdio_transport
self._session = await self._exit_stack.enter_async_context(
ClientSession(_stdio, _write)
)
await self._session.initialize()
def session(self) -> ClientSession:
if self._session is None:
raise ConnectionError(
"Client session not initialized or cache not populated. Call connect_to_server first."
)
return self._session
async def list_tools(self) -> list[types.Tool]:
result = await self.session().list_tools()
return result.tools
async def call_tool(
self, tool_name: str, tool_input: dict
) -> types.CallToolResult | None:
return await self.session().call_tool(tool_name, tool_input)
async def list_prompts(self) -> list[types.Prompt]:
# TODO: Return a list of prompts defined by the MCP server
return []
async def get_prompt(self, prompt_name, args: dict[str, str]):
# TODO: Get a particular prompt defined by the MCP server
return []
async def read_resource(self, uri: str) -> Any:
# TODO: Read a resource, parse the contents and return it
return []
async def cleanup(self):
await self._exit_stack.aclose()
self._session = None
async def __aenter__(self):
await self.connect()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.cleanup()
# For testing
async def main():
async with MCPClient(
# If using Python without UV, update command to 'python' and remove "run" from args.
command="uv",
args=["run", "mcp_server.py"],
) as _client:
result = await _client.list_tools()
print(result)
if __name__ == "__main__":
if sys.platform == "win32":
asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
asyncio.run(main())