Skip to content
This repository was archived by the owner on Mar 16, 2026. It is now read-only.

Commit 50a8a62

Browse files
committed
MCP and A2A protocol integration (v1.10.5)
- MCP client (stdio/HTTP/SSE), MCPToolAdapter, discovery and config - A2A client, server, A2AAgentTool, discovery and config - CLI: hivemind mcp list|test|add, hivemind a2a serve|discover|call - Doctor checks for MCP servers and A2A agents - Protocol version constants in hivemind/protocols/ - Optional deps: mcp, a2a in pyproject.toml - Tests: tests/test_protocols.py Made-with: Cursor
1 parent 3562fd4 commit 50a8a62

File tree

23 files changed

+1326
-3
lines changed

23 files changed

+1326
-3
lines changed

CHANGELOG.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,24 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## [Unreleased]
99

10+
## [1.10.5] - 2026-03-10
11+
12+
### Added
13+
14+
- MCP (Model Context Protocol) client supporting stdio, HTTP, and SSE transports
15+
- MCPToolAdapter: MCP server tools appear in hivemind tool registry automatically
16+
- MCP server config via `[[mcp.servers]]` in hivemind.toml
17+
- A2A (Agent-to-Agent) client for calling external A2A-compliant agents as tools
18+
- A2AServer: exposes hivemind as an A2A endpoint with AgentCard and task endpoints
19+
- A2AAgentTool: external A2A agent skills registered as local tools
20+
- `hivemind mcp list|test|add` CLI commands
21+
- `hivemind a2a serve|discover|call` CLI commands
22+
- Protocol version constants in `hivemind/protocols/`
23+
24+
### Changed
25+
26+
- Tool category "mcp" and "a2a" added to scoring and selection pipeline
27+
1028
## [1.10.0] - 2026-03-10
1129

1230
### Added

hivemind/agents/a2a/__init__.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
"""
2+
A2A (Agent-to-Agent) integration: client, server, tool adapter.
3+
"""
4+
5+
from hivemind.agents.a2a.types import (
6+
AgentCard,
7+
AgentSkill,
8+
A2ATaskRequest,
9+
A2ATaskResponse,
10+
)
11+
from hivemind.agents.a2a.client import A2AClient
12+
from hivemind.agents.a2a.server import create_a2a_app, run_a2a_server
13+
from hivemind.agents.a2a.tool_adapter import A2AAgentTool
14+
15+
# Alias for task spec: "A2AServer" = run_a2a_server / create_a2a_app
16+
A2AServer = create_a2a_app
17+
18+
__all__ = [
19+
"A2AClient",
20+
"A2AServer",
21+
"AgentCard",
22+
"A2ATaskRequest",
23+
"A2ATaskResponse",
24+
"A2AAgentTool",
25+
"create_a2a_app",
26+
"run_a2a_server",
27+
]

