Skip to content

Commit 8e5f361

Browse files
google-genai-botcopybara-github
authored andcommitted
fix: Update remote_a2a_agent to better handle streaming events and avoid duplicate responses
Currently, the A2A Task -> ADK event conversion is producing the same events on the last two update events (the last is a status update marking the task complete) The change here based on A2AClientEvent(task, update): - if the update == None: handle the non-streaming task case and also streaming case for the initial task creation event - if the update = TaskStatusUpdateEvent AND a message is set: emit an event with that message - if a task status update AND no message is set: don't emit event (for example, the final status update) - if the update is ArtifactUpdateEvent and it's final artifact: emit the event PiperOrigin-RevId: 812878869
1 parent b1ee013 commit 8e5f361

File tree

2 files changed

+266
-11
lines changed

2 files changed

+266
-11
lines changed

src/google/adk/agents/remote_a2a_agent.py

Lines changed: 34 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@
3636
from a2a.types import Message as A2AMessage
3737
from a2a.types import Part as A2APart
3838
from a2a.types import Role
39+
from a2a.types import TaskArtifactUpdateEvent as A2ATaskArtifactUpdateEvent
40+
from a2a.types import TaskStatusUpdateEvent as A2ATaskStatusUpdateEvent
3941
from a2a.types import TransportProtocol as A2ATransport
4042
except ImportError as e:
4143
import sys
@@ -393,30 +395,53 @@ def _construct_message_parts_from_session(
393395

394396
async def _handle_a2a_response(
395397
self, a2a_response: A2AClientEvent | A2AMessage, ctx: InvocationContext
396-
) -> Event:
398+
) -> Optional[Event]:
397399
"""Handle A2A response and convert to Event.
398400
399401
Args:
400402
a2a_response: The A2A response object
401403
ctx: The invocation context
402404
403405
Returns:
404-
Event object representing the response
406+
Event object representing the response, or None if no event should be
407+
emitted.
405408
"""
406409
try:
407410
if isinstance(a2a_response, tuple):
408-
# ClientEvent is a tuple of the absolute Task state and the last update.
409-
# We only need the Task state.
410-
task = a2a_response[0]
411-
event = convert_a2a_task_to_event(task, self.name, ctx)
411+
task, update = a2a_response
412+
if update is None:
413+
# This is the initial response for a streaming task or the complete
414+
# response for a non-streaming task, which is the full task state.
415+
# We process this to get the initial message.
416+
event = convert_a2a_task_to_event(task, self.name, ctx)
417+
elif isinstance(update, A2ATaskStatusUpdateEvent) and update.message:
418+
# This is a streaming task status update with a message.
419+
event = convert_a2a_message_to_event(update.message, self.name, ctx)
420+
elif isinstance(update, A2ATaskArtifactUpdateEvent) and (
421+
not update.append or update.last_chunk
422+
):
423+
# This is a streaming task artifact update.
424+
# We only handle full artifact updates and ignore partial updates.
425+
# Note: Depends on the server implementation, there is no clear
426+
# definition of what a partial update is currently. We use the two
427+
# signals:
428+
# 1. append: True for partial updates, False for full updates.
429+
# 2. last_chunk: True for full updates, False for partial updates.
430+
event = convert_a2a_task_to_event(task, self.name, ctx)
431+
else:
432+
# This is a streaming update without a message (e.g. status change)
433+
# or an partial artifact update. We don't emit an event for these
434+
# for now.
435+
return None
436+
412437
event.custom_metadata = event.custom_metadata or {}
413438
event.custom_metadata[A2A_METADATA_PREFIX + "task_id"] = task.id
414439
if task.context_id:
415440
event.custom_metadata[A2A_METADATA_PREFIX + "context_id"] = (
416441
task.context_id
417442
)
418443

419-
# Otherwise, it's a regular A2AMessage.
444+
# Otherwise, it's a regular A2AMessage for non-streaming responses.
420445
elif isinstance(a2a_response, A2AMessage):
421446
event = convert_a2a_message_to_event(a2a_response, self.name, ctx)
422447
event.custom_metadata = event.custom_metadata or {}
@@ -492,6 +517,8 @@ async def _run_async_impl(
492517
logger.debug(build_a2a_response_log(a2a_response))
493518

494519
event = await self._handle_a2a_response(a2a_response, ctx)
520+
if not event:
521+
continue
495522

496523
# Add metadata about the request and response
497524
event.custom_metadata = event.custom_metadata or {}

tests/unittests/agents/test_remote_a2a_agent.py

Lines changed: 232 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,12 @@
3939
from a2a.types import AgentCapabilities
4040
from a2a.types import AgentCard
4141
from a2a.types import AgentSkill
42+
from a2a.types import Artifact
4243
from a2a.types import Message as A2AMessage
4344
from a2a.types import SendMessageSuccessResponse
4445
from a2a.types import Task as A2ATask
46+
from a2a.types import TaskArtifactUpdateEvent
47+
from a2a.types import TaskStatusUpdateEvent
4548
from google.adk.agents.invocation_context import InvocationContext
4649
from google.adk.agents.remote_a2a_agent import A2A_METADATA_PREFIX
4750
from google.adk.agents.remote_a2a_agent import AgentCardResolutionError
@@ -60,6 +63,9 @@ class DummyTypes:
6063
A2AMessage = DummyTypes()
6164
SendMessageSuccessResponse = DummyTypes()
6265
A2ATask = DummyTypes()
66+
TaskStatusUpdateEvent = DummyTypes()
67+
Artifact = DummyTypes()
68+
TaskArtifactUpdateEvent = DummyTypes()
6369
InvocationContext = DummyTypes()
6470
RemoteA2aAgent = DummyTypes()
6571
AgentCardResolutionError = Exception
@@ -685,8 +691,8 @@ async def test_handle_a2a_response_success_with_message(self):
685691
assert A2A_METADATA_PREFIX + "context_id" in result.custom_metadata
686692

687693
@pytest.mark.asyncio
688-
async def test_handle_a2a_response_success_with_task(self):
689-
"""Test successful A2A response handling with task."""
694+
async def test_handle_a2a_response_with_task_and_no_update(self):
695+
"""Test successful A2A response handling with task and no update."""
690696
mock_a2a_task = Mock(spec=A2ATask)
691697
mock_a2a_task.id = "task-123"
692698
mock_a2a_task.context_id = "context-123"
@@ -718,6 +724,116 @@ async def test_handle_a2a_response_success_with_task(self):
718724
assert A2A_METADATA_PREFIX + "task_id" in result.custom_metadata
719725
assert A2A_METADATA_PREFIX + "context_id" in result.custom_metadata
720726

727+
@pytest.mark.asyncio
728+
async def test_handle_a2a_response_with_task_status_update_with_message(self):
729+
"""Test handling of a task status update with a message."""
730+
mock_a2a_task = Mock(spec=A2ATask)
731+
mock_a2a_task.id = "task-123"
732+
mock_a2a_task.context_id = "context-123"
733+
734+
mock_a2a_message = Mock(spec=A2AMessage)
735+
mock_update = Mock(spec=TaskStatusUpdateEvent)
736+
mock_update.message = mock_a2a_message
737+
mock_update.status = "COMPLETED"
738+
739+
# Create a proper Event mock that can handle custom_metadata
740+
mock_event = Event(
741+
author=self.agent.name,
742+
invocation_id=self.mock_context.invocation_id,
743+
branch=self.mock_context.branch,
744+
)
745+
746+
with patch(
747+
"google.adk.agents.remote_a2a_agent.convert_a2a_message_to_event"
748+
) as mock_convert:
749+
mock_convert.return_value = mock_event
750+
751+
result = await self.agent._handle_a2a_response(
752+
(mock_a2a_task, mock_update), self.mock_context
753+
)
754+
755+
assert result == mock_event
756+
mock_convert.assert_called_once_with(
757+
mock_a2a_message,
758+
self.agent.name,
759+
self.mock_context,
760+
)
761+
# Check that metadata was added
762+
assert result.custom_metadata is not None
763+
assert A2A_METADATA_PREFIX + "task_id" in result.custom_metadata
764+
assert A2A_METADATA_PREFIX + "context_id" in result.custom_metadata
765+
766+
@pytest.mark.asyncio
767+
async def test_handle_a2a_response_with_task_status_update_no_message(self):
768+
"""Test handling of a task status update with no message."""
769+
mock_a2a_task = Mock(spec=A2ATask)
770+
mock_a2a_task.id = "task-123"
771+
772+
mock_update = Mock(spec=TaskStatusUpdateEvent)
773+
mock_update.message = None
774+
mock_update.status = "COMPLETED"
775+
776+
result = await self.agent._handle_a2a_response(
777+
(mock_a2a_task, mock_update), self.mock_context
778+
)
779+
780+
assert result is None
781+
782+
@pytest.mark.asyncio
783+
async def test_handle_a2a_response_with_artifact_update(self):
784+
"""Test successful A2A response handling with artifact update."""
785+
mock_a2a_task = Mock(spec=A2ATask)
786+
mock_a2a_task.id = "task-123"
787+
mock_a2a_task.context_id = "context-123"
788+
789+
mock_artifact = Mock(spec=Artifact)
790+
mock_update = Mock(spec=TaskArtifactUpdateEvent)
791+
mock_update.artifact = mock_artifact
792+
mock_update.append = False
793+
mock_update.last_chunk = True
794+
795+
# Create a proper Event mock that can handle custom_metadata
796+
mock_event = Event(
797+
author=self.agent.name,
798+
invocation_id=self.mock_context.invocation_id,
799+
branch=self.mock_context.branch,
800+
)
801+
802+
with patch(
803+
"google.adk.agents.remote_a2a_agent.convert_a2a_task_to_event"
804+
) as mock_convert:
805+
mock_convert.return_value = mock_event
806+
807+
result = await self.agent._handle_a2a_response(
808+
(mock_a2a_task, mock_update), self.mock_context
809+
)
810+
811+
assert result == mock_event
812+
mock_convert.assert_called_once_with(
813+
mock_a2a_task, self.agent.name, self.mock_context
814+
)
815+
# Check that metadata was added
816+
assert result.custom_metadata is not None
817+
assert A2A_METADATA_PREFIX + "task_id" in result.custom_metadata
818+
assert A2A_METADATA_PREFIX + "context_id" in result.custom_metadata
819+
820+
@pytest.mark.asyncio
821+
async def test_handle_a2a_response_with_partial_artifact_update(self):
822+
"""Test that partial artifact updates are ignored."""
823+
mock_a2a_task = Mock(spec=A2ATask)
824+
mock_a2a_task.id = "task-123"
825+
826+
mock_update = Mock(spec=TaskArtifactUpdateEvent)
827+
mock_update.artifact = Mock(spec=Artifact)
828+
mock_update.append = True
829+
mock_update.last_chunk = False
830+
831+
result = await self.agent._handle_a2a_response(
832+
(mock_a2a_task, mock_update), self.mock_context
833+
)
834+
835+
assert result is None
836+
721837

722838
class TestRemoteA2aAgentMessageHandlingFromFactory:
723839
"""Test message handling functionality."""
@@ -865,8 +981,8 @@ async def test_handle_a2a_response_success_with_message(self):
865981
assert A2A_METADATA_PREFIX + "context_id" in result.custom_metadata
866982

867983
@pytest.mark.asyncio
868-
async def test_handle_a2a_response_success_with_task(self):
869-
"""Test successful A2A response handling with task."""
984+
async def test_handle_a2a_response_with_task_and_no_update(self):
985+
"""Test successful A2A response handling with task and no update."""
870986
mock_a2a_task = Mock(spec=A2ATask)
871987
mock_a2a_task.id = "task-123"
872988
mock_a2a_task.context_id = "context-123"
@@ -896,6 +1012,116 @@ async def test_handle_a2a_response_success_with_task(self):
8961012
assert A2A_METADATA_PREFIX + "task_id" in result.custom_metadata
8971013
assert A2A_METADATA_PREFIX + "context_id" in result.custom_metadata
8981014

1015+
@pytest.mark.asyncio
1016+
async def test_handle_a2a_response_with_task_status_update_with_message(self):
1017+
"""Test handling of a task status update with a message."""
1018+
mock_a2a_task = Mock(spec=A2ATask)
1019+
mock_a2a_task.id = "task-123"
1020+
mock_a2a_task.context_id = "context-123"
1021+
1022+
mock_a2a_message = Mock(spec=A2AMessage)
1023+
mock_update = Mock(spec=TaskStatusUpdateEvent)
1024+
mock_update.message = mock_a2a_message
1025+
mock_update.status = "COMPLETED"
1026+
1027+
# Create a proper Event mock that can handle custom_metadata
1028+
mock_event = Event(
1029+
author=self.agent.name,
1030+
invocation_id=self.mock_context.invocation_id,
1031+
branch=self.mock_context.branch,
1032+
)
1033+
1034+
with patch(
1035+
"google.adk.agents.remote_a2a_agent.convert_a2a_message_to_event"
1036+
) as mock_convert:
1037+
mock_convert.return_value = mock_event
1038+
1039+
result = await self.agent._handle_a2a_response(
1040+
(mock_a2a_task, mock_update), self.mock_context
1041+
)
1042+
1043+
assert result == mock_event
1044+
mock_convert.assert_called_once_with(
1045+
mock_a2a_message,
1046+
self.agent.name,
1047+
self.mock_context,
1048+
)
1049+
# Check that metadata was added
1050+
assert result.custom_metadata is not None
1051+
assert A2A_METADATA_PREFIX + "task_id" in result.custom_metadata
1052+
assert A2A_METADATA_PREFIX + "context_id" in result.custom_metadata
1053+
1054+
@pytest.mark.asyncio
1055+
async def test_handle_a2a_response_with_task_status_update_no_message(self):
1056+
"""Test handling of a task status update with no message."""
1057+
mock_a2a_task = Mock(spec=A2ATask)
1058+
mock_a2a_task.id = "task-123"
1059+
1060+
mock_update = Mock(spec=TaskStatusUpdateEvent)
1061+
mock_update.message = None
1062+
mock_update.status = "COMPLETED"
1063+
1064+
result = await self.agent._handle_a2a_response(
1065+
(mock_a2a_task, mock_update), self.mock_context
1066+
)
1067+
1068+
assert result is None
1069+
1070+
@pytest.mark.asyncio
1071+
async def test_handle_a2a_response_with_artifact_update(self):
1072+
"""Test successful A2A response handling with artifact update."""
1073+
mock_a2a_task = Mock(spec=A2ATask)
1074+
mock_a2a_task.id = "task-123"
1075+
mock_a2a_task.context_id = "context-123"
1076+
1077+
mock_artifact = Mock(spec=Artifact)
1078+
mock_update = Mock(spec=TaskArtifactUpdateEvent)
1079+
mock_update.artifact = mock_artifact
1080+
mock_update.append = False
1081+
mock_update.last_chunk = True
1082+
1083+
# Create a proper Event mock that can handle custom_metadata
1084+
mock_event = Event(
1085+
author=self.agent.name,
1086+
invocation_id=self.mock_context.invocation_id,
1087+
branch=self.mock_context.branch,
1088+
)
1089+
1090+
with patch(
1091+
"google.adk.agents.remote_a2a_agent.convert_a2a_task_to_event"
1092+
) as mock_convert:
1093+
mock_convert.return_value = mock_event
1094+
1095+
result = await self.agent._handle_a2a_response(
1096+
(mock_a2a_task, mock_update), self.mock_context
1097+
)
1098+
1099+
assert result == mock_event
1100+
mock_convert.assert_called_once_with(
1101+
mock_a2a_task, self.agent.name, self.mock_context
1102+
)
1103+
# Check that metadata was added
1104+
assert result.custom_metadata is not None
1105+
assert A2A_METADATA_PREFIX + "task_id" in result.custom_metadata
1106+
assert A2A_METADATA_PREFIX + "context_id" in result.custom_metadata
1107+
1108+
@pytest.mark.asyncio
1109+
async def test_handle_a2a_response_with_partial_artifact_update(self):
1110+
"""Test that partial artifact updates are ignored."""
1111+
mock_a2a_task = Mock(spec=A2ATask)
1112+
mock_a2a_task.id = "task-123"
1113+
1114+
mock_update = Mock(spec=TaskArtifactUpdateEvent)
1115+
mock_update.artifact = Mock(spec=Artifact)
1116+
mock_update.append = True
1117+
mock_update.last_chunk = False
1118+
1119+
result = await self.agent._handle_a2a_response(
1120+
(mock_a2a_task, mock_update), self.mock_context
1121+
)
1122+
1123+
assert result is None
1124+
8991125

9001126
class TestRemoteA2aAgentExecution:
9011127
"""Test agent execution functionality."""
@@ -1019,6 +1245,7 @@ async def test_run_async_impl_successful_request(self):
10191245
# Add model_dump to mock_response for metadata
10201246
mock_response.model_dump.return_value = {"test": "response"}
10211247

1248+
# Execute
10221249
events = []
10231250
async for event in self.agent._run_async_impl(
10241251
self.mock_context
@@ -1211,6 +1438,7 @@ async def test_run_async_impl_successful_request(self):
12111438
"test": "response"
12121439
}
12131440

1441+
# Execute
12141442
events = []
12151443
async for event in self.agent._run_async_impl(
12161444
self.mock_context

0 commit comments

Comments
 (0)