-
Notifications
You must be signed in to change notification settings - Fork 72
Expand file tree
/
Copy pathmcp.py
More file actions
76 lines (57 loc) · 2.44 KB
/
mcp.py
File metadata and controls
76 lines (57 loc) · 2.44 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
"""Utility functions for MCP in ACP implementation."""
from collections.abc import Sequence
from typing import Any
from acp.schema import (
HttpMcpServer,
McpServerStdio,
SseMcpServer,
)
ACPMCPServerType = McpServerStdio | HttpMcpServer | SseMcpServer
def _convert_env_to_dict(env: Sequence[dict[str, str]]) -> dict[str, str]:
"""
Convert environment variables from serialized EnvVariable format to a dictionary.
When Pydantic models are dumped to dict, EnvVariable objects become dicts
with 'name' and 'value' keys.
Args:
env: List of dicts with 'name' and 'value' keys (serialized EnvVariable objects)
Returns:
Dictionary mapping environment variable names to values
"""
env_dict: dict[str, str] = {}
for env_var in env:
env_dict[env_var["name"]] = env_var["value"]
return env_dict
def convert_acp_mcp_servers_to_agent_format(
mcp_servers: Sequence[ACPMCPServerType],
) -> dict[str, dict[str, Any]]:
"""
Convert MCP servers from ACP format to Agent format.
ACP and Agent use different formats for MCP server configurations:
- ACP: List of Pydantic server models with 'name' field, env as array of EnvVariable
- Agent: Dict keyed by server name, env as dict, with transport field
Args:
mcp_servers: List of MCP server Pydantic models from ACP
Returns:
Dictionary of MCP servers in Agent format (keyed by name)
"""
converted_servers: dict[str, dict[str, Any]] = {}
for server in mcp_servers:
server_dict = server.model_dump()
server_name: str = server_dict["name"]
server_config: dict[str, Any] = {
k: v for k, v in server_dict.items() if k != "name"
}
# Convert env from array to dict format if present
# ACP sends env as array of EnvVariable objects, but Agent expects dict
if "env" in server_config:
server_config["env"] = _convert_env_to_dict(server_config["env"])
# Add transport type based on server class
# StdioMcpServer -> stdio, HttpMcpServer -> http, SseMcpServer -> sse
if isinstance(server, McpServerStdio):
server_config["transport"] = "stdio"
elif isinstance(server, HttpMcpServer):
server_config["transport"] = "http"
elif isinstance(server, SseMcpServer):
server_config["transport"] = "sse"
converted_servers[server_name] = server_config
return converted_servers