Skip to content

Concurrent MCP discovery + robust schema normalization for Harmony tool injection #101

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
232 changes: 132 additions & 100 deletions gpt-oss-mcp-server/build-system-prompt.py
Original file line number Diff line number Diff line change
@@ -1,115 +1,147 @@
import datetime
import asyncio
import copy
import datetime as dt
import logging
from typing import Any, Dict, List, Optional, Tuple

from gpt_oss.tokenizer import tokenizer

from openai_harmony import (
Conversation,
DeveloperContent,
HarmonyEncodingName,
Message,
ReasoningEffort,
Role,
SystemContent,
ToolNamespaceConfig,
ToolDescription,
Conversation, DeveloperContent, HarmonyEncodingName, Message,
ReasoningEffort, Role, SystemContent, ToolNamespaceConfig, ToolDescription,
load_harmony_encoding,
)

from mcp import ClientSession
from mcp.client.sse import sse_client
from mcp.types import ListToolsResult

log = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)

async def list_server_and_tools(server_url: str):
async with sse_client(url=server_url) as streams, ClientSession(
*streams) as session:
initialize_response = await session.initialize()
list_tools_response = await session.list_tools()
return initialize_response, list_tools_response


def trim_schema(schema: dict) -> dict:
# Turn JSON Schema from MCP generated into Harmony's variant.
if "title" in schema:
del schema["title"]
if "default" in schema and schema["default"] is None:
del schema["default"]
if "anyOf" in schema:
# Turn "anyOf": [{"type": "type-1"}, {"type": "type-2"}] into "type": ["type-1", "type-2"]
# if there's more than 1 types, also remove "null" type as Harmony will just ignore it
types = [
type_dict["type"] for type_dict in schema["anyOf"]
if type_dict["type"] != 'null'
]
schema["type"] = types
del schema["anyOf"]
if "properties" in schema:
schema["properties"] = {
k: trim_schema(v)
for k, v in schema["properties"].items()
}
return schema


def post_process_tools_description(
list_tools_result: ListToolsResult) -> ListToolsResult:
# Adapt the MCP tool result for Harmony
for tool in list_tools_result.tools:
tool.inputSchema = trim_schema(tool.inputSchema)

# Some tools schema don't need to be part of the prompt (e.g. simple text in text out for Python)
list_tools_result.tools = [
tool for tool in list_tools_result.tools
if getattr(tool.annotations, "include_in_prompt", True)
]

return list_tools_result


