Skip to content

Commit 796920a

Browse files
Adding A2A platform (#142)
A2A platform. 1. Updated Agent class to include A2A as a platform. 2. Implemented A2A as a platform that depedends on BAF's functionality. 3. Created server, agent-card, message router, agent and method registration. 4. Added a test script in examples to test A2A protocol.
1 parent 9ec469a commit 796920a

28 files changed

+1667
-3
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,5 +69,6 @@ Note that if you want to set your agent's language to **Luxembourgish**, you wil
6969
- [telegram_agent](https://github.com/BESSER-PEARL/BESSER-Agentic-Framework/blob/main/besser/agent/test/examples/telegram_agent.py): Introducing the [TelegramPlatform](https://besser-agentic-framework.readthedocs.io/latest/wiki/platforms/telegram_platform.html)
7070
- [github_agent](https://github.com/BESSER-PEARL/BESSER-Agentic-Framework/blob/main/besser/agent/test/examples/github_agent.py): Introducing [GitHubPlatform](https://besser-agentic-framework.readthedocs.io/latest/wiki/platforms/github_platform.html)
7171
- [gitlab_agent](https://github.com/BESSER-PEARL/BESSER-Agentic-Framework/blob/main/besser/agent/test/examples/gitlab_agent.py): Introducing the [GitLabPlatform](https://besser-agentic-framework.readthedocs.io/latest/wiki/platforms/gitlab_platform.html)
72+
- [a2a_multiagent](https://github.com/BESSER-PEARL/BESSER-Agentic-Framework/blob/main/besser/agent/test/examples/a2a_multiagent.py): Introducing the [A2APlatform](https://besser-agentic-framework.readthedocs.io/latest/wiki/platforms/a2a_platform.html)
7273

7374
For more example agents, check out the [BAF-agent-examples](https://github.com/BESSER-PEARL/BAF-agent-examples) repository!

besser/agent/core/agent.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
from besser.agent.platforms.websocket.websocket_platform import WebSocketPlatform
3131
from besser.agent.platforms.github.github_platform import GitHubPlatform
3232
from besser.agent.platforms.gitlab.gitlab_platform import GitLabPlatform
33+
from besser.agent.platforms.a2a.a2a_platform import A2APlatform
3334

3435

3536
class Agent:
@@ -528,6 +529,16 @@ def use_gitlab_platform(self) -> GitLabPlatform:
528529
gitlab_platform = GitLabPlatform(self)
529530
self._platforms.append(gitlab_platform)
530531
return gitlab_platform
532+
533+
def use_a2a_platform(self) -> A2APlatform:
534+
"""Use the :class: `~besser.agent.platforms.a2a.a2a_platform.A2APlatform` on this agent.
535+
536+
Returns:
537+
A2APlatform: the A2A platform
538+
"""
539+
a2a_platform = A2APlatform(self)
540+
self._platforms.append(a2a_platform)
541+
return a2a_platform
531542

532543
def _monitoring_db_insert_session(self, session: Session) -> None:
533544
"""Insert a session record into the monitoring database.

besser/agent/nlp/nlp_engine.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -232,11 +232,12 @@ def text2speech(self, session: Session, text: str):
232232
233233
Returns:
234234
dict: the speech synthesis as a dictionary containing 2 keys:
235+
235236
- audio (np.ndarray): the generated audio waveform as a numpy array with dimensions (nb_channels,
236-
audio_length), where nb_channels is the number of audio channels (usually 1 for mono) and audio_length is the number
237-
of samples in the audio
237+
audio_length), where nb_channels is the number of audio channels (usually 1 for mono) and audio_length is the number
238+
of samples in the audio
238239
- sampling_rate (int): an integer value containing the sampling rate, eg. how many samples correspond to
239-
one second of audio
240+
one second of audio
240241
"""
241242

242243
user_language = session.get("user_language", "en")
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
"""Definition of the agent properties within the ``A2A_platform`` section:"""
2+
3+
from besser.agent.core.property import Property
4+
5+
SECTION_A2A = 'a2a_platform'
6+
7+
A2A_WEBSOCKET_PORT = Property(SECTION_A2A, 'a2a.port', int, 8000)
8+
"""
9+
The server local port. This port should be exposed or proxied to make it visible by other Agents
10+
11+
name: ``a2a.port``
12+
13+
type: ``int``
14+
15+
default value: ``8000``
16+
"""

besser/agent/platforms/a2a/a2a_platform.py

Lines changed: 452 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
from pydantic import BaseModel
2+
3+
class AgentCard(BaseModel):
4+
"""
5+
Represents the agent card that has the metadata and description about the agent.
6+
"""
7+
# Define the fields and their type for the agent card
8+
name: str
9+
id: str
10+
endpoints: list[str]
11+
version: str
12+
capabilities: list[str]
13+
descriptions: list[str]
14+
provider: str
15+
skills: list[str]
16+
examples: list[dict] | list[str] = []
17+
methods: list[dict] = []
18+
19+
def to_json(self):
20+
return self.model_dump_json(indent=4)
21+
22+
@classmethod
23+
def from_json(cls, json_str):
24+
return cls.model_validate_json(json_str)
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
# from typing import TYPE_CHECKING
2+
# if TYPE_CHECKING:
3+
# from besser.agent.platforms.a2a.a2a_platform import A2APlatform
4+
from aiohttp import web
5+
6+
from besser.agent.exceptions.logger import logger
7+
from besser.agent.platforms.a2a.error_handler import AgentNotFound
8+
9+
class AgentRegistry:
10+
'''
11+
Keeps track of registered A2A agents by ID.
12+
Attributes:
13+
_agents: dictionary of registered agents
14+
'''
15+
16+
def __init__(self):
17+
self._agents: dict[str, 'A2APlatform'] = {}
18+
19+
def register(self, agent_id: str, platform: 'A2APlatform') -> None:
20+
"""
21+
Register the provided agent (through agent_id) in the given platform
22+
"""
23+
if agent_id in self._agents:
24+
logger.error(f'Agent ID "{agent_id}" already registered')
25+
raise ValueError(f'Agent ID "{agent_id}" already registered')
26+
logger.info(f'Registering agent {agent_id}')
27+
self._agents[agent_id] = platform
28+
29+
# Auto-register the methods that are common to all agents.
30+
if hasattr(platform, "router") and platform.router:
31+
platform.router.register_task_methods(platform)
32+
33+
def get(self, agent_id: str) -> 'A2APlatform':
34+
"""
35+
Get the registered agent
36+
"""
37+
if agent_id not in self._agents:
38+
raise ValueError(f'Agent ID "{agent_id}" not found')
39+
return self._agents[agent_id]
40+
41+
def list(self) -> list:
42+
'''
43+
Return summary info for all registered agents.
44+
'''
45+
return [
46+
{
47+
"id": agent_id,
48+
"name": platform.agent_card.name,
49+
"description": platform.agent_card.descriptions,
50+
"capabilities": platform.agent_card.capabilities,
51+
"card_url": f"/agents/{agent_id}/agent-card"
52+
}
53+
for agent_id, platform in self._agents.items()
54+
]
55+
56+
def count(self) -> int:
57+
"""
58+
provide total number of agents that are registered
59+
"""
60+
return len(self._agents)
61+
62+
# Used for synchronous agent orchestration calls
63+
async def call_agent_method(self, target_agent_id: str, method: str, params: dict) -> web.json_response:
64+
target_platform = self.get(target_agent_id)
65+
if not target_platform:
66+
raise AgentNotFound(f'Agent ID "{target_agent_id}" not found')
67+
return await target_platform.router.handle(method, params)
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
import requests
2+
import time
3+
import httpx
4+
5+
class A2AClient:
6+
def __init__(self, base_url: str, timeout: int = 20):
7+
self.base = base_url.rstrip("/")
8+
self.timeout = timeout
9+
10+
def call(self, method: str, params: dict | None = None, id: int | None = None):
11+
payload = {"jsonrpc":"2.0","method":method,"params":params or {}, "id": id or int(time.time()*1000)}
12+
r = requests.post(f"{self.base}/run", json=payload, timeout=self.timeout)
13+
r.raise_for_status()
14+
data = r.json()
15+
if "error" in data:
16+
raise RuntimeError(data["error"])
17+
return data["result"]
18+
19+
def card(self):
20+
r = requests.get(f"{self.base}/agent-card", timeout=self.timeout)
21+
r.raise_for_status()
22+
return r.json()
23+
24+
async def stream(self, method: str, params: dict | None = None):
25+
async with httpx.AsyncClient(timeout=None) as client:
26+
async with client.stream("POST", f"{self.base}", json={"jsonrpc":"2.0","method":method,"params":params or {}, "id":int(time.time()*1000)}) as r:
27+
async for line in r.aiter_lines():
28+
if line.strip():
29+
yield line
30+
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
from aiohttp import web
2+
3+
# JSON-RPC standard errors
4+
PARSE_ERROR = {"code": -32700, "message": "Parse error"}
5+
INVALID_REQUEST = {"code": -32600, "message": "Invalid Request"}
6+
METHOD_NOT_FOUND = {"code": -32601, "message": "Method not found"}
7+
INVALID_PARAMS = {"code": -32602, "message": "Invalid params"}
8+
INTERNAL_ERROR = {"code": -32603, "message": "Internal error"}
9+
10+
# Custom task-related errors for A2A
11+
TASK_PENDING = {"code": -32000, "message": "Task is still pending"}
12+
TASK_FAILED = {"code": -32001, "message": "Task execution failed"}
13+
TASK_NOT_FOUND = {"code": -32002, "message": "Task not found"}
14+
15+
class JSONRPCError(Exception):
16+
def __init__(self, code=-32000, message="Server error", data=None):
17+
super().__init__(message)
18+
self.code = code
19+
self.message = message
20+
self.data = data
21+
22+
class AgentNotFound(JSONRPCError):
23+
def __init__(self, message="Agent not found"):
24+
super().__init__(-32003, message)
25+
26+
class MethodNotFound(JSONRPCError):
27+
def __init__(self, message="Method not found"):
28+
super().__init__(-32601, message)
29+
30+
class InvalidParams(JSONRPCError):
31+
def __init__(self, message="Invalid params"):
32+
super().__init__(-32602, message)
33+
34+
class TaskError(Exception):
35+
def __init__(self, code: str, message: str = ""):
36+
super().__init__(message)
37+
self.code = code
38+
self.message = message
39+
40+
def error_response(exc: Exception, request_id=None):
41+
if isinstance(exc, JSONRPCError):
42+
return {
43+
"jsonrpc": "2.0",
44+
"error": {"code": exc.code, "message": exc.message, "data": exc.data},
45+
"id": request_id,
46+
}
47+
else:
48+
return {
49+
"jsonrpc": "2.0",
50+
"error": {"code": -32603, "message": str(exc)},
51+
"id": request_id,
52+
}
53+
54+
@web.middleware
55+
async def error_middleware(request, handler):
56+
try:
57+
return await handler(request)
58+
except JSONRPCError as e:
59+
return web.json_response(error_response(e))
60+
except Exception as e:
61+
return web.json_response(error_response(e))
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
# from typing import TYPE_CHECKING
2+
3+
import inspect
4+
5+
from aiohttp import web
6+
from aiohttp.web_request import Request
7+
8+
from besser.agent.exceptions.logger import logger
9+
from besser.agent.platforms.a2a.error_handler import JSONRPCError, MethodNotFound, InvalidParams, TaskError
10+
from besser.agent.platforms.a2a.error_handler import INTERNAL_ERROR, PARSE_ERROR, INVALID_REQUEST, TASK_PENDING, TASK_FAILED, TASK_NOT_FOUND
11+
from besser.agent.platforms.a2a.agent_registry import AgentRegistry
12+
# if TYPE_CHECKING:
13+
# from besser.agent.platforms.a2a.a2a_platform import A2APlatform
14+
15+
class A2ARouter:
16+
def __init__(self) -> None:
17+
self.methods = {}
18+
19+
def register(self, method_name, func) -> None:
20+
'''
21+
Register a method (coupled to its name, also called as key) that can be called via RPC.
22+
'''
23+
self.methods[method_name] = func
24+
25+
async def handle(self, method_name: str, params: dict) -> web.json_response:
26+
"""
27+
Execute the method given its name and parameters
28+
"""
29+
30+
if method_name not in self.methods:
31+
logger.error(f"Method '{method_name}' not found")
32+
raise MethodNotFound(message=f"Method '{method_name}' not found")
33+
34+
if not isinstance(params, dict):
35+
logger.error(f"Params must be a dictionary")
36+
raise InvalidParams()
37+
38+
method = self.methods[method_name]
39+
40+
# for handling async tasks, else it is sync
41+
if inspect.iscoroutinefunction(method):
42+
return await method(**params)
43+
else:
44+
return method(**params)
45+
46+
async def aiohttp_handler(self, request: Request) -> web.json_response:
47+
"""
48+
Handle HTTP requests from the server
49+
"""
50+
request_id = None
51+
try:
52+
body = await request.json()
53+
request_id = body.get("id")
54+
except Exception:
55+
logger.error(PARSE_ERROR)
56+
return web.json_response({
57+
"jsonrpc": "2.0",
58+
"error": PARSE_ERROR,
59+
"id": request_id
60+
})
61+
62+
if "method" not in body or not isinstance(body["method"], str):
63+
logger.error(INVALID_REQUEST)
64+
return web.json_response({
65+
"jsonrpc": "2.0",
66+
"error": INVALID_REQUEST,
67+
"id": body.get("id")
68+
})
69+
70+
method = body['method']
71+
params = body.get('params', {})
72+
73+
try:
74+
result = await self.handle(method, params)
75+
return web.json_response({
76+
"jsonrpc": "2.0",
77+
"result": result,
78+
"id": request_id
79+
})
80+
except JSONRPCError as e:
81+
return web.json_response({
82+
"jsonrpc": "2.0",
83+
"error": {"code": e.code, "message": e.message},
84+
"id": request_id
85+
})
86+
87+
except TaskError as e:
88+
error_map = {
89+
"TASK_PENDING": TASK_PENDING,
90+
"TASK_FAILED": TASK_FAILED,
91+
"TASK_NOT_FOUND": TASK_NOT_FOUND
92+
}
93+
logger.error(error_map.get(e.code, INTERNAL_ERROR))
94+
return web.json_response({
95+
"jsonrpc": "2.0",
96+
"error": error_map.get(e.code, INTERNAL_ERROR),
97+
"id": request_id
98+
})
99+
except Exception as e:
100+
# print(f"Error: \n{e}")
101+
logger.error(f"Internal error: {str(e)}")
102+
return web.json_response({
103+
"jsonrpc": "2.0",
104+
"error": {**INTERNAL_ERROR,
105+
"message": str(e)},
106+
"id": request_id
107+
})
108+
109+
def register_task_methods(self, platform: 'A2APlatform') -> None:
110+
"""
111+
Auto-register internal methods for creating, executing and getting task status.
112+
"""
113+
self.register("create_task_and_run", platform.rpc_create_task)
114+
self.register("task_create", platform.create_task)
115+
self.register("task_status", platform.get_status)
116+
117+
#
118+
def register_orchestration_methods(self, platform: 'A2APlatform', registry: AgentRegistry) -> None:
119+
"""
120+
Register methods used for orchestration in its router. Enables one agent to call another agent.
121+
"""
122+
async def call_agent_rpc(target_agent_id: str, method: str, params: dict):
123+
return await platform.rpc_call_agent(target_agent_id, method, params, registry)
124+
125+
self.register("call_agent", call_agent_rpc)

0 commit comments

Comments
 (0)