Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 137 additions & 32 deletions openhands-sdk/openhands/sdk/agent/acp_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,12 @@
# well below the ~20 min runtime-api kill threshold.
_ACTIVITY_SIGNAL_INTERVAL: float = 30.0

# ACP tool-call statuses that represent a terminal outcome. Non-terminal
# statuses (``pending``, ``in_progress``) mean the call is still in flight
# and, if the turn aborts before it reaches a terminal state, the live-
# emitted event on state.events will otherwise be orphaned forever.
_TERMINAL_TOOL_CALL_STATUSES: frozenset[str] = frozenset({"completed", "failed"})


def _make_dummy_llm() -> LLM:
"""Create a dummy LLM that should never be called directly."""
Expand Down Expand Up @@ -286,13 +292,28 @@ class _OpenHandsACPBridge:
"""Bridge between OpenHands and ACP that accumulates session updates.

Implements the ``Client`` protocol from ``agent_client_protocol``.

Concurrency model — ``on_event`` / ``on_token`` / ``on_activity`` are
fired synchronously from ``session_update``, which runs on the
``AsyncExecutor`` portal thread. The caller thread driving
``ACPAgent.step()`` is blocked inside ``portal.call()`` for the entire
``prompt()`` round-trip, so these callbacks do not race with the final
``MessageEvent`` / ``FinishAction`` emitted by the caller thread after
``prompt()`` returns. Consumers that keep cross-callback state (e.g.
hook processors reading-then-writing, visualizers) can therefore treat
each callback as sequential within a single turn.
Comment thread
simonrosenberg marked this conversation as resolved.
Outdated
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟠 Important: Unresolved Thread Safety Concern

VascoSch92's concern about concurrent writes to state.events remains unaddressed. The docstring claims safety because "the caller thread... is blocked inside portal.call() for the entire prompt() round-trip", but this needs verification:

  1. Does on_event directly append to state.events, or does it queue through a synchronized mechanism?
  2. If the callback writes directly to a list without synchronization, there's a race between:
    • Portal thread: fires on_event with live ACPToolCallEvents during session_update
    • Caller thread: appends final MessageEvent/FinishAction after prompt() returns

The Python GIL provides some protection for individual list operations, but the ordering guarantee you're claiming ("consumers can treat each callback as sequential within a single turn") may not hold if both threads are appending to the same list.

Action needed: Verify the callback mechanism provides proper serialization, or add explicit synchronization if needed.

"""

