Skip to content

Commit 4e14428

Browse files
committed
simple mcp client with sampling capability
1 parent 61399b3 commit 4e14428

File tree

8 files changed

+846
-0
lines changed

8 files changed

+846
-0
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
OPENAI_API_KEY=YOUR_OPENAI_API_KEY
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
.env
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
3.13
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
# Simple Sampling Client Example (MCP)
2+
3+
This example demonstrates how to use the sampling capability of the MCP SDK with an OpenAI-compatible client. It shows how to:
4+
5+
- Connect to an MCP server
6+
- Fetch available tools
7+
- Use OpenAI's API for chat completions
8+
- Call MCP tools from the client
9+
10+
## Prerequisites
11+
12+
- Python 3.13+
13+
- [uv](https://github.com/astral-sh/uv) for dependency management
14+
- An OpenAI API key (set in a `.env` file or as an environment variable)
15+
16+
## Setup
17+
18+
1. Install dependencies:
19+
20+
```sh
21+
cd examples/clients/simple-sampling-client/
22+
uv sync
23+
```
24+
25+
2. Set environment variables in a `.env` file. A sample `.env` file is provided as `.env.example`.
26+
27+
3. Start the MCP server in a separate terminal:
28+
29+
```sh
30+
cd examples/snippets/servers/
31+
uv run server sampling streamable-http
32+
```
33+
34+
4. Run the sampling client in previous terminal:
35+
36+
```sh
37+
uv run mcp-simple-sampling-client
38+
```
39+
40+
## Usage
41+
42+
You will be prompted to enter a message. Type your message and press Enter. The assistant will respond using the sampling capability and may call MCP tools as needed.
43+
44+
Type `exit` or `quit` to stop the client.
45+
46+
## Code Overview
47+
48+
For more details, see the source code in `mcp_simple_sampling_client/main.py`.
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
"""Simple sampling client for MCP."""
Lines changed: 294 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,294 @@
1+
"""Define a simple MCP client that supports sampling."""
2+
3+
import asyncio
4+
import http
5+
import json
6+
7+
import openai
8+
import pydantic_settings
9+
from openai.types.chat import (
10+
ChatCompletion,
11+
ChatCompletionAssistantMessageParam,
12+
ChatCompletionMessageParam,
13+
ChatCompletionSystemMessageParam,
14+
ChatCompletionToolParam,
15+
ChatCompletionUserMessageParam,
16+
)
17+
18+
import mcp
19+
from mcp.client.streamable_http import streamablehttp_client
20+
from mcp.shared.context import RequestContext
21+
from mcp.types import CreateMessageRequestParams, CreateMessageResult, ErrorData, TextContent, Tool
22+
23+
24+
class Configurations(pydantic_settings.BaseSettings):
25+
"""Define configurations for the sampling client."""
26+
27+
chat_model: str = "gpt-4o-mini"
28+
max_tokens: int = 1024
29+
mcp_server_host: str = "localhost"
30+
mcp_server_port: int = 8000
31+
openai_api_key: str = "your_openai_api_key"
32+
system_prompt: str = "You are a helpful assistant."
33+
34+
model_config = pydantic_settings.SettingsConfigDict(env_file=".env", env_file_encoding="utf-8")
35+
36+
37+
class SamplingClient:
38+
"""Define a simple MCP client that supports sampling.
39+
40+
Parameters
41+
----------
42+
config : Configurations
43+
The configurations for the sampling client.
44+
"""
45+
46+
def __init__(self: "SamplingClient", config: Configurations) -> None:
47+
self.config = config
48+
49+
self.server_url = f"http://{self.config.mcp_server_host}:{self.config.mcp_server_port}/mcp"
50+
self.openai_client = openai.OpenAI(api_key=self.config.openai_api_key)
51+
52+
self.conversation_history: list[ChatCompletionMessageParam] = []
53+
54+
def get_openai_response(
55+
self: "SamplingClient",
56+
chat_history: list[ChatCompletionMessageParam],
57+
system_prompt: str,
58+
max_tokens: int,
59+
tools: list[ChatCompletionToolParam] | None = None,
60+
) -> ChatCompletion:
61+
"""Get a non-streaming response from OpenAI API.
62+
63+
Parameters
64+
----------
65+
chat_history : list[ChatCompletionMessageParam]
66+
The chat history to use for the chat completion.
67+
system_prompt : str
68+
The system prompt to use for the chat completion.
69+
max_tokens : int
70+
The maximum number of tokens to generate in the response.
71+
tools : list[ChatCompletionToolParam] | None, optional
72+
The tools to use for the chat completion, by default None.
73+
74+
Returns
75+
-------
76+
ChatCompletion
77+
The response from the OpenAI API.
78+
"""
79+
updated_chat_history = [
80+
ChatCompletionSystemMessageParam(content=system_prompt, role="system"),
81+
*chat_history,
82+
]
83+
84+
extra_arguments = {} if tools is None else {"tool_choice": "auto", "tools": tools}
85+
86+
chat_completion = self.openai_client.chat.completions.create(
87+
messages=updated_chat_history,
88+
model=self.config.chat_model,
89+
max_completion_tokens=max_tokens,
90+
n=1,
91+
stream=False,
92+
**extra_arguments,
93+
)
94+
95+
return chat_completion
96+
97+
async def fetch_mcp_tools(self: "SamplingClient") -> list[Tool]:
98+
"""List available tools."""
99+
async with streamablehttp_client(self.server_url) as (read_stream, write_stream, _):
100+
async with mcp.ClientSession(read_stream, write_stream) as session:
101+
await session.initialize()
102+
103+
server_tools = await session.list_tools()
104+
105+
return server_tools.tools
106+
107+
@staticmethod
108+
def convert_to_openai_tools(mcp_tools: list[Tool]) -> list[ChatCompletionToolParam]:
109+
"""Convert MCP tools to OpenAI tool call parameters.
110+
111+
Parameters
112+
----------
113+
mcp_tools : list[Tool]
114+
List of MCP tools to convert.
115+
116+
Returns
117+
-------
118+
list[ChatCompletionToolParam]
119+
List of OpenAI tool call parameters.
120+
"""
121+
return [
122+
ChatCompletionToolParam(
123+
function={
124+
"name": tool.name,
125+
"description": tool.description or "",
126+
"parameters": tool.inputSchema,
127+
},
128+
type="function",
129+
)
130+
for tool in mcp_tools
131+
]
132+
133+
async def sampling_handler(
134+
self: "SamplingClient", context: RequestContext, parameters: CreateMessageRequestParams
135+
) -> CreateMessageResult | ErrorData:
136+
"""Handle sampling requests for OpenAI API calls with MCP tools.
137+
138+
Parameters
139+
----------
140+
context : RequestContext
141+
request context containing information about the sampling request
142+
parameters : CreateMessageRequestParams
143+
parameters for the sampling request, including messages and customisations
144+
145+
Returns
146+
-------
147+
CreateMessageResult | ErrorData
148+
result of the sampling request, either a message result or an error data
149+
"""
150+
del context
151+
152+
openai_response = self.get_openai_response(
153+
[
154+
ChatCompletionUserMessageParam(
155+
content=(
156+
message.content.text if isinstance(message.content, TextContent) else str(message.content)
157+
),
158+
role="user",
159+
)
160+
for message in parameters.messages
161+
],
162+
parameters.systemPrompt or self.config.system_prompt,
163+
parameters.maxTokens,
164+
)
165+
166+
if not (choices := openai_response.choices):
167+
return ErrorData(
168+
code=http.HTTPStatus.INTERNAL_SERVER_ERROR,
169+
message="No choices returned from OpenAI API.",
170+
)
171+
172+
choice = choices[0]
173+
sampling_response_message = choice.message.content or ""
174+
175+
return CreateMessageResult(
176+
role="assistant",
177+
content=TextContent(type="text", text=sampling_response_message),
178+
model=self.config.chat_model,
179+
stopReason=choice.finish_reason,
180+
)
181+
182+
async def execute_tool_call(self: "SamplingClient", tool_name: str, arguments: dict) -> str:
183+
"""Execute a tool call on an MCP server.
184+
185+
Parameters
186+
----------
187+
tool_name : str
188+
name of the tool to call, formatted as "mcp-{server_name}-{tool_name}"
189+
arguments : dict
190+
arguments to pass to the tool call
191+
192+
Returns
193+
-------
194+
str
195+
JSON string containing the result of the tool call or an error message
196+
"""
197+
async with streamablehttp_client(self.server_url) as (read_stream, write_stream, _):
198+
async with mcp.ClientSession(read_stream, write_stream, sampling_callback=self.sampling_handler) as session:
199+
await session.initialize()
200+
201+
tool_result = await session.call_tool(tool_name, arguments=arguments)
202+
203+
if tool_result.isError:
204+
error_message = "".join(content.text for content in tool_result.content if isinstance(content, TextContent))
205+
206+
return json.dumps({"error": (f"Failed tool call to {tool_name=} with {arguments=}: {error_message}.")})
207+
208+
if (structured_result := tool_result.structuredContent) is not None:
209+
return json.dumps(structured_result)
210+
211+
return json.dumps([element.model_dump() for element in tool_result.content])
212+
213+
async def orchestrate(self: "SamplingClient", user_message: str) -> None:
214+
"""Orchestrate the sampling client to handle requests."""
215+
self.conversation_history.append(ChatCompletionUserMessageParam(role="user", content=user_message))
216+
217+
self.mcp_server_tools = await self.fetch_mcp_tools()
218+
self.openai_compatible_tools = self.convert_to_openai_tools(self.mcp_server_tools)
219+
220+
openai_response = self.get_openai_response(
221+
self.conversation_history,
222+
self.config.system_prompt,
223+
self.config.max_tokens,
224+
tools=self.openai_compatible_tools,
225+
)
226+
227+
if not (choices := openai_response.choices):
228+
error_message = "No choices returned from OpenAI API."
229+
self.conversation_history.append(
230+
ChatCompletionAssistantMessageParam(role="assistant", content=error_message)
231+
)
232+
233+
print(error_message)
234+
235+
return
236+
237+
choice = choices[0]
238+
239+
while choice.finish_reason == "tool_calls":
240+
for tool_call in choice.message.tool_calls or []:
241+
if tool_call.type != "function":
242+
continue
243+
244+
tool_response = await self.execute_tool_call(
245+
tool_call.function.name, json.loads(tool_call.function.arguments)
246+
)
247+
248+
self.conversation_history.append(
249+
ChatCompletionAssistantMessageParam(
250+
role="assistant",
251+
content=f"Tool {tool_call.id} returned: {tool_response}",
252+
)
253+
)
254+
255+
openai_response = self.get_openai_response(
256+
self.conversation_history,
257+
self.config.system_prompt,
258+
self.config.max_tokens,
259+
tools=self.openai_compatible_tools,
260+
)
261+
262+
if not (choices := openai_response.choices):
263+
error_message = "No choices returned from OpenAI API."
264+
self.conversation_history.append(
265+
ChatCompletionAssistantMessageParam(role="assistant", content=error_message)
266+
)
267+
268+
print(error_message)
269+
270+
return
271+
272+
choice = choices[0]
273+
274+
assistant_message = choice.message.content or ""
275+
self.conversation_history.append({"role": "assistant", "content": assistant_message})
276+
277+
print(f"Assistant: {choice.message.content}")
278+
279+
280+
def main():
281+
"""Run the sampling client."""
282+
config = Configurations()
283+
284+
sampling_client = SamplingClient(config)
285+
286+
user_message = input("User: ")
287+
while user_message.lower() not in {"exit", "quit"}:
288+
asyncio.run(sampling_client.orchestrate(user_message))
289+
290+
user_message = input("User: ")
291+
292+
293+
if __name__ == "__main__":
294+
main()

0 commit comments

Comments
 (0)