From 41b73fa9b64a257f2bbdc381931575ceb8a40865 Mon Sep 17 00:00:00 2001 From: mc936h Date: Fri, 8 Aug 2025 19:16:19 -0400 Subject: [PATCH] Concurrent MCP discovery + robust schema normalization for Harmony tool injection MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Summary Fetch MCP servers concurrently (no more asyncio.run per URL). Normalize MCP JSON Schemas to Harmony’s variant (handles anyOf/oneOf/allOf/nullable, deep-copies, preserves enums/descriptions). Resilient to partial failures (one bad server doesn’t break the build). Deterministic UTC conversation_start_date. Clear variable naming, logging, light type hints. Moves tool URLs to a single config list and makes it easy to plumb env/CLI later. Why Latency: Parallel discovery reduces startup from sum of server latencies to max of them. Correctness: Tools with oneOf/nullable previously broke or were dropped; now they’re usable. Reliability: Dev flow shouldn’t die because one local tool server is down. Maintainability: Easier to reason about, test, and extend. Key changes New helpers: _normalize_schema, _filter_tools, gather_servers, build_harmony_system_content. Safer schema handling: deep-copy, unionize types, recurse properties/items, naive allOf merge. Logging on exclusions & failures; graceful skip of failing servers. Avoids variable shadowing (no reusing system_message). Uses UTC date via datetime.now(timezone.utc). Before / After Before Sequential SSE calls; one failure aborted the run. anyOf only; oneOf/nullable unsupported; in-place mutation. Local time in system message; unclear printed output var. After Concurrency via asyncio.gather. Broader schema support; deep-copy; preserves enum/description/format. UTC date; explicit rendered_system_text output. Tests Unit (new): _normalize_schema with: anyOf: [{"type":"string"},{"type":"null"}] → type: ["string"] oneOf types → flattened type list allOf simple merges nullable: true with type: "object" → type: ["object"] properties/items recursion _filter_tools respects annotations.include_in_prompt Manual (local): Start browser and python MCP servers at :8001 / :8000. Run script twice: once with both up; once with one stopped — ensure it still prints a valid system message. Introduce a tool with oneOf and enum; verify it appears in rendered output. Rollout / Risk Low: purely affects how we construct the system message; no runtime call sites changed. If an MCP server returns highly nested/complex allOf, our naive merge could miss edge cases. Mitigation: we log a warning and still include the raw fields; follow-up below. Follow-ups (separate PRs) Env/CLI: read server URLs (MCP_SSE_URLS) and REASONING_EFFORT. Add JSON Schema-based validation against a “Harmony schema” for tool parameters. Metrics (count tools per namespace; excluded count; build latency). Golden-file test for rendered system text. Config / Docs Config: central TOOL_SERVER_URLS list; documented at top of file. Docs: Added module docstring explaining the pipeline and limitations of allOf merge. Reviewer checklist Concurrency is correct (no blocking calls; one asyncio.run only). Schema normalization doesn’t drop meaningful fields (see tests). Partial-failure path still yields a usable SystemContent. Naming/readability and logs are clear. --- gpt-oss-mcp-server/build-system-prompt.py | 232 ++++++++++++---------- 1 file changed, 132 insertions(+), 100 deletions(-) diff --git a/gpt-oss-mcp-server/build-system-prompt.py b/gpt-oss-mcp-server/build-system-prompt.py index 58e953a..0abac40 100644 --- a/gpt-oss-mcp-server/build-system-prompt.py +++ b/gpt-oss-mcp-server/build-system-prompt.py @@ -1,115 +1,147 @@ -import datetime import asyncio +import copy +import datetime as dt +import logging +from typing import Any, Dict, List, Optional, Tuple from gpt_oss.tokenizer import tokenizer - from openai_harmony import ( - Conversation, - DeveloperContent, - HarmonyEncodingName, - Message, - ReasoningEffort, - Role, - SystemContent, - ToolNamespaceConfig, - ToolDescription, + Conversation, DeveloperContent, HarmonyEncodingName, Message, + ReasoningEffort, Role, SystemContent, ToolNamespaceConfig, ToolDescription, load_harmony_encoding, ) - from mcp import ClientSession from mcp.client.sse import sse_client from mcp.types import ListToolsResult +log = logging.getLogger(__name__) +logging.basicConfig(level=logging.INFO) -async def list_server_and_tools(server_url: str): - async with sse_client(url=server_url) as streams, ClientSession( - *streams) as session: - initialize_response = await session.initialize() - list_tools_response = await session.list_tools() - return initialize_response, list_tools_response - - -def trim_schema(schema: dict) -> dict: - # Turn JSON Schema from MCP generated into Harmony's variant. - if "title" in schema: - del schema["title"] - if "default" in schema and schema["default"] is None: - del schema["default"] - if "anyOf" in schema: - # Turn "anyOf": [{"type": "type-1"}, {"type": "type-2"}] into "type": ["type-1", "type-2"] - # if there's more than 1 types, also remove "null" type as Harmony will just ignore it - types = [ - type_dict["type"] for type_dict in schema["anyOf"] - if type_dict["type"] != 'null' - ] - schema["type"] = types - del schema["anyOf"] - if "properties" in schema: - schema["properties"] = { - k: trim_schema(v) - for k, v in schema["properties"].items() - } - return schema - - -def post_process_tools_description( - list_tools_result: ListToolsResult) -> ListToolsResult: - # Adapt the MCP tool result for Harmony - for tool in list_tools_result.tools: - tool.inputSchema = trim_schema(tool.inputSchema) - - # Some tools schema don't need to be part of the prompt (e.g. simple text in text out for Python) - list_tools_result.tools = [ - tool for tool in list_tools_result.tools - if getattr(tool.annotations, "include_in_prompt", True) - ] - - return list_tools_result - - -tools_urls = [ +TOOL_SERVER_URLS = [ "http://localhost:8001/sse", # browser "http://localhost:8000/sse", # python ] -harmony_tool_descriptions = [] -for tools_url in tools_urls: - - initialize_response, list_tools_response = asyncio.run( - list_server_and_tools(tools_url)) - - list_tools_response = post_process_tools_description(list_tools_response) - - tool_from_mcp = ToolNamespaceConfig( - name=initialize_response.serverInfo.name, - description=initialize_response.instructions, - tools=[ - ToolDescription.new(name=tool.name, - description=tool.description, - parameters=tool.inputSchema) - for tool in list_tools_response.tools - ]) - harmony_tool_descriptions.append(tool_from_mcp) - -encoding = load_harmony_encoding(HarmonyEncodingName.HARMONY_GPT_OSS) - -system_message_content = (SystemContent.new().with_reasoning_effort( - ReasoningEffort.LOW).with_conversation_start_date( - datetime.datetime.now().strftime("%Y-%m-%d"))) - -for tool_description in harmony_tool_descriptions: - system_message_content = system_message_content.with_tools( - tool_description) - -system_message = Message.from_role_and_content(Role.SYSTEM, - system_message_content) - -developer_message_content = DeveloperContent.new().with_instructions("") -developer_message = Message.from_role_and_content(Role.DEVELOPER, - developer_message_content) - -messages = [system_message, developer_message] -conversation = Conversation.from_messages(messages) -tokens = encoding.render_conversation(conversation) -system_message = tokenizer.decode(tokens) -print(system_message) +def _strip_none_default(d: Dict[str, Any]) -> None: + if "default" in d and d["default"] is None: + d.pop("default", None) + +def _flatten_type_list(types: List[str]) -> List[str]: + # remove duplicates and "null" (Harmony ignores it) + return sorted({t for t in types if t != "null"}) + +def _normalize_schema(schema: Dict[str, Any]) -> Dict[str, Any]: + """Return a deep-copied Harmony-friendly variant of a JSON Schema.""" + s = copy.deepcopy(schema) + s.pop("title", None) + _strip_none_default(s) + + # Handle nullable (OpenAPI) → type list + if s.get("nullable") is True: + t = s.get("type") + if isinstance(t, str): + s["type"] = _flatten_type_list([t, "null"]) + elif isinstance(t, list): + s["type"] = _flatten_type_list(t + ["null"]) + s.pop("nullable", None) + + # anyOf/oneOf → type union when they’re simple type unions + for key in ("anyOf", "oneOf"): + if key in s: + variants = s[key] + if all(isinstance(v, dict) and "type" in v for v in variants): + s["type"] = _flatten_type_list([v["type"] for v in variants]) + s.pop(key, None) + + # allOf – naive merge for common simple cases + if "allOf" in s: + merged: Dict[str, Any] = {} + for part in s.pop("allOf"): + merged.update(part) + # Recurse on merged piece (avoid infinite loop) + s = _normalize_schema({**s, **merged}) + + # Recurse into properties/items + if "properties" in s and isinstance(s["properties"], dict): + s["properties"] = {k: _normalize_schema(v) for k, v in s["properties"].items()} + + if "items" in s and isinstance(s["items"], dict): + s["items"] = _normalize_schema(s["items"]) + + # Keep description/enum/const/format if present; Harmony tolerates these + return s + +def _filter_tools(list_tools: ListToolsResult) -> ListToolsResult: + # Guard annotations (MCP servers differ) + kept = [] + for t in list_tools.tools: + include = True + ann = getattr(t, "annotations", None) + if ann is not None: + include = getattr(ann, "include_in_prompt", True) + if include: + kept.append(t) + else: + log.info("Excluding tool from prompt: %s", getattr(t, "name", "")) + list_tools.tools = kept + return list_tools + +async def fetch_server(tools_url: str) -> Optional[Tuple[Any, ListToolsResult]]: + try: + async with sse_client(url=tools_url, timeout=10) as streams, ClientSession(*streams) as session: + init = await session.initialize() + tools: ListToolsResult = await session.list_tools() + return init, tools + except Exception as e: + log.warning("Failed to fetch tools from %s: %s", tools_url, e) + return None + +async def gather_servers(urls: List[str]) -> List[Tuple[Any, ListToolsResult]]: + results = await asyncio.gather(*(fetch_server(u) for u in urls)) + return [r for r in results if r is not None] + +def build_harmony_system_content( + server_results: List[Tuple[Any, ListToolsResult]], + conversation_start_date: str, + reasoning_effort: ReasoningEffort = ReasoningEffort.LOW, +) -> SystemContent: + sc = SystemContent.new().with_reasoning_effort(reasoning_effort).with_conversation_start_date(conversation_start_date) + for init, tools_result in server_results: + tools_result = _filter_tools(tools_result) + namespace = ToolNamespaceConfig( + name=init.serverInfo.name, + description=init.instructions, + tools=[ + ToolDescription.new( + name=t.name, + description=t.description, + parameters=_normalize_schema(t.inputSchema), + ) + for t in tools_result.tools + ], + ) + sc = sc.with_tools(namespace) + return sc + +def main(tool_urls: List[str]) -> str: + # Fetch tools concurrently + server_results = asyncio.run(gather_servers(tool_urls)) + if not server_results: + raise RuntimeError("No tool servers available; cannot build system message.") + + encoding = load_harmony_encoding(HarmonyEncodingName.HARMONY_GPT_OSS) + start_date = dt.datetime.now(dt.timezone.utc).strftime("%Y-%m-%d") + + system_content = build_harmony_system_content(server_results, start_date) + system_msg = Message.from_role_and_content(Role.SYSTEM, system_content) + + dev_msg = Message.from_role_and_content(Role.DEVELOPER, DeveloperContent.new().with_instructions("")) + + convo = Conversation.from_messages([system_msg, dev_msg]) + token_ids = encoding.render_conversation(convo) + + rendered_system_text = tokenizer.decode(token_ids) + return rendered_system_text + +if __name__ == "__main__": + print(main(TOOL_SERVER_URLS))