Skip to content

Commit 282b878

Browse files
Debug Agentclaude
andcommitted
review: close pending ACP tool cards on retry / abort
ACP servers mint fresh ``tool_call_id``s on a retried prompt(), so live-emitted pending events from the failed attempt would otherwise be orphaned on state.events — consumers that dedupe by tool_call_id and take the last-seen status as authoritative would keep those cards spinning forever. Introduces ``_cancel_inflight_tool_calls(on_event)`` which walks the accumulator and emits a terminal ``ACPToolCallEvent(status="failed", is_error=True)`` for every entry that hasn't reached a terminal status. Called before ``_reset_client_for_turn`` in both retry branches, and also before the error MessageEvent in the TimeoutError and outer-exception paths so aborted turns don't leave ghost cards behind either. Also documents the concurrency model on the bridge: on_event / on_token / on_activity all fire synchronously from the portal thread while the caller thread is blocked in portal.call(), so they do not race with the final MessageEvent / FinishAction emitted by the caller thread. Review feedback from VascoSch92 on PR #2868. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent fb60def commit 282b878

File tree

2 files changed

+228
-0
lines changed

2 files changed

+228
-0
lines changed

openhands-sdk/openhands/sdk/agent/acp_agent.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,12 @@
109109
# well below the ~20 min runtime-api kill threshold.
110110
_ACTIVITY_SIGNAL_INTERVAL: float = 30.0
111111

112+
# ACP tool-call statuses that represent a terminal outcome. Non-terminal
113+
# statuses (``pending``, ``in_progress``) mean the call is still in flight
114+
# and, if the turn aborts before it reaches a terminal state, the live-
115+
# emitted event on state.events will otherwise be orphaned forever.
116+
_TERMINAL_TOOL_CALL_STATUSES: frozenset[str] = frozenset({"completed", "failed"})
117+
112118

113119
def _make_dummy_llm() -> LLM:
114120
"""Create a dummy LLM that should never be called directly."""
@@ -286,6 +292,16 @@ class _OpenHandsACPBridge:
286292
"""Bridge between OpenHands and ACP that accumulates session updates.
287293
288294
Implements the ``Client`` protocol from ``agent_client_protocol``.
295+
296+
Concurrency model — ``on_event`` / ``on_token`` / ``on_activity`` are
297+
fired synchronously from ``session_update``, which runs on the
298+
``AsyncExecutor`` portal thread. The caller thread driving
299+
``ACPAgent.step()`` is blocked inside ``portal.call()`` for the entire
300+
``prompt()`` round-trip, so these callbacks do not race with the final
301+
``MessageEvent`` / ``FinishAction`` emitted by the caller thread after
302+
``prompt()`` returns. Consumers that keep cross-callback state (e.g.
303+
hook processors reading-then-writing, visualizers) can therefore treat
304+
each callback as sequential within a single turn.
289305
"""
290306

291307
def __init__(self) -> None:
@@ -939,6 +955,45 @@ def _reset_client_for_turn(
939955
self._client.on_event = on_event
940956
self._client.on_activity = self._on_activity
941957

958+
def _cancel_inflight_tool_calls(self, on_event: ConversationCallbackType) -> None:
959+
"""Emit a terminal ``failed`` ACPToolCallEvent for every tool call
960+
in the accumulator that has not reached a terminal status yet.
961+
962+
ACP servers mint fresh ``tool_call_id``s on a retried turn, so any
963+
``pending`` / ``in_progress`` events already streamed during the
964+
failed attempt would otherwise be orphaned on ``state.events`` —
965+
no later notification reuses their id, and consumers that dedupe
966+
by ``tool_call_id`` + "last-seen status wins" would keep them
967+
spinning forever. This method closes those cards before we wipe
968+
the in-memory accumulator on retry / turn abort.
969+
970+
Called with ``on_event`` passed in explicitly because the bridge's
971+
``on_event`` attribute is about to be cleared by ``reset()``.
972+
"""
973+
for tc in self._client.accumulated_tool_calls:
974+
status = tc.get("status")
975+
if status in _TERMINAL_TOOL_CALL_STATUSES:
976+
continue
977+
try:
978+
on_event(
979+
ACPToolCallEvent(
980+
tool_call_id=tc["tool_call_id"],
981+
title=tc["title"],
982+
status="failed",
983+
tool_kind=tc.get("tool_kind"),
984+
raw_input=tc.get("raw_input"),
985+
raw_output=tc.get("raw_output"),
986+
content=tc.get("content"),
987+
is_error=True,
988+
)
989+
)
990+
except Exception:
991+
logger.debug(
992+
"Failed to emit supersede event for %s",
993+
tc.get("tool_call_id"),
994+
exc_info=True,
995+
)
996+
942997
@observe(name="acp_agent.step", ignore_inputs=["conversation", "on_event"])
943998
def step(
944999
self,
@@ -1024,6 +1079,7 @@ async def _prompt() -> PromptResponse:
10241079
e,
10251080
)
10261081
time.sleep(delay)
1082+
self._cancel_inflight_tool_calls(on_event)
10271083
self._reset_client_for_turn(on_token, on_event)
10281084
else:
10291085
raise
@@ -1048,6 +1104,7 @@ async def _prompt() -> PromptResponse:
10481104
e,
10491105
)
10501106
time.sleep(delay)
1107+
self._cancel_inflight_tool_calls(on_event)
10511108
self._reset_client_for_turn(on_token, on_event)
10521109
else:
10531110
raise
@@ -1144,12 +1201,17 @@ async def _prompt() -> PromptResponse:
11441201
)
11451202
],
11461203
)
1204+
# Close any tool cards left in flight from the timed-out attempt.
1205+
self._cancel_inflight_tool_calls(on_event)
11471206
on_event(MessageEvent(source="agent", llm_message=error_message))
11481207
state.execution_status = ConversationExecutionStatus.ERROR
11491208
except Exception as e:
11501209
logger.error("ACP prompt failed: %s", e, exc_info=True)
11511210
error_str = str(e)
11521211

