Skip to content

Commit 5a24c8a

Browse files
committed
simple mcp client with sampling capability
1 parent 61399b3 commit 5a24c8a

File tree

8 files changed

+860
-0
lines changed

8 files changed

+860
-0
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
OPENAI_API_KEY=YOUR_OPENAI_API_KEY
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
.env
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
3.13
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
# Simple Sampling Client Example (MCP)
2+
3+
This example demonstrates how to use the sampling capability of the MCP SDK with an OpenAI-compatible client. It shows how to:
4+
5+
- Connect to an MCP server
6+
- Fetch available tools
7+
- Use OpenAI's API for chat completions
8+
- Call MCP tools from the client
9+
10+
## Prerequisites
11+
12+
- Python 3.13+
13+
- [uv](https://github.com/astral-sh/uv) for dependency management
14+
- An OpenAI API key (set in a `.env` file or as an environment variable)
15+
16+
## Setup
17+
18+
1. Install dependencies:
19+
20+
```sh
21+
cd examples/clients/simple-sampling-client/
22+
uv sync
23+
```
24+
25+
2. Set environment variables in a `.env` file. A sample `.env` file is provided as `.env.example`.
26+
27+
3. Start the MCP server in a separate terminal:
28+
29+
```sh
30+
cd examples/snippets/servers/
31+
uv run server sampling streamable-http
32+
```
33+
34+
4. Run the sampling client in previous terminal:
35+
36+
```sh
37+
uv run mcp-simple-sampling-client
38+
```
39+
40+
## Usage
41+
42+
You will be prompted to enter a message. Type your message and press Enter. The assistant will respond using the sampling capability and may call MCP tools as needed.
43+
44+
Type `exit` or `quit` to stop the client.
45+
46+
---
47+
For more details, see the source code in `mcp_simple_sampling_client/main.py`.
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
"""Simple sampling client for MCP."""
Lines changed: 309 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,309 @@
1+
"""Define a simple MCP client that supports sampling."""
2+
3+
import asyncio
4+
import http
5+
import json
6+
7+
import openai
8+
import pydantic_settings
9+
from openai.types.chat import (
10+
ChatCompletion,
11+
ChatCompletionAssistantMessageParam,
12+
ChatCompletionMessageParam,
13+
ChatCompletionSystemMessageParam,
14+
ChatCompletionToolMessageParam,
15+
ChatCompletionToolParam,
16+
ChatCompletionUserMessageParam,
17+
)
18+
19+
import mcp
20+
from mcp.client.streamable_http import streamablehttp_client
21+
from mcp.shared.context import RequestContext
22+
from mcp.types import CreateMessageRequestParams, CreateMessageResult, ErrorData, TextContent, Tool
23+
24+
25+
class Configurations(pydantic_settings.BaseSettings):
26+
"""Define configurations for the sampling client."""
27+
28+
chat_model: str = "gpt-4o-mini"
29+
max_tokens: int = 1024
30+
mcp_server_host: str = "localhost"
31+
mcp_server_port: int = 8000
32+
openai_api_key: str = "your_openai_api_key"
33+
system_prompt: str = "You are a helpful assistant."
34+
35+
model_config = pydantic_settings.SettingsConfigDict(env_file=".env", env_file_encoding="utf-8")
36+
37+
38+
class SamplingClient:
39+
"""Define a simple MCP client that supports sampling.
40+
41+
Parameters
42+
----------
43+
config : Configurations
44+
The configurations for the sampling client.
45+
"""
46+
47+
def __init__(self: "SamplingClient", config: Configurations) -> None:
48+
self.config = config
49+
50+
self.server_url = f"http://{self.config.mcp_server_host}:{self.config.mcp_server_port}/mcp"
51+
self.openai_client = openai.OpenAI(api_key=self.config.openai_api_key)
52+
53+
self.conversation_history: list[ChatCompletionMessageParam] = []
54+
55+
def get_openai_response(
56+
self: "SamplingClient",
57+
chat_history: list[ChatCompletionMessageParam],
58+
system_prompt: str,
59+
max_tokens: int,
60+
tools: list[ChatCompletionToolParam] | None = None,
61+
) -> ChatCompletion:
62+
"""Get a non-streaming response from OpenAI API.
63+
64+
Parameters
65+
----------
66+
chat_history : list[ChatCompletionMessageParam]
67+
The chat history to use for the chat completion.
68+
system_prompt : str
69+
The system prompt to use for the chat completion.
70+
max_tokens : int
71+
The maximum number of tokens to generate in the response.
72+
tools : list[ChatCompletionToolParam] | None, optional
73+
The tools to use for the chat completion, by default None.
74+
75+
Returns
76+
-------
77+
ChatCompletion
78+
The response from the OpenAI API.
79+
"""
80+
updated_chat_history = [
81+
ChatCompletionSystemMessageParam(content=system_prompt, role="system"),
82+
*chat_history,
83+
]
84+
85+
extra_arguments = {} if tools is None else {"tool_choice": "auto", "tools": tools}
86+
87+
chat_completion = self.openai_client.chat.completions.create(
88+
messages=updated_chat_history,
89+
model=self.config.chat_model,
90+
max_completion_tokens=max_tokens,
91+
n=1,
92+
stream=False,
93+
**extra_arguments,
94+
)
95+
96+
return chat_completion
97+
98+
async def fetch_mcp_tools(self: "SamplingClient") -> list[Tool]:
99+
"""List available tools."""
100+
async with streamablehttp_client(self.server_url) as (read_stream, write_stream, _):
101+
async with mcp.ClientSession(read_stream, write_stream) as session:
102+
await session.initialize()
103+
104+
server_tools = await session.list_tools()
105+
106+
return server_tools.tools
107+
108+
@staticmethod
109+
def convert_to_openai_tools(mcp_tools: list[Tool]) -> list[ChatCompletionToolParam]:
110+
"""Convert MCP tools to OpenAI tool call parameters.
111+
112+
Parameters
113+
----------
114+
mcp_tools : list[Tool]
115+
List of MCP tools to convert.
116+
117+
Returns
118+
-------
119+
list[ChatCompletionToolParam]
120+
List of OpenAI tool call parameters.
121+
"""
122+
return [
123+
ChatCompletionToolParam(
124+
function={
125+
"name": tool.name,
126+
"description": tool.description or "",
127+
"parameters": tool.inputSchema,
128+
},
129+
type="function",
130+
)
131+
for tool in mcp_tools
132+
]
133+
134+
async def sampling_handler(
135+
self: "SamplingClient", context: RequestContext, parameters: CreateMessageRequestParams
136+
) -> CreateMessageResult | ErrorData:
137+
"""Handle sampling requests for OpenAI API calls with MCP tools.
138+
139+
Parameters
140+
----------
141+
context : RequestContext
142+
request context containing information about the sampling request
143+
parameters : CreateMessageRequestParams
144+
parameters for the sampling request, including messages and customisations
145+
146+
Returns
147+
-------
148+
CreateMessageResult | ErrorData
149+
result of the sampling request, either a message result or an error data
150+
"""
151+
del context
152+
153+
openai_response = self.get_openai_response(
154+
[
155+
ChatCompletionUserMessageParam(
156+
content=(
157+
message.content.text
158+
if isinstance(message.content, TextContent)
159+
else str(message.content)
160+
),
161+
role="user",
162+
)
163+
for message in parameters.messages
164+
],
165+
parameters.systemPrompt or self.config.system_prompt,
166+
parameters.maxTokens,
167+
)
168+
169+
if not (choices := openai_response.choices):
170+
return ErrorData(
171+
code=http.HTTPStatus.INTERNAL_SERVER_ERROR,
172+
message="No choices returned from OpenAI API.",
173+
)
174+
175+
choice = choices[0]
176+
sampling_response_message = choice.message.content or ""
177+
178+
return CreateMessageResult(
179+
role="assistant",
180+
content=TextContent(type="text", text=sampling_response_message),
181+
model=self.config.chat_model,
182+
stopReason=choice.finish_reason,
183+
)
184+
185+
async def execute_tool_call(self: "SamplingClient", tool_name: str, arguments: dict) -> str:
186+
"""Execute a tool call on an MCP server.
187+
188+
Parameters
189+
----------
190+
tool_name : str
191+
name of the tool to call, formatted as "mcp-{server_name}-{tool_name}"
192+
arguments : dict
193+
arguments to pass to the tool call
194+
195+
Returns
196+
-------
197+
str
198+
JSON string containing the result of the tool call or an error message
199+
"""
200+
async with streamablehttp_client(self.server_url) as (read_stream, write_stream, _):
201+
async with mcp.ClientSession(
202+
read_stream, write_stream, sampling_callback=self.sampling_handler
203+
) as session:
204+
await session.initialize()
205+
206+
tool_result = await session.call_tool(tool_name, arguments=arguments)
207+
208+
if tool_result.isError:
209+
error_message = "".join(
210+
content.text for content in tool_result.content if isinstance(content, TextContent)
211+
)
212+
213+
return json.dumps(
214+
{
215+
"error": (
216+
f"Failed tool call to {tool_name=} with {arguments=}: {error_message}."
217+
)
218+
}
219+
)
220+
221+
if (structured_result := tool_result.structuredContent) is not None:
222+
return json.dumps(structured_result)
223+
224+
return json.dumps([element.model_dump() for element in tool_result.content])
225+
226+
async def orchestrate(self: "SamplingClient", user_message: str) -> None:
227+
"""Orchestrate the sampling client to handle requests."""
228+
self.conversation_history.append(
229+
ChatCompletionUserMessageParam(role="user", content=user_message)
230+
)
231+
232+
self.mcp_server_tools = await self.fetch_mcp_tools()
233+
self.openai_compatible_tools = self.convert_to_openai_tools(self.mcp_server_tools)
234+
235+
openai_response = self.get_openai_response(
236+
self.conversation_history,
237+
self.config.system_prompt,
238+
self.config.max_tokens,
239+
tools=self.openai_compatible_tools,
240+
)
241+
242+
if not (choices := openai_response.choices):
243+
error_message = "No choices returned from OpenAI API."
244+
self.conversation_history.append(
245+
ChatCompletionAssistantMessageParam(role="assistant", content=error_message)
246+
)
247+
248+
print(error_message)
249+
250+
return
251+
252+
choice = choices[0]
253+
254+
while choice.finish_reason == "tool_calls":
255+
for tool_call in choice.message.tool_calls or []:
256+
if tool_call.type != "function":
257+
continue
258+
259+
tool_response = await self.execute_tool_call(
260+
tool_call.function.name, json.loads(tool_call.function.arguments)
261+
)
262+
263+
self.conversation_history.append(
264+
ChatCompletionAssistantMessageParam(
265+
role="assistant",
266+
content=f"Tool {tool_call.id} returned: {tool_response}",
267+
)
268+
)
269+
270+
openai_response = self.get_openai_response(
271+
self.conversation_history,
272+
self.config.system_prompt,
273+
self.config.max_tokens,
274+
tools=self.openai_compatible_tools,
275+
)
276+
277+
if not (choices := openai_response.choices):
278+
error_message = "No choices returned from OpenAI API."
279+
self.conversation_history.append(
280+
ChatCompletionAssistantMessageParam(role="assistant", content=error_message)
281+
)
282+
283+
print(error_message)
284+
285+
return
286+
287+
choice = choices[0]
288+
289+
assistant_message = choice.message.content or ""
290+
self.conversation_history.append({"role": "assistant", "content": assistant_message})
291+
292+
print(f"Assistant: {choice.message.content}")
293+
294+
295+
def main():
296+
"""Run the sampling client."""
297+
config = Configurations()
298+
299+
sampling_client = SamplingClient(config)
300+
301+
user_message = input("User: ")
302+
while user_message.lower() not in {"exit", "quit"}:
303+
asyncio.run(sampling_client.orchestrate(user_message))
304+
305+
user_message = input("User: ")
306+
307+
308+
if __name__ == "__main__":
309+
main()

0 commit comments

Comments
 (0)