diff --git a/line/llm_agent/llm_agent.py b/line/llm_agent/llm_agent.py index 51daa488..12bdbd16 100644 --- a/line/llm_agent/llm_agent.py +++ b/line/llm_agent/llm_agent.py @@ -702,9 +702,12 @@ def _execute_backgroundable_tool( Events are added to _background_event_queue for loopback processing. If the caller is cancelled, events continue to be produced and queued for processing on the next process() call. + + Events are tagged with the CURRENT event_id at yield time (not the triggering + event_id). This ensures that when a background tool yields after user spoke + and a new process() started, the tool result appears at the END of the + conversation history, not in the middle where it was originally triggered. """ - # Capture the event_id at the start - this is the triggering event - triggering_event_id = self.history._current_event_id async def generate_events() -> None: n = 0 @@ -713,9 +716,11 @@ async def generate_events() -> None: call_id = f"{tc_id}-{n}" called, returned = _construct_tool_events(call_id, tc_name, tool_args, value) - # Add to local history with the triggering event_id - self.history._append_local_with_event_id(called, triggering_event_id) - self.history._append_local_with_event_id(returned, triggering_event_id) + # Add to local history with the CURRENT event_id (at yield time). + # This ensures background tool results appear at the end of history + # when yielded after a new process() call has started. + self.history._append_local(called) + self.history._append_local(returned) # Add to queue for loopback processing await self._background_event_queue.put((called, returned)) n += 1 @@ -723,9 +728,9 @@ async def generate_events() -> None: # Use negative limit to show last 10 frames (most relevant) logger.error(f"Error in Tool Call {tc_name}: {e}\n{traceback.format_exc(limit=-10)}") called, returned = _construct_tool_events(f"{tc_id}-{n}", tc_name, tool_args, f"error: {e}") - # Add to local history with the triggering event_id - self.history._append_local_with_event_id(called, triggering_event_id) - self.history._append_local_with_event_id(returned, triggering_event_id) + # Add to local history with the CURRENT event_id + self.history._append_local(called) + self.history._append_local(returned) # Add to queue for loopback processing await self._background_event_queue.put((called, returned)) @@ -757,22 +762,38 @@ async def _maybe_await_background_event(self) -> Union[None, tuple[AgentToolCall return None get_event_task = asyncio.ensure_future(self._background_event_queue.get()) - done, _ = await asyncio.wait( - [get_event_task, self._background_task], - return_when=asyncio.FIRST_COMPLETED, - ) + try: + done, _ = await asyncio.wait( + [get_event_task, self._background_task], + return_when=asyncio.FIRST_COMPLETED, + ) - # Check if the get_event task completed - if get_event_task in done: - return get_event_task.result() + # Check if the get_event task completed + if get_event_task in done: + return get_event_task.result() - # Background task completed first - cancel the get_event task - get_event_task.cancel() - try: - await get_event_task + # Background task completed first - cancel the get_event task + get_event_task.cancel() + try: + await get_event_task + except asyncio.CancelledError: + pass + return None except asyncio.CancelledError: - pass - return None + # If we're cancelled externally (e.g., user started speaking), + # ensure get_event_task is cancelled so it doesn't consume + # events meant for the next process() call. + # However, if get_event_task already completed (consumed an event), + # we must re-enqueue it so the next process() call can pick it up. + get_event_task.cancel() + try: + await get_event_task + except asyncio.CancelledError: + pass + else: + # get_event_task completed before cancel took effect - re-enqueue the event + self._background_event_queue.put_nowait(get_event_task.result()) + raise async def cleanup(self) -> None: """Clean up resources.""" diff --git a/tests/test_llm_agent_llm_agent.py b/tests/test_llm_agent_llm_agent.py index 36f94a68..7a856e13 100644 --- a/tests/test_llm_agent_llm_agent.py +++ b/tests/test_llm_agent_llm_agent.py @@ -2031,3 +2031,124 @@ async def test_empty_messages_list_skips_generation(self, turn_env): # LLM should not have been called assert mock_llm._call_count == 0 + + +# ============================================================================= +# Tests: Background tool cancellation handling +# ============================================================================= + + +async def test_background_tool_event_not_lost_on_cancellation(turn_env): + """Test that background tool events aren't lost when process() is cancelled. + + This tests a specific bug where if process() was cancelled while waiting + in _maybe_await_background_event(), an orphaned queue.get() task could + consume events meant for the next process() call. + + Scenario: + 1. User initiates a background tool (e.g., sell_stock) + 2. Tool yields "pending" status, agent responds + 3. User speaks again, cancelling the current process() + 4. Tool completes and yields "success" + 5. New process() should see the "success" event + """ + import asyncio + + # Track tool execution state + tool_started = asyncio.Event() + tool_can_complete = asyncio.Event() + + @loopback_tool(is_background=True) + async def slow_background_tool(ctx, value: Annotated[str, "A value"]): + """A background tool that yields pending, waits, then yields success.""" + yield {"status": "pending", "value": value} + tool_started.set() + await tool_can_complete.wait() + yield {"status": "success", "value": value} + + # Response 1: LLM calls the tool + # Response 2: LLM responds to pending status + # Response 3: LLM responds to user's second message + # Response 4: LLM responds to success status + responses = [ + [ + StreamChunk(text="Starting transaction..."), + StreamChunk( + tool_calls=[ + ToolCall( + id="tc1", name="slow_background_tool", arguments='{"value":"test"}', is_complete=True + ) + ] + ), + StreamChunk(is_final=True), + ], + [ + StreamChunk(text="Please wait..."), + StreamChunk(is_final=True), + ], + [ + StreamChunk(text="I understand, still waiting..."), + StreamChunk(is_final=True), + ], + [ + StreamChunk(text="Transaction complete!"), + StreamChunk(is_final=True), + ], + ] + + agent, mock_llm = create_agent_with_mock(responses, tools=[slow_background_tool]) + + # First process() call - initiates the tool + first_event = UserTextSent( + content="Start transaction", history=[UserTextSent(content="Start transaction")] + ) + + async def run_first_process(): + outputs = [] + async for output in agent.process(turn_env, first_event): + if not isinstance(output, LogMetric): + outputs.append(output) + return outputs + + first_task = asyncio.create_task(run_first_process()) + + # Wait for tool to start and yield pending + await asyncio.wait_for(tool_started.wait(), timeout=5.0) + + # Give some time for the agent to process the pending status + await asyncio.sleep(0.1) + + # Cancel the first process (simulating user speaking) + first_task.cancel() + try: + await first_task + except asyncio.CancelledError: + pass + + # Now allow the tool to complete + tool_can_complete.set() + + # Give time for the background tool to yield its success result + await asyncio.sleep(0.1) + + # Second process() call - should pick up the success event + second_event = UserTextSent( + content="What's the status?", + history=[ + UserTextSent(content="Start transaction"), + AgentTextSent(content="Starting transaction..."), + AgentTextSent(content="Please wait..."), + UserTextSent(content="What's the status?"), + ], + ) + + second_outputs = await collect_outputs(agent, turn_env, second_event) + + # The second process should have received the success tool result + tool_returned_events = [o for o in second_outputs if isinstance(o, AgentToolReturned)] + success_events = [e for e in tool_returned_events if e.result.get("status") == "success"] + + assert len(success_events) >= 1, ( + f"Expected to find 'success' tool result in second process(), " + f"but only found: {[e.result for e in tool_returned_events]}" + )