Skip to content

feat: agent/team SSE reconnection and resume#6849

Open
kausmeows wants to merge 13 commits intomainfrom
feat/sse-reconnection-agent-team
Open

feat: agent/team SSE reconnection and resume#6849
kausmeows wants to merge 13 commits intomainfrom
feat/sse-reconnection-agent-team

Conversation

@kausmeows
Copy link
Contributor

@kausmeows kausmeows commented Mar 3, 2026

Summary

Adds SSE-based reconnection support for agent runs when using background=True, stream=True. When a frontend user refreshes the page or loses network during an agent's SSE stream, they can now reconnect via a new /resume endpoint and pick up where they left off — catching up on missed events and continuing to receive live events if the agent is still running.

This follows the same pattern as workflows, where background=True + stream=True uses WebSocket-based reconnection. For agents, reconnection is done over SSE instead.

Key changes:

  • background=True, stream=True now supported in arun_dispatch — previously rejected with ValueError, now routes to new _arun_background_stream
  • RUNNING status persisted in DB — run is stored with RUNNING status before streaming begins, matching the _arun_background pattern for non-streaming background runs
  • Decoupled agent execution from HTTP response lifecycle — agent runs in a detached asyncio.Task (inside _arun_background_stream) so it survives client disconnections
  • Event buffering — every SSE event is stored in the in-memory EventsBuffer with a sequential event_index, following the same pattern as workflow.py
  • New SSESubscriberManager — manages asyncio.Queue subscribers for live event forwarding to resumed clients
  • New POST /agents/{agent_id}/runs/{run_id}/resume endpoint — three reconnection paths (live subscription, buffer replay, DB fallback)
  • New format_sse_event_with_index() helper — injects event_index and run_id into SSE payloads without modifying core dataclasses

Architecture

Problem

When background=True is used, the agent should keep running even if the client disconnects. Previously, agent.arun() ran inside the StreamingResponse async generator — when the HTTP connection closed (page refresh, network loss), Starlette cancelled the generator, killing the agent mid-execution. All events were lost with no way to reconnect.

Additionally, arun_dispatch rejected background=True, stream=True with a ValueError, so there was no way to combine background execution with streaming.

Solution: Decoupled Producer/Consumer (background=True only)

Client A connects    -> StreamingResponse reads from Queue <- Background Task (_arun_stream)
Client A disconnects -> StreamingResponse cancelled         <- Background Task keeps running
Client B reconnects  -> /resume reads from subscriber Queue <- Background Task still publishing

When background=True, stream=True, the new _arun_background_stream in _run.py:

  1. Persists RUNNING status in DB (via aread_or_create_session + upsert_run + asave_session)
  2. Spawns a detached asyncio.Task that runs _arun_stream and publishes events to:
    • The EventsBuffer — for catch-up replay on reconnection (lazy import from agno.os.managers, same as workflow.py)
    • SSESubscriberManager queues — for live forwarding to /resume clients
    • A primary asyncio.Queue — read by the original StreamingResponse generator
  3. Yields SSE-formatted strings from the queue (formatted via format_sse_event_with_index from agno.os.utils)

When the original client disconnects, only the thin queue-reader is cancelled. The background task keeps running in the event loop until the agent completes.

When background=False (default): The original direct-yield streamer is used — agent runs inline inside the generator, and client disconnection cancels the agent. No buffering, no event_index injection. Behavior is identical to before this PR.

Streaming Path Selection

background stream Behavior
false true Default. Agent runs inline. Client disconnect cancels agent. No buffering.
false false Non-streaming. Returns full response. Unaffected.
true true Resumable. Agent runs in detached task. RUNNING status persisted. Events buffered. /resume available.
true false Fire-and-forget. Returns PENDING immediately. Polls via GET. Unaffected.

Components

1. Background Stream Execution (_run.py_arun_background_stream)

New function in _run.py that handles the background=True, stream=True path. Similar to how _arun_background handles background=True, stream=False, but for streaming:

  • Persists RUNNING status in DB before streaming begins
  • Spawns a detached asyncio.Task that runs _arun_stream
  • Buffers events via event_buffer and publishes to sse_subscriber_manager (lazy imports from agno.os.managers, matching the workflow pattern where workflow.py imports from agno.os.managers for event_buffer and websocket_manager)
  • Formats events as SSE via format_sse_event_with_index (lazy import from agno.os.utils)
  • Yields SSE strings from an asyncio.Queue
  • Uses run_response.status for final status (set by _arun_stream/acleanup_and_store)

The router's agent_resumable_response_streamer is now a thin wrapper that just calls agent.arun(background=True, stream=True) and yields the SSE strings.

2. Event Buffering (managers.pyEventsBuffer, pre-existing)

Every event is stored with a sequential event_index (0, 1, 2, ...). The same EventsBuffer class that workflows already use.

  • add_event(run_id, event) -> returns event_index
  • get_events(run_id, last_event_index=N) -> returns events after index N
  • set_run_completed(run_id, status) -> marks run done, triggers cleanup after 30min
  • get_run_status(run_id) -> returns running, completed, error, etc.
  • Max 10,000 events per run. Completed runs cleaned up after 30 minutes.

3. SSE Subscriber Manager (managers.py — new)

When a /resume client connects while the agent is still running, it registers an asyncio.Queue. The producer pushes every event to all registered queues. A None sentinel signals completion.

4. event_index Injection (utils.pyformat_sse_event_with_index)

Resumable SSE events include an event_index field in their JSON payload. Used by _arun_background_stream in _run.py to format events. The core BaseAgentRunEvent dataclass is not modified.

5. /resume Endpoint (router.py)

POST /agents/{agent_id}/runs/{run_id}/resume
Content-Type: multipart/form-data

last_event_index=N&session_id=S

Three reconnection paths:

Path Condition Behavior
1 Run still active (in buffer) Subscribe to queue FIRST, replay missed events from buffer, then stream live events from queue
2 Run completed (in buffer) Replay all missed events from buffer
3 Not in buffer (expired) Fall back to database via agent.aget_run_output()

Race condition handling: After subscribing but before entering the queue loop, the buffer status is re-checked. If the run completed during catch-up, remaining events are replayed from buffer instead of waiting on an empty queue (the sentinel was pushed before the subscription existed).

Meta Events

The /resume stream may include these meta events before actual data:

Event Type Meaning
catch_up Run still active. Missed events follow, then live events.
replay Run already completed. All missed events follow.
subscribed Catch-up done, now receiving live events.
error Run not found or other issue.

Files Changed

File Change
libs/agno/agno/os/routers/agents/router.py Decoupled streamers, new _resume_stream_generator, new /resume endpoint
libs/agno/agno/os/managers.py New SSESubscriberManager class + global instance
libs/agno/agno/os/utils.py New format_sse_event_with_index() helper
cookbook/05_agent_os/client/10_sse_reconnect.py New cookbook example for testing reconnection

Type of change

  • New feature (non-breaking change which adds functionality)

Checklist

  • Ran ./scripts/format.sh and ./scripts/validate.sh
  • No new mypy errors introduced (11 pre-existing in unrelated files)
  • Cookbook example tested end-to-end
  • No core agent code modified — all changes at the OS router layer
  • Both sync and async paths unaffected (only async streaming path modified)

@kausmeows kausmeows added the WIP work in progress label Mar 3, 2026
@kausmeows kausmeows marked this pull request as ready for review March 5, 2026 12:32
@kausmeows kausmeows requested a review from a team as a code owner March 5, 2026 12:32
@kausmeows kausmeows removed WIP work in progress labels Mar 5, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant