Skip to content

Commit ea808bd

Browse files
Debug Agentclaude
andcommitted
feat(acp): stream ACPToolCallEvents live from session_update
Emit ACPToolCallEvents from _OpenHandsACPBridge.session_update as each ToolCallStart / ToolCallProgress notification arrives instead of batching them into a single end-of-turn fan-out. Consumers dedupe by tool_call_id and treat the last-seen event as authoritative. ACPAgent.step() now just wires on_event onto the bridge and handles bookkeeping (usage totals, FinishAction, execution-status transition) once prompt() returns. The final MessageEvent / FinishAction shape is unchanged. Closes #2866 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent a234670 commit ea808bd

File tree

2 files changed

+280
-47
lines changed

2 files changed

+280
-47
lines changed

openhands-sdk/openhands/sdk/agent/acp_agent.py

Lines changed: 61 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,11 @@ def __init__(self) -> None:
293293
self.accumulated_thoughts: list[str] = []
294294
self.accumulated_tool_calls: list[dict[str, Any]] = []
295295
self.on_token: Any = None # ConversationTokenCallbackType | None
296+
# Live event sink — fired from session_update as ACP tool-call
297+
# updates arrive, so the event stream reflects real subprocess
298+
# progress instead of a single end-of-turn burst. Set by
299+
# ACPAgent.step() for the duration of one prompt() round-trip.
300+
self.on_event: Any = None # ConversationCallbackType | None
296301
# Activity heartbeat — called (throttled) during session_update to
297302
# signal that the ACP subprocess is still actively working. Set by
298303
# ACPAgent.step() to keep the agent-server's idle timer alive.
@@ -317,6 +322,7 @@ def reset(self) -> None:
317322
self.accumulated_thoughts.clear()
318323
self.accumulated_tool_calls.clear()
319324
self.on_token = None
325+
self.on_event = None
320326
self.on_activity = None
321327
self._turn_usage_updates.clear()
322328
self._usage_received.clear()
@@ -378,21 +384,22 @@ async def session_update(
378384
if event is not None:
379385
event.set()
380386
elif isinstance(update, ToolCallStart):
381-
self.accumulated_tool_calls.append(
382-
{
383-
"tool_call_id": update.tool_call_id,
384-
"title": update.title,
385-
"tool_kind": update.kind,
386-
"status": update.status,
387-
"raw_input": update.raw_input,
388-
"raw_output": update.raw_output,
389-
"content": _serialize_tool_content(update.content),
390-
}
391-
)
387+
entry = {
388+
"tool_call_id": update.tool_call_id,
389+
"title": update.title,
390+
"tool_kind": update.kind,
391+
"status": update.status,
392+
"raw_input": update.raw_input,
393+
"raw_output": update.raw_output,
394+
"content": _serialize_tool_content(update.content),
395+
}
396+
self.accumulated_tool_calls.append(entry)
392397
logger.debug("ACP tool call start: %s", update.tool_call_id)
398+
self._emit_tool_call_event(entry)
393399
self._maybe_signal_activity()
394400
elif isinstance(update, ToolCallProgress):
395401
# Find the existing tool call entry and merge updates
402+
target: dict[str, Any] | None = None
396403
for tc in self.accumulated_tool_calls:
397404
if tc["tool_call_id"] == update.tool_call_id:
398405
if update.title is not None:
@@ -407,12 +414,41 @@ async def session_update(
407414
tc["raw_output"] = update.raw_output
408415
if update.content is not None:
409416
tc["content"] = _serialize_tool_content(update.content)
417+
target = tc
410418
break
411419
logger.debug("ACP tool call progress: %s", update.tool_call_id)
420+
if target is not None:
421+
self._emit_tool_call_event(target)
412422
self._maybe_signal_activity()
413423
else:
414424
logger.debug("ACP session update: %s", type(update).__name__)
415425

426+
def _emit_tool_call_event(self, tc: dict[str, Any]) -> None:
427+
"""Emit an ACPToolCallEvent reflecting the current state of ``tc``.
428+
429+
Called from ``session_update`` on each ``ToolCallStart`` /
430+
``ToolCallProgress`` so downstream consumers see tool cards appear
431+
and update as the subprocess runs. The same ``tool_call_id`` is
432+
reused on every emission — consumers should dedupe by id and treat
433+
the last-seen event as authoritative.
434+
"""
435+
if self.on_event is None:
436+
return
437+
try:
438+
event = ACPToolCallEvent(
439+
tool_call_id=tc["tool_call_id"],
440+
title=tc["title"],
441+
status=tc.get("status"),
442+
tool_kind=tc.get("tool_kind"),
443+
raw_input=tc.get("raw_input"),
444+
raw_output=tc.get("raw_output"),
445+
content=tc.get("content"),
446+
is_error=tc.get("status") == "failed",
447+
)
448+
self.on_event(event)
449+
except Exception:
450+
logger.debug("on_event callback failed", exc_info=True)
451+
416452
def _maybe_signal_activity(self) -> None:
417453
"""Signal activity to the agent-server's idle tracker (throttled).
418454
@@ -910,9 +946,13 @@ def step(
910946
state.execution_status = ConversationExecutionStatus.FINISHED
911947
return
912948

913-
# Reset client accumulators
949+
# Reset client accumulators and wire live callbacks. ``on_event``
950+
# is fired from inside ``_OpenHandsACPBridge.session_update`` as
951+
# tool-call notifications arrive, so consumers see ACPToolCallEvents
952+
# streamed live instead of a single end-of-turn burst.
914953
self._client.reset()
915954
self._client.on_token = on_token
955+
self._client.on_event = on_event
916956
self._client.on_activity = self._on_activity
917957

918958
t0 = time.monotonic()
@@ -973,6 +1013,8 @@ async def _prompt() -> PromptResponse:
9731013
time.sleep(delay)
9741014
self._client.reset()
9751015
self._client.on_token = on_token
1016+
self._client.on_event = on_event
1017+
self._client.on_activity = self._on_activity
9761018
else:
9771019
raise
9781020
except ACPRequestError as e:
@@ -998,6 +1040,8 @@ async def _prompt() -> PromptResponse:
9981040
time.sleep(delay)
9991041
self._client.reset()
10001042
self._client.on_token = on_token
1043+
self._client.on_event = on_event
1044+
self._client.on_activity = self._on_activity
10011045
else:
10021046
raise
10031047

@@ -1013,19 +1057,11 @@ async def _prompt() -> PromptResponse:
10131057
usage_update=usage_update,
10141058
)
10151059

1016-
# Emit ACPToolCallEvents for each accumulated tool call
1017-
for tc in self._client.accumulated_tool_calls:
1018-
tc_event = ACPToolCallEvent(
1019-
tool_call_id=tc["tool_call_id"],
1020-
title=tc["title"],
1021-
status=tc.get("status"),
1022-
tool_kind=tc.get("tool_kind"),
1023-
raw_input=tc.get("raw_input"),
1024-
raw_output=tc.get("raw_output"),
1025-
content=tc.get("content"),
1026-
is_error=tc.get("status") == "failed",
1027-
)
1028-
on_event(tc_event)
1060+
# ACPToolCallEvents were already emitted live from
1061+
# _OpenHandsACPBridge.session_update as each ToolCallStart /
1062+
# ToolCallProgress notification arrived — no end-of-turn fan-out
1063+
# here. The final MessageEvent + FinishAction still close out
1064+
# the turn below.
10291065

10301066
# Build response message
10311067
response_text = "".join(self._client.accumulated_text)

0 commit comments

Comments
 (0)