From ff5b902b29383489f3c039004df82e53f81f8424 Mon Sep 17 00:00:00 2001 From: Dragoslav Mitrinovic Date: Thu, 26 Feb 2026 14:44:38 -0700 Subject: [PATCH 1/4] Fix background tool events lost when process() cancelled When process() was cancelled (e.g., user started speaking during a background tool operation), the _maybe_await_background_event() method would leave an orphaned queue.get() task running. This task could then consume events meant for the next process() call, causing the agent to miss tool completion results. The fix wraps the asyncio.wait() call in try/except to ensure the get_event_task is always cancelled when the method is interrupted. Scenario this fixes: 1. User initiates a background tool (e.g., sell_stock) 2. Tool yields "pending", agent responds "please wait" 3. User speaks again (conversation continues during transaction) 4. Current process() is cancelled, but get_event_task remains 5. Tool completes, yields "success" - consumed by orphaned task! 6. New process() waits for events but queue is empty 7. Agent stays silent instead of reporting transaction result Co-Authored-By: Claude Opus 4.6 --- line/llm_agent/llm_agent.py | 37 ++++++---- tests/test_llm_agent_llm_agent.py | 115 ++++++++++++++++++++++++++++++ 2 files changed, 139 insertions(+), 13 deletions(-) diff --git a/line/llm_agent/llm_agent.py b/line/llm_agent/llm_agent.py index 51daa488..889ce3a5 100644 --- a/line/llm_agent/llm_agent.py +++ b/line/llm_agent/llm_agent.py @@ -757,22 +757,33 @@ async def _maybe_await_background_event(self) -> Union[None, tuple[AgentToolCall return None get_event_task = asyncio.ensure_future(self._background_event_queue.get()) - done, _ = await asyncio.wait( - [get_event_task, self._background_task], - return_when=asyncio.FIRST_COMPLETED, - ) + try: + done, _ = await asyncio.wait( + [get_event_task, self._background_task], + return_when=asyncio.FIRST_COMPLETED, + ) - # Check if the get_event task completed - if get_event_task in done: - return get_event_task.result() + # Check if the get_event task completed + if get_event_task in done: + return get_event_task.result() - # Background task completed first - cancel the get_event task - get_event_task.cancel() - try: - await get_event_task + # Background task completed first - cancel the get_event task + get_event_task.cancel() + try: + await get_event_task + except asyncio.CancelledError: + pass + return None except asyncio.CancelledError: - pass - return None + # If we're cancelled externally (e.g., user started speaking), + # ensure get_event_task is cancelled so it doesn't consume + # events meant for the next process() call + get_event_task.cancel() + try: + await get_event_task + except asyncio.CancelledError: + pass + raise async def cleanup(self) -> None: """Clean up resources.""" diff --git a/tests/test_llm_agent_llm_agent.py b/tests/test_llm_agent_llm_agent.py index 36f94a68..98b21278 100644 --- a/tests/test_llm_agent_llm_agent.py +++ b/tests/test_llm_agent_llm_agent.py @@ -2031,3 +2031,118 @@ async def test_empty_messages_list_skips_generation(self, turn_env): # LLM should not have been called assert mock_llm._call_count == 0 + + +# ============================================================================= +# Tests: Background tool cancellation handling +# ============================================================================= + + +async def test_background_tool_event_not_lost_on_cancellation(turn_env): + """Test that background tool events aren't lost when process() is cancelled. + + This tests a specific bug where if process() was cancelled while waiting + in _maybe_await_background_event(), an orphaned queue.get() task could + consume events meant for the next process() call. + + Scenario: + 1. User initiates a background tool (e.g., sell_stock) + 2. Tool yields "pending" status, agent responds + 3. User speaks again, cancelling the current process() + 4. Tool completes and yields "success" + 5. New process() should see the "success" event + """ + import asyncio + + # Track tool execution state + tool_started = asyncio.Event() + tool_can_complete = asyncio.Event() + + @loopback_tool(is_background=True) + async def slow_background_tool(ctx, value: Annotated[str, "A value"]): + """A background tool that yields pending, waits, then yields success.""" + yield {"status": "pending", "value": value} + tool_started.set() + await tool_can_complete.wait() + yield {"status": "success", "value": value} + + # Response 1: LLM calls the tool + # Response 2: LLM responds to pending status + # Response 3: LLM responds to user's second message + # Response 4: LLM responds to success status + responses = [ + [ + StreamChunk(text="Starting transaction..."), + StreamChunk( + tool_calls=[ToolCall(id="tc1", name="slow_background_tool", arguments='{"value":"test"}', is_complete=True)] + ), + StreamChunk(is_final=True), + ], + [ + StreamChunk(text="Please wait..."), + StreamChunk(is_final=True), + ], + [ + StreamChunk(text="I understand, still waiting..."), + StreamChunk(is_final=True), + ], + [ + StreamChunk(text="Transaction complete!"), + StreamChunk(is_final=True), + ], + ] + + agent, mock_llm = create_agent_with_mock(responses, tools=[slow_background_tool]) + + # First process() call - initiates the tool + first_event = UserTextSent(content="Start transaction", history=[UserTextSent(content="Start transaction")]) + + async def run_first_process(): + outputs = [] + async for output in agent.process(turn_env, first_event): + if not isinstance(output, LogMetric): + outputs.append(output) + return outputs + + first_task = asyncio.create_task(run_first_process()) + + # Wait for tool to start and yield pending + await asyncio.wait_for(tool_started.wait(), timeout=5.0) + + # Give some time for the agent to process the pending status + await asyncio.sleep(0.1) + + # Cancel the first process (simulating user speaking) + first_task.cancel() + try: + await first_task + except asyncio.CancelledError: + pass + + # Now allow the tool to complete + tool_can_complete.set() + + # Give time for the background tool to yield its success result + await asyncio.sleep(0.1) + + # Second process() call - should pick up the success event + second_event = UserTextSent( + content="What's the status?", + history=[ + UserTextSent(content="Start transaction"), + AgentTextSent(content="Starting transaction..."), + AgentTextSent(content="Please wait..."), + UserTextSent(content="What's the status?"), + ], + ) + + second_outputs = await collect_outputs(agent, turn_env, second_event) + + # The second process should have received the success tool result + tool_returned_events = [o for o in second_outputs if isinstance(o, AgentToolReturned)] + success_events = [e for e in tool_returned_events if e.result.get("status") == "success"] + + assert len(success_events) >= 1, ( + f"Expected to find 'success' tool result in second process(), " + f"but only found: {[e.result for e in tool_returned_events]}" + ) From d07ad17007d9375915588e36c2fe1f96c4f85456 Mon Sep 17 00:00:00 2001 From: Dragoslav Mitrinovic Date: Mon, 9 Mar 2026 22:45:09 -0500 Subject: [PATCH 2/4] Fix background tool results appearing in wrong position in history Background tool events were being added to history with the triggering event_id (from when the tool was called). When the tool yields much later after user spoke and a new process() started, these events were placed in the OLD turn's position in the merged history. This caused the LLM messages to end with assistant text instead of tool result, triggering the 'conversation cannot end with assistant message' validation and skipping the LLM call. Fix: Use the CURRENT event_id at yield time (via _append_local) instead of the captured triggering_event_id. This ensures background tool results appear at the END of history when yielded during a new process() call. Co-Authored-By: Claude Opus 4.6 --- line/llm_agent/llm_agent.py | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/line/llm_agent/llm_agent.py b/line/llm_agent/llm_agent.py index 889ce3a5..2fbb3414 100644 --- a/line/llm_agent/llm_agent.py +++ b/line/llm_agent/llm_agent.py @@ -702,9 +702,12 @@ def _execute_backgroundable_tool( Events are added to _background_event_queue for loopback processing. If the caller is cancelled, events continue to be produced and queued for processing on the next process() call. + + Events are tagged with the CURRENT event_id at yield time (not the triggering + event_id). This ensures that when a background tool yields after user spoke + and a new process() started, the tool result appears at the END of the + conversation history, not in the middle where it was originally triggered. """ - # Capture the event_id at the start - this is the triggering event - triggering_event_id = self.history._current_event_id async def generate_events() -> None: n = 0 @@ -713,9 +716,11 @@ async def generate_events() -> None: call_id = f"{tc_id}-{n}" called, returned = _construct_tool_events(call_id, tc_name, tool_args, value) - # Add to local history with the triggering event_id - self.history._append_local_with_event_id(called, triggering_event_id) - self.history._append_local_with_event_id(returned, triggering_event_id) + # Add to local history with the CURRENT event_id (at yield time). + # This ensures background tool results appear at the end of history + # when yielded after a new process() call has started. + self.history._append_local(called) + self.history._append_local(returned) # Add to queue for loopback processing await self._background_event_queue.put((called, returned)) n += 1 @@ -723,9 +728,9 @@ async def generate_events() -> None: # Use negative limit to show last 10 frames (most relevant) logger.error(f"Error in Tool Call {tc_name}: {e}\n{traceback.format_exc(limit=-10)}") called, returned = _construct_tool_events(f"{tc_id}-{n}", tc_name, tool_args, f"error: {e}") - # Add to local history with the triggering event_id - self.history._append_local_with_event_id(called, triggering_event_id) - self.history._append_local_with_event_id(returned, triggering_event_id) + # Add to local history with the CURRENT event_id + self.history._append_local(called) + self.history._append_local(returned) # Add to queue for loopback processing await self._background_event_queue.put((called, returned)) From da2384401535d2d07698ac92a9e8ccaad33b91e9 Mon Sep 17 00:00:00 2001 From: Dragoslav Mitrinovic Date: Mon, 9 Mar 2026 23:22:18 -0500 Subject: [PATCH 3/4] format --- tests/test_llm_agent_llm_agent.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/tests/test_llm_agent_llm_agent.py b/tests/test_llm_agent_llm_agent.py index 98b21278..7a856e13 100644 --- a/tests/test_llm_agent_llm_agent.py +++ b/tests/test_llm_agent_llm_agent.py @@ -2074,7 +2074,11 @@ async def slow_background_tool(ctx, value: Annotated[str, "A value"]): [ StreamChunk(text="Starting transaction..."), StreamChunk( - tool_calls=[ToolCall(id="tc1", name="slow_background_tool", arguments='{"value":"test"}', is_complete=True)] + tool_calls=[ + ToolCall( + id="tc1", name="slow_background_tool", arguments='{"value":"test"}', is_complete=True + ) + ] ), StreamChunk(is_final=True), ], @@ -2095,7 +2099,9 @@ async def slow_background_tool(ctx, value: Annotated[str, "A value"]): agent, mock_llm = create_agent_with_mock(responses, tools=[slow_background_tool]) # First process() call - initiates the tool - first_event = UserTextSent(content="Start transaction", history=[UserTextSent(content="Start transaction")]) + first_event = UserTextSent( + content="Start transaction", history=[UserTextSent(content="Start transaction")] + ) async def run_first_process(): outputs = [] From f3ff797a0b738878eac4d6a8bb313816bcf2b578 Mon Sep 17 00:00:00 2001 From: Dragoslav Mitrinovic Date: Tue, 10 Mar 2026 10:47:32 -0500 Subject: [PATCH 4/4] Re-enqueue event if get_event_task completed before cancel If get_event_task already consumed an event from the queue when CancelledError is caught, cancel() is a no-op and the event would be silently discarded. Fix by checking if the task completed and re-enqueueing the result. Co-Authored-By: Claude Opus 4.6 --- line/llm_agent/llm_agent.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/line/llm_agent/llm_agent.py b/line/llm_agent/llm_agent.py index 2fbb3414..12bdbd16 100644 --- a/line/llm_agent/llm_agent.py +++ b/line/llm_agent/llm_agent.py @@ -782,12 +782,17 @@ async def _maybe_await_background_event(self) -> Union[None, tuple[AgentToolCall except asyncio.CancelledError: # If we're cancelled externally (e.g., user started speaking), # ensure get_event_task is cancelled so it doesn't consume - # events meant for the next process() call + # events meant for the next process() call. + # However, if get_event_task already completed (consumed an event), + # we must re-enqueue it so the next process() call can pick it up. get_event_task.cancel() try: await get_event_task except asyncio.CancelledError: pass + else: + # get_event_task completed before cancel took effect - re-enqueue the event + self._background_event_queue.put_nowait(get_event_task.result()) raise async def cleanup(self) -> None: