Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions lib/crewai/src/crewai/a2a/wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -531,10 +531,20 @@ def _delegate_to_a2a(
agent_config = next(filter(lambda x: x.endpoint == agent_id, a2a_agents))
task_config = task.config or {}
context_id = task_config.get("context_id")
task_id_config = task_config.get("task_id")
metadata = task_config.get("metadata")
extensions = task_config.get("extensions")

# Use endpoint-scoped task IDs to prevent reusing task IDs across different A2A agents
# This fixes the issue where delegating to a second A2A agent fails because the task_id
# from the first agent is in "completed" state
# Make a defensive copy to avoid in-place mutation of task.config
# Handle case where value is explicitly None (e.g., from JSON/YAML config)
existing_task_ids = task_config.get("a2a_task_ids_by_endpoint")
a2a_task_ids_by_endpoint: dict[str, str] = (
dict(existing_task_ids) if existing_task_ids else {}
)
task_id_config = a2a_task_ids_by_endpoint.get(agent_id)

reference_task_ids = task_config.get("reference_task_ids", [])

if original_task_description is None:
Expand Down Expand Up @@ -575,6 +585,8 @@ def _delegate_to_a2a(
if conversation_history:
latest_message = conversation_history[-1]
if latest_message.task_id is not None:
# Update task_id_config for the current loop iteration only
# Don't persist to a2a_task_ids_by_endpoint yet - wait until we know the status
task_id_config = latest_message.task_id
if latest_message.context_id is not None:
context_id = latest_message.context_id
Expand All @@ -584,13 +596,16 @@ def _delegate_to_a2a(
a2a_result["status"] == "completed"
and agent_config.trust_remote_completion_status
):
# Don't persist completed task IDs - they can't be reused
# (A2A protocol rejects task IDs in terminal state)
# Only add to reference_task_ids for tracking purposes
if task.config is None:
task.config = {}
if (
task_id_config is not None
and task_id_config not in reference_task_ids
):
reference_task_ids.append(task_id_config)
if task.config is None:
task.config = {}
task.config["reference_task_ids"] = reference_task_ids

result_text = a2a_result.get("result", "")
Expand Down
305 changes: 305 additions & 0 deletions lib/crewai/tests/agents/test_a2a_multiple_endpoints.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,305 @@
"""Test A2A delegation to multiple endpoints sequentially.
This test file covers the bug fix for issue #4166 where delegating to a second
A2A agent fails because the task_id from the first agent is in "completed" state.
"""

from unittest.mock import MagicMock, patch

import pytest

from crewai.a2a.config import A2AConfig

try:
from a2a.types import Message, Part, Role, TextPart

A2A_SDK_INSTALLED = True
except ImportError:
A2A_SDK_INSTALLED = False


@pytest.mark.skipif(not A2A_SDK_INSTALLED, reason="Requires a2a-sdk to be installed")
def test_sequential_delegation_to_multiple_endpoints_uses_separate_task_ids():
"""When delegating to multiple A2A endpoints sequentially, each should get a unique task_id.
This test verifies the fix for issue #4166 where the second A2A delegation
fails with 'Task is in terminal state: completed' because the task_id from
the first delegation was being reused.
"""
from crewai.a2a.wrapper import _delegate_to_a2a
from crewai import Agent, Task

# Configure agent with two A2A endpoints
a2a_configs = [
A2AConfig(
endpoint="http://endpoint-a.com",
trust_remote_completion_status=True,
),
A2AConfig(
endpoint="http://endpoint-b.com",
trust_remote_completion_status=True,
),
]

agent = Agent(
role="test manager",
goal="coordinate",
backstory="test",
a2a=a2a_configs,
)

task = Task(description="test", expected_output="test", agent=agent)

# First delegation to endpoint A
class MockResponseA:
is_a2a = True
message = "Please help with task A"
a2a_ids = ["http://endpoint-a.com/"]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

URL trailing slash mismatch causes tests to fail

All three tests have a URL mismatch that would cause them to fail with ValueError before testing the intended behavior. The mock responses use a2a_ids with trailing slashes (e.g., "http://endpoint-a.com/") while the A2AConfig endpoints are configured without trailing slashes (e.g., "http://endpoint-a.com"). The validation at wrapper.py lines 526-529 checks if agent_id is in the configured agent_ids, and these mismatched URLs would fail the check, raising a ValueError immediately. The tests would never reach the code they're designed to verify.

Additional Locations (2)

Fix in Cursor Fix in Web

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a false positive. The tests pass because they mock _fetch_agent_cards_concurrently and pass agent_cards directly to _delegate_to_a2a. The validation at lines 526-529 checks if agent_id is in the agent_cards dictionary keys, not in the A2AConfig endpoints. Since the tests provide agent_cards with trailing slashes that match the a2a_ids, the validation passes correctly.

The tests are passing in CI (all 3 tests pass across Python 3.10, 3.11, 3.12, and 3.13), confirming this is not an issue.


# Second delegation to endpoint B
class MockResponseB:
is_a2a = True
message = "Please help with task B"
a2a_ids = ["http://endpoint-b.com/"]

task_ids_used = []

def mock_execute_a2a_delegation(**kwargs):
"""Track the task_id used for each delegation."""
task_ids_used.append(kwargs.get("task_id"))
endpoint = kwargs.get("endpoint")

# Create a mock message with a task_id
mock_message = MagicMock()
mock_message.task_id = f"task-id-for-{endpoint}"
mock_message.context_id = None

return {
"status": "completed",
"result": f"Done by {endpoint}",
"history": [mock_message],
}

with (
patch(
"crewai.a2a.wrapper.execute_a2a_delegation",
side_effect=mock_execute_a2a_delegation,
) as mock_execute,
patch("crewai.a2a.wrapper._fetch_agent_cards_concurrently") as mock_fetch,
):
mock_card_a = MagicMock()
mock_card_a.name = "Agent A"
mock_card_b = MagicMock()
mock_card_b.name = "Agent B"
mock_fetch.return_value = (
{
"http://endpoint-a.com/": mock_card_a,
"http://endpoint-b.com/": mock_card_b,
},
{},
)

# First delegation to endpoint A
result_a = _delegate_to_a2a(
self=agent,
agent_response=MockResponseA(),
task=task,
original_fn=lambda *args, **kwargs: "fallback",
context=None,
tools=None,
agent_cards={
"http://endpoint-a.com/": mock_card_a,
"http://endpoint-b.com/": mock_card_b,
},
original_task_description="test",
)

assert result_a == "Done by http://endpoint-a.com/"

# Second delegation to endpoint B
result_b = _delegate_to_a2a(
self=agent,
agent_response=MockResponseB(),
task=task,
original_fn=lambda *args, **kwargs: "fallback",
context=None,
tools=None,
agent_cards={
"http://endpoint-a.com/": mock_card_a,
"http://endpoint-b.com/": mock_card_b,
},
original_task_description="test",
)

assert result_b == "Done by http://endpoint-b.com/"

# Verify that the second delegation used a different (None) task_id
# The first call should have task_id=None (no prior task_id for endpoint A)
# The second call should also have task_id=None (no prior task_id for endpoint B)
assert len(task_ids_used) == 2
assert task_ids_used[0] is None # First delegation to endpoint A
assert task_ids_used[1] is None # Second delegation to endpoint B (not reusing A's task_id)


@pytest.mark.skipif(not A2A_SDK_INSTALLED, reason="Requires a2a-sdk to be installed")
def test_completed_task_ids_are_not_persisted_for_reuse():
"""Completed task IDs should NOT be persisted for reuse.
The A2A protocol rejects task IDs that are in terminal state (completed/failed).
This test verifies that completed task IDs are not stored in task.config
for future delegations, so each new delegation gets a fresh task_id.
"""
from crewai.a2a.wrapper import _delegate_to_a2a
from crewai import Agent, Task

a2a_config = A2AConfig(
endpoint="http://test-endpoint.com",
trust_remote_completion_status=True,
)

agent = Agent(
role="test manager",
goal="coordinate",
backstory="test",
a2a=a2a_config,
)

task = Task(description="test", expected_output="test", agent=agent)

class MockResponse:
is_a2a = True
message = "Please help"
a2a_ids = ["http://test-endpoint.com/"]

task_ids_used = []

def mock_execute_a2a_delegation(**kwargs):
"""Track the task_id used for each call."""
task_ids_used.append(kwargs.get("task_id"))

# Create a mock message with a task_id
mock_message = MagicMock()
mock_message.task_id = "completed-task-id"
mock_message.context_id = None

return {
"status": "completed",
"result": "Done",
"history": [mock_message],
}

with (
patch(
"crewai.a2a.wrapper.execute_a2a_delegation",
side_effect=mock_execute_a2a_delegation,
),
patch("crewai.a2a.wrapper._fetch_agent_cards_concurrently") as mock_fetch,
):
mock_card = MagicMock()
mock_card.name = "Test"
mock_fetch.return_value = ({"http://test-endpoint.com/": mock_card}, {})

# First delegation
_delegate_to_a2a(
self=agent,
agent_response=MockResponse(),
task=task,
original_fn=lambda *args, **kwargs: "fallback",
context=None,
tools=None,
agent_cards={"http://test-endpoint.com/": mock_card},
original_task_description="test",
)

# Verify that completed task IDs are NOT stored in a2a_task_ids_by_endpoint
# because they can't be reused (A2A protocol rejects terminal state task IDs)
if task.config is not None:
a2a_task_ids = task.config.get("a2a_task_ids_by_endpoint", {})
# The endpoint should NOT have a stored task_id since it completed
assert "http://test-endpoint.com/" not in a2a_task_ids

# Second delegation to the SAME endpoint should also get a fresh task_id
_delegate_to_a2a(
self=agent,
agent_response=MockResponse(),
task=task,
original_fn=lambda *args, **kwargs: "fallback",
context=None,
tools=None,
agent_cards={"http://test-endpoint.com/": mock_card},
original_task_description="test",
)

# Verify that BOTH calls used None as task_id (fresh task for each)
# because completed task IDs are not persisted
assert len(task_ids_used) == 2
assert task_ids_used[0] is None # First call - new conversation
assert task_ids_used[1] is None # Second call - also new (completed IDs not reused)


@pytest.mark.skipif(not A2A_SDK_INSTALLED, reason="Requires a2a-sdk to be installed")
def test_reference_task_ids_are_tracked_for_completed_tasks():
"""Completed task IDs should be added to reference_task_ids for tracking.
While completed task IDs can't be reused for new delegations, they should
still be tracked in reference_task_ids for context/history purposes.
"""
from crewai.a2a.wrapper import _delegate_to_a2a
from crewai import Agent, Task

a2a_config = A2AConfig(
endpoint="http://test-endpoint.com",
trust_remote_completion_status=True,
)

agent = Agent(
role="test manager",
goal="coordinate",
backstory="test",
a2a=a2a_config,
)

task = Task(description="test", expected_output="test", agent=agent)

class MockResponse:
is_a2a = True
message = "Please help"
a2a_ids = ["http://test-endpoint.com/"]

def mock_execute_a2a_delegation(**kwargs):
mock_message = MagicMock()
mock_message.task_id = "unique-task-id-123"
mock_message.context_id = None

return {
"status": "completed",
"result": "Done",
"history": [mock_message],
}

with (
patch(
"crewai.a2a.wrapper.execute_a2a_delegation",
side_effect=mock_execute_a2a_delegation,
),
patch("crewai.a2a.wrapper._fetch_agent_cards_concurrently") as mock_fetch,
):
mock_card = MagicMock()
mock_card.name = "Test"
mock_fetch.return_value = ({"http://test-endpoint.com/": mock_card}, {})

_delegate_to_a2a(
self=agent,
agent_response=MockResponse(),
task=task,
original_fn=lambda *args, **kwargs: "fallback",
context=None,
tools=None,
agent_cards={"http://test-endpoint.com/": mock_card},
original_task_description="test",
)

# Verify the completed task_id is tracked in reference_task_ids
assert task.config is not None
assert "reference_task_ids" in task.config
assert "unique-task-id-123" in task.config["reference_task_ids"]
Loading