Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 37 additions & 21 deletions line/llm_agent/llm_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -702,9 +702,12 @@ def _execute_backgroundable_tool(
Events are added to _background_event_queue for loopback processing.
If the caller is cancelled, events continue to be produced and queued
for processing on the next process() call.

Events are tagged with the CURRENT event_id at yield time (not the triggering
event_id). This ensures that when a background tool yields after user spoke
and a new process() started, the tool result appears at the END of the
conversation history, not in the middle where it was originally triggered.
"""
# Capture the event_id at the start - this is the triggering event
triggering_event_id = self.history._current_event_id

async def generate_events() -> None:
n = 0
Expand All @@ -713,19 +716,21 @@ async def generate_events() -> None:
call_id = f"{tc_id}-{n}"
called, returned = _construct_tool_events(call_id, tc_name, tool_args, value)

# Add to local history with the triggering event_id
self.history._append_local_with_event_id(called, triggering_event_id)
self.history._append_local_with_event_id(returned, triggering_event_id)
# Add to local history with the CURRENT event_id (at yield time).
# This ensures background tool results appear at the end of history
# when yielded after a new process() call has started.
self.history._append_local(called)
self.history._append_local(returned)
# Add to queue for loopback processing
await self._background_event_queue.put((called, returned))
n += 1
except Exception as e:
# Use negative limit to show last 10 frames (most relevant)
logger.error(f"Error in Tool Call {tc_name}: {e}\n{traceback.format_exc(limit=-10)}")
called, returned = _construct_tool_events(f"{tc_id}-{n}", tc_name, tool_args, f"error: {e}")
# Add to local history with the triggering event_id
self.history._append_local_with_event_id(called, triggering_event_id)
self.history._append_local_with_event_id(returned, triggering_event_id)
# Add to local history with the CURRENT event_id
self.history._append_local(called)
self.history._append_local(returned)
# Add to queue for loopback processing
await self._background_event_queue.put((called, returned))

Expand Down Expand Up @@ -757,22 +762,33 @@ async def _maybe_await_background_event(self) -> Union[None, tuple[AgentToolCall
return None

get_event_task = asyncio.ensure_future(self._background_event_queue.get())
done, _ = await asyncio.wait(
[get_event_task, self._background_task],
return_when=asyncio.FIRST_COMPLETED,
)
try:
done, _ = await asyncio.wait(
[get_event_task, self._background_task],
return_when=asyncio.FIRST_COMPLETED,
)

# Check if the get_event task completed
if get_event_task in done:
return get_event_task.result()
# Check if the get_event task completed
if get_event_task in done:
return get_event_task.result()

# Background task completed first - cancel the get_event task
get_event_task.cancel()
try:
await get_event_task
# Background task completed first - cancel the get_event task
get_event_task.cancel()
try:
await get_event_task
except asyncio.CancelledError:
pass
return None
except asyncio.CancelledError:
pass
return None
# If we're cancelled externally (e.g., user started speaking),
# ensure get_event_task is cancelled so it doesn't consume
# events meant for the next process() call
get_event_task.cancel()
try:
await get_event_task
except asyncio.CancelledError:
pass
raise
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Completed event result silently discarded in cancellation handler

Medium Severity

In the CancelledError handler of _maybe_await_background_event, if get_event_task has already completed (consumed an event from the queue) before the cancellation takes effect, get_event_task.cancel() is a no-op and await get_event_task returns the event tuple — but the result is never captured or put back into _background_event_queue. The event is consumed from the queue and silently discarded. This race occurs when asyncio.wait()'s internal waiter is cancelled just after get_event_task completes but before asyncio.wait() can return. The completed result needs to be retrieved and re-enqueued via put_nowait so the next process() call can pick it up.

Fix in Cursor Fix in Web


async def cleanup(self) -> None:
"""Clean up resources."""
Expand Down
121 changes: 121 additions & 0 deletions tests/test_llm_agent_llm_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -2031,3 +2031,124 @@ async def test_empty_messages_list_skips_generation(self, turn_env):

# LLM should not have been called
assert mock_llm._call_count == 0


# =============================================================================
# Tests: Background tool cancellation handling
# =============================================================================


async def test_background_tool_event_not_lost_on_cancellation(turn_env):
"""Test that background tool events aren't lost when process() is cancelled.

This tests a specific bug where if process() was cancelled while waiting
in _maybe_await_background_event(), an orphaned queue.get() task could
consume events meant for the next process() call.

Scenario:
1. User initiates a background tool (e.g., sell_stock)
2. Tool yields "pending" status, agent responds
3. User speaks again, cancelling the current process()
4. Tool completes and yields "success"
5. New process() should see the "success" event
"""
import asyncio

# Track tool execution state
tool_started = asyncio.Event()
tool_can_complete = asyncio.Event()

@loopback_tool(is_background=True)
async def slow_background_tool(ctx, value: Annotated[str, "A value"]):
"""A background tool that yields pending, waits, then yields success."""
yield {"status": "pending", "value": value}
tool_started.set()
await tool_can_complete.wait()
yield {"status": "success", "value": value}

# Response 1: LLM calls the tool
# Response 2: LLM responds to pending status
# Response 3: LLM responds to user's second message
# Response 4: LLM responds to success status
responses = [
[
StreamChunk(text="Starting transaction..."),
StreamChunk(
tool_calls=[
ToolCall(
id="tc1", name="slow_background_tool", arguments='{"value":"test"}', is_complete=True
)
]
),
StreamChunk(is_final=True),
],
[
StreamChunk(text="Please wait..."),
StreamChunk(is_final=True),
],
[
StreamChunk(text="I understand, still waiting..."),
StreamChunk(is_final=True),
],
[
StreamChunk(text="Transaction complete!"),
StreamChunk(is_final=True),
],
]

agent, mock_llm = create_agent_with_mock(responses, tools=[slow_background_tool])

# First process() call - initiates the tool
first_event = UserTextSent(
content="Start transaction", history=[UserTextSent(content="Start transaction")]
)

async def run_first_process():
outputs = []
async for output in agent.process(turn_env, first_event):
if not isinstance(output, LogMetric):
outputs.append(output)
return outputs

first_task = asyncio.create_task(run_first_process())

# Wait for tool to start and yield pending
await asyncio.wait_for(tool_started.wait(), timeout=5.0)

# Give some time for the agent to process the pending status
await asyncio.sleep(0.1)

# Cancel the first process (simulating user speaking)
first_task.cancel()
try:
await first_task
except asyncio.CancelledError:
pass

# Now allow the tool to complete
tool_can_complete.set()

# Give time for the background tool to yield its success result
await asyncio.sleep(0.1)

# Second process() call - should pick up the success event
second_event = UserTextSent(
content="What's the status?",
history=[
UserTextSent(content="Start transaction"),
AgentTextSent(content="Starting transaction..."),
AgentTextSent(content="Please wait..."),
UserTextSent(content="What's the status?"),
],
)

second_outputs = await collect_outputs(agent, turn_env, second_event)

# The second process should have received the success tool result
tool_returned_events = [o for o in second_outputs if isinstance(o, AgentToolReturned)]
success_events = [e for e in tool_returned_events if e.result.get("status") == "success"]

assert len(success_events) >= 1, (
f"Expected to find 'success' tool result in second process(), "
f"but only found: {[e.result for e in tool_returned_events]}"
)