Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 61 additions & 25 deletions openhands-sdk/openhands/sdk/agent/acp_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,11 @@ def __init__(self) -> None:
self.accumulated_thoughts: list[str] = []
self.accumulated_tool_calls: list[dict[str, Any]] = []
self.on_token: Any = None # ConversationTokenCallbackType | None
# Live event sink — fired from session_update as ACP tool-call
Comment thread
simonrosenberg marked this conversation as resolved.
Comment thread
simonrosenberg marked this conversation as resolved.
# updates arrive, so the event stream reflects real subprocess
# progress instead of a single end-of-turn burst. Set by
# ACPAgent.step() for the duration of one prompt() round-trip.
self.on_event: Any = None # ConversationCallbackType | None
# Activity heartbeat — called (throttled) during session_update to
# signal that the ACP subprocess is still actively working. Set by
# ACPAgent.step() to keep the agent-server's idle timer alive.
Expand All @@ -317,6 +322,7 @@ def reset(self) -> None:
self.accumulated_thoughts.clear()
self.accumulated_tool_calls.clear()
self.on_token = None
self.on_event = None
self.on_activity = None
self._turn_usage_updates.clear()
self._usage_received.clear()
Expand Down Expand Up @@ -378,21 +384,22 @@ async def session_update(
if event is not None:
event.set()
elif isinstance(update, ToolCallStart):
self.accumulated_tool_calls.append(
{
"tool_call_id": update.tool_call_id,
"title": update.title,
"tool_kind": update.kind,
"status": update.status,
"raw_input": update.raw_input,
"raw_output": update.raw_output,
"content": _serialize_tool_content(update.content),
}
)
entry = {
"tool_call_id": update.tool_call_id,
"title": update.title,
"tool_kind": update.kind,
"status": update.status,
"raw_input": update.raw_input,
"raw_output": update.raw_output,
"content": _serialize_tool_content(update.content),
}
self.accumulated_tool_calls.append(entry)
logger.debug("ACP tool call start: %s", update.tool_call_id)
self._emit_tool_call_event(entry)
self._maybe_signal_activity()
elif isinstance(update, ToolCallProgress):
# Find the existing tool call entry and merge updates
target: dict[str, Any] | None = None
for tc in self.accumulated_tool_calls:
if tc["tool_call_id"] == update.tool_call_id:
if update.title is not None:
Expand All @@ -407,12 +414,41 @@ async def session_update(
tc["raw_output"] = update.raw_output
if update.content is not None:
tc["content"] = _serialize_tool_content(update.content)
target = tc
break
logger.debug("ACP tool call progress: %s", update.tool_call_id)
if target is not None:
self._emit_tool_call_event(target)
self._maybe_signal_activity()
else:
logger.debug("ACP session update: %s", type(update).__name__)

def _emit_tool_call_event(self, tc: dict[str, Any]) -> None:
Comment thread
simonrosenberg marked this conversation as resolved.
"""Emit an ACPToolCallEvent reflecting the current state of ``tc``.

Called from ``session_update`` on each ``ToolCallStart`` /
``ToolCallProgress`` so downstream consumers see tool cards appear
and update as the subprocess runs. The same ``tool_call_id`` is
reused on every emission — consumers should dedupe by id and treat
the last-seen event as authoritative.
"""
if self.on_event is None:
return
try:
event = ACPToolCallEvent(
tool_call_id=tc["tool_call_id"],
title=tc["title"],
status=tc.get("status"),
tool_kind=tc.get("tool_kind"),
raw_input=tc.get("raw_input"),
raw_output=tc.get("raw_output"),
content=tc.get("content"),
is_error=tc.get("status") == "failed",
)
self.on_event(event)
Comment thread
simonrosenberg marked this conversation as resolved.
except Exception:
Comment thread
simonrosenberg marked this conversation as resolved.
logger.debug("on_event callback failed", exc_info=True)

def _maybe_signal_activity(self) -> None:
"""Signal activity to the agent-server's idle tracker (throttled).

Expand Down Expand Up @@ -910,9 +946,13 @@ def step(
state.execution_status = ConversationExecutionStatus.FINISHED
return

# Reset client accumulators
# Reset client accumulators and wire live callbacks. ``on_event``
# is fired from inside ``_OpenHandsACPBridge.session_update`` as
# tool-call notifications arrive, so consumers see ACPToolCallEvents
Comment thread
simonrosenberg marked this conversation as resolved.
Outdated
# streamed live instead of a single end-of-turn burst.
self._client.reset()
self._client.on_token = on_token
self._client.on_event = on_event
self._client.on_activity = self._on_activity

t0 = time.monotonic()
Expand Down Expand Up @@ -973,6 +1013,8 @@ async def _prompt() -> PromptResponse:
time.sleep(delay)
self._client.reset()
self._client.on_token = on_token
self._client.on_event = on_event
self._client.on_activity = self._on_activity
else:
raise
except ACPRequestError as e:
Expand All @@ -998,6 +1040,8 @@ async def _prompt() -> PromptResponse:
time.sleep(delay)
self._client.reset()
self._client.on_token = on_token
self._client.on_event = on_event
self._client.on_activity = self._on_activity
else:
raise

Expand All @@ -1013,19 +1057,11 @@ async def _prompt() -> PromptResponse:
usage_update=usage_update,
)

# Emit ACPToolCallEvents for each accumulated tool call
for tc in self._client.accumulated_tool_calls:
tc_event = ACPToolCallEvent(
tool_call_id=tc["tool_call_id"],
title=tc["title"],
status=tc.get("status"),
tool_kind=tc.get("tool_kind"),
raw_input=tc.get("raw_input"),
raw_output=tc.get("raw_output"),
content=tc.get("content"),
is_error=tc.get("status") == "failed",
)
on_event(tc_event)
# ACPToolCallEvents were already emitted live from
# _OpenHandsACPBridge.session_update as each ToolCallStart /
# ToolCallProgress notification arrived — no end-of-turn fan-out
# here. The final MessageEvent + FinishAction still close out
# the turn below.
Comment thread
simonrosenberg marked this conversation as resolved.

# Build response message
response_text = "".join(self._client.accumulated_text)
Expand Down
Loading
Loading