1212+
# Close any tool cards left in flight before surfacing the error.
1213+
self._cancel_inflight_tool_calls(on_event)
1214+
11531215
# Emit error as an agent message (existing behavior, preserved for
11541216
# consumers that inspect MessageEvents)
11551217
error_message = Message(

tests/sdk/agent/test_acp_agent.py

Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1401,6 +1401,172 @@ def test_reset_clears_on_event(self):
14011401
assert client.on_event is None
14021402

14031403

1404+
class TestACPCancelInflightToolCalls:
1405+
"""Tests for _cancel_inflight_tool_calls — ensures ghost tool cards are
1406+
closed on retry / abort so the live-emission stream cannot leave an
1407+
orphaned pending event on ``state.events``.
1408+
1409+
Raised in PR review on #2866: ACP servers mint fresh ``tool_call_id``s
1410+
when the prompt is retried, so any pending event already fired for the
1411+
failed attempt would otherwise spin forever under dedup-by-id consumers.
1412+
"""
1413+
1414+
@staticmethod
1415+
def _push_entry(
1416+
client: _OpenHandsACPBridge, tool_call_id: str, status: str
1417+
) -> None:
1418+
client.accumulated_tool_calls.append(
1419+
{
1420+
"tool_call_id": tool_call_id,
1421+
"title": f"Tool {tool_call_id}",
1422+
"tool_kind": "read",
1423+
"status": status,
1424+
"raw_input": {"k": "v"},
1425+
"raw_output": None,
1426+
"content": None,
1427+
}
1428+
)
1429+
1430+
def test_emits_failed_event_for_pending_entries(self, tmp_path):
1431+
"""Pending / in_progress entries get a terminal failed ACPToolCallEvent."""
1432+
agent = _make_agent()
1433+
agent._client = _OpenHandsACPBridge()
1434+
self._push_entry(agent._client, "tc-1", "pending")
1435+
self._push_entry(agent._client, "tc-2", "in_progress")
1436+
1437+
emitted: list = []
1438+
agent._cancel_inflight_tool_calls(emitted.append)
1439+
1440+
assert len(emitted) == 2
1441+
assert all(isinstance(e, ACPToolCallEvent) for e in emitted)
1442+
assert [e.tool_call_id for e in emitted] == ["tc-1", "tc-2"]
1443+
assert all(e.status == "failed" and e.is_error for e in emitted)
1444+
1445+
def test_skips_already_terminal_entries(self, tmp_path):
1446+
"""completed / failed entries are left alone — they already closed."""
1447+
agent = _make_agent()
1448+
agent._client = _OpenHandsACPBridge()
1449+
self._push_entry(agent._client, "tc-done", "completed")
1450+
self._push_entry(agent._client, "tc-bad", "failed")
1451+
self._push_entry(agent._client, "tc-live", "pending")
1452+
1453+
emitted: list = []
1454+
agent._cancel_inflight_tool_calls(emitted.append)
1455+
1456+
# Only the pending one gets a synthetic terminal event.
1457+
assert [e.tool_call_id for e in emitted] == ["tc-live"]
1458+
1459+
def test_callback_errors_are_swallowed(self):
1460+
"""A raising on_event during cancellation must not break the retry path."""
1461+
agent = _make_agent()
1462+
agent._client = _OpenHandsACPBridge()
1463+
self._push_entry(agent._client, "tc-1", "pending")
1464+
self._push_entry(agent._client, "tc-2", "pending")
1465+
1466+
seen: list = []
1467+
1468+
def flaky(event) -> None:
1469+
seen.append(event)
1470+
raise RuntimeError("boom")
1471+
1472+
agent._cancel_inflight_tool_calls(flaky) # must not raise
1473+
# Both entries still attempted even though the first raised.
1474+
assert len(seen) == 2
1475+
1476+
def test_retry_cancels_pending_events_before_reset(self, tmp_path):
1477+
"""Full step() retry path closes pending cards before the new attempt."""
1478+
from acp.schema import ToolCallStart
1479+
1480+
agent = _make_agent()
1481+
state = _make_state(tmp_path)
1482+
state.events.append(
1483+
SystemPromptEvent(
1484+
source="agent",
1485+
system_prompt=TextContent(text="sys"),
1486+
tools=[],
1487+
)
1488+
)
1489+
state.events.append(
1490+
MessageEvent(
1491+
source="user",
1492+
llm_message=Message(role="user", content=[TextContent(text="go")]),
1493+
)
1494+
)
1495+
conversation = MagicMock()
1496+
conversation.state = state
1497+
1498+
mock_client = _OpenHandsACPBridge()
1499+
agent._client = mock_client
1500+
agent._conn = MagicMock()
1501+
agent._session_id = "test-session"
1502+
1503+
events: list = []
1504+
call_count = 0
1505+
1506+
def _fake_run_async(_coro, **_kwargs):
1507+
nonlocal call_count
1508+
call_count += 1
1509+
if call_count == 1:
1510+
# First attempt: stream a pending tool call, then fail
1511+
start = MagicMock(spec=ToolCallStart)
1512+
start.tool_call_id = "toolu_AAA"
1513+
start.title = "Read file"
1514+
start.kind = "read"
1515+
start.status = "pending"
1516+
start.raw_input = {"path": "/tmp/x"}
1517+
start.raw_output = None
1518+
start.content = None
1519+
asyncio.run(mock_client.session_update("sess", start))
1520+
raise ConnectionError("reset by peer")
1521+
# Retry: fresh tool call id reaches terminal state
1522+
start = MagicMock(spec=ToolCallStart)
1523+
start.tool_call_id = "toolu_BBB"
1524+
start.title = "Read file"
1525+
start.kind = "read"
1526+
start.status = "completed"
1527+
start.raw_input = {"path": "/tmp/x"}
1528+
start.raw_output = "ok"
1529+
start.content = None
1530+
asyncio.run(mock_client.session_update("sess", start))
1531+
mock_client.accumulated_text.append("done")
1532+
return MagicMock(usage=None)
1533+
1534+
mock_executor = MagicMock()
1535+
mock_executor.run_async = _fake_run_async
1536+
agent._executor = mock_executor
1537+
1538+
with patch("openhands.sdk.agent.acp_agent.time.sleep"):
1539+
agent.step(conversation, on_event=events.append)
1540+
1541+
assert call_count == 2
1542+
tool_events = [e for e in events if isinstance(e, ACPToolCallEvent)]
1543+
# Expected sequence:
1544+
# toolu_AAA(pending) — live-emitted during attempt 1
1545+
# toolu_AAA(failed) — synthetic cancellation before retry reset
1546+
# toolu_BBB(completed) — attempt 2
1547+
by_id: dict[str, list[ACPToolCallEvent]] = {}
1548+
for e in tool_events:
1549+
by_id.setdefault(e.tool_call_id, []).append(e)
1550+
1551+
assert "toolu_AAA" in by_id
1552+
aaa_events = by_id["toolu_AAA"]
1553+
# Must end in a terminal status so consumer dedupe-by-id closes the card.
1554+
assert aaa_events[-1].status == "failed"
1555+
assert aaa_events[-1].is_error is True
1556+
1557+
assert "toolu_BBB" in by_id
1558+
assert by_id["toolu_BBB"][-1].status == "completed"
1559+
1560+
# The toolu_AAA cancellation comes before any toolu_BBB event.
1561+
aaa_idx = max(
1562+
i for i, e in enumerate(tool_events) if e.tool_call_id == "toolu_AAA"
1563+
)
1564+
bbb_idx = min(
1565+
i for i, e in enumerate(tool_events) if e.tool_call_id == "toolu_BBB"
1566+
)
1567+
assert aaa_idx < bbb_idx
1568+
1569+
14041570
class TestACPToolCallEmission:
14051571
"""Tests for ACPToolCallEvent emission in step()."""
14061572

0 commit comments

Comments
 (0)