Skip to content

Commit a17f3b2

Browse files
google-genai-botcopybara-github
authored andcommitted
feat: Allow custom request and event converters in A2aAgentExecutor
This change introduces type aliases for request and event conversion functions: - `A2ARequestToADKRunArgsConverter`: For converting A2A `RequestContext` to an `ADKRunArgs` Pydantic model. - `AdkEventToA2AEventsConverter`: For converting ADK `Event` to a list of A2A `A2AEvent` objects. The `convert_a2a_request_to_adk_run_args` function now returns a structured `ADKRunArgs` model instead of a generic dictionary, improving type safety. These converter types can now be provided via the `A2aAgentExecutorConfig` to customize the conversion logic used by the `A2aAgentExecutor`. The executor defaults to the existing `convert_a2a_request_to_adk_run_args` and `convert_event_to_a2a_events` functions if no custom converters are specified. This allows users to inject their own logic for handling request and event conversions, for example, to add custom metadata or transform data types, without modifying the core executor. PiperOrigin-RevId: 819934960
1 parent 6ab1498 commit a17f3b2

File tree

5 files changed

+728
-671
lines changed

5 files changed

+728
-671
lines changed

src/google/adk/a2a/converters/event_converter.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
from __future__ import annotations
1616

17+
from collections.abc import Callable
1718
from datetime import datetime
1819
from datetime import timezone
1920
import logging
@@ -57,6 +58,34 @@
5758
logger = logging.getLogger("google_adk." + __name__)
5859

5960

