From ba56c73ac1a5b4032f17cbb6530e13f6a42157b6 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 1 Jan 2026 07:44:52 +0000 Subject: [PATCH 1/3] fix: use endpoint-scoped task IDs for A2A delegations MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This fixes issue #4166 where delegating to a second A2A agent fails because the task_id from the first agent is in 'completed' state. The fix introduces endpoint-scoped task ID storage in task.config using a2a_task_ids_by_endpoint dictionary. This ensures that: - Each A2A endpoint gets its own task_id - Multi-turn conversations with the same endpoint reuse the task_id - Sequential delegations to different endpoints use separate task_ids Added tests to verify: - Sequential delegation to multiple endpoints uses separate task IDs - Multi-turn conversations with same endpoint reuse task IDs - Endpoint-scoped task IDs are properly persisted to task.config Co-Authored-By: João --- lib/crewai/src/crewai/a2a/wrapper.py | 17 +- .../agents/test_a2a_multiple_endpoints.py | 314 ++++++++++++++++++ 2 files changed, 328 insertions(+), 3 deletions(-) create mode 100644 lib/crewai/tests/agents/test_a2a_multiple_endpoints.py diff --git a/lib/crewai/src/crewai/a2a/wrapper.py b/lib/crewai/src/crewai/a2a/wrapper.py index 4c98e6f30d..8e26e7e383 100644 --- a/lib/crewai/src/crewai/a2a/wrapper.py +++ b/lib/crewai/src/crewai/a2a/wrapper.py @@ -531,10 +531,17 @@ def _delegate_to_a2a( agent_config = next(filter(lambda x: x.endpoint == agent_id, a2a_agents)) task_config = task.config or {} context_id = task_config.get("context_id") - task_id_config = task_config.get("task_id") metadata = task_config.get("metadata") extensions = task_config.get("extensions") + # Use endpoint-scoped task IDs to prevent reusing task IDs across different A2A agents + # This fixes the issue where delegating to a second A2A agent fails because the task_id + # from the first agent is in "completed" state + a2a_task_ids_by_endpoint: dict[str, str] = task_config.get( + "a2a_task_ids_by_endpoint", {} + ) + task_id_config = a2a_task_ids_by_endpoint.get(agent_id) + reference_task_ids = task_config.get("reference_task_ids", []) if original_task_description is None: @@ -576,6 +583,8 @@ def _delegate_to_a2a( latest_message = conversation_history[-1] if latest_message.task_id is not None: task_id_config = latest_message.task_id + # Store the task_id scoped to this endpoint for multi-turn conversations + a2a_task_ids_by_endpoint[agent_id] = task_id_config if latest_message.context_id is not None: context_id = latest_message.context_id @@ -584,13 +593,15 @@ def _delegate_to_a2a( a2a_result["status"] == "completed" and agent_config.trust_remote_completion_status ): + if task.config is None: + task.config = {} + # Persist endpoint-scoped task IDs for future delegations + task.config["a2a_task_ids_by_endpoint"] = a2a_task_ids_by_endpoint if ( task_id_config is not None and task_id_config not in reference_task_ids ): reference_task_ids.append(task_id_config) - if task.config is None: - task.config = {} task.config["reference_task_ids"] = reference_task_ids result_text = a2a_result.get("result", "") diff --git a/lib/crewai/tests/agents/test_a2a_multiple_endpoints.py b/lib/crewai/tests/agents/test_a2a_multiple_endpoints.py new file mode 100644 index 0000000000..332af8cc94 --- /dev/null +++ b/lib/crewai/tests/agents/test_a2a_multiple_endpoints.py @@ -0,0 +1,314 @@ +"""Test A2A delegation to multiple endpoints sequentially. + +This test file covers the bug fix for issue #4166 where delegating to a second +A2A agent fails because the task_id from the first agent is in "completed" state. +""" + +from unittest.mock import MagicMock, patch + +import pytest + +from crewai.a2a.config import A2AConfig + +try: + from a2a.types import Message, Part, Role, TextPart + + A2A_SDK_INSTALLED = True +except ImportError: + A2A_SDK_INSTALLED = False + + +@pytest.mark.skipif(not A2A_SDK_INSTALLED, reason="Requires a2a-sdk to be installed") +def test_sequential_delegation_to_multiple_endpoints_uses_separate_task_ids(): + """When delegating to multiple A2A endpoints sequentially, each should get a unique task_id. + + This test verifies the fix for issue #4166 where the second A2A delegation + fails with 'Task is in terminal state: completed' because the task_id from + the first delegation was being reused. + """ + from crewai.a2a.wrapper import _delegate_to_a2a + from crewai import Agent, Task + + # Configure agent with two A2A endpoints + a2a_configs = [ + A2AConfig( + endpoint="http://endpoint-a.com", + trust_remote_completion_status=True, + ), + A2AConfig( + endpoint="http://endpoint-b.com", + trust_remote_completion_status=True, + ), + ] + + agent = Agent( + role="test manager", + goal="coordinate", + backstory="test", + a2a=a2a_configs, + ) + + task = Task(description="test", expected_output="test", agent=agent) + + # First delegation to endpoint A + class MockResponseA: + is_a2a = True + message = "Please help with task A" + a2a_ids = ["http://endpoint-a.com/"] + + # Second delegation to endpoint B + class MockResponseB: + is_a2a = True + message = "Please help with task B" + a2a_ids = ["http://endpoint-b.com/"] + + task_ids_used = [] + + def mock_execute_a2a_delegation(**kwargs): + """Track the task_id used for each delegation.""" + task_ids_used.append(kwargs.get("task_id")) + endpoint = kwargs.get("endpoint") + + # Create a mock message with a task_id + mock_message = MagicMock() + mock_message.task_id = f"task-id-for-{endpoint}" + mock_message.context_id = None + + return { + "status": "completed", + "result": f"Done by {endpoint}", + "history": [mock_message], + } + + with ( + patch( + "crewai.a2a.wrapper.execute_a2a_delegation", + side_effect=mock_execute_a2a_delegation, + ) as mock_execute, + patch("crewai.a2a.wrapper._fetch_agent_cards_concurrently") as mock_fetch, + ): + mock_card_a = MagicMock() + mock_card_a.name = "Agent A" + mock_card_b = MagicMock() + mock_card_b.name = "Agent B" + mock_fetch.return_value = ( + { + "http://endpoint-a.com/": mock_card_a, + "http://endpoint-b.com/": mock_card_b, + }, + {}, + ) + + # First delegation to endpoint A + result_a = _delegate_to_a2a( + self=agent, + agent_response=MockResponseA(), + task=task, + original_fn=lambda *args, **kwargs: "fallback", + context=None, + tools=None, + agent_cards={ + "http://endpoint-a.com/": mock_card_a, + "http://endpoint-b.com/": mock_card_b, + }, + original_task_description="test", + ) + + assert result_a == "Done by http://endpoint-a.com/" + + # Verify the endpoint-scoped task IDs are stored in task.config + assert task.config is not None + assert "a2a_task_ids_by_endpoint" in task.config + assert ( + task.config["a2a_task_ids_by_endpoint"]["http://endpoint-a.com/"] + == "task-id-for-http://endpoint-a.com/" + ) + + # Second delegation to endpoint B + result_b = _delegate_to_a2a( + self=agent, + agent_response=MockResponseB(), + task=task, + original_fn=lambda *args, **kwargs: "fallback", + context=None, + tools=None, + agent_cards={ + "http://endpoint-a.com/": mock_card_a, + "http://endpoint-b.com/": mock_card_b, + }, + original_task_description="test", + ) + + assert result_b == "Done by http://endpoint-b.com/" + + # Verify that the second delegation used a different (None) task_id + # The first call should have task_id=None (no prior task_id for endpoint A) + # The second call should also have task_id=None (no prior task_id for endpoint B) + assert len(task_ids_used) == 2 + assert task_ids_used[0] is None # First delegation to endpoint A + assert task_ids_used[1] is None # Second delegation to endpoint B (not reusing A's task_id) + + +@pytest.mark.skipif(not A2A_SDK_INSTALLED, reason="Requires a2a-sdk to be installed") +def test_multi_turn_conversation_with_same_endpoint_reuses_task_id(): + """Multi-turn conversations with the same endpoint should reuse the task_id. + + This test ensures that the fix for issue #4166 doesn't break multi-turn + conversations with the same endpoint. When trust_remote_completion_status=True, + the task_id should be stored and reused for subsequent calls to the same endpoint. + """ + from crewai.a2a.wrapper import _delegate_to_a2a + from crewai import Agent, Task + + a2a_config = A2AConfig( + endpoint="http://test-endpoint.com", + trust_remote_completion_status=True, + ) + + agent = Agent( + role="test manager", + goal="coordinate", + backstory="test", + a2a=a2a_config, + ) + + task = Task(description="test", expected_output="test", agent=agent) + + class MockResponse: + is_a2a = True + message = "Please help" + a2a_ids = ["http://test-endpoint.com/"] + + task_ids_used = [] + + def mock_execute_a2a_delegation(**kwargs): + """Track the task_id used for each call.""" + task_ids_used.append(kwargs.get("task_id")) + + # Create a mock message with a task_id + mock_message = MagicMock() + mock_message.task_id = "persistent-task-id" + mock_message.context_id = None + + return { + "status": "completed", + "result": "Done", + "history": [mock_message], + } + + with ( + patch( + "crewai.a2a.wrapper.execute_a2a_delegation", + side_effect=mock_execute_a2a_delegation, + ), + patch("crewai.a2a.wrapper._fetch_agent_cards_concurrently") as mock_fetch, + ): + mock_card = MagicMock() + mock_card.name = "Test" + mock_fetch.return_value = ({"http://test-endpoint.com/": mock_card}, {}) + + # First delegation + _delegate_to_a2a( + self=agent, + agent_response=MockResponse(), + task=task, + original_fn=lambda *args, **kwargs: "fallback", + context=None, + tools=None, + agent_cards={"http://test-endpoint.com/": mock_card}, + original_task_description="test", + ) + + # Verify the task_id was stored in the endpoint-scoped dictionary + assert task.config is not None + assert "a2a_task_ids_by_endpoint" in task.config + stored_task_id = task.config["a2a_task_ids_by_endpoint"].get( + "http://test-endpoint.com/" + ) + assert stored_task_id == "persistent-task-id" + + # Second delegation to the SAME endpoint should use the stored task_id + _delegate_to_a2a( + self=agent, + agent_response=MockResponse(), + task=task, + original_fn=lambda *args, **kwargs: "fallback", + context=None, + tools=None, + agent_cards={"http://test-endpoint.com/": mock_card}, + original_task_description="test", + ) + + # Verify that the second call used the stored task_id + assert len(task_ids_used) == 2 + # First call should have no task_id (new conversation) + assert task_ids_used[0] is None + # Second call should reuse the task_id from the first call + assert task_ids_used[1] == "persistent-task-id" + + +@pytest.mark.skipif(not A2A_SDK_INSTALLED, reason="Requires a2a-sdk to be installed") +def test_endpoint_scoped_task_ids_are_persisted_to_task_config(): + """Verify that endpoint-scoped task IDs are properly persisted to task.config.""" + from crewai.a2a.wrapper import _delegate_to_a2a + from crewai import Agent, Task + + a2a_config = A2AConfig( + endpoint="http://test-endpoint.com", + trust_remote_completion_status=True, + ) + + agent = Agent( + role="test manager", + goal="coordinate", + backstory="test", + a2a=a2a_config, + ) + + task = Task(description="test", expected_output="test", agent=agent) + + class MockResponse: + is_a2a = True + message = "Please help" + a2a_ids = ["http://test-endpoint.com/"] + + def mock_execute_a2a_delegation(**kwargs): + mock_message = MagicMock() + mock_message.task_id = "unique-task-id-123" + mock_message.context_id = None + + return { + "status": "completed", + "result": "Done", + "history": [mock_message], + } + + with ( + patch( + "crewai.a2a.wrapper.execute_a2a_delegation", + side_effect=mock_execute_a2a_delegation, + ), + patch("crewai.a2a.wrapper._fetch_agent_cards_concurrently") as mock_fetch, + ): + mock_card = MagicMock() + mock_card.name = "Test" + mock_fetch.return_value = ({"http://test-endpoint.com/": mock_card}, {}) + + _delegate_to_a2a( + self=agent, + agent_response=MockResponse(), + task=task, + original_fn=lambda *args, **kwargs: "fallback", + context=None, + tools=None, + agent_cards={"http://test-endpoint.com/": mock_card}, + original_task_description="test", + ) + + # Verify the endpoint-scoped task IDs are stored + assert task.config is not None + assert "a2a_task_ids_by_endpoint" in task.config + assert ( + task.config["a2a_task_ids_by_endpoint"]["http://test-endpoint.com/"] + == "unique-task-id-123" + ) From f171805092c124541dd3ef5a5817e3ef74c82131 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 1 Jan 2026 08:03:00 +0000 Subject: [PATCH 2/3] fix: address Bugbot feedback - prevent in-place mutation and don't persist completed task IDs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Make defensive copy of a2a_task_ids_by_endpoint dict to avoid in-place mutation - Don't persist completed task IDs since A2A protocol rejects terminal state task IDs - Update task_id_config locally for current loop only, not in shared dict - Update tests to verify correct behavior: - Completed task IDs are NOT persisted for reuse - Each new delegation gets a fresh task_id (None) - Completed task IDs are still tracked in reference_task_ids Co-Authored-By: João --- lib/crewai/src/crewai/a2a/wrapper.py | 14 +++-- .../agents/test_a2a_multiple_endpoints.py | 61 ++++++++----------- 2 files changed, 34 insertions(+), 41 deletions(-) diff --git a/lib/crewai/src/crewai/a2a/wrapper.py b/lib/crewai/src/crewai/a2a/wrapper.py index 8e26e7e383..c365cdb90a 100644 --- a/lib/crewai/src/crewai/a2a/wrapper.py +++ b/lib/crewai/src/crewai/a2a/wrapper.py @@ -537,8 +537,9 @@ def _delegate_to_a2a( # Use endpoint-scoped task IDs to prevent reusing task IDs across different A2A agents # This fixes the issue where delegating to a second A2A agent fails because the task_id # from the first agent is in "completed" state - a2a_task_ids_by_endpoint: dict[str, str] = task_config.get( - "a2a_task_ids_by_endpoint", {} + # Make a defensive copy to avoid in-place mutation of task.config + a2a_task_ids_by_endpoint: dict[str, str] = dict( + task_config.get("a2a_task_ids_by_endpoint", {}) ) task_id_config = a2a_task_ids_by_endpoint.get(agent_id) @@ -582,9 +583,9 @@ def _delegate_to_a2a( if conversation_history: latest_message = conversation_history[-1] if latest_message.task_id is not None: + # Update task_id_config for the current loop iteration only + # Don't persist to a2a_task_ids_by_endpoint yet - wait until we know the status task_id_config = latest_message.task_id - # Store the task_id scoped to this endpoint for multi-turn conversations - a2a_task_ids_by_endpoint[agent_id] = task_id_config if latest_message.context_id is not None: context_id = latest_message.context_id @@ -593,10 +594,11 @@ def _delegate_to_a2a( a2a_result["status"] == "completed" and agent_config.trust_remote_completion_status ): + # Don't persist completed task IDs - they can't be reused + # (A2A protocol rejects task IDs in terminal state) + # Only add to reference_task_ids for tracking purposes if task.config is None: task.config = {} - # Persist endpoint-scoped task IDs for future delegations - task.config["a2a_task_ids_by_endpoint"] = a2a_task_ids_by_endpoint if ( task_id_config is not None and task_id_config not in reference_task_ids diff --git a/lib/crewai/tests/agents/test_a2a_multiple_endpoints.py b/lib/crewai/tests/agents/test_a2a_multiple_endpoints.py index 332af8cc94..d0496fc4a8 100644 --- a/lib/crewai/tests/agents/test_a2a_multiple_endpoints.py +++ b/lib/crewai/tests/agents/test_a2a_multiple_endpoints.py @@ -116,14 +116,6 @@ def mock_execute_a2a_delegation(**kwargs): assert result_a == "Done by http://endpoint-a.com/" - # Verify the endpoint-scoped task IDs are stored in task.config - assert task.config is not None - assert "a2a_task_ids_by_endpoint" in task.config - assert ( - task.config["a2a_task_ids_by_endpoint"]["http://endpoint-a.com/"] - == "task-id-for-http://endpoint-a.com/" - ) - # Second delegation to endpoint B result_b = _delegate_to_a2a( self=agent, @@ -150,12 +142,12 @@ def mock_execute_a2a_delegation(**kwargs): @pytest.mark.skipif(not A2A_SDK_INSTALLED, reason="Requires a2a-sdk to be installed") -def test_multi_turn_conversation_with_same_endpoint_reuses_task_id(): - """Multi-turn conversations with the same endpoint should reuse the task_id. +def test_completed_task_ids_are_not_persisted_for_reuse(): + """Completed task IDs should NOT be persisted for reuse. - This test ensures that the fix for issue #4166 doesn't break multi-turn - conversations with the same endpoint. When trust_remote_completion_status=True, - the task_id should be stored and reused for subsequent calls to the same endpoint. + The A2A protocol rejects task IDs that are in terminal state (completed/failed). + This test verifies that completed task IDs are not stored in task.config + for future delegations, so each new delegation gets a fresh task_id. """ from crewai.a2a.wrapper import _delegate_to_a2a from crewai import Agent, Task @@ -187,7 +179,7 @@ def mock_execute_a2a_delegation(**kwargs): # Create a mock message with a task_id mock_message = MagicMock() - mock_message.task_id = "persistent-task-id" + mock_message.task_id = "completed-task-id" mock_message.context_id = None return { @@ -219,15 +211,14 @@ def mock_execute_a2a_delegation(**kwargs): original_task_description="test", ) - # Verify the task_id was stored in the endpoint-scoped dictionary - assert task.config is not None - assert "a2a_task_ids_by_endpoint" in task.config - stored_task_id = task.config["a2a_task_ids_by_endpoint"].get( - "http://test-endpoint.com/" - ) - assert stored_task_id == "persistent-task-id" + # Verify that completed task IDs are NOT stored in a2a_task_ids_by_endpoint + # because they can't be reused (A2A protocol rejects terminal state task IDs) + if task.config is not None: + a2a_task_ids = task.config.get("a2a_task_ids_by_endpoint", {}) + # The endpoint should NOT have a stored task_id since it completed + assert "http://test-endpoint.com/" not in a2a_task_ids - # Second delegation to the SAME endpoint should use the stored task_id + # Second delegation to the SAME endpoint should also get a fresh task_id _delegate_to_a2a( self=agent, agent_response=MockResponse(), @@ -239,17 +230,20 @@ def mock_execute_a2a_delegation(**kwargs): original_task_description="test", ) - # Verify that the second call used the stored task_id + # Verify that BOTH calls used None as task_id (fresh task for each) + # because completed task IDs are not persisted assert len(task_ids_used) == 2 - # First call should have no task_id (new conversation) - assert task_ids_used[0] is None - # Second call should reuse the task_id from the first call - assert task_ids_used[1] == "persistent-task-id" + assert task_ids_used[0] is None # First call - new conversation + assert task_ids_used[1] is None # Second call - also new (completed IDs not reused) @pytest.mark.skipif(not A2A_SDK_INSTALLED, reason="Requires a2a-sdk to be installed") -def test_endpoint_scoped_task_ids_are_persisted_to_task_config(): - """Verify that endpoint-scoped task IDs are properly persisted to task.config.""" +def test_reference_task_ids_are_tracked_for_completed_tasks(): + """Completed task IDs should be added to reference_task_ids for tracking. + + While completed task IDs can't be reused for new delegations, they should + still be tracked in reference_task_ids for context/history purposes. + """ from crewai.a2a.wrapper import _delegate_to_a2a from crewai import Agent, Task @@ -305,10 +299,7 @@ def mock_execute_a2a_delegation(**kwargs): original_task_description="test", ) - # Verify the endpoint-scoped task IDs are stored + # Verify the completed task_id is tracked in reference_task_ids assert task.config is not None - assert "a2a_task_ids_by_endpoint" in task.config - assert ( - task.config["a2a_task_ids_by_endpoint"]["http://test-endpoint.com/"] - == "unique-task-id-123" - ) + assert "reference_task_ids" in task.config + assert "unique-task-id-123" in task.config["reference_task_ids"] From 4f1315348321d1a612652004ae7799a96d606e67 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 1 Jan 2026 08:17:38 +0000 Subject: [PATCH 3/3] fix: handle None value for a2a_task_ids_by_endpoint in config MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Address Bugbot feedback - handle case where a2a_task_ids_by_endpoint is explicitly set to None (e.g., from JSON/YAML config) to avoid TypeError when calling dict() on None. Co-Authored-By: João --- lib/crewai/src/crewai/a2a/wrapper.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/lib/crewai/src/crewai/a2a/wrapper.py b/lib/crewai/src/crewai/a2a/wrapper.py index c365cdb90a..ad831c7764 100644 --- a/lib/crewai/src/crewai/a2a/wrapper.py +++ b/lib/crewai/src/crewai/a2a/wrapper.py @@ -538,8 +538,10 @@ def _delegate_to_a2a( # This fixes the issue where delegating to a second A2A agent fails because the task_id # from the first agent is in "completed" state # Make a defensive copy to avoid in-place mutation of task.config - a2a_task_ids_by_endpoint: dict[str, str] = dict( - task_config.get("a2a_task_ids_by_endpoint", {}) + # Handle case where value is explicitly None (e.g., from JSON/YAML config) + existing_task_ids = task_config.get("a2a_task_ids_by_endpoint") + a2a_task_ids_by_endpoint: dict[str, str] = ( + dict(existing_task_ids) if existing_task_ids else {} ) task_id_config = a2a_task_ids_by_endpoint.get(agent_id)