Skip to content

Commit aebbbb8

Browse files
author
Debug Agent
committed
Merge PR #2868: stream ACPToolCallEvents live from session_update
2 parents 98db65c + fb60def commit aebbbb8

File tree

2 files changed

+299
-54
lines changed

2 files changed

+299
-54
lines changed

openhands-sdk/openhands/sdk/agent/acp_agent.py

Lines changed: 75 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,11 @@ def __init__(self) -> None:
293293
self.accumulated_thoughts: list[str] = []
294294
self.accumulated_tool_calls: list[dict[str, Any]] = []
295295
self.on_token: Any = None # ConversationTokenCallbackType | None
296+
# Live event sink — fired from session_update as ACP tool-call
297+
# updates arrive, so the event stream reflects real subprocess
298+
# progress instead of a single end-of-turn burst. Set by
299+
# ACPAgent.step() for the duration of one prompt() round-trip.
300+
self.on_event: ConversationCallbackType | None = None
296301
# Activity heartbeat — called (throttled) during session_update to
297302
# signal that the ACP subprocess is still actively working. Set by
298303
# ACPAgent.step() to keep the agent-server's idle timer alive.
@@ -317,6 +322,7 @@ def reset(self) -> None:
317322
self.accumulated_thoughts.clear()
318323
self.accumulated_tool_calls.clear()
319324
self.on_token = None
325+
self.on_event = None
320326
self.on_activity = None
321327
self._turn_usage_updates.clear()
322328
self._usage_received.clear()
@@ -378,21 +384,22 @@ async def session_update(
378384
if event is not None:
379385
event.set()
380386
elif isinstance(update, ToolCallStart):
381-
self.accumulated_tool_calls.append(
382-
{
383-
"tool_call_id": update.tool_call_id,
384-
"title": update.title,
385-
"tool_kind": update.kind,
386-
"status": update.status,
387-
"raw_input": update.raw_input,
388-
"raw_output": update.raw_output,
389-
"content": _serialize_tool_content(update.content),
390-
}
391-
)
387+
entry = {
388+
"tool_call_id": update.tool_call_id,
389+
"title": update.title,
390+
"tool_kind": update.kind,
391+
"status": update.status,
392+
"raw_input": update.raw_input,
393+
"raw_output": update.raw_output,
394+
"content": _serialize_tool_content(update.content),
395+
}
396+
self.accumulated_tool_calls.append(entry)
392397
logger.debug("ACP tool call start: %s", update.tool_call_id)
398+
self._emit_tool_call_event(entry)
393399
self._maybe_signal_activity()
394400
elif isinstance(update, ToolCallProgress):
395401
# Find the existing tool call entry and merge updates
402+
target: dict[str, Any] | None = None
396403
for tc in self.accumulated_tool_calls:
397404
if tc["tool_call_id"] == update.tool_call_id:
398405
if update.title is not None:
@@ -407,12 +414,41 @@ async def session_update(
407414
tc["raw_output"] = update.raw_output
408415
if update.content is not None:
409416
tc["content"] = _serialize_tool_content(update.content)
417+
target = tc
410418
break
411419
logger.debug("ACP tool call progress: %s", update.tool_call_id)
420+
if target is not None:
421+
self._emit_tool_call_event(target)
412422
self._maybe_signal_activity()
413423
else:
414424
logger.debug("ACP session update: %s", type(update).__name__)
415425

426+
def _emit_tool_call_event(self, tc: dict[str, Any]) -> None:
427+
"""Emit an ACPToolCallEvent reflecting the current state of ``tc``.
428+
429+
Called from ``session_update`` on each ``ToolCallStart`` /
430+
``ToolCallProgress`` so downstream consumers see tool cards appear
431+
and update as the subprocess runs. The same ``tool_call_id`` is
432+
reused on every emission — consumers should dedupe by id and treat
433+
the last-seen event as authoritative.
434+
"""
435+
if self.on_event is None:
436+
return
437+
try:
438+
event = ACPToolCallEvent(
439+
tool_call_id=tc["tool_call_id"],
440+
title=tc["title"],
441+
status=tc.get("status"),
442+
tool_kind=tc.get("tool_kind"),
443+
raw_input=tc.get("raw_input"),
444+
raw_output=tc.get("raw_output"),
445+
content=tc.get("content"),
446+
is_error=tc.get("status") == "failed",
447+
)
448+
self.on_event(event)
449+
except Exception:
450+
logger.debug("on_event callback failed", exc_info=True)
451+
416452
def _maybe_signal_activity(self) -> None:
417453
"""Signal activity to the agent-server's idle tracker (throttled).
418454
@@ -883,6 +919,26 @@ async def _init() -> tuple[Any, Any, Any, str, str, str]:
883919
) = result
884920
self._working_dir = working_dir
885921

922+
def _reset_client_for_turn(
923+
self,
924+
on_token: ConversationTokenCallbackType | None,
925+
on_event: ConversationCallbackType,
926+
) -> None:
927+
"""Reset per-turn client state and (re)wire live callbacks.
928+
929+
Called at the start of ``step()`` and again on each retry inside the
930+
prompt loop so that the three callbacks (``on_token``, ``on_event``,
931+
``on_activity``) stay in sync with the fresh turn after ``reset()``
932+
clears them. ``on_event`` is fired from inside
933+
``_OpenHandsACPBridge.session_update`` as tool-call notifications
934+
arrive, so consumers see ACPToolCallEvents streamed live instead of
935+
a single end-of-turn burst.
936+
"""
937+
self._client.reset()
938+
self._client.on_token = on_token
939+
self._client.on_event = on_event
940+
self._client.on_activity = self._on_activity
941+
886942
@observe(name="acp_agent.step", ignore_inputs=["conversation", "on_event"])
887943
def step(
888944
self,
@@ -910,10 +966,7 @@ def step(
910966
state.execution_status = ConversationExecutionStatus.FINISHED
911967
return
912968

913-
# Reset client accumulators
914-
self._client.reset()
915-
self._client.on_token = on_token
916-
self._client.on_activity = self._on_activity
969+
self._reset_client_for_turn(on_token, on_event)
917970

918971
t0 = time.monotonic()
919972
try:
@@ -971,8 +1024,7 @@ async def _prompt() -> PromptResponse:
9711024
e,
9721025
)
9731026
time.sleep(delay)
974-
self._client.reset()
975-
self._client.on_token = on_token
1027+
self._reset_client_for_turn(on_token, on_event)
9761028
else:
9771029
raise
9781030
except ACPRequestError as e:
@@ -996,8 +1048,7 @@ async def _prompt() -> PromptResponse:
9961048
e,
9971049
)
9981050
time.sleep(delay)
999-
self._client.reset()
1000-
self._client.on_token = on_token
1051+
self._reset_client_for_turn(on_token, on_event)
10011052
else:
10021053
raise
10031054

@@ -1013,19 +1064,11 @@ async def _prompt() -> PromptResponse:
10131064
usage_update=usage_update,
10141065
)
10151066

1016-
# Emit ACPToolCallEvents for each accumulated tool call
1017-
for tc in self._client.accumulated_tool_calls:
1018-
tc_event = ACPToolCallEvent(
1019-
tool_call_id=tc["tool_call_id"],
1020-
title=tc["title"],
1021-
status=tc.get("status"),
1022-
tool_kind=tc.get("tool_kind"),
1023-
raw_input=tc.get("raw_input"),
1024-
raw_output=tc.get("raw_output"),
1025-
content=tc.get("content"),
1026-
is_error=tc.get("status") == "failed",
1027-
)
1028-
on_event(tc_event)
1067+
# ACPToolCallEvents were already emitted live from
1068+
# _OpenHandsACPBridge.session_update as each ToolCallStart /
1069+
# ToolCallProgress notification arrived — no end-of-turn fan-out
1070+
# here. The final MessageEvent + FinishAction still close out
1071+
# the turn below.
10291072

10301073
# Build response message
10311074
response_text = "".join(self._client.accumulated_text)

0 commit comments

Comments
 (0)