tools_urls = [
TOOL_SERVER_URLS = [
"http://localhost:8001/sse", # browser
"http://localhost:8000/sse", # python
]
harmony_tool_descriptions = []
for tools_url in tools_urls:

initialize_response, list_tools_response = asyncio.run(
list_server_and_tools(tools_url))

list_tools_response = post_process_tools_description(list_tools_response)

tool_from_mcp = ToolNamespaceConfig(
name=initialize_response.serverInfo.name,
description=initialize_response.instructions,
tools=[
ToolDescription.new(name=tool.name,
description=tool.description,
parameters=tool.inputSchema)
for tool in list_tools_response.tools
])
harmony_tool_descriptions.append(tool_from_mcp)

encoding = load_harmony_encoding(HarmonyEncodingName.HARMONY_GPT_OSS)

system_message_content = (SystemContent.new().with_reasoning_effort(
ReasoningEffort.LOW).with_conversation_start_date(
datetime.datetime.now().strftime("%Y-%m-%d")))

for tool_description in harmony_tool_descriptions:
system_message_content = system_message_content.with_tools(
tool_description)

system_message = Message.from_role_and_content(Role.SYSTEM,
system_message_content)

developer_message_content = DeveloperContent.new().with_instructions("")
developer_message = Message.from_role_and_content(Role.DEVELOPER,
developer_message_content)

messages = [system_message, developer_message]

conversation = Conversation.from_messages(messages)
tokens = encoding.render_conversation(conversation)
system_message = tokenizer.decode(tokens)
print(system_message)
def _strip_none_default(d: Dict[str, Any]) -> None:
if "default" in d and d["default"] is None:
d.pop("default", None)

def _flatten_type_list(types: List[str]) -> List[str]:
# remove duplicates and "null" (Harmony ignores it)
return sorted({t for t in types if t != "null"})

def _normalize_schema(schema: Dict[str, Any]) -> Dict[str, Any]:
"""Return a deep-copied Harmony-friendly variant of a JSON Schema."""
s = copy.deepcopy(schema)
s.pop("title", None)
_strip_none_default(s)

# Handle nullable (OpenAPI) → type list
if s.get("nullable") is True:
t = s.get("type")
if isinstance(t, str):
s["type"] = _flatten_type_list([t, "null"])
elif isinstance(t, list):
s["type"] = _flatten_type_list(t + ["null"])
s.pop("nullable", None)

# anyOf/oneOf → type union when they’re simple type unions
for key in ("anyOf", "oneOf"):
if key in s:
variants = s[key]
if all(isinstance(v, dict) and "type" in v for v in variants):
s["type"] = _flatten_type_list([v["type"] for v in variants])
s.pop(key, None)

# allOf – naive merge for common simple cases
if "allOf" in s:
merged: Dict[str, Any] = {}
for part in s.pop("allOf"):
merged.update(part)
# Recurse on merged piece (avoid infinite loop)
s = _normalize_schema({**s, **merged})

# Recurse into properties/items
if "properties" in s and isinstance(s["properties"], dict):
s["properties"] = {k: _normalize_schema(v) for k, v in s["properties"].items()}

if "items" in s and isinstance(s["items"], dict):
s["items"] = _normalize_schema(s["items"])

# Keep description/enum/const/format if present; Harmony tolerates these
return s

def _filter_tools(list_tools: ListToolsResult) -> ListToolsResult:
# Guard annotations (MCP servers differ)
kept = []
for t in list_tools.tools:
include = True
ann = getattr(t, "annotations", None)
if ann is not None:
include = getattr(ann, "include_in_prompt", True)
if include:
kept.append(t)
else:
log.info("Excluding tool from prompt: %s", getattr(t, "name", "<unnamed>"))
list_tools.tools = kept
return list_tools

async def fetch_server(tools_url: str) -> Optional[Tuple[Any, ListToolsResult]]:
try:
async with sse_client(url=tools_url, timeout=10) as streams, ClientSession(*streams) as session:
init = await session.initialize()
tools: ListToolsResult = await session.list_tools()
return init, tools
except Exception as e:
log.warning("Failed to fetch tools from %s: %s", tools_url, e)
return None

async def gather_servers(urls: List[str]) -> List[Tuple[Any, ListToolsResult]]:
results = await asyncio.gather(*(fetch_server(u) for u in urls))
return [r for r in results if r is not None]

def build_harmony_system_content(
server_results: List[Tuple[Any, ListToolsResult]],
conversation_start_date: str,
reasoning_effort: ReasoningEffort = ReasoningEffort.LOW,
) -> SystemContent:
sc = SystemContent.new().with_reasoning_effort(reasoning_effort).with_conversation_start_date(conversation_start_date)
for init, tools_result in server_results:
tools_result = _filter_tools(tools_result)
namespace = ToolNamespaceConfig(
name=init.serverInfo.name,
description=init.instructions,
tools=[
ToolDescription.new(
name=t.name,
description=t.description,
parameters=_normalize_schema(t.inputSchema),
)
for t in tools_result.tools
],
)
sc = sc.with_tools(namespace)
return sc

def main(tool_urls: List[str]) -> str:
# Fetch tools concurrently
server_results = asyncio.run(gather_servers(tool_urls))
if not server_results:
raise RuntimeError("No tool servers available; cannot build system message.")

encoding = load_harmony_encoding(HarmonyEncodingName.HARMONY_GPT_OSS)
start_date = dt.datetime.now(dt.timezone.utc).strftime("%Y-%m-%d")

system_content = build_harmony_system_content(server_results, start_date)
system_msg = Message.from_role_and_content(Role.SYSTEM, system_content)

dev_msg = Message.from_role_and_content(Role.DEVELOPER, DeveloperContent.new().with_instructions(""))

convo = Conversation.from_messages([system_msg, dev_msg])
token_ids = encoding.render_conversation(convo)

rendered_system_text = tokenizer.decode(token_ids)
return rendered_system_text

if __name__ == "__main__":
print(main(TOOL_SERVER_URLS))