Skip to content

Commit 6bf0ea5

Browse files
author
Davidson Gomes
committed
feat(a2a): add file support and multimodal content processing for A2A protocol
1 parent 958eeec commit 6bf0ea5

File tree

8 files changed

+869
-43
lines changed

8 files changed

+869
-43
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1111

1212
- Add Task Agent for structured single-task execution
1313
- Improve context management in agent execution
14+
- Add file support for A2A protocol (Agent-to-Agent) endpoints
15+
- Implement multimodal content processing in A2A messages
1416

1517
## [0.0.9] - 2025-05-13
1618

src/api/a2a_routes.py

Lines changed: 88 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
Routes for the A2A (Agent-to-Agent) protocol.
3232
3333
This module implements the standard A2A routes according to the specification.
34+
Supports both text messages and file uploads through the message parts mechanism.
3435
"""
3536

3637
import uuid
@@ -92,18 +93,100 @@ async def process_a2a_request(
9293
db: Session = Depends(get_db),
9394
a2a_service: A2AService = Depends(get_a2a_service),
9495
):
95-
"""Processes an A2A request."""
96+
"""
97+
Processes an A2A request.
98+
99+
Supports both text messages and file uploads. For file uploads,
100+
include file parts in the message following the A2A protocol format:
101+
102+
{
103+
"jsonrpc": "2.0",
104+
"id": "request-id",
105+
"method": "tasks/send",
106+
"params": {
107+
"id": "task-id",
108+
"sessionId": "session-id",
109+
"message": {
110+
"role": "user",
111+
"parts": [
112+
{
113+
"type": "text",
114+
"text": "Analyze this image"
115+
},
116+
{
117+
"type": "file",
118+
"file": {
119+
"name": "example.jpg",
120+
"mimeType": "image/jpeg",
121+
"bytes": "base64-encoded-content"
122+
}
123+
}
124+
]
125+
}
126+
}
127+
}
128+
"""
96129
# Verify the API key
97130
if not verify_api_key(db, x_api_key):
98131
raise HTTPException(status_code=401, detail="Invalid API key")
99132

100133
# Process the request
101134
try:
102135
request_body = await request.json()
136+
137+
debug_request_body = {}
138+
if "method" in request_body:
139+
debug_request_body["method"] = request_body["method"]
140+
if "id" in request_body:
141+
debug_request_body["id"] = request_body["id"]
142+
143+
logger.info(f"A2A request received: {debug_request_body}")
144+
145+
# Log if request contains file parts for debugging
146+
if isinstance(request_body, dict) and "params" in request_body:
147+
params = request_body.get("params", {})
148+
message = params.get("message", {})
149+
parts = message.get("parts", [])
150+
151+
logger.info(f"A2A message contains {len(parts)} parts")
152+
for i, part in enumerate(parts):
153+
if not isinstance(part, dict):
154+
logger.warning(f"Part {i+1} is not a dictionary: {type(part)}")
155+
continue
156+
157+
part_type = part.get("type")
158+
logger.info(f"Part {i+1} type: {part_type}")
159+
160+
if part_type == "file":
161+
file_info = part.get("file", {})
162+
logger.info(
163+
f"File part found: {file_info.get('name')} ({file_info.get('mimeType')})"
164+
)
165+
if "bytes" in file_info:
166+
bytes_data = file_info.get("bytes", "")
167+
bytes_size = len(bytes_data) * 0.75
168+
logger.info(f"File size: ~{bytes_size/1024:.2f} KB")
169+
if bytes_data:
170+
sample = (
171+
bytes_data[:10] + "..."
172+
if len(bytes_data) > 10
173+
else bytes_data
174+
)
175+
logger.info(f"Sample of base64 data: {sample}")
176+
elif part_type == "text":
177+
text_content = part.get("text", "")
178+
preview = (
179+
text_content[:30] + "..."
180+
if len(text_content) > 30
181+
else text_content
182+
)
183+
logger.info(f"Text part found: '{preview}'")
184+
103185
result = await a2a_service.process_request(agent_id, request_body)
104186

105187
# If the response is a streaming response, return as EventSourceResponse
106188
if hasattr(result, "__aiter__"):
189+
logger.info("Returning streaming response")
107190

108191
async def event_generator():
109192
async for item in result:
@@ -115,11 +198,15 @@ async def event_generator():
115198
return EventSourceResponse(event_generator())
116199

117200
# Otherwise, return as JSONResponse
201+
logger.info("Returning standard JSON response")
118202
if hasattr(result, "model_dump"):
119203
return JSONResponse(result.model_dump(exclude_none=True))
120204
return JSONResponse(result)
121205
except Exception as e:
122206
logger.error(f"Error processing A2A request: {e}")
207+
import traceback
208+
209+
logger.error(f"Full traceback: {traceback.format_exc()}")
123210
return JSONResponse(
124211
status_code=500,
125212
content={

src/api/chat_routes.py

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
"""
2929

3030
import uuid
31+
import base64
3132
from fastapi import (
3233
APIRouter,
3334
Depends,
@@ -47,7 +48,7 @@
4748
from src.services import (
4849
agent_service,
4950
)
50-
from src.schemas.chat import ChatRequest, ChatResponse, ErrorResponse
51+
from src.schemas.chat import ChatRequest, ChatResponse, ErrorResponse, FileData
5152
from src.services.agent_runner import run_agent, run_agent_stream
5253
from src.core.exceptions import AgentNotFoundError
5354
from src.services.service_providers import (
@@ -59,7 +60,7 @@
5960
from datetime import datetime
6061
import logging
6162
import json
62-
from typing import Optional, Dict
63+
from typing import Optional, Dict, List, Any
6364

6465
logger = logging.getLogger(__name__)
6566

@@ -195,6 +196,29 @@ async def websocket_chat(
195196
if not message:
196197
continue
197198

199+
files = None
200+
if data.get("files") and isinstance(data.get("files"), list):
201+
try:
202+
files = []
203+
for file_data in data.get("files"):
204+
if (
205+
isinstance(file_data, dict)
206+
and file_data.get("filename")
207+
and file_data.get("content_type")
208+
and file_data.get("data")
209+
):
210+
files.append(
211+
FileData(
212+
filename=file_data.get("filename"),
213+
content_type=file_data.get("content_type"),
214+
data=file_data.get("data"),
215+
)
216+
)
217+
logger.info(f"Processed {len(files)} files via WebSocket")
218+
except Exception as e:
219+
logger.error(f"Error processing files: {str(e)}")
220+
files = None
221+
198222
async for chunk in run_agent_stream(
199223
agent_id=agent_id,
200224
external_id=external_id,
@@ -203,6 +227,7 @@ async def websocket_chat(
203227
artifacts_service=artifacts_service,
204228
memory_service=memory_service,
205229
db=db,
230+
files=files,
206231
):
207232
await websocket.send_json(
208233
{"message": json.loads(chunk), "turn_complete": False}
@@ -259,6 +284,7 @@ async def chat(
259284
artifacts_service,
260285
memory_service,
261286
db,
287+
files=request.files,
262288
)
263289

264290
return {

0 commit comments

Comments
 (0)