@@ -459,8 +459,18 @@ def test_step_wires_on_activity(self, tmp_path):
459459
460460 # Mock the internals so step() doesn't actually call the ACP server
461461 agent ._client = _OpenHandsACPBridge ()
462+
463+ # Capture on_activity while prompt() is still "running" — step()
464+ # unwires the bridge callbacks in its finally block once the turn
465+ # completes, so the post-return value is None by design.
466+ wired_during_prompt : list = []
467+
468+ def _capture_run_async (_coro , ** _kwargs ):
469+ wired_during_prompt .append (agent ._client .on_activity )
470+ return MagicMock (usage = None )
471+
462472 agent ._executor = MagicMock ()
463- agent ._executor .run_async = MagicMock ( return_value = MagicMock ( usage = None ))
473+ agent ._executor .run_async = _capture_run_async
464474 agent ._session_id = "sess-1"
465475 agent ._initialized = True
466476
@@ -470,8 +480,11 @@ def test_step_wires_on_activity(self, tmp_path):
470480
471481 agent .step (conversation , on_event = events .append )
472482
473- # Verify on_activity was wired to the bridge
474- assert agent ._client .on_activity is activity_fn
483+ # Verify on_activity was wired to the bridge during the turn.
484+ assert wired_during_prompt == [activity_fn ]
485+ # And that it was cleared afterward so a late session_update
486+ # cannot fire the per-turn heartbeat callback out-of-band.
487+ assert agent ._client .on_activity is None
475488
476489
477490# ---------------------------------------------------------------------------
@@ -643,7 +656,12 @@ def test_step_passes_on_token(self, tmp_path):
643656 agent ._conn = MagicMock ()
644657 agent ._session_id = "test-session"
645658
659+ # Capture on_token while prompt() is still running — step() clears
660+ # the per-turn callbacks in its finally block once the turn ends.
661+ wired_during_prompt : list = []
662+
646663 def _fake_run_async (_coro , ** _kwargs ):
664+ wired_during_prompt .append (mock_client .on_token )
647665 mock_client .accumulated_text .append ("ok" )
648666
649667 mock_executor = MagicMock ()
@@ -654,8 +672,10 @@ def _fake_run_async(_coro, **_kwargs):
654672
655673 agent .step (conversation , on_event = lambda _ : None , on_token = on_token )
656674
657- # Verify on_token was passed to the client
658- assert mock_client .on_token == on_token
675+ # Verify on_token was wired during the turn.
676+ assert wired_during_prompt == [on_token ]
677+ # And unwired afterward so a late token chunk is a no-op.
678+ assert mock_client .on_token is None
659679
660680
661681# ---------------------------------------------------------------------------
@@ -1669,6 +1689,77 @@ def _fake_run_async(_coro, **_kwargs):
16691689 assert events [1 ].tool_call_id == "tc-2"
16701690 assert events [1 ].is_error is True
16711691
1692+ def test_step_clears_live_callbacks_on_return (self , tmp_path ):
1693+ """After step() returns, bridge callbacks are unwired.
1694+
1695+ A trailing ``session_update`` that lands between turns (the ACP
1696+ subprocess sending a late ``ToolCallProgress`` after its prompt
1697+ response) would otherwise fire the previous step's ``on_event``
1698+ on the portal thread with no FIFOLock held by anyone, racing
1699+ other threads appending to ``state.events``.
1700+ """
1701+ from acp .schema import ToolCallStart
1702+
1703+ agent = _make_agent ()
1704+ conversation = self ._make_conversation_with_message (tmp_path )
1705+ events : list = []
1706+
1707+ mock_client = _OpenHandsACPBridge ()
1708+ agent ._client = mock_client
1709+ agent ._conn = MagicMock ()
1710+ agent ._session_id = "test-session"
1711+
1712+ def _fake_run_async (_coro , ** _kwargs ):
1713+ mock_client .accumulated_text .append ("done" )
1714+
1715+ mock_executor = MagicMock ()
1716+ mock_executor .run_async = _fake_run_async
1717+ agent ._executor = mock_executor
1718+
1719+ agent .step (conversation , on_event = events .append , on_token = lambda _ : None )
1720+
1721+ # Callbacks unwired — a late session_update is a safe no-op emit.
1722+ assert mock_client .on_event is None
1723+ assert mock_client .on_token is None
1724+ assert mock_client .on_activity is None
1725+
1726+ pre_count = len (events )
1727+ trailing = MagicMock (spec = ToolCallStart )
1728+ trailing .tool_call_id = "tc-late"
1729+ trailing .title = "Late arrival"
1730+ trailing .kind = "read"
1731+ trailing .status = "completed"
1732+ trailing .raw_input = None
1733+ trailing .raw_output = None
1734+ trailing .content = None
1735+ asyncio .run (mock_client .session_update ("sess" , trailing ))
1736+ assert len (events ) == pre_count # nothing reached the stale callback
1737+
1738+ def test_step_clears_live_callbacks_on_error (self , tmp_path ):
1739+ """Callback unwire also runs when step() raises (finally block)."""
1740+ agent = _make_agent ()
1741+ conversation = self ._make_conversation_with_message (tmp_path )
1742+ events : list = []
1743+
1744+ mock_client = _OpenHandsACPBridge ()
1745+ agent ._client = mock_client
1746+ agent ._conn = MagicMock ()
1747+ agent ._session_id = "test-session"
1748+
1749+ def _fake_run_async (_coro , ** _kwargs ):
1750+ raise RuntimeError ("boom" )
1751+
1752+ mock_executor = MagicMock ()
1753+ mock_executor .run_async = _fake_run_async
1754+ agent ._executor = mock_executor
1755+
1756+ with pytest .raises (RuntimeError ):
1757+ agent .step (conversation , on_event = events .append )
1758+
1759+ assert mock_client .on_event is None
1760+ assert mock_client .on_token is None
1761+ assert mock_client .on_activity is None
1762+
16721763 def test_step_emits_no_tool_call_events_when_none (self , tmp_path ):
16731764 """step() emits only MessageEvent when no tool calls accumulated."""
16741765 agent = _make_agent ()
0 commit comments