109109# well below the ~20 min runtime-api kill threshold.
110110_ACTIVITY_SIGNAL_INTERVAL : float = 30.0
111111
112+ # ACP tool-call statuses that represent a terminal outcome. Non-terminal
113+ # statuses (``pending``, ``in_progress``) mean the call is still in flight
114+ # and, if the turn aborts before it reaches a terminal state, the live-
115+ # emitted event on state.events will otherwise be orphaned forever.
116+ _TERMINAL_TOOL_CALL_STATUSES : frozenset [str ] = frozenset ({"completed" , "failed" })
117+
112118
113119def _make_dummy_llm () -> LLM :
114120 """Create a dummy LLM that should never be called directly."""
@@ -286,13 +292,28 @@ class _OpenHandsACPBridge:
286292 """Bridge between OpenHands and ACP that accumulates session updates.
287293
288294 Implements the ``Client`` protocol from ``agent_client_protocol``.
295+
296+ Concurrency model — ``on_event`` / ``on_token`` / ``on_activity`` are
297+ fired synchronously from ``session_update``, which runs on the
298+ ``AsyncExecutor`` portal thread. The caller thread driving
299+ ``ACPAgent.step()`` is blocked inside ``portal.call()`` for the entire
300+ ``prompt()`` round-trip, so these callbacks do not race with the final
301+ ``MessageEvent`` / ``FinishAction`` emitted by the caller thread after
302+ ``prompt()`` returns. Consumers that keep cross-callback state (e.g.
303+ hook processors reading-then-writing, visualizers) can therefore treat
304+ each callback as sequential within a single turn.
289305 """
290306
291307 def __init__ (self ) -> None :
292308 self .accumulated_text : list [str ] = []
293309 self .accumulated_thoughts : list [str ] = []
294310 self .accumulated_tool_calls : list [dict [str , Any ]] = []
295311 self .on_token : Any = None # ConversationTokenCallbackType | None
312+ # Live event sink — fired from session_update as ACP tool-call
313+ # updates arrive, so the event stream reflects real subprocess
314+ # progress instead of a single end-of-turn burst. Set by
315+ # ACPAgent.step() for the duration of one prompt() round-trip.
316+ self .on_event : ConversationCallbackType | None = None
296317 # Activity heartbeat — called (throttled) during session_update to
297318 # signal that the ACP subprocess is still actively working. Set by
298319 # ACPAgent.step() to keep the agent-server's idle timer alive.
@@ -317,6 +338,7 @@ def reset(self) -> None:
317338 self .accumulated_thoughts .clear ()
318339 self .accumulated_tool_calls .clear ()
319340 self .on_token = None
341+ self .on_event = None
320342 self .on_activity = None
321343 self ._turn_usage_updates .clear ()
322344 self ._usage_received .clear ()
@@ -378,21 +400,22 @@ async def session_update(
378400 if event is not None :
379401 event .set ()
380402 elif isinstance (update , ToolCallStart ):
381- self .accumulated_tool_calls .append (
382- {
383- "tool_call_id" : update .tool_call_id ,
384- "title" : update .title ,
385- "tool_kind" : update .kind ,
386- "status" : update .status ,
387- "raw_input" : update .raw_input ,
388- "raw_output" : update .raw_output ,
389- "content" : _serialize_tool_content (update .content ),
390- }
391- )
403+ entry = {
404+ "tool_call_id" : update .tool_call_id ,
405+ "title" : update .title ,
406+ "tool_kind" : update .kind ,
407+ "status" : update .status ,
408+ "raw_input" : update .raw_input ,
409+ "raw_output" : update .raw_output ,
410+ "content" : _serialize_tool_content (update .content ),
411+ }
412+ self .accumulated_tool_calls .append (entry )
392413 logger .debug ("ACP tool call start: %s" , update .tool_call_id )
414+ self ._emit_tool_call_event (entry )
393415 self ._maybe_signal_activity ()
394416 elif isinstance (update , ToolCallProgress ):
395417 # Find the existing tool call entry and merge updates
418+ target : dict [str , Any ] | None = None
396419 for tc in self .accumulated_tool_calls :
397420 if tc ["tool_call_id" ] == update .tool_call_id :
398421 if update .title is not None :
@@ -407,12 +430,41 @@ async def session_update(
407430 tc ["raw_output" ] = update .raw_output
408431 if update .content is not None :
409432 tc ["content" ] = _serialize_tool_content (update .content )
433+ target = tc
410434 break
411435 logger .debug ("ACP tool call progress: %s" , update .tool_call_id )
436+ if target is not None :
437+ self ._emit_tool_call_event (target )
412438 self ._maybe_signal_activity ()
413439 else :
414440 logger .debug ("ACP session update: %s" , type (update ).__name__ )
415441
442+ def _emit_tool_call_event (self , tc : dict [str , Any ]) -> None :
443+ """Emit an ACPToolCallEvent reflecting the current state of ``tc``.
444+
445+ Called from ``session_update`` on each ``ToolCallStart`` /
446+ ``ToolCallProgress`` so downstream consumers see tool cards appear
447+ and update as the subprocess runs. The same ``tool_call_id`` is
448+ reused on every emission — consumers should dedupe by id and treat
449+ the last-seen event as authoritative.
450+ """
451+ if self .on_event is None :
452+ return
453+ try :
454+ event = ACPToolCallEvent (
455+ tool_call_id = tc ["tool_call_id" ],
456+ title = tc ["title" ],
457+ status = tc .get ("status" ),
458+ tool_kind = tc .get ("tool_kind" ),
459+ raw_input = tc .get ("raw_input" ),
460+ raw_output = tc .get ("raw_output" ),
461+ content = tc .get ("content" ),
462+ is_error = tc .get ("status" ) == "failed" ,
463+ )
464+ self .on_event (event )
465+ except Exception :
466+ logger .debug ("on_event callback failed" , exc_info = True )
467+
416468 def _maybe_signal_activity (self ) -> None :
417469 """Signal activity to the agent-server's idle tracker (throttled).
418470
@@ -943,6 +995,65 @@ async def _init() -> tuple[Any, Any, Any, str, str, str]:
943995 ) = result
944996 self ._working_dir = working_dir
945997
998+ def _reset_client_for_turn (
999+ self ,
1000+ on_token : ConversationTokenCallbackType | None ,
1001+ on_event : ConversationCallbackType ,
1002+ ) -> None :
1003+ """Reset per-turn client state and (re)wire live callbacks.
1004+
1005+ Called at the start of ``step()`` and again on each retry inside the
1006+ prompt loop so that the three callbacks (``on_token``, ``on_event``,
1007+ ``on_activity``) stay in sync with the fresh turn after ``reset()``
1008+ clears them. ``on_event`` is fired from inside
1009+ ``_OpenHandsACPBridge.session_update`` as tool-call notifications
1010+ arrive, so consumers see ACPToolCallEvents streamed live instead of
1011+ a single end-of-turn burst.
1012+ """
1013+ self ._client .reset ()
1014+ self ._client .on_token = on_token
1015+ self ._client .on_event = on_event
1016+ self ._client .on_activity = self ._on_activity
1017+
1018+ def _cancel_inflight_tool_calls (self , on_event : ConversationCallbackType ) -> None :
1019+ """Emit a terminal ``failed`` ACPToolCallEvent for every tool call
1020+ in the accumulator that has not reached a terminal status yet.
1021+
1022+ ACP servers mint fresh ``tool_call_id``s on a retried turn, so any
1023+ ``pending`` / ``in_progress`` events already streamed during the
1024+ failed attempt would otherwise be orphaned on ``state.events`` —
1025+ no later notification reuses their id, and consumers that dedupe
1026+ by ``tool_call_id`` + "last-seen status wins" would keep them
1027+ spinning forever. This method closes those cards before we wipe
1028+ the in-memory accumulator on retry / turn abort.
1029+
1030+ Called with ``on_event`` passed in explicitly because the bridge's
1031+ ``on_event`` attribute is about to be cleared by ``reset()``.
1032+ """
1033+ for tc in self ._client .accumulated_tool_calls :
1034+ status = tc .get ("status" )
1035+ if status in _TERMINAL_TOOL_CALL_STATUSES :
1036+ continue
1037+ try :
1038+ on_event (
1039+ ACPToolCallEvent (
1040+ tool_call_id = tc ["tool_call_id" ],
1041+ title = tc ["title" ],
1042+ status = "failed" ,
1043+ tool_kind = tc .get ("tool_kind" ),
1044+ raw_input = tc .get ("raw_input" ),
1045+ raw_output = tc .get ("raw_output" ),
1046+ content = tc .get ("content" ),
1047+ is_error = True ,
1048+ )
1049+ )
1050+ except Exception :
1051+ logger .debug (
1052+ "Failed to emit supersede event for %s" ,
1053+ tc .get ("tool_call_id" ),
1054+ exc_info = True ,
1055+ )
1056+
9461057 @observe (name = "acp_agent.step" , ignore_inputs = ["conversation" , "on_event" ])
9471058 def step (
9481059 self ,
@@ -970,10 +1081,7 @@ def step(
9701081 state .execution_status = ConversationExecutionStatus .FINISHED
9711082 return
9721083
973- # Reset client accumulators
974- self ._client .reset ()
975- self ._client .on_token = on_token
976- self ._client .on_activity = self ._on_activity
1084+ self ._reset_client_for_turn (on_token , on_event )
9771085
9781086 t0 = time .monotonic ()
9791087 try :
@@ -1031,8 +1139,8 @@ async def _prompt() -> PromptResponse:
10311139 e ,
10321140 )
10331141 time .sleep (delay )
1034- self ._client . reset ( )
1035- self ._client . on_token = on_token
1142+ self ._cancel_inflight_tool_calls ( on_event )
1143+ self ._reset_client_for_turn ( on_token , on_event )
10361144 else :
10371145 raise
10381146 except ACPRequestError as e :
@@ -1056,8 +1164,8 @@ async def _prompt() -> PromptResponse:
10561164 e ,
10571165 )
10581166 time .sleep (delay )
1059- self ._client . reset ( )
1060- self ._client . on_token = on_token
1167+ self ._cancel_inflight_tool_calls ( on_event )
1168+ self ._reset_client_for_turn ( on_token , on_event )
10611169 else :
10621170 raise
10631171
@@ -1073,19 +1181,11 @@ async def _prompt() -> PromptResponse:
10731181 usage_update = usage_update ,
10741182 )
10751183
1076- # Emit ACPToolCallEvents for each accumulated tool call
1077- for tc in self ._client .accumulated_tool_calls :
1078- tc_event = ACPToolCallEvent (
1079- tool_call_id = tc ["tool_call_id" ],
1080- title = tc ["title" ],
1081- status = tc .get ("status" ),
1082- tool_kind = tc .get ("tool_kind" ),
1083- raw_input = tc .get ("raw_input" ),
1084- raw_output = tc .get ("raw_output" ),
1085- content = tc .get ("content" ),
1086- is_error = tc .get ("status" ) == "failed" ,
1087- )
1088- on_event (tc_event )
1184+ # ACPToolCallEvents were already emitted live from
1185+ # _OpenHandsACPBridge.session_update as each ToolCallStart /
1186+ # ToolCallProgress notification arrived — no end-of-turn fan-out
1187+ # here. The final MessageEvent + FinishAction still close out
1188+ # the turn below.
10891189
10901190 # Build response message
10911191 response_text = "" .join (self ._client .accumulated_text )
@@ -1161,12 +1261,17 @@ async def _prompt() -> PromptResponse:
11611261 )
11621262 ],
11631263 )
1264+ # Close any tool cards left in flight from the timed-out attempt.
1265+ self ._cancel_inflight_tool_calls (on_event )
11641266 on_event (MessageEvent (source = "agent" , llm_message = error_message ))
11651267 state .execution_status = ConversationExecutionStatus .ERROR
11661268 except Exception as e :
11671269 logger .error ("ACP prompt failed: %s" , e , exc_info = True )
11681270 error_str = str (e )
11691271
1272+ # Close any tool cards left in flight before surfacing the error.
1273+ self ._cancel_inflight_tool_calls (on_event )
1274+
11701275 # Emit error as an agent message (existing behavior, preserved for
11711276 # consumers that inspect MessageEvents)
11721277 error_message = Message (
0 commit comments