-
Notifications
You must be signed in to change notification settings - Fork 233
feat(acp): stream ACPToolCallEvents live from session_update #2868
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
+682
−59
Merged
Changes from 2 commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
ea808bd
feat(acp): stream ACPToolCallEvents live from session_update
fb60def
review: extract reset helper, tighten on_event type, apply ruff format
282b878
review: close pending ACP tool cards on retry / abort
94c5e0e
review: tighten cancel helper, cite state-lock in concurrency docs
9740e56
review: unwire per-turn callbacks after step() to close late-update race
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Some comments aren't visible on the classic Files Changed page.
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -293,6 +293,11 @@ def __init__(self) -> None: | |
| self.accumulated_thoughts: list[str] = [] | ||
| self.accumulated_tool_calls: list[dict[str, Any]] = [] | ||
| self.on_token: Any = None # ConversationTokenCallbackType | None | ||
| # Live event sink — fired from session_update as ACP tool-call | ||
|
simonrosenberg marked this conversation as resolved.
|
||
| # updates arrive, so the event stream reflects real subprocess | ||
| # progress instead of a single end-of-turn burst. Set by | ||
| # ACPAgent.step() for the duration of one prompt() round-trip. | ||
| self.on_event: ConversationCallbackType | None = None | ||
| # Activity heartbeat — called (throttled) during session_update to | ||
| # signal that the ACP subprocess is still actively working. Set by | ||
| # ACPAgent.step() to keep the agent-server's idle timer alive. | ||
|
|
@@ -317,6 +322,7 @@ def reset(self) -> None: | |
| self.accumulated_thoughts.clear() | ||
| self.accumulated_tool_calls.clear() | ||
| self.on_token = None | ||
| self.on_event = None | ||
| self.on_activity = None | ||
| self._turn_usage_updates.clear() | ||
| self._usage_received.clear() | ||
|
|
@@ -378,21 +384,22 @@ async def session_update( | |
| if event is not None: | ||
| event.set() | ||
| elif isinstance(update, ToolCallStart): | ||
| self.accumulated_tool_calls.append( | ||
| { | ||
| "tool_call_id": update.tool_call_id, | ||
| "title": update.title, | ||
| "tool_kind": update.kind, | ||
| "status": update.status, | ||
| "raw_input": update.raw_input, | ||
| "raw_output": update.raw_output, | ||
| "content": _serialize_tool_content(update.content), | ||
| } | ||
| ) | ||
| entry = { | ||
| "tool_call_id": update.tool_call_id, | ||
| "title": update.title, | ||
| "tool_kind": update.kind, | ||
| "status": update.status, | ||
| "raw_input": update.raw_input, | ||
| "raw_output": update.raw_output, | ||
| "content": _serialize_tool_content(update.content), | ||
| } | ||
| self.accumulated_tool_calls.append(entry) | ||
| logger.debug("ACP tool call start: %s", update.tool_call_id) | ||
| self._emit_tool_call_event(entry) | ||
| self._maybe_signal_activity() | ||
| elif isinstance(update, ToolCallProgress): | ||
| # Find the existing tool call entry and merge updates | ||
| target: dict[str, Any] | None = None | ||
| for tc in self.accumulated_tool_calls: | ||
| if tc["tool_call_id"] == update.tool_call_id: | ||
| if update.title is not None: | ||
|
|
@@ -407,12 +414,41 @@ async def session_update( | |
| tc["raw_output"] = update.raw_output | ||
| if update.content is not None: | ||
| tc["content"] = _serialize_tool_content(update.content) | ||
| target = tc | ||
| break | ||
| logger.debug("ACP tool call progress: %s", update.tool_call_id) | ||
| if target is not None: | ||
| self._emit_tool_call_event(target) | ||
| self._maybe_signal_activity() | ||
| else: | ||
| logger.debug("ACP session update: %s", type(update).__name__) | ||
|
|
||
| def _emit_tool_call_event(self, tc: dict[str, Any]) -> None: | ||
|
simonrosenberg marked this conversation as resolved.
|
||
| """Emit an ACPToolCallEvent reflecting the current state of ``tc``. | ||
|
|
||
| Called from ``session_update`` on each ``ToolCallStart`` / | ||
| ``ToolCallProgress`` so downstream consumers see tool cards appear | ||
| and update as the subprocess runs. The same ``tool_call_id`` is | ||
| reused on every emission — consumers should dedupe by id and treat | ||
| the last-seen event as authoritative. | ||
| """ | ||
| if self.on_event is None: | ||
| return | ||
| try: | ||
| event = ACPToolCallEvent( | ||
| tool_call_id=tc["tool_call_id"], | ||
| title=tc["title"], | ||
| status=tc.get("status"), | ||
| tool_kind=tc.get("tool_kind"), | ||
| raw_input=tc.get("raw_input"), | ||
| raw_output=tc.get("raw_output"), | ||
| content=tc.get("content"), | ||
| is_error=tc.get("status") == "failed", | ||
| ) | ||
| self.on_event(event) | ||
|
simonrosenberg marked this conversation as resolved.
|
||
| except Exception: | ||
|
simonrosenberg marked this conversation as resolved.
|
||
| logger.debug("on_event callback failed", exc_info=True) | ||
|
|
||
| def _maybe_signal_activity(self) -> None: | ||
| """Signal activity to the agent-server's idle tracker (throttled). | ||
|
|
||
|
|
@@ -883,6 +919,26 @@ async def _init() -> tuple[Any, Any, Any, str, str, str]: | |
| ) = result | ||
| self._working_dir = working_dir | ||
|
|
||
| def _reset_client_for_turn( | ||
|
simonrosenberg marked this conversation as resolved.
|
||
| self, | ||
| on_token: ConversationTokenCallbackType | None, | ||
| on_event: ConversationCallbackType, | ||
| ) -> None: | ||
| """Reset per-turn client state and (re)wire live callbacks. | ||
|
simonrosenberg marked this conversation as resolved.
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟢 Good: The |
||
|
|
||
| Called at the start of ``step()`` and again on each retry inside the | ||
| prompt loop so that the three callbacks (``on_token``, ``on_event``, | ||
| ``on_activity``) stay in sync with the fresh turn after ``reset()`` | ||
| clears them. ``on_event`` is fired from inside | ||
| ``_OpenHandsACPBridge.session_update`` as tool-call notifications | ||
| arrive, so consumers see ACPToolCallEvents streamed live instead of | ||
| a single end-of-turn burst. | ||
| """ | ||
| self._client.reset() | ||
| self._client.on_token = on_token | ||
| self._client.on_event = on_event | ||
| self._client.on_activity = self._on_activity | ||
|
|
||
| @observe(name="acp_agent.step", ignore_inputs=["conversation", "on_event"]) | ||
| def step( | ||
| self, | ||
|
|
@@ -910,10 +966,7 @@ def step( | |
| state.execution_status = ConversationExecutionStatus.FINISHED | ||
| return | ||
|
|
||
| # Reset client accumulators | ||
| self._client.reset() | ||
| self._client.on_token = on_token | ||
| self._client.on_activity = self._on_activity | ||
| self._reset_client_for_turn(on_token, on_event) | ||
|
|
||
| t0 = time.monotonic() | ||
| try: | ||
|
|
@@ -971,8 +1024,7 @@ async def _prompt() -> PromptResponse: | |
| e, | ||
| ) | ||
| time.sleep(delay) | ||
| self._client.reset() | ||
| self._client.on_token = on_token | ||
| self._reset_client_for_turn(on_token, on_event) | ||
| else: | ||
| raise | ||
| except ACPRequestError as e: | ||
|
|
@@ -996,8 +1048,7 @@ async def _prompt() -> PromptResponse: | |
| e, | ||
| ) | ||
| time.sleep(delay) | ||
| self._client.reset() | ||
| self._client.on_token = on_token | ||
| self._reset_client_for_turn(on_token, on_event) | ||
| else: | ||
| raise | ||
|
|
||
|
|
@@ -1013,19 +1064,11 @@ async def _prompt() -> PromptResponse: | |
| usage_update=usage_update, | ||
| ) | ||
|
|
||
| # Emit ACPToolCallEvents for each accumulated tool call | ||
| for tc in self._client.accumulated_tool_calls: | ||
| tc_event = ACPToolCallEvent( | ||
| tool_call_id=tc["tool_call_id"], | ||
| title=tc["title"], | ||
| status=tc.get("status"), | ||
| tool_kind=tc.get("tool_kind"), | ||
| raw_input=tc.get("raw_input"), | ||
| raw_output=tc.get("raw_output"), | ||
| content=tc.get("content"), | ||
| is_error=tc.get("status") == "failed", | ||
| ) | ||
| on_event(tc_event) | ||
| # ACPToolCallEvents were already emitted live from | ||
| # _OpenHandsACPBridge.session_update as each ToolCallStart / | ||
| # ToolCallProgress notification arrived — no end-of-turn fan-out | ||
| # here. The final MessageEvent + FinishAction still close out | ||
| # the turn below. | ||
|
simonrosenberg marked this conversation as resolved.
|
||
|
|
||
| # Build response message | ||
| response_text = "".join(self._client.accumulated_text) | ||
|
|
||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.