hivemind/agents/a2a/client.py

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
"""
2+
A2A client: call external A2A-compliant agents (get AgentCard, send_task, stream_task).
3+
"""
4+
5+
import uuid
6+
from typing import AsyncIterator
7+
8+
from hivemind.agents.a2a.types import (
9+
AgentCard,
10+
AgentSkill,
11+
A2ATaskRequest,
12+
A2ATaskResponse,
13+
)
14+
15+
16+
def _parse_agent_card(data: dict) -> AgentCard:
17+
"""Build AgentCard from JSON response."""
18+
skills_data = data.get("skills") or []
19+
skills = [
20+
AgentSkill(
21+
id=s.get("id", ""),
22+
name=s.get("name", ""),
23+
description=s.get("description", ""),
24+
input_modes=list(s.get("inputModes") or []),
25+
output_modes=list(s.get("outputModes") or []),
26+
)
27+
for s in skills_data
28+
if isinstance(s, dict)
29+
]
30+
return AgentCard(
31+
name=data.get("name", ""),
32+
description=data.get("description", ""),
33+
url=data.get("url", ""),
34+
version=data.get("version", ""),
35+
capabilities=list(data.get("capabilities") or []),
36+
skills=skills,
37+
authentication=data.get("authentication"),
38+
)
39+
40+
41+
def _parse_task_response(data: dict) -> A2ATaskResponse:
42+
"""Build A2ATaskResponse from JSON."""
43+
status = data.get("status", "failed")
44+
if status not in ("submitted", "working", "completed", "failed", "canceled"):
45+
status = "failed"
46+
return A2ATaskResponse(
47+
id=data.get("id", ""),
48+
status=status,
49+
result=data.get("result"),
50+
artifacts=data.get("artifacts") or [],
51+
)
52+
53+
54+
class A2AClient:
55+
"""Calls external A2A-compliant agents as if they were local tools."""
56+
57+
def __init__(self, timeout_seconds: float = 60.0) -> None:
58+
self.timeout_seconds = timeout_seconds
59+
60+
def _client(self):
61+
import httpx
62+
return httpx.AsyncClient(timeout=self.timeout_seconds)
63+
64+
async def get_agent_card(self, base_url: str) -> AgentCard:
65+
"""GET {base_url}/.well-known/agent.json"""
66+
url = base_url.rstrip("/") + "/.well-known/agent.json"
67+
async with self._client() as client:
68+
r = await client.get(url)
69+
r.raise_for_status()
70+
data = r.json()
71+
return _parse_agent_card(data)
72+
73+
async def send_task(
74+
self,
75+
base_url: str,
76+
request: A2ATaskRequest,
77+
poll_interval: float = 0.5,
78+
) -> A2ATaskResponse:
79+
"""POST {base_url}/tasks/send, then poll GET /tasks/{id} until completed/failed."""
80+
import asyncio
81+
url = base_url.rstrip("/") + "/tasks/send"
82+
body = {
83+
"id": request.id,
84+
"message": request.message,
85+
"sessionId": request.session_id,
86+
}
87+
async with self._client() as client:
88+
r = await client.post(url, json=body)
89+
r.raise_for_status()
90+
data = r.json()
91+
resp = _parse_task_response(data)
92+
if resp.status in ("completed", "failed", "canceled"):
93+
return resp
94+
task_url = base_url.rstrip("/") + f"/tasks/{resp.id}"
95+
while resp.status in ("submitted", "working"):
96+
await asyncio.sleep(poll_interval)
97+
async with self._client() as client:
98+
r = await client.get(task_url)
99+
r.raise_for_status()
100+
data = r.json()
101+
resp = _parse_task_response(data)
102+
if resp.status in ("completed", "failed", "canceled"):
103+
return resp
104+
return resp
105+
106+
async def stream_task(
107+
self,
108+
base_url: str,
109+
request: A2ATaskRequest,
110+
) -> AsyncIterator[str]:
111+
"""POST {base_url}/tasks/sendSubscribe, consume SSE stream, yield text chunks."""
112+
import httpx
113+
url = base_url.rstrip("/") + "/tasks/sendSubscribe"
114+
body = {
115+
"id": request.id,
116+
"message": request.message,
117+
"sessionId": request.session_id,
118+
}
119+
async with httpx.AsyncClient(timeout=self.timeout_seconds) as client:
120+
async with client.stream("POST", url, json=body) as response:
121+
response.raise_for_status()
122+
async for line in response.aiter_lines():
123+
if line.startswith("data:"):
124+
data = line[5:].strip()
125+
if data and data != "[DONE]":
126+
yield data

hivemind/agents/a2a/discovery.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
"""
2+
A2A discovery: fetch AgentCard from URL, register skills as A2AAgentTool.
3+
"""
4+
5+
import asyncio
6+
7+
from hivemind.config.schema import A2AAgentConfig
8+
from hivemind.agents.a2a.client import A2AClient
9+
from hivemind.agents.a2a.tool_adapter import A2AAgentTool
10+
11+
12+
async def discover_a2a_tools_async(agent_config: A2AAgentConfig) -> int:
13+
"""Fetch AgentCard from agent URL, create A2AAgentTool per skill, register. Return count."""
14+
client = A2AClient()
15+
card = await client.get_agent_card(agent_config.url)
16+
name = agent_config.name or card.name or "a2a-agent"
17+
for skill in card.skills:
18+
A2AAgentTool(agent_name=name, skill=skill, base_url=agent_config.url, client=client)
19+
return len(card.skills)
20+
21+
22+
def register_a2a_agent(agent_config: A2AAgentConfig) -> int:
23+
"""Sync: discover and register A2A agent tools; return count."""
24+
return asyncio.run(discover_a2a_tools_async(agent_config))