def __init__(self) -> None:
self.accumulated_text: list[str] = []
self.accumulated_thoughts: list[str] = []
self.accumulated_tool_calls: list[dict[str, Any]] = []
self.on_token: Any = None # ConversationTokenCallbackType | None
# Live event sink — fired from session_update as ACP tool-call
Comment thread
simonrosenberg marked this conversation as resolved.
Comment thread
simonrosenberg marked this conversation as resolved.
# updates arrive, so the event stream reflects real subprocess
# progress instead of a single end-of-turn burst. Set by
# ACPAgent.step() for the duration of one prompt() round-trip.
self.on_event: ConversationCallbackType | None = None
# Activity heartbeat — called (throttled) during session_update to
# signal that the ACP subprocess is still actively working. Set by
# ACPAgent.step() to keep the agent-server's idle timer alive.
Expand All @@ -317,6 +338,7 @@ def reset(self) -> None:
self.accumulated_thoughts.clear()
self.accumulated_tool_calls.clear()
self.on_token = None
self.on_event = None
self.on_activity = None
self._turn_usage_updates.clear()
self._usage_received.clear()
Expand Down Expand Up @@ -378,21 +400,22 @@ async def session_update(
if event is not None:
event.set()
elif isinstance(update, ToolCallStart):
self.accumulated_tool_calls.append(
{
"tool_call_id": update.tool_call_id,
"title": update.title,
"tool_kind": update.kind,
"status": update.status,
"raw_input": update.raw_input,
"raw_output": update.raw_output,
"content": _serialize_tool_content(update.content),
}
)
entry = {
"tool_call_id": update.tool_call_id,
"title": update.title,
"tool_kind": update.kind,
"status": update.status,
"raw_input": update.raw_input,
"raw_output": update.raw_output,
"content": _serialize_tool_content(update.content),
}
self.accumulated_tool_calls.append(entry)
logger.debug("ACP tool call start: %s", update.tool_call_id)
self._emit_tool_call_event(entry)
self._maybe_signal_activity()
elif isinstance(update, ToolCallProgress):
# Find the existing tool call entry and merge updates
target: dict[str, Any] | None = None
for tc in self.accumulated_tool_calls:
if tc["tool_call_id"] == update.tool_call_id:
if update.title is not None:
Expand All @@ -407,12 +430,41 @@ async def session_update(
tc["raw_output"] = update.raw_output
if update.content is not None:
tc["content"] = _serialize_tool_content(update.content)
target = tc
break
logger.debug("ACP tool call progress: %s", update.tool_call_id)
if target is not None:
self._emit_tool_call_event(target)
self._maybe_signal_activity()
else:
logger.debug("ACP session update: %s", type(update).__name__)

def _emit_tool_call_event(self, tc: dict[str, Any]) -> None:
Comment thread
simonrosenberg marked this conversation as resolved.
"""Emit an ACPToolCallEvent reflecting the current state of ``tc``.

Called from ``session_update`` on each ``ToolCallStart`` /
``ToolCallProgress`` so downstream consumers see tool cards appear
and update as the subprocess runs. The same ``tool_call_id`` is
reused on every emission — consumers should dedupe by id and treat
the last-seen event as authoritative.
"""
if self.on_event is None:
return
try:
event = ACPToolCallEvent(
tool_call_id=tc["tool_call_id"],
title=tc["title"],
status=tc.get("status"),
tool_kind=tc.get("tool_kind"),
raw_input=tc.get("raw_input"),
raw_output=tc.get("raw_output"),
content=tc.get("content"),
is_error=tc.get("status") == "failed",
)
self.on_event(event)
Comment thread
simonrosenberg marked this conversation as resolved.
except Exception:
Comment thread
simonrosenberg marked this conversation as resolved.
logger.debug("on_event callback failed", exc_info=True)

def _maybe_signal_activity(self) -> None:
"""Signal activity to the agent-server's idle tracker (throttled).

Expand Down Expand Up @@ -883,6 +935,65 @@ async def _init() -> tuple[Any, Any, Any, str, str, str]:
) = result
self._working_dir = working_dir

def _reset_client_for_turn(
Comment thread
simonrosenberg marked this conversation as resolved.
self,
on_token: ConversationTokenCallbackType | None,
on_event: ConversationCallbackType,
) -> None:
"""Reset per-turn client state and (re)wire live callbacks.
Comment thread
simonrosenberg marked this conversation as resolved.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟢 Good: The _reset_client_for_turn extraction addresses the code duplication from previous reviews. Clean refactoring.


Called at the start of ``step()`` and again on each retry inside the
prompt loop so that the three callbacks (``on_token``, ``on_event``,
``on_activity``) stay in sync with the fresh turn after ``reset()``
clears them. ``on_event`` is fired from inside
``_OpenHandsACPBridge.session_update`` as tool-call notifications
arrive, so consumers see ACPToolCallEvents streamed live instead of
a single end-of-turn burst.
"""
self._client.reset()
self._client.on_token = on_token
self._client.on_event = on_event
self._client.on_activity = self._on_activity

def _cancel_inflight_tool_calls(self, on_event: ConversationCallbackType) -> None:
Comment thread
simonrosenberg marked this conversation as resolved.
Outdated
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Suggestion: Docstring doesn't match usage

The docstring says on_event is "passed as a parameter because the bridge's on_event attribute is about to be cleared by reset()".

But all call sites (lines 1082, 1107, 1204, 1214) call this method BEFORE reset(), not after, so self._client.on_event should still be valid. The parameter seems unnecessary.

Either:

  1. Use self._client.on_event directly inside the method, or
  2. Update the docstring to explain the real reason for the parameter (maybe for testability?)

"""Emit a terminal ``failed`` ACPToolCallEvent for every tool call
in the accumulator that has not reached a terminal status yet.

ACP servers mint fresh ``tool_call_id``s on a retried turn, so any
``pending`` / ``in_progress`` events already streamed during the
failed attempt would otherwise be orphaned on ``state.events`` —
no later notification reuses their id, and consumers that dedupe
by ``tool_call_id`` + "last-seen status wins" would keep them
spinning forever. This method closes those cards before we wipe
the in-memory accumulator on retry / turn abort.
Comment thread
simonrosenberg marked this conversation as resolved.

Called with ``on_event`` passed in explicitly because the bridge's
``on_event`` attribute is about to be cleared by ``reset()``.
"""
for tc in self._client.accumulated_tool_calls:
status = tc.get("status")
if status in _TERMINAL_TOOL_CALL_STATUSES:
continue
try:
on_event(
ACPToolCallEvent(
tool_call_id=tc["tool_call_id"],
title=tc["title"],
status="failed",
tool_kind=tc.get("tool_kind"),
raw_input=tc.get("raw_input"),
raw_output=tc.get("raw_output"),
content=tc.get("content"),
is_error=True,
)
)
except Exception:
logger.debug(
"Failed to emit supersede event for %s",
tc.get("tool_call_id"),
exc_info=True,
)

@observe(name="acp_agent.step", ignore_inputs=["conversation", "on_event"])
def step(
self,
Expand Down Expand Up @@ -910,10 +1021,7 @@ def step(
state.execution_status = ConversationExecutionStatus.FINISHED
return

# Reset client accumulators
self._client.reset()
self._client.on_token = on_token
self._client.on_activity = self._on_activity
self._reset_client_for_turn(on_token, on_event)

t0 = time.monotonic()
try:
Expand Down Expand Up @@ -971,8 +1079,8 @@ async def _prompt() -> PromptResponse:
e,
)
time.sleep(delay)
self._client.reset()
self._client.on_token = on_token
self._cancel_inflight_tool_calls(on_event)
self._reset_client_for_turn(on_token, on_event)
else:
raise
except ACPRequestError as e:
Expand All @@ -996,8 +1104,8 @@ async def _prompt() -> PromptResponse:
e,
)
time.sleep(delay)
self._client.reset()
self._client.on_token = on_token
self._cancel_inflight_tool_calls(on_event)
self._reset_client_for_turn(on_token, on_event)
else:
raise

Expand All @@ -1013,19 +1121,11 @@ async def _prompt() -> PromptResponse:
usage_update=usage_update,
)

# Emit ACPToolCallEvents for each accumulated tool call
for tc in self._client.accumulated_tool_calls:
tc_event = ACPToolCallEvent(
tool_call_id=tc["tool_call_id"],
title=tc["title"],
status=tc.get("status"),
tool_kind=tc.get("tool_kind"),
raw_input=tc.get("raw_input"),
raw_output=tc.get("raw_output"),
content=tc.get("content"),
is_error=tc.get("status") == "failed",
)
on_event(tc_event)
# ACPToolCallEvents were already emitted live from
# _OpenHandsACPBridge.session_update as each ToolCallStart /
# ToolCallProgress notification arrived — no end-of-turn fan-out
# here. The final MessageEvent + FinishAction still close out
# the turn below.
Comment thread
simonrosenberg marked this conversation as resolved.

# Build response message
response_text = "".join(self._client.accumulated_text)
Expand Down Expand Up @@ -1101,12 +1201,17 @@ async def _prompt() -> PromptResponse:
)
],
)
# Close any tool cards left in flight from the timed-out attempt.
self._cancel_inflight_tool_calls(on_event)
on_event(MessageEvent(source="agent", llm_message=error_message))
state.execution_status = ConversationExecutionStatus.ERROR
except Exception as e:
logger.error("ACP prompt failed: %s", e, exc_info=True)
error_str = str(e)

# Close any tool cards left in flight before surfacing the error.
self._cancel_inflight_tool_calls(on_event)

# Emit error as an agent message (existing behavior, preserved for
# consumers that inspect MessageEvents)
error_message = Message(
Expand Down
Loading
Loading