61+
AdkEventToA2AEventsConverter = Callable[
62+
[
63+
Event,
64+
InvocationContext,
65+
Optional[str],
66+
Optional[str],
67+
GenAIPartToA2APartConverter,
68+
],
69+
List[A2AEvent],
70+
]
71+
"""A callable that converts an ADK Event into a list of A2A events.
72+
73+
This interface allows for custom logic to map ADK's event structure to the
74+
event structure expected by the A2A server.
75+
76+
Args:
77+
event: The source ADK Event to convert.
78+
invocation_context: The context of the ADK agent invocation.
79+
task_id: The ID of the A2A task being processed.
80+
context_id: The context ID from the A2A request.
81+
part_converter: A function to convert GenAI content parts to A2A
82+
parts.
83+
84+
Returns:
85+
A list of A2A events.
86+
"""
87+
88+
6089
def _serialize_metadata_value(value: Any) -> str:
6190
"""Safely serializes metadata values to string format.
6291

src/google/adk/a2a/converters/request_converter.py

Lines changed: 57 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,12 @@
1414

1515
from __future__ import annotations
1616

17+
from collections.abc import Callable
1718
import sys
1819
from typing import Any
20+
from typing import Optional
21+
22+
from pydantic import BaseModel
1923

2024
try:
2125
from a2a.server.agent_execution import RequestContext
@@ -35,6 +39,39 @@
3539
from .part_converter import convert_a2a_part_to_genai_part
3640

3741

42+
@a2a_experimental
43+
class AgentRunRequest(BaseModel):
44+
"""Data model for arguments passed to the ADK runner."""
45+
46+
user_id: Optional[str] = None
47+
session_id: Optional[str] = None
48+
invocation_id: Optional[str] = None
49+
new_message: Optional[genai_types.Content] = None
50+
state_delta: Optional[dict[str, Any]] = None
51+
run_config: Optional[RunConfig] = None
52+
53+
54+
A2ARequestToAgentRunRequestConverter = Callable[
55+
[
56+
RequestContext,
57+
A2APartToGenAIPartConverter,
58+
],
59+
AgentRunRequest,
60+
]
61+
"""A callable that converts an A2A RequestContext to RunnerRequest for ADK runner.
62+
63+
This interface allows for custom logic to map an incoming A2A RequestContext to the
64+
structured arguments expected by the ADK runner's `run_async` method.
65+
66+
Args:
67+
request: The incoming request context from the A2A server.
68+
part_converter: A function to convert A2A content parts to GenAI parts.
69+
70+
Returns:
71+
An RunnerRequest object containing the keyword arguments for ADK runner's run_async method.
72+
"""
73+
74+
3875
def _get_user_id(request: RequestContext) -> str:
3976
# Get user from call context if available (auth is enabled on a2a server)
4077
if (
@@ -49,20 +86,32 @@ def _get_user_id(request: RequestContext) -> str:
4986

5087

5188
@a2a_experimental
52-
def convert_a2a_request_to_adk_run_args(
89+
def convert_a2a_request_to_agent_run_request(
5390
request: RequestContext,
5491
part_converter: A2APartToGenAIPartConverter = convert_a2a_part_to_genai_part,
55-
) -> dict[str, Any]:
92+
) -> AgentRunRequest:
93+
"""Converts an A2A RequestContext to an AgentRunRequest model.
94+
95+
Args:
96+
request: The incoming request context from the A2A server.
97+
part_converter: A function to convert A2A content parts to GenAI parts.
98+
99+
Returns:
100+
A AgentRunRequest object ready to be used as arguments for the ADK runner.
101+
102+
Raises:
103+
ValueError: If the request message is None.
104+
"""
56105

57106
if not request.message:
58107
raise ValueError('Request message cannot be None')
59108

60-
return {
61-
'user_id': _get_user_id(request),
62-
'session_id': request.context_id,
63-
'new_message': genai_types.Content(
109+
return AgentRunRequest(
110+
user_id=_get_user_id(request),
111+
session_id=request.context_id,
112+
new_message=genai_types.Content(
64113
role='user',
65114
parts=[part_converter(part) for part in request.message.parts],
66115
),
67-
'run_config': RunConfig(),
68-
}
116+
run_config=RunConfig(),
117+
)

src/google/adk/a2a/executor/a2a_agent_executor.py

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
from datetime import timezone
1919
import inspect
2020
import logging
21-
from typing import Any
2221
from typing import Awaitable
2322
from typing import Callable
2423
from typing import Optional
@@ -52,12 +51,15 @@
5251
from pydantic import BaseModel
5352
from typing_extensions import override
5453

54+
from ..converters.event_converter import AdkEventToA2AEventsConverter
5555
from ..converters.event_converter import convert_event_to_a2a_events
5656
from ..converters.part_converter import A2APartToGenAIPartConverter
5757
from ..converters.part_converter import convert_a2a_part_to_genai_part
5858
from ..converters.part_converter import convert_genai_part_to_a2a_part
5959
from ..converters.part_converter import GenAIPartToA2APartConverter
60-
from ..converters.request_converter import convert_a2a_request_to_adk_run_args
60+
from ..converters.request_converter import A2ARequestToAgentRunRequestConverter
61+
from ..converters.request_converter import AgentRunRequest
62+
from ..converters.request_converter import convert_a2a_request_to_agent_run_request
6163
from ..converters.utils import _get_adk_metadata_key
6264
from ..experimental import a2a_experimental
6365
from .task_result_aggregator import TaskResultAggregator
@@ -75,6 +77,10 @@ class A2aAgentExecutorConfig(BaseModel):
7577
gen_ai_part_converter: GenAIPartToA2APartConverter = (
7678
convert_genai_part_to_a2a_part
7779
)
80+
request_converter: A2ARequestToAgentRunRequestConverter = (
81+
convert_a2a_request_to_agent_run_request
82+
)
83+
event_converter: AdkEventToA2AEventsConverter = convert_event_to_a2a_events
7884

7985

8086
@a2a_experimental
@@ -192,19 +198,20 @@ async def _handle_request(
192198
# Resolve the runner instance
193199
runner = await self._resolve_runner()
194200

195-
# Convert the a2a request to ADK run args
196-
run_args = convert_a2a_request_to_adk_run_args(
197-
context, self._config.a2a_part_converter
201+
# Convert the a2a request to AgentRunRequest
202+
run_request = self._config.request_converter(
203+
context,
204+
self._config.a2a_part_converter,
198205
)
199206

200207
# ensure the session exists
201-
session = await self._prepare_session(context, run_args, runner)
208+
session = await self._prepare_session(context, run_request, runner)
202209

203210
# create invocation context
204211
invocation_context = runner._new_invocation_context(
205212
session=session,
206-
new_message=run_args['new_message'],
207-
run_config=run_args['run_config'],
213+
new_message=run_request.new_message,
214+
run_config=run_request.run_config,
208215
)
209216

210217
# publish the task working event
@@ -219,16 +226,16 @@ async def _handle_request(
219226
final=False,
220227
metadata={
221228
_get_adk_metadata_key('app_name'): runner.app_name,
222-
_get_adk_metadata_key('user_id'): run_args['user_id'],
223-
_get_adk_metadata_key('session_id'): run_args['session_id'],
229+
_get_adk_metadata_key('user_id'): run_request.user_id,
230+
_get_adk_metadata_key('session_id'): run_request.session_id,
224231
},
225232
)
226233
)
227234

228235
task_result_aggregator = TaskResultAggregator()
229-
async with Aclosing(runner.run_async(**run_args)) as agen:
236+
async with Aclosing(runner.run_async(**vars(run_request))) as agen:
230237
async for adk_event in agen:
231-
for a2a_event in convert_event_to_a2a_events(
238+
for a2a_event in self._config.event_converter(
232239
adk_event,
233240
invocation_context,
234241
context.task_id,
@@ -284,12 +291,15 @@ async def _handle_request(
284291
)
285292

286293
async def _prepare_session(
287-
self, context: RequestContext, run_args: dict[str, Any], runner: Runner
294+
self,
295+
context: RequestContext,
296+
run_request: AgentRunRequest,
297+
runner: Runner,
288298
):
289299

290-
session_id = run_args['session_id']
300+
session_id = run_request.session_id
291301
# create a new session if not exists
292-
user_id = run_args['user_id']
302+
user_id = run_request.user_id
293303
session = await runner.session_service.get_session(
294304
app_name=runner.app_name,
295305
user_id=user_id,
@@ -302,7 +312,7 @@ async def _prepare_session(
302312
state={},
303313
session_id=session_id,
304314
)
305-
# Update run_args with the new session_id
306-
run_args['session_id'] = session.id
315+
# Update run_request with the new session_id
316+
run_request.session_id = session.id
307317

308318
return session

0 commit comments

Comments
 (0)