hivemind/agents/a2a/server.py

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
"""
2+
A2A server: expose hivemind agents as A2A-compliant endpoints (FastAPI).
3+
"""
4+
5+
from typing import Any
6+
7+
from hivemind.agents.a2a.types import AgentCard, AgentSkill, A2ATaskRequest, A2ATaskResponse
8+
9+
10+
def _build_agent_card(host: str, port: int, swarm_name: str) -> dict:
11+
"""Build AgentCard JSON for GET /.well-known/agent.json."""
12+
from hivemind.intelligence.strategy_selector import ExecutionStrategy
13+
url = f"http://{host}:{port}"
14+
name = swarm_name or "hivemind"
15+
skills = [
16+
{
17+
"id": s.value,
18+
"name": s.value.replace("_", " ").title(),
19+
"description": f"Execute task using {s.value} strategy.",
20+
"inputModes": ["text"],
21+
"outputModes": ["text"],
22+
}
23+
for s in ExecutionStrategy
24+
if s != ExecutionStrategy.GENERAL
25+
]
26+
return {
27+
"name": name,
28+
"description": "Hivemind swarm orchestration as A2A agent.",
29+
"url": url,
30+
"version": "1.10.5",
31+
"capabilities": ["streaming"],
32+
"skills": skills,
33+
}
34+
35+
36+
def create_a2a_app(host: str = "localhost", port: int = 8080, swarm_name: str = "") -> Any:
37+
"""Create FastAPI app with A2A routes. Requires fastapi, uvicorn, sse-starlette."""
38+
from fastapi import FastAPI
39+
from fastapi.responses import StreamingResponse
40+
import asyncio
41+
import uuid
42+
43+
app = FastAPI(title="Hivemind A2A Server")
44+
45+
@app.get("/.well-known/agent.json")
46+
def agent_card() -> dict:
47+
return _build_agent_card(host, port, swarm_name)
48+
49+
@app.post("/tasks/send")
50+
async def tasks_send(body: dict) -> dict:
51+
"""Accept A2A task, run via Swarm, return result."""
52+
task_id = body.get("id") or str(uuid.uuid4())
53+
message = body.get("message") or {}
54+
text = message.get("text", "") if isinstance(message, dict) else ""
55+
if not text:
56+
return {
57+
"id": task_id,
58+
"status": "failed",
59+
"result": "Missing message.text",
60+
"artifacts": [],
61+
}
62+
try:
63+
from hivemind.config import get_config
64+
from hivemind.swarm.swarm import Swarm
65+
cfg = get_config()
66+
swarm = Swarm(config=cfg)
67+
result = swarm.run(text)
68+
out = "\n".join(f"{k}: {v[:2000]}" for k, v in result.items()) if result else ""
69+
return {
70+
"id": task_id,
71+
"status": "completed",
72+
"result": out,
73+
"artifacts": [],
74+
}
75+
except Exception as e:
76+
return {
77+
"id": task_id,
78+
"status": "failed",
79+
"result": str(e),
80+
"artifacts": [],
81+
}
82+
83+
@app.post("/tasks/sendSubscribe")
84+
async def tasks_send_subscribe(body: dict):
85+
"""SSE streaming task execution."""
86+
task_id = body.get("id") or str(uuid.uuid4())
87+
message = body.get("message") or {}
88+
text = message.get("text", "") if isinstance(message, dict) else ""
89+
90+
async def stream():
91+
if not text:
92+
yield f"data: {__import__('json').dumps({'error': 'Missing message.text'})}\n\n"
93+
return
94+
try:
95+
from hivemind.config import get_config
96+
from hivemind.swarm.swarm import Swarm
97+
cfg = get_config()
98+
swarm = Swarm(config=cfg)
99+
result = swarm.run(text)
100+
out = "\n".join(f"{k}: {v[:2000]}" for k, v in result.items()) if result else ""
101+
yield f"data: {__import__('json').dumps({'id': task_id, 'status': 'completed', 'result': out})}\n\n"
102+
except Exception as e:
103+
yield f"data: {__import__('json').dumps({'id': task_id, 'status': 'failed', 'result': str(e)})}\n\n"
104+
105+
return StreamingResponse(
106+
stream(),
107+
media_type="text/event-stream",
108+
headers={"Cache-Control": "no-cache", "Connection": "keep-alive"},
109+
)
110+
111+
@app.get("/tasks/{task_id}")
112+
def task_status(task_id: str) -> dict:
113+
"""Task status (stub: we don't persist task state for now)."""
114+
return {"id": task_id, "status": "completed", "result": None, "artifacts": []}
115+
116+
@app.post("/tasks/{task_id}/cancel")
117+
def task_cancel(task_id: str) -> dict:
118+
"""Cancel running task (stub)."""
119+
return {"id": task_id, "status": "canceled", "result": None, "artifacts": []}
120+
121+
return app
122+
123+
124+
def run_a2a_server(host: str = "localhost", port: int = 8080, swarm_name: str = "") -> None:
125+
"""Run A2A server with uvicorn."""
126+
import uvicorn
127+
app = create_a2a_app(host=host, port=port, swarm_name=swarm_name)
128+
uvicorn.run(app, host=host, port=port)

0 commit comments

Comments
 (0)