Skip to content

Move background queuing logic into dedicated container#174

Merged
akavi merged 2 commits intomainfrom
akavi/background-queue
Mar 11, 2026
Merged

Move background queuing logic into dedicated container#174
akavi merged 2 commits intomainfrom
akavi/background-queue

Conversation

@akavi
Copy link
Collaborator

@akavi akavi commented Mar 11, 2026

What does this PR do?

The event queuing logic for background tasks in LlmAgent was getting a bit unwieldy, so I pulled it into a seperate class.

BackgroundQueue[T]
    subscribe(source)  — register an AsyncIterable as a source
    get_nowait() -> List[T]  — drain all buffered items (non-blocking)
    get() -> T | None  — wait for next item; None means all sources done + empty
    is_active -> bool  — True if items buffered or sources still running
    wait() -> None  — wait for all sources to complete; raises on source error

Also: cleaned up some dead code

Type of change

  • Bug fix
  • New feature
  • Breaking change
  • Documentation
  • Other: Refactor

Testing

Unit tests

Checklist

  • I have read the contributing guidelines
  • I have added tests that prove my fix is effective or that my feature works
  • I have formatted my code with make format

Note

Medium Risk
Refactors background tool event delivery and cancellation behavior in LlmAgent, which is concurrency-sensitive and could affect loopback timing or dropped events if edge cases are missed.

Overview
Extracts background tool event buffering into a new BackgroundQueue that runs each async source as its own task, preserves event ordering (including queued errors), and is cancellation-safe for consumers.

Updates LlmAgent to replace the prior asyncio.Queue + chained _background_task + _maybe_await_background_event() logic with BackgroundQueue (subscribe/get/get_nowait/is_active/wait) and waits for background sources in cleanup().

Removes dead History._append_local_with_event_id code and adds focused unit tests for BackgroundQueue, plus tightens an existing cancellation regression test to wait deterministically for background completion via agent._background_queue.wait().

Written by Cursor Bugbot for commit ad9d63e. This will update automatically on new commits. Configure here.

@akavi akavi force-pushed the akavi/background-queue branch from e0ebe24 to ad9d63e Compare March 11, 2026 18:58
@akavi akavi enabled auto-merge (squash) March 11, 2026 18:59
@akavi akavi merged commit f11d658 into main Mar 11, 2026
7 checks passed
@akavi akavi deleted the akavi/background-queue branch March 11, 2026 19:00
Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 2 potential issues.

Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.

# After all sources are done, scan the buffer for the first error.
for entry in self._buffer:
if isinstance(entry, _Error):
raise entry.exc
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wait() doesn't remove error from buffer, causing double-raise

Medium Severity

The wait() method scans _buffer with a for loop and raises on the first _Error found, but never removes that _Error from the buffer. After wait() raises, the same error persists in the buffer and will be raised again by subsequent get() or get_nowait() calls. This is exactly the concern raised in the PR discussion — the error needs to be consumed or cleared after being raised to avoid duplicate error propagation.

Fix in Cursor Fix in Web

async def wait(self) -> None:
"""Wait for all source tasks to complete. Raises if any source failed."""
if self._sources:
await asyncio.gather(*list(self._sources), return_exceptions=True)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cancelling wait() cancels source tasks via gather

Medium Severity

The wait() method uses asyncio.gather() to await source tasks. If wait() is cancelled, gather() propagates the cancellation to all source tasks, destroying the in-flight background work. The old code used asyncio.shield() to protect background tasks from cancellation. This regression means cancelling cleanup() (e.g., during CallEnded handling) now kills background tool execution instead of letting it complete.

Additional Locations (1)
Fix in Cursor Fix in Web

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants