Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 107 additions & 0 deletions line/llm_agent/background_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
"""BackgroundQueue - a cancellation-safe queue for async iterables."""

import asyncio
from collections import deque
from typing import AsyncIterable, Generic, Optional, TypeVar, Union

T = TypeVar("T")


class _Error:
"""Sentinel wrapper so queued errors are distinguishable from normal items."""

__slots__ = ("exc",)

def __init__(self, exc: BaseException) -> None:
self.exc = exc


class BackgroundQueue(Generic[T]):
"""A queue that consumes multiple async iterables and buffers their items.

Sources run as independent tasks — cancelling the consumer of get()
does not cancel the source tasks, since get() only awaits an Event,
not the tasks themselves.

Errors from sources are slotted into the buffer at the position they
occurred and re-raised when dequeued via get() or get_nowait().

Interface:
subscribe(source) — register an AsyncIterable as a source
get_nowait() -> T | None — pop next buffered item; None if empty
get() -> T | None — wait for next item; None means all sources done + empty
is_active -> bool — True if items buffered or sources still running
wait() -> None — wait for all sources to complete; raises on source error
"""

def __init__(self) -> None:
self._buffer: deque[Union[T, _Error]] = deque()
self._notify: asyncio.Event = asyncio.Event()
self._sources: set[asyncio.Task[None]] = set()

def subscribe(self, source: AsyncIterable[T]) -> None:
"""Register an async iterable as a source.

Items yielded by the source are buffered for later consumption via
get() or get_nowait(). If the source raises, the error is slotted
into the buffer and re-raised when dequeued.
"""

async def _consume() -> None:
async for item in source:
self._buffer.append(item)
self._notify.set()

task = asyncio.create_task(_consume())
self._sources.add(task)
task.add_done_callback(self._on_source_done)

def _on_source_done(self, task: asyncio.Task[None]) -> None:
self._sources.discard(task)
if not task.cancelled() and task.exception() is not None:
self._buffer.append(_Error(task.exception()))
# Wake up get() so it can re-check the done/error condition
self._notify.set()

def get_nowait(self) -> Optional[T]:
"""Pop and return the next buffered item (non-blocking).

Returns None if the buffer is empty. Raises if the next entry is an error.
"""
if not self._buffer:
return None
item = self._buffer.popleft()
if isinstance(item, _Error):
raise item.exc
return item # type: ignore[return-value]

async def get(self) -> Optional[T]:
"""Wait for the next item. Returns None when all sources are done and buffer is empty.

Cancellation-safe: cancellation can only occur at the await point,
where no item has been dequeued yet.
"""
while True:
if self._buffer:
item = self._buffer.popleft()
if isinstance(item, _Error):
raise item.exc
return item # type: ignore[return-value]
if not self._sources:
return None
self._notify.clear()
await self._notify.wait()

@property
def is_active(self) -> bool:
"""True if there are buffered items or any source is still running."""
return bool(self._buffer) or bool(self._sources)

async def wait(self) -> None:
"""Wait for all source tasks to complete. Raises if any source failed."""
if self._sources:
await asyncio.gather(*list(self._sources), return_exceptions=True)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cancelling wait() cancels source tasks via gather

Medium Severity

The wait() method uses asyncio.gather() to await source tasks. If wait() is cancelled, gather() propagates the cancellation to all source tasks, destroying the in-flight background work. The old code used asyncio.shield() to protect background tasks from cancellation. This regression means cancelling cleanup() (e.g., during CallEnded handling) now kills background tool execution instead of letting it complete.

Additional Locations (1)
Fix in Cursor Fix in Web

# After all sources are done, scan the buffer for the first error.
for entry in self._buffer:
if isinstance(entry, _Error):
raise entry.exc
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wait() doesn't remove error from buffer, causing double-raise

Medium Severity

The wait() method scans _buffer with a for loop and raises on the first _Error found, but never removes that _Error from the buffer. After wait() raises, the same error persists in the buffer and will be raised again by subsequent get() or get_nowait() calls. This is exactly the concern raised in the PR discussion — the error needs to be consumed or cleared after being raised to avoid duplicate error propagation.

Fix in Cursor Fix in Web

6 changes: 0 additions & 6 deletions line/llm_agent/history.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ class History:
Private API (called by LlmAgent):
_set_input(input_history, current_event_id) — set merge sources
_append_local(event) — append event tagged with current event id
_append_local_with_event_id(event, event_id) — append with explicit id
_current_event_id — property for the current triggering event id
"""

Expand Down Expand Up @@ -230,11 +229,6 @@ def _append_local(self, event: _LocalEvent) -> None:
self._local_history.append((self._current_event_id, event))
self._cache = None # invalidate cache

def _append_local_with_event_id(self, event: _LocalEvent, event_id: str) -> None:
"""Append an event to local history with an explicit event id."""
self._local_history.append((event_id, event))
self._cache = None # invalidate cache

# ------------------------------------------------------------------
# Internal: lazy rebuild
# ------------------------------------------------------------------
Expand Down
119 changes: 18 additions & 101 deletions line/llm_agent/llm_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
See README.md for examples and documentation.
"""

import asyncio
import inspect
import json
import time
Expand Down Expand Up @@ -40,6 +39,7 @@
OutputEvent,
UserTextSent,
)
from line.llm_agent.background_queue import BackgroundQueue
from line.llm_agent.config import LlmConfig, _merge_configs, _normalize_config
from line.llm_agent.history import _HISTORY_EVENT_TYPES, History
from line.llm_agent.provider import LLMProvider, Message, ToolCall
Expand Down Expand Up @@ -104,12 +104,8 @@ def __init__(
self._introduction_sent = False
self.history = History()
self._handoff_target: Optional[AgentCallable] = None # Normalized process function
# Background task for backgrounded tools - None means no pending work
self._background_task: Optional[asyncio.Task[None]] = None
# Queue for events from backgrounded tools that need to trigger loopback
self._background_event_queue: asyncio.Queue[tuple[AgentToolCalled, AgentToolReturned]] = (
asyncio.Queue()
)
self._background_queue: BackgroundQueue[tuple[AgentToolCalled, AgentToolReturned]] = BackgroundQueue()
# Cache for thought signatures (Gemini 3+ models)
# Maps tool_call_id -> thought_signature
self._tool_signatures: Dict[str, str] = {}
Expand Down Expand Up @@ -316,15 +312,15 @@ async def _generate_response(
# These events were produced since the last iteration (or from previous process() invocations)
if is_first_iteration or should_loopback:
# Drain any immediately available events (non-blocking)
while not self._background_event_queue.empty():
called_evt, returned_evt = self._background_event_queue.get_nowait()
while (pair := self._background_queue.get_nowait()) is not None:
called_evt, returned_evt = pair
yield called_evt
yield returned_evt
else:
# Otherwise wait for either: background task completes OR new event arrives
result = await self._maybe_await_background_event()
# Otherwise wait for either: all sources complete OR new event arrives
result = await self._background_queue.get()
if result is None:
# Background task completed with no more events
# All background sources completed with no more events
# this generation process is completed - exit loop
break
called_evt, returned_evt = result
Expand Down Expand Up @@ -576,9 +572,7 @@ def handoff_target(

# ==== END TOOL CALLS ==== #

has_background_events = not self._background_event_queue.empty()
has_background_tasks = self._background_task is not None and not self._background_task.done()
if not (should_loopback or has_background_events or has_background_tasks):
if not (should_loopback or self._background_queue.is_active):
break

async def _build_messages(
Expand Down Expand Up @@ -690,118 +684,41 @@ def _execute_backgroundable_tool(
tc_id: str,
tc_name: str,
) -> None:
"""Execute a backgroundable tool in a shielded task, streaming events.

The task is protected from cancellation. If the calling coroutine is
cancelled, the task continues running and stores results to local_history.
"""Execute a backgroundable tool via the background queue.

Each value yielded by the tool produces a pair of:
- AgentToolCalled with tool_call_id = "{tc_id}-{n}"
- AgentToolReturned with the same tool_call_id

Events are added to _background_event_queue for loopback processing.
If the caller is cancelled, events continue to be produced and queued
for processing on the next process() call.

Events are tagged with the CURRENT event_id at yield time (not the triggering
event_id). This ensures that when a background tool yields after user spoke
and a new process() started, the tool result appears at the END of the
conversation history, not in the middle where it was originally triggered.
The source is subscribed to the background queue, which shields it
from cancellation. Events are tagged with the CURRENT event_id at
yield time so background tool results appear at the end of history
when yielded after a new process() call has started.
"""

async def generate_events() -> None:
async def generate_events() -> AsyncIterable[tuple[AgentToolCalled, AgentToolReturned]]:
n = 0
try:
async for value in normalized_func(ctx, **tool_args):
call_id = f"{tc_id}-{n}"
called, returned = _construct_tool_events(call_id, tc_name, tool_args, value)

# Add to local history with the CURRENT event_id (at yield time).
# This ensures background tool results appear at the end of history
# when yielded after a new process() call has started.
self.history._append_local(called)
self.history._append_local(returned)
# Add to queue for loopback processing
await self._background_event_queue.put((called, returned))
yield (called, returned)
n += 1
except Exception as e:
# Use negative limit to show last 10 frames (most relevant)
logger.error(f"Error in Tool Call {tc_name}: {e}\n{traceback.format_exc(limit=-10)}")
called, returned = _construct_tool_events(f"{tc_id}-{n}", tc_name, tool_args, f"error: {e}")
# Add to local history with the CURRENT event_id
self.history._append_local(called)
self.history._append_local(returned)
# Add to queue for loopback processing
await self._background_event_queue.put((called, returned))

# Chain this task after the current background task
# Use shield to protect from cancellation
future = asyncio.shield(generate_events())
old_background_task = self._background_task

async def _new_background_task() -> None:
if old_background_task is not None:
await old_background_task
await future

self._background_task = asyncio.ensure_future(_new_background_task())

async def _maybe_await_background_event(self) -> Union[None, tuple[AgentToolCalled, AgentToolReturned]]:
"""Wait for either a background event or background task completion.

Cleans up get_event if the background task completes first.
Intentionally does not clean up the background task if get_event completes first,
since #cleanup handles that
yield (called, returned)

Returns:
- (AgentToolCalled, AgentToolReturned) if a new event is available
- None if the background task completed with no more events
"""
# If no background task, there's nothing to wait for
if self._background_task is None:
return None

get_event_task = asyncio.ensure_future(self._background_event_queue.get())
try:
done, _ = await asyncio.wait(
[get_event_task, self._background_task],
return_when=asyncio.FIRST_COMPLETED,
)

# Check if the get_event task completed
if get_event_task in done:
return get_event_task.result()

# Background task completed first - cancel the get_event task
get_event_task.cancel()
try:
await get_event_task
except asyncio.CancelledError:
pass
return None
except asyncio.CancelledError:
# If we're cancelled externally (e.g., user started speaking),
# ensure get_event_task is cancelled so it doesn't consume
# events meant for the next process() call.
# However, if get_event_task already completed (consumed an event),
# we must re-enqueue it so the next process() call can pick it up.
get_event_task.cancel()
try:
await get_event_task
except asyncio.CancelledError:
pass
else:
# get_event_task completed before cancel took effect - re-enqueue the event
self._background_event_queue.put_nowait(get_event_task.result())
raise
self._background_queue.subscribe(generate_events())

async def cleanup(self) -> None:
"""Clean up resources."""
self._handoff_target = None
# Wait for any remaining background task to complete
if self._background_task is not None:
await self._background_task

await self._background_queue.wait()
await self._llm.aclose()

# ------------------------------------------------------------------
Expand Down
Loading
Loading