1
+ import asyncio
2
+ import json
3
+ from mcp_transport import McpHttpTransport
4
+
5
+ class MCPClient :
6
+ """A simple client to interact with an MCP server."""
7
+ def __init__ (self , base_url : str ):
8
+ self ._transport = McpHttpTransport (base_url = base_url )
9
+
10
+ async def list_all_tools (self ):
11
+ print ("--> Attempting to list tools..." )
12
+ response = await self ._transport .tools_list ()
13
+ return response .get ("result" , {}).get ("tools" , [])
14
+
15
+ async def invoke_a_tool (self , tool_name : str , args : dict ):
16
+ print (f"\n --> Attempting to invoke tool: '{ tool_name } '..." )
17
+ response = await self ._transport .tool_invoke (tool_name , args )
18
+ return response .get ("result" , {})
19
+
20
+ async def close (self ):
21
+ await self ._transport .close ()
22
+
23
+
24
+ async def main ():
25
+ server_url = "http://127.0.0.1:5000"
26
+ client = MCPClient (base_url = server_url )
27
+
28
+ try :
29
+ # 1. List all available tools
30
+ tools = await client .list_all_tools ()
31
+ print ("\n ✅ Tools listed successfully:" )
32
+ print (json .dumps (tools , indent = 2 ))
33
+
34
+ # 2. Invoke a specific tool
35
+ tool_to_invoke = "get-n-rows"
36
+ arguments = {"num_rows" : "2" }
37
+ invocation_result = await client .invoke_a_tool (tool_to_invoke , arguments )
38
+
39
+ print ("\n ✅ Tool invoked successfully:" )
40
+ print (json .dumps (invocation_result , indent = 2 ))
41
+
42
+ except Exception as e :
43
+ print (f"\n ❌ An error occurred: { e } " )
44
+ finally :
45
+ await client .close ()
46
+
47
+
48
+ if __name__ == "__main__" :
49
+ asyncio .run (main ())
0 commit comments