From 48c7ebc278014a7376947dcbbb4b0f635171679b Mon Sep 17 00:00:00 2001 From: kausmeows Date: Tue, 3 Mar 2026 17:54:20 +0530 Subject: [PATCH 01/10] feat: agent/team SSE reconnection and resume --- .../05_agent_os/client/10_sse_reconnect.py | 220 ++++++++ libs/agno/agno/agent-run.py | 73 +++ libs/agno/agno/os/managers.py | 56 ++ libs/agno/agno/os/routers/agents/router.py | 526 ++++++++++++++++-- libs/agno/agno/os/utils.py | 36 ++ 5 files changed, 863 insertions(+), 48 deletions(-) create mode 100644 cookbook/05_agent_os/client/10_sse_reconnect.py create mode 100644 libs/agno/agno/agent-run.py diff --git a/cookbook/05_agent_os/client/10_sse_reconnect.py b/cookbook/05_agent_os/client/10_sse_reconnect.py new file mode 100644 index 0000000000..50940d525e --- /dev/null +++ b/cookbook/05_agent_os/client/10_sse_reconnect.py @@ -0,0 +1,220 @@ +""" +SSE Reconnection Test +===================== + +Tests SSE stream reconnection for agent runs: start a streaming run, disconnect +mid-stream, then reconnect via the /resume endpoint and catch up on missed events. + +Prerequisites: +1. Start the AgentOS server: python cookbook/05_agent_os/basic.py +2. Run this script: python cookbook/05_agent_os/client/10_sse_reconnect.py +""" + +import asyncio +import json +from typing import Optional + +import httpx + +# --------------------------------------------------------------------------- +# Config +# --------------------------------------------------------------------------- +BASE_URL = "http://localhost:7777" +# Number of events to receive before simulating a disconnect +EVENTS_BEFORE_DISCONNECT = 6 +# How long to "stay disconnected" (seconds) +DISCONNECT_DURATION = 3 + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- +def parse_sse_line(line: str) -> Optional[dict]: + """Parse a single SSE data line into a dict.""" + if line.startswith("data: "): + try: + return json.loads(line[6:]) + except json.JSONDecodeError: + return None + return None + + +# --------------------------------------------------------------------------- +# Test +# --------------------------------------------------------------------------- +async def test_sse_reconnection(): + print("=" * 70) + print("Agent SSE Reconnection Test") + print("=" * 70) + + # Step 1: Discover an agent + async with httpx.AsyncClient(base_url=BASE_URL, timeout=30) as client: + resp = await client.get("/agents") + resp.raise_for_status() + agents = resp.json() + if not agents: + print("[ERROR] No agents available on the server") + return + agent_id = agents[0]["id"] + print(f"Using agent: {agent_id} ({agents[0].get('name', 'unnamed')})") + + # Step 2: Start a streaming run and disconnect after a few events + run_id: Optional[str] = None + session_id: Optional[str] = None + last_event_index: Optional[int] = None + events_phase1: list[dict] = [] + + print( + f"\nPhase 1: Starting SSE stream, will disconnect after {EVENTS_BEFORE_DISCONNECT} events..." + ) + + async with httpx.AsyncClient(base_url=BASE_URL, timeout=60) as client: + form_data = { + "message": "Tell me a detailed story about a brave knight who goes on a quest. Make it at least 5 paragraphs long.", + "stream": "true", + } + async with client.stream( + "POST", f"/agents/{agent_id}/runs", data=form_data + ) as response: + event_count = 0 + buffer = "" + async for chunk in response.aiter_text(): + buffer += chunk + # SSE events are delimited by double newlines + while "\n\n" in buffer: + event_str, buffer = buffer.split("\n\n", 1) + for line in event_str.strip().split("\n"): + data = parse_sse_line(line) + if data is None: + continue + + event_type = data.get("event", "unknown") + ev_idx = data.get("event_index") + ev_run_id = data.get("run_id") + ev_session_id = data.get("session_id") + + # Track run_id and session_id + if ev_run_id and not run_id: + run_id = ev_run_id + if ev_session_id and not session_id: + session_id = ev_session_id + if ev_idx is not None: + last_event_index = ev_idx + + events_phase1.append(data) + event_count += 1 + content_preview = str(data.get("content", ""))[:60] + print( + f" [{event_count}] event={event_type} index={ev_idx} content={content_preview!r}" + ) + + if event_count >= EVENTS_BEFORE_DISCONNECT: + break + if event_count >= EVENTS_BEFORE_DISCONNECT: + break + if event_count >= EVENTS_BEFORE_DISCONNECT: + break + + print( + f"\n[DISCONNECT] Received {event_count} events. run_id={run_id}, last_event_index={last_event_index}" + ) + + if not run_id: + print("[ERROR] Could not determine run_id from events") + return + + # Step 3: Wait (simulate user being away) + print(f"\nSimulating disconnect for {DISCONNECT_DURATION} seconds...") + await asyncio.sleep(DISCONNECT_DURATION) + + # Step 4: Resume via /resume endpoint + print("\nPhase 2: Reconnecting via /resume endpoint...") + events_phase2: list[dict] = [] + + params: dict = {"last_event_index": last_event_index} + if session_id: + params["session_id"] = session_id + + async with httpx.AsyncClient(base_url=BASE_URL, timeout=120) as client: + async with client.stream( + "GET", f"/agents/{agent_id}/runs/{run_id}/resume", params=params + ) as response: + buffer = "" + async for chunk in response.aiter_text(): + buffer += chunk + while "\n\n" in buffer: + event_str, buffer = buffer.split("\n\n", 1) + for line in event_str.strip().split("\n"): + data = parse_sse_line(line) + if data is None: + continue + + event_type = data.get("event", "unknown") + ev_idx = data.get("event_index") + events_phase2.append(data) + + if event_type in ("catch_up", "replay", "subscribed"): + print( + f" [META] event={event_type} | {json.dumps(data, indent=2)}" + ) + else: + content_preview = str(data.get("content", ""))[:60] + print( + f" [RESUME] event={event_type} index={ev_idx} content={content_preview!r}" + ) + + # Step 5: Print summary + print("\n" + "=" * 70) + print("Summary") + print("=" * 70) + print(f"Phase 1 events received: {len(events_phase1)}") + print(f"Phase 2 events received: {len(events_phase2)}") + + # Check for meta events + meta_events = [ + e + for e in events_phase2 + if e.get("event") in ("catch_up", "replay", "subscribed") + ] + data_events = [ + e + for e in events_phase2 + if e.get("event") not in ("catch_up", "replay", "subscribed", "error") + ] + print(f" Meta events (catch_up/replay/subscribed): {len(meta_events)}") + print(f" Data events (actual agent events): {len(data_events)}") + + # Validate event_index continuity + phase1_indices = [ + e.get("event_index") for e in events_phase1 if e.get("event_index") is not None + ] + phase2_indices = [ + e.get("event_index") for e in data_events if e.get("event_index") is not None + ] + + if phase1_indices and phase2_indices: + last_p1 = max(phase1_indices) + first_p2 = min(phase2_indices) + last_p2 = max(phase2_indices) + print(f"\n Phase 1 event_index range: 0 -> {last_p1}") + print(f" Phase 2 event_index range: {first_p2} -> {last_p2}") + if first_p2 == last_p1 + 1: + print(" [PASS] Event indices are contiguous - no events were lost") + elif first_p2 > last_p1: + print(f" [WARN] Gap in event indices: {last_p1} -> {first_p2}") + else: + print(" [INFO] Overlapping indices detected (dedup may have occurred)") + elif not phase2_indices: + print( + "\n [INFO] No data events in phase 2 (run may have completed before resume)" + ) + else: + print("\n [INFO] No event indices in phase 1 to compare") + + total_events = len(events_phase1) + len(data_events) + print(f"\n Total unique events across both phases: {total_events}") + print("=" * 70) + + +if __name__ == "__main__": + asyncio.run(test_sse_reconnection()) diff --git a/libs/agno/agno/agent-run.py b/libs/agno/agno/agent-run.py new file mode 100644 index 0000000000..8cf72174f3 --- /dev/null +++ b/libs/agno/agno/agent-run.py @@ -0,0 +1,73 @@ +"""Minimal example for AgentOS.""" + +from agno.agent import Agent +from agno.db.postgres import PostgresDb +from agno.models.openai import OpenAIChat +from agno.os import AgentOS +from agno.team import Team +from agno.workflow.step import Step +from agno.workflow.workflow import Workflow + +# --------------------------------------------------------------------------- +# Create Example +# --------------------------------------------------------------------------- + +# Setup the database +db = PostgresDb(id="basic-db", db_url="postgresql+psycopg://ai:ai@localhost:5532/ai") + +# Setup basic agents, teams and workflows +basic_agent = Agent( + name="Basic Agent", + db=db, + enable_session_summaries=True, + update_memory_on_run=True, + add_history_to_context=True, + num_history_runs=3, + add_datetime_to_context=True, + markdown=True, +) +basic_team = Team( + id="basic-team", + name="Basic Team", + model=OpenAIChat(id="gpt-4o"), + db=db, + members=[basic_agent], + update_memory_on_run=True, +) +basic_workflow = Workflow( + id="basic-workflow", + name="Basic Workflow", + description="Just a simple workflow", + db=db, + steps=[ + Step( + name="step1", + description="Just a simple step", + agent=basic_agent, + ) + ], + add_workflow_history_to_steps=True, +) + +# Setup our AgentOS app +agent_os = AgentOS( + description="Example app for basic agent, team and workflow", + agents=[basic_agent], + teams=[basic_team], + workflows=[basic_workflow], +) +app = agent_os.get_app() + + +# --------------------------------------------------------------------------- +# Run Example +# --------------------------------------------------------------------------- + +if __name__ == "__main__": + """Run your AgentOS. + + You can see the configuration and available apps at: + http://localhost:7777/config + + """ + agent_os.serve(app="agent-run:app", reload=True) diff --git a/libs/agno/agno/os/managers.py b/libs/agno/agno/os/managers.py index 427c30fff8..81466a318c 100644 --- a/libs/agno/agno/os/managers.py +++ b/libs/agno/agno/os/managers.py @@ -5,10 +5,12 @@ - WebSocketManager: WebSocket connection management for real-time streaming - EventsBuffer: Event buffering for agent/team/workflow reconnection support - WebSocketHandler: Handler for sending events over WebSocket connections +- SSESubscriberManager: Subscriber management for SSE-based reconnection These managers are used by agents, teams, and workflows for background WebSocket execution. """ +import asyncio import json from dataclasses import dataclass from time import time @@ -315,6 +317,58 @@ def get_run_status(self, run_id: str) -> Optional[RunStatus]: return metadata["status"] if metadata else None +class SSESubscriberManager: + """ + Manages asyncio.Queue subscribers for SSE-based reconnection. + + When a client reconnects to a still-running agent/team via the /resume SSE endpoint, + it registers a Queue here. The response streamer pushes SSE-formatted events to all + registered queues. A None sentinel signals run completion. + """ + + def __init__(self) -> None: + self._subscribers: Dict[str, List[asyncio.Queue[Optional[str]]]] = {} + + def subscribe(self, run_id: str) -> "asyncio.Queue[Optional[str]]": + """Register a new subscriber queue for a run. Returns the queue.""" + if run_id not in self._subscribers: + self._subscribers[run_id] = [] + queue: asyncio.Queue[Optional[str]] = asyncio.Queue() + self._subscribers[run_id].append(queue) + log_debug(f"SSE subscriber registered for run {run_id}") + return queue + + def unsubscribe(self, run_id: str, queue: "asyncio.Queue[Optional[str]]") -> None: + """Remove a subscriber queue.""" + if run_id in self._subscribers: + try: + self._subscribers[run_id].remove(queue) + except ValueError: + pass + if not self._subscribers[run_id]: + del self._subscribers[run_id] + + async def publish(self, run_id: str, sse_data: str) -> None: + """Push an SSE-formatted event string to all subscriber queues for a run.""" + if run_id not in self._subscribers: + return + for queue in self._subscribers[run_id]: + try: + await queue.put(sse_data) + except Exception: + pass + + async def complete(self, run_id: str) -> None: + """Signal all subscribers that the run is done by pushing None sentinel.""" + if run_id not in self._subscribers: + return + for queue in self._subscribers[run_id]: + try: + await queue.put(None) + except Exception: + pass + + # Global manager instances websocket_manager = WebSocketManager( active_connections={}, @@ -324,3 +378,5 @@ def get_run_status(self, run_id: str) -> Optional[RunStatus]: max_events_per_run=10000, # Keep last 10000 events per run cleanup_interval=1800, # Clean up completed runs after 30 minutes ) + +sse_subscriber_manager = SSESubscriberManager() diff --git a/libs/agno/agno/os/routers/agents/router.py b/libs/agno/agno/os/routers/agents/router.py index 5b9b8bfd3c..2cf64dd957 100644 --- a/libs/agno/agno/os/routers/agents/router.py +++ b/libs/agno/agno/os/routers/agents/router.py @@ -1,3 +1,4 @@ +import asyncio import json from typing import TYPE_CHECKING, Any, AsyncGenerator, List, Optional, Union, cast from uuid import uuid4 @@ -22,6 +23,7 @@ from agno.media import Audio, Image, Video from agno.media import File as FileMedia from agno.os.auth import get_auth_token_from_request, get_authentication_dependency, require_resource_access +from agno.os.managers import event_buffer, sse_subscriber_manager from agno.os.routers.agents.schema import AgentResponse from agno.os.schema import ( BadRequestResponse, @@ -32,7 +34,7 @@ ) from agno.os.settings import AgnoAPISettings from agno.os.utils import ( - format_sse_event, + format_sse_event_with_index, get_agent_by_id, get_request_kwargs, process_audio, @@ -41,9 +43,10 @@ process_video, ) from agno.registry import Registry -from agno.run.agent import RunErrorEvent, RunOutput +from agno.run.agent import RunErrorEvent, RunEvent, RunOutput from agno.run.base import RunStatus from agno.utils.log import log_debug, log_error, log_warning +from agno.utils.serialize import json_serializer if TYPE_CHECKING: from agno.os.app import AgentOS @@ -62,63 +65,178 @@ async def agent_response_streamer( auth_token: Optional[str] = None, **kwargs: Any, ) -> AsyncGenerator: - try: - # Pass background_tasks if provided - if background_tasks is not None: - kwargs["background_tasks"] = background_tasks + """SSE generator that reads from the agent's event queue. - if "stream_events" in kwargs: - stream_events = kwargs.pop("stream_events") - else: - stream_events = True + The actual agent execution runs in a detached asyncio.Task (_agent_run_producer) + so it survives client disconnections. This generator simply subscribes to the + run's event queue and yields SSE data. When the client disconnects, only this + generator is cancelled -- the producer keeps running. + """ + # Create a queue that the producer will publish to via SSESubscriberManager + # We need a pre-run queue since run_id isn't known yet. Use a dedicated queue + # that the producer writes to directly. + primary_queue: asyncio.Queue[Optional[str]] = asyncio.Queue() - # Pass auth_token for remote agents - if auth_token and isinstance(agent, RemoteAgent): - kwargs["auth_token"] = auth_token + # Launch the producer as a detached background task + async def _producer_wrapper() -> None: + """Wrapper that publishes to the primary queue in addition to SSE subscribers.""" + run_id: Optional[str] = None + final_status = RunStatus.completed - run_response = agent.arun( - input=message, - session_id=session_id, - user_id=user_id, - images=images, - audio=audio, - videos=videos, - files=files, - stream=True, - stream_events=stream_events, - **kwargs, - ) - async for run_response_chunk in run_response: - yield format_sse_event(run_response_chunk) # type: ignore - except (InputCheckError, OutputCheckError) as e: - error_response = RunErrorEvent( - content=str(e), - error_type=e.type, - error_id=e.error_id, - additional_data=e.additional_data, - ) - yield format_sse_event(error_response) - except Exception as e: - import traceback + try: + if background_tasks is not None: + kwargs["background_tasks"] = background_tasks - traceback.print_exc(limit=3) - error_response = RunErrorEvent( - content=str(e), - ) - yield format_sse_event(error_response) + if "stream_events" in kwargs: + stream_events = kwargs.pop("stream_events") + else: + stream_events = True + if auth_token and isinstance(agent, RemoteAgent): + kwargs["auth_token"] = auth_token -async def agent_continue_response_streamer( + run_response = agent.arun( + input=message, + session_id=session_id, + user_id=user_id, + images=images, + audio=audio, + videos=videos, + files=files, + stream=True, + stream_events=stream_events, + **kwargs, + ) + async for run_response_chunk in run_response: + chunk_run_id = getattr(run_response_chunk, "run_id", None) + if run_id is None and chunk_run_id: + run_id = chunk_run_id + + if isinstance(run_response_chunk, RunOutput): + continue + + event_index: Optional[int] = None + buffer_run_id = run_id or chunk_run_id + if buffer_run_id: + try: + event_index = event_buffer.add_event(buffer_run_id, run_response_chunk) + except Exception: + pass + + sse_data = format_sse_event_with_index( + run_response_chunk, event_index=event_index, run_id=buffer_run_id + ) + + # Publish to primary queue (original client) + try: + await primary_queue.put(sse_data) + except Exception: + pass + + # Publish to SSE subscriber queues (resumed clients) + if buffer_run_id: + try: + await sse_subscriber_manager.publish(buffer_run_id, sse_data) + except Exception: + pass + + event_type = getattr(run_response_chunk, "event", "") + if event_type == RunEvent.run_error.value: + final_status = RunStatus.error + elif event_type == RunEvent.run_cancelled.value: + final_status = RunStatus.cancelled + elif event_type == RunEvent.run_paused.value: + final_status = RunStatus.paused + + except (InputCheckError, OutputCheckError) as e: + final_status = RunStatus.error + error_response = RunErrorEvent( + content=str(e), + error_type=e.type, + error_id=e.error_id, + additional_data=e.additional_data, + ) + event_index = None + if run_id: + try: + event_index = event_buffer.add_event(run_id, error_response) + except Exception: + pass + sse_data = format_sse_event_with_index(error_response, event_index=event_index, run_id=run_id) + try: + await primary_queue.put(sse_data) + except Exception: + pass + if run_id: + try: + await sse_subscriber_manager.publish(run_id, sse_data) + except Exception: + pass + except Exception as e: + import traceback + + traceback.print_exc(limit=3) + final_status = RunStatus.error + error_response = RunErrorEvent( + content=str(e), + ) + event_index = None + if run_id: + try: + event_index = event_buffer.add_event(run_id, error_response) + except Exception: + pass + sse_data = format_sse_event_with_index(error_response, event_index=event_index, run_id=run_id) + try: + await primary_queue.put(sse_data) + except Exception: + pass + if run_id: + try: + await sse_subscriber_manager.publish(run_id, sse_data) + except Exception: + pass + finally: + if run_id: + try: + event_buffer.set_run_completed(run_id, final_status) + except Exception: + pass + try: + await sse_subscriber_manager.complete(run_id) + except Exception: + pass + # Signal the primary queue that the run is done + try: + await primary_queue.put(None) + except Exception: + pass + + # Start the producer as a detached task (survives client disconnect) + asyncio.create_task(_producer_wrapper()) + + # Read from the primary queue and yield SSE data + while True: + sse_data = await primary_queue.get() + if sse_data is None: + break + yield sse_data + + +async def _agent_continue_run_producer( agent: Union[Agent, RemoteAgent], run_id: str, updated_tools: List, + primary_queue: "asyncio.Queue[Optional[str]]", session_id: Optional[str] = None, user_id: Optional[str] = None, background_tasks: Optional[BackgroundTasks] = None, auth_token: Optional[str] = None, -) -> AsyncGenerator: +) -> None: + """Background task that runs agent.acontinue_run and publishes events.""" + final_status = RunStatus.completed + try: - # Build kwargs for remote agent auth extra_kwargs: dict = {} if auth_token and isinstance(agent, RemoteAgent): extra_kwargs["auth_token"] = auth_token @@ -134,28 +252,296 @@ async def agent_continue_response_streamer( **extra_kwargs, ) async for run_response_chunk in continue_response: - yield format_sse_event(run_response_chunk) # type: ignore + if isinstance(run_response_chunk, RunOutput): + continue + + event_index: Optional[int] = None + try: + event_index = event_buffer.add_event(run_id, run_response_chunk) + except Exception: + pass + + sse_data = format_sse_event_with_index(run_response_chunk, event_index=event_index, run_id=run_id) + + try: + await primary_queue.put(sse_data) + except Exception: + pass + + try: + await sse_subscriber_manager.publish(run_id, sse_data) + except Exception: + pass + + event_type = getattr(run_response_chunk, "event", "") + if event_type == RunEvent.run_error.value: + final_status = RunStatus.error + elif event_type == RunEvent.run_cancelled.value: + final_status = RunStatus.cancelled + elif event_type == RunEvent.run_paused.value: + final_status = RunStatus.paused + except (InputCheckError, OutputCheckError) as e: + final_status = RunStatus.error error_response = RunErrorEvent( content=str(e), error_type=e.type, error_id=e.error_id, additional_data=e.additional_data, ) - yield format_sse_event(error_response) + event_index = None + try: + event_index = event_buffer.add_event(run_id, error_response) + except Exception: + pass + sse_data = format_sse_event_with_index(error_response, event_index=event_index, run_id=run_id) + try: + await primary_queue.put(sse_data) + except Exception: + pass + try: + await sse_subscriber_manager.publish(run_id, sse_data) + except Exception: + pass except Exception as e: import traceback traceback.print_exc(limit=3) + final_status = RunStatus.error error_response = RunErrorEvent( content=str(e), error_type=e.type if hasattr(e, "type") else None, error_id=e.error_id if hasattr(e, "error_id") else None, ) - yield format_sse_event(error_response) + event_index = None + try: + event_index = event_buffer.add_event(run_id, error_response) + except Exception: + pass + sse_data = format_sse_event_with_index(error_response, event_index=event_index, run_id=run_id) + try: + await primary_queue.put(sse_data) + except Exception: + pass + try: + await sse_subscriber_manager.publish(run_id, sse_data) + except Exception: + pass + finally: + try: + event_buffer.set_run_completed(run_id, final_status) + except Exception: + pass + try: + await sse_subscriber_manager.complete(run_id) + except Exception: + pass + try: + await primary_queue.put(None) + except Exception: + pass + + +async def agent_continue_response_streamer( + agent: Union[Agent, RemoteAgent], + run_id: str, + updated_tools: List, + session_id: Optional[str] = None, + user_id: Optional[str] = None, + background_tasks: Optional[BackgroundTasks] = None, + auth_token: Optional[str] = None, +) -> AsyncGenerator: + """SSE generator for continue_run. Decoupled from agent execution via background task.""" + primary_queue: asyncio.Queue[Optional[str]] = asyncio.Queue() + + asyncio.create_task( + _agent_continue_run_producer( + agent=agent, + run_id=run_id, + updated_tools=updated_tools, + primary_queue=primary_queue, + session_id=session_id, + user_id=user_id, + background_tasks=background_tasks, + auth_token=auth_token, + ) + ) + + while True: + sse_data = await primary_queue.get() + if sse_data is None: + break + yield sse_data + + +async def _resume_stream_generator( + agent: Union[Agent, RemoteAgent], + run_id: str, + last_event_index: Optional[int], + session_id: Optional[str], +) -> AsyncGenerator: + """SSE generator for the /resume endpoint. + + Three reconnection paths: + 1. Run still active (in buffer): replay missed events + subscribe for live events via Queue + 2. Run completed (in buffer): replay all events since last_event_index + 3. Not in buffer: fall back to database replay + """ + buffer_status = event_buffer.get_run_status(run_id) + + if buffer_status is None: + # PATH 3: Not in buffer -- fall back to database + if session_id and not isinstance(agent, RemoteAgent): + run_output = await agent.aget_run_output(run_id=run_id, session_id=session_id) + if run_output and run_output.events: + meta: dict = { + "event": "replay", + "run_id": run_id, + "status": run_output.status.value if run_output.status else "unknown", + "total_events": len(run_output.events), + "message": "Run completed. Replaying all events from database.", + } + yield f"event: replay\ndata: {json.dumps(meta)}\n\n" + + for idx, event in enumerate(run_output.events): + event_dict = event.to_dict() + event_dict["event_index"] = idx + if "run_id" not in event_dict: + event_dict["run_id"] = run_id + event_type = event_dict.get("event", "message") + yield f"event: {event_type}\ndata: {json.dumps(event_dict, separators=(',', ':'), default=json_serializer, ensure_ascii=False)}\n\n" + return + elif run_output: + meta = { + "event": "replay", + "run_id": run_id, + "status": run_output.status.value if run_output.status else "unknown", + "total_events": 0, + "message": "Run completed but no events stored.", + } + yield f"event: replay\ndata: {json.dumps(meta)}\n\n" + return + + # Run not found anywhere + error = {"event": "error", "error": f"Run {run_id} not found in buffer or database"} + yield f"event: error\ndata: {json.dumps(error)}\n\n" + return + + if buffer_status in (RunStatus.completed, RunStatus.error, RunStatus.cancelled, RunStatus.paused): + # PATH 2: Run finished -- replay missed events from buffer + total_buffered = event_buffer.get_event_count(run_id) + missed_events = event_buffer.get_events(run_id, last_event_index=last_event_index) + log_debug( + f"Resume PATH 2: run_id={run_id}, status={buffer_status.value}, " + f"last_event_index={last_event_index}, total_buffered={total_buffered}, " + f"missed_events={len(missed_events)}" + ) + + meta = { + "event": "replay", + "run_id": run_id, + "status": buffer_status.value, + "total_events": len(missed_events), + "total_buffered": total_buffered, + "last_event_index_requested": last_event_index if last_event_index is not None else -1, + "message": f"Run {buffer_status.value}. Replaying {len(missed_events)} missed events (of {total_buffered} total).", + } + yield f"event: replay\ndata: {json.dumps(meta)}\n\n" + + start_index = (last_event_index + 1) if last_event_index is not None else 0 + for idx, buffered_event in enumerate(missed_events): + event_dict = buffered_event.to_dict() + event_dict["event_index"] = start_index + idx + if "run_id" not in event_dict: + event_dict["run_id"] = run_id + event_type = event_dict.get("event", "message") + yield f"event: {event_type}\ndata: {json.dumps(event_dict, separators=(',', ':'), default=json_serializer, ensure_ascii=False)}\n\n" return + # PATH 1: Run still active -- subscribe FIRST (to avoid race condition), then replay missed events + queue = sse_subscriber_manager.subscribe(run_id) + + try: + missed_events = event_buffer.get_events(run_id, last_event_index) + current_count = event_buffer.get_event_count(run_id) + + # Track the highest replayed event_index for dedup against queue events + last_replayed_index = last_event_index if last_event_index is not None else -1 + + if missed_events: + meta = { + "event": "catch_up", + "run_id": run_id, + "status": "running", + "missed_events": len(missed_events), + "current_event_count": current_count, + "message": f"Catching up on {len(missed_events)} missed events.", + } + yield f"event: catch_up\ndata: {json.dumps(meta)}\n\n" + + start_index = (last_event_index + 1) if last_event_index is not None else 0 + for idx, buffered_event in enumerate(missed_events): + current_idx = start_index + idx + event_dict = buffered_event.to_dict() + event_dict["event_index"] = current_idx + if "run_id" not in event_dict: + event_dict["run_id"] = run_id + event_type = event_dict.get("event", "message") + yield f"event: {event_type}\ndata: {json.dumps(event_dict, separators=(',', ':'), default=json_serializer, ensure_ascii=False)}\n\n" + last_replayed_index = current_idx + + # Re-check buffer status after subscribing: the run may have completed + # between our initial status check and now. If so, replay remaining events + # from buffer instead of waiting on the queue (the sentinel was already pushed + # before our subscription existed). + updated_status = event_buffer.get_run_status(run_id) + if updated_status is not None and updated_status != RunStatus.running: + # Run completed while we were catching up -- replay remaining from buffer + remaining = event_buffer.get_events(run_id, last_event_index=last_replayed_index) + if remaining: + replay_start = last_replayed_index + 1 + for idx, buffered_event in enumerate(remaining): + current_idx = replay_start + idx + event_dict = buffered_event.to_dict() + event_dict["event_index"] = current_idx + if "run_id" not in event_dict: + event_dict["run_id"] = run_id + event_type = event_dict.get("event", "message") + yield f"event: {event_type}\ndata: {json.dumps(event_dict, separators=(',', ':'), default=json_serializer, ensure_ascii=False)}\n\n" + return + + # Confirm subscription for live events + subscribed = { + "event": "subscribed", + "run_id": run_id, + "status": "running", + "current_event_count": current_count, + "message": "Subscribed to agent run. Receiving live events.", + } + yield f"event: subscribed\ndata: {json.dumps(subscribed)}\n\n" + + log_debug(f"SSE client subscribed to agent run {run_id} (last_event_index: {last_event_index})") + + # Read from queue, dedup events already replayed by event_index + while True: + sse_data = await queue.get() + if sse_data is None: + # Sentinel: run completed + break + # Dedup: extract event_index from the SSE data and skip if already replayed + try: + data_line = sse_data.split("data: ", 1)[1].split("\n\n")[0] + parsed = json.loads(data_line) + ev_idx = parsed.get("event_index") + if ev_idx is not None and ev_idx <= last_replayed_index: + continue + except Exception: + pass + yield sse_data + finally: + sse_subscriber_manager.unsubscribe(run_id, queue) + def get_agent_router( os: "AgentOS", @@ -749,6 +1135,50 @@ async def get_agent_run( return run_output.to_dict() + @router.get( + "/agents/{agent_id}/runs/{run_id}/resume", + tags=["Agents"], + operation_id="resume_agent_run_stream", + summary="Resume Agent Run Stream", + description=( + "Resume an SSE stream for an agent run after disconnection.\n\n" + "Sends missed events since `last_event_index`, then continues streaming " + "live events if the run is still active.\n\n" + "**Three reconnection paths:**\n" + "1. **Run still active**: Sends catch-up events + continues live streaming\n" + "2. **Run completed (in buffer)**: Replays missed buffered events\n" + "3. **Run completed (in database)**: Replays events from database\n\n" + "**Client usage:**\n" + "Track `event_index` from each SSE event. On reconnection, pass the last " + "received `event_index` as `last_event_index` query parameter." + ), + responses={ + 200: { + "description": "SSE stream of catch-up and/or live events", + "content": {"text/event-stream": {}}, + }, + 400: {"description": "Not supported for remote agents", "model": BadRequestResponse}, + 404: {"description": "Agent not found", "model": NotFoundResponse}, + }, + dependencies=[Depends(require_resource_access("agents", "run", "agent_id"))], + ) + async def resume_agent_run_stream( + agent_id: str, + run_id: str, + last_event_index: Optional[int] = Query(None, description="Index of last event received by client (0-based)"), + session_id: Optional[str] = Query(None, description="Session ID for database fallback"), + ): + agent = get_agent_by_id(agent_id=agent_id, agents=os.agents, db=os.db, registry=os.registry, create_fresh=True) + if agent is None: + raise HTTPException(status_code=404, detail="Agent not found") + if isinstance(agent, RemoteAgent): + raise HTTPException(status_code=400, detail="Stream resumption is not supported for remote agents") + + return StreamingResponse( + _resume_stream_generator(agent, run_id, last_event_index, session_id), + media_type="text/event-stream", + ) + @router.get( "/agents/{agent_id}/runs", tags=["Agents"], diff --git a/libs/agno/agno/os/utils.py b/libs/agno/agno/os/utils.py index 6b76ca181a..2cab14ee01 100644 --- a/libs/agno/agno/os/utils.py +++ b/libs/agno/agno/os/utils.py @@ -199,6 +199,42 @@ def format_sse_event(event: Union[RunOutputEvent, TeamRunOutputEvent, WorkflowRu return f"event: message\ndata: {clean_json}\n\n" +def format_sse_event_with_index( + event: Union[RunOutputEvent, TeamRunOutputEvent, WorkflowRunOutputEvent], + event_index: Optional[int] = None, + run_id: Optional[str] = None, +) -> str: + """Format an event as SSE with injected event_index and run_id. + + Used by the agent/team response streamers to include reconnection metadata + in SSE payloads without modifying the core event dataclasses. + + Args: + event: The event object to serialize. + event_index: Buffer index for reconnection tracking. + run_id: Run ID to inject if not already present on the event. + + Returns: + SSE-formatted string with event_index in the data payload. + """ + from agno.utils.serialize import json_serializer + + try: + event_type = event.event or "message" + event_dict = event.to_dict() + + if event_index is not None: + event_dict["event_index"] = event_index + if run_id and "run_id" not in event_dict: + event_dict["run_id"] = run_id + + clean_json = json.dumps(event_dict, separators=(",", ":"), default=json_serializer, ensure_ascii=False) + return f"event: {event_type}\ndata: {clean_json}\n\n" + except Exception: + clean_json = event.to_json(separators=(",", ":"), indent=None) + return f"event: message\ndata: {clean_json}\n\n" + + async def get_db( dbs: dict[str, list[Union[BaseDb, AsyncBaseDb, RemoteDb]]], db_id: Optional[str] = None, table: Optional[str] = None ) -> Union[BaseDb, AsyncBaseDb, RemoteDb]: From 567ccfc70f45e0d80a5ebb2f70fc1771e892e659 Mon Sep 17 00:00:00 2001 From: kausmeows Date: Tue, 3 Mar 2026 18:14:01 +0530 Subject: [PATCH 02/10] chore: only do for background=True and stream=True --- .../05_agent_os/client/10_sse_reconnect.py | 11 +- libs/agno/agno/os/routers/agents/router.py | 146 +++++++++++++++++- 2 files changed, 147 insertions(+), 10 deletions(-) diff --git a/cookbook/05_agent_os/client/10_sse_reconnect.py b/cookbook/05_agent_os/client/10_sse_reconnect.py index 50940d525e..1e43180442 100644 --- a/cookbook/05_agent_os/client/10_sse_reconnect.py +++ b/cookbook/05_agent_os/client/10_sse_reconnect.py @@ -2,8 +2,14 @@ SSE Reconnection Test ===================== -Tests SSE stream reconnection for agent runs: start a streaming run, disconnect -mid-stream, then reconnect via the /resume endpoint and catch up on missed events. +Tests SSE stream reconnection for agent runs using background=True, stream=True. +When background=True, the agent runs in a detached task that survives client +disconnections. Events are buffered so the client can reconnect via /resume. + +Steps: +1. Start a streaming run with background=true +2. Disconnect after a few events +3. Reconnect via /resume and catch up on missed events Prerequisites: 1. Start the AgentOS server: python cookbook/05_agent_os/basic.py @@ -72,6 +78,7 @@ async def test_sse_reconnection(): form_data = { "message": "Tell me a detailed story about a brave knight who goes on a quest. Make it at least 5 paragraphs long.", "stream": "true", + "background": "true", } async with client.stream( "POST", f"/agents/{agent_id}/runs", data=form_data diff --git a/libs/agno/agno/os/routers/agents/router.py b/libs/agno/agno/os/routers/agents/router.py index 99fb7c0fef..c45999fa97 100644 --- a/libs/agno/agno/os/routers/agents/router.py +++ b/libs/agno/agno/os/routers/agents/router.py @@ -39,6 +39,7 @@ ) from agno.os.settings import AgnoAPISettings from agno.os.utils import ( + format_sse_event, format_sse_event_with_index, get_agent_by_id, get_request_kwargs, @@ -70,12 +71,68 @@ async def agent_response_streamer( auth_token: Optional[str] = None, **kwargs: Any, ) -> AsyncGenerator: - """SSE generator that reads from the agent's event queue. + """Default SSE generator. Agent runs inline — if client disconnects, agent is cancelled.""" + try: + if background_tasks is not None: + kwargs["background_tasks"] = background_tasks + + if "stream_events" in kwargs: + stream_events = kwargs.pop("stream_events") + else: + stream_events = True + + if auth_token and isinstance(agent, RemoteAgent): + kwargs["auth_token"] = auth_token + + run_response = agent.arun( + input=message, + session_id=session_id, + user_id=user_id, + images=images, + audio=audio, + videos=videos, + files=files, + stream=True, + stream_events=stream_events, + **kwargs, + ) + async for run_response_chunk in run_response: + yield format_sse_event(run_response_chunk) # type: ignore + except (InputCheckError, OutputCheckError) as e: + error_response = RunErrorEvent( + content=str(e), + error_type=e.type, + error_id=e.error_id, + additional_data=e.additional_data, + ) + yield format_sse_event(error_response) + except Exception as e: + import traceback + + traceback.print_exc(limit=3) + error_response = RunErrorEvent( + content=str(e), + ) + yield format_sse_event(error_response) - The actual agent execution runs in a detached asyncio.Task (_agent_run_producer) - so it survives client disconnections. This generator simply subscribes to the - run's event queue and yields SSE data. When the client disconnects, only this - generator is cancelled -- the producer keeps running. + +async def agent_resumable_response_streamer( + agent: Union[Agent, RemoteAgent], + message: str, + session_id: Optional[str] = None, + user_id: Optional[str] = None, + images: Optional[List[Image]] = None, + audio: Optional[List[Audio]] = None, + videos: Optional[List[Video]] = None, + files: Optional[List[FileMedia]] = None, + background_tasks: Optional[BackgroundTasks] = None, + auth_token: Optional[str] = None, + **kwargs: Any, +) -> AsyncGenerator: + """Resumable SSE generator for background=True, stream=True. + + The actual agent execution runs in a detached asyncio.Task so it survives + client disconnections. Events are buffered for reconnection via /resume. """ # Create a queue that the producer will publish to via SSESubscriberManager # We need a pre-run queue since run_id isn't known yet. Use a dedicated queue @@ -357,7 +414,54 @@ async def agent_continue_response_streamer( background_tasks: Optional[BackgroundTasks] = None, auth_token: Optional[str] = None, ) -> AsyncGenerator: - """SSE generator for continue_run. Decoupled from agent execution via background task.""" + """Default SSE generator for continue_run. Agent runs inline — client disconnect cancels agent.""" + try: + extra_kwargs: dict = {} + if auth_token and isinstance(agent, RemoteAgent): + extra_kwargs["auth_token"] = auth_token + + continue_response = agent.acontinue_run( + run_id=run_id, + updated_tools=updated_tools, + session_id=session_id, + user_id=user_id, + stream=True, + stream_events=True, + background_tasks=background_tasks, + **extra_kwargs, + ) + async for run_response_chunk in continue_response: + yield format_sse_event(run_response_chunk) # type: ignore + except (InputCheckError, OutputCheckError) as e: + error_response = RunErrorEvent( + content=str(e), + error_type=e.type, + error_id=e.error_id, + additional_data=e.additional_data, + ) + yield format_sse_event(error_response) + except Exception as e: + import traceback + + traceback.print_exc(limit=3) + error_response = RunErrorEvent( + content=str(e), + error_type=e.type if hasattr(e, "type") else None, + error_id=e.error_id if hasattr(e, "error_id") else None, + ) + yield format_sse_event(error_response) + + +async def agent_resumable_continue_response_streamer( + agent: Union[Agent, RemoteAgent], + run_id: str, + updated_tools: List, + session_id: Optional[str] = None, + user_id: Optional[str] = None, + background_tasks: Optional[BackgroundTasks] = None, + auth_token: Optional[str] = None, +) -> AsyncGenerator: + """Resumable SSE generator for continue_run (background=True). Decoupled from agent execution.""" primary_queue: asyncio.Queue[Optional[str]] = asyncio.Queue() asyncio.create_task( @@ -743,10 +847,33 @@ async def create_agent_run( # Extract auth token for remote agents auth_token = get_auth_token_from_request(request) - # Background execution: return 202 immediately with run metadata + # Background execution if background: if isinstance(agent, RemoteAgent): raise HTTPException(status_code=400, detail="Background execution is not supported for remote agents") + + if stream: + # background=True, stream=True: resumable SSE streaming + # Agent runs in a detached asyncio.Task that survives client disconnections. + # Events are buffered for reconnection via /resume endpoint. + return StreamingResponse( + agent_resumable_response_streamer( + agent, + message, + session_id=session_id, + user_id=user_id, + images=base64_images if base64_images else None, + audio=base64_audios if base64_audios else None, + videos=base64_videos if base64_videos else None, + files=input_files if input_files else None, + background_tasks=background_tasks, + auth_token=auth_token, + **kwargs, + ), + media_type="text/event-stream", + ) + + # background=True, stream=False: return 202 immediately with run metadata if not agent.db: raise HTTPException( status_code=400, detail="Background execution requires a database to be configured on the agent" @@ -895,6 +1022,7 @@ async def continue_agent_run( session_id: Optional[str] = Form(None), user_id: Optional[str] = Form(None), stream: bool = Form(True), + background: bool = Form(False), ): if hasattr(request.state, "user_id") and request.state.user_id is not None: user_id = request.state.user_id @@ -956,8 +1084,10 @@ async def continue_agent_run( auth_token = get_auth_token_from_request(request) if stream: + # Use resumable streamer when background=True (agent survives client disconnect) + streamer = agent_resumable_continue_response_streamer if background else agent_continue_response_streamer return StreamingResponse( - agent_continue_response_streamer( + streamer( agent, run_id=run_id, # run_id from path updated_tools=updated_tools, From fe59ef4ad380c94fafaca066326752240b782356 Mon Sep 17 00:00:00 2001 From: kausmeows Date: Tue, 3 Mar 2026 18:22:59 +0530 Subject: [PATCH 03/10] fix: mypy --- libs/agno/agno/os/routers/agents/router.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/libs/agno/agno/os/routers/agents/router.py b/libs/agno/agno/os/routers/agents/router.py index c45999fa97..7c07ede5e9 100644 --- a/libs/agno/agno/os/routers/agents/router.py +++ b/libs/agno/agno/os/routers/agents/router.py @@ -408,7 +408,7 @@ async def _agent_continue_run_producer( async def agent_continue_response_streamer( agent: Union[Agent, RemoteAgent], run_id: str, - updated_tools: List, + updated_tools: Optional[List] = None, session_id: Optional[str] = None, user_id: Optional[str] = None, background_tasks: Optional[BackgroundTasks] = None, @@ -455,7 +455,7 @@ async def agent_continue_response_streamer( async def agent_resumable_continue_response_streamer( agent: Union[Agent, RemoteAgent], run_id: str, - updated_tools: List, + updated_tools: Optional[List] = None, session_id: Optional[str] = None, user_id: Optional[str] = None, background_tasks: Optional[BackgroundTasks] = None, From 96adc8b726a9f61345b2e859e62f3a5bd4f02085 Mon Sep 17 00:00:00 2001 From: Kaustubh Date: Tue, 3 Mar 2026 18:27:02 +0530 Subject: [PATCH 04/10] update --- cookbook/05_agent_os/client/10_sse_reconnect.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cookbook/05_agent_os/client/10_sse_reconnect.py b/cookbook/05_agent_os/client/10_sse_reconnect.py index 1e43180442..269e0ab18e 100644 --- a/cookbook/05_agent_os/client/10_sse_reconnect.py +++ b/cookbook/05_agent_os/client/10_sse_reconnect.py @@ -1,5 +1,5 @@ """ -SSE Reconnection Test +SSE Reconnection ===================== Tests SSE stream reconnection for agent runs using background=True, stream=True. From 74623482493d647eadd58056d594f5d15390da7e Mon Sep 17 00:00:00 2001 From: kausmeows Date: Thu, 5 Mar 2026 14:36:03 +0530 Subject: [PATCH 05/10] update --- libs/agno/agno/agent-run.py | 73 ------------------------------------- 1 file changed, 73 deletions(-) delete mode 100644 libs/agno/agno/agent-run.py diff --git a/libs/agno/agno/agent-run.py b/libs/agno/agno/agent-run.py deleted file mode 100644 index 8cf72174f3..0000000000 --- a/libs/agno/agno/agent-run.py +++ /dev/null @@ -1,73 +0,0 @@ -"""Minimal example for AgentOS.""" - -from agno.agent import Agent -from agno.db.postgres import PostgresDb -from agno.models.openai import OpenAIChat -from agno.os import AgentOS -from agno.team import Team -from agno.workflow.step import Step -from agno.workflow.workflow import Workflow - -# --------------------------------------------------------------------------- -# Create Example -# --------------------------------------------------------------------------- - -# Setup the database -db = PostgresDb(id="basic-db", db_url="postgresql+psycopg://ai:ai@localhost:5532/ai") - -# Setup basic agents, teams and workflows -basic_agent = Agent( - name="Basic Agent", - db=db, - enable_session_summaries=True, - update_memory_on_run=True, - add_history_to_context=True, - num_history_runs=3, - add_datetime_to_context=True, - markdown=True, -) -basic_team = Team( - id="basic-team", - name="Basic Team", - model=OpenAIChat(id="gpt-4o"), - db=db, - members=[basic_agent], - update_memory_on_run=True, -) -basic_workflow = Workflow( - id="basic-workflow", - name="Basic Workflow", - description="Just a simple workflow", - db=db, - steps=[ - Step( - name="step1", - description="Just a simple step", - agent=basic_agent, - ) - ], - add_workflow_history_to_steps=True, -) - -# Setup our AgentOS app -agent_os = AgentOS( - description="Example app for basic agent, team and workflow", - agents=[basic_agent], - teams=[basic_team], - workflows=[basic_workflow], -) -app = agent_os.get_app() - - -# --------------------------------------------------------------------------- -# Run Example -# --------------------------------------------------------------------------- - -if __name__ == "__main__": - """Run your AgentOS. - - You can see the configuration and available apps at: - http://localhost:7777/config - - """ - agent_os.serve(app="agent-run:app", reload=True) From cd4628bae0a2122c22dd9e580662380f71c8732e Mon Sep 17 00:00:00 2001 From: kausmeows Date: Thu, 5 Mar 2026 15:03:27 +0530 Subject: [PATCH 06/10] chore: persist running status --- libs/agno/agno/agent/_run.py | 80 ++++++++++++++++++++-- libs/agno/agno/os/app.py | 16 ++++- libs/agno/agno/os/routers/agents/router.py | 23 +++---- 3 files changed, 96 insertions(+), 23 deletions(-) diff --git a/libs/agno/agno/agent/_run.py b/libs/agno/agno/agent/_run.py index 2e5b5af833..1fc1d808e1 100644 --- a/libs/agno/agno/agent/_run.py +++ b/libs/agno/agno/agent/_run.py @@ -1884,6 +1884,62 @@ async def _background_task() -> None: return run_response +async def _arun_background_stream( + agent: Agent, + run_response: RunOutput, + run_context: RunContext, + session_id: str, + user_id: Optional[str] = None, + add_history_to_context: Optional[bool] = None, + add_dependencies_to_context: Optional[bool] = None, + add_session_state_to_context: Optional[bool] = None, + response_format: Optional[Union[Dict, Type[BaseModel]]] = None, + stream_events: bool = False, + yield_run_output: Optional[bool] = None, + debug_mode: Optional[bool] = None, + background_tasks: Optional[Any] = None, + **kwargs: Any, +) -> AsyncIterator[Union[RunOutputEvent, RunOutput]]: + """Streaming agent run with RUNNING status persisted in DB before streaming begins. + + Persists the run with RUNNING status, then delegates to _arun_stream. + The caller (e.g. router) is responsible for running this in a detached task + if client-disconnect survival is needed. + """ + from agno.agent._session import asave_session + from agno.agent._storage import aread_or_create_session, update_metadata + + # Persist RUNNING status so the run is visible in the DB immediately + run_response.status = RunStatus.running + + agent_session = await aread_or_create_session(agent, session_id=session_id, user_id=user_id) + update_metadata(agent, session=agent_session) + agent_session.upsert_run(run=run_response) + await asave_session(agent, session=agent_session) + + log_info(f"Background stream run {run_response.run_id} persisted with RUNNING status") + + # Delegate to _arun_stream for actual execution + async for event in _arun_stream( + agent, + run_response=run_response, + run_context=run_context, + user_id=user_id, + response_format=response_format, + stream_events=stream_events, + yield_run_output=yield_run_output, + session_id=session_id, + add_history_to_context=add_history_to_context, + add_dependencies_to_context=add_dependencies_to_context, + add_session_state_to_context=add_session_state_to_context, + debug_mode=debug_mode, + background_tasks=background_tasks, + pre_session=agent_session, + **kwargs, + ): + yield event + + async def _arun_stream( agent: Agent, run_response: RunOutput, @@ -2572,16 +2628,30 @@ def arun_dispatch( # type: ignore run_response.metrics = RunMetrics() run_response.metrics.start_timer() - # Background execution: return immediately with PENDING status + # Background execution if background: - if opts.stream: - raise ValueError( - "Background execution cannot be combined with streaming. Set stream=False when using background=True." - ) if not agent.db: raise ValueError( "Background execution requires a database to be configured on the agent for run persistence." ) + if opts.stream: + # background=True, stream=True: run in background task, stream events via queue + return _arun_background_stream( # type: ignore[return-value] + agent, + run_response=run_response, + run_context=run_context, + user_id=user_id, + response_format=response_format, + stream_events=opts.stream_events, + yield_run_output=opts.yield_run_output, + session_id=session_id, + add_history_to_context=opts.add_history_to_context, + add_dependencies_to_context=opts.add_dependencies_to_context, + add_session_state_to_context=opts.add_session_state_to_context, + debug_mode=debug_mode, + background_tasks=background_tasks, + **kwargs, + ) return _arun_background( # type: ignore[return-value] agent, run_response=run_response, diff --git a/libs/agno/agno/os/app.py b/libs/agno/agno/os/app.py index c5430096b7..93c274b677 100644 --- a/libs/agno/agno/os/app.py +++ b/libs/agno/agno/os/app.py @@ -399,7 +399,11 @@ def _reprovision_routers(self, app: FastAPI) -> None: updated_routers.append(get_schedule_router(os_db=self.db, settings=self.settings)) updated_routers.append(get_approval_router(os_db=self.db, settings=self.settings)) else: - for prefix, tag in [("/components", "Components"), ("/schedules", "Schedules"), ("/approvals", "Approvals")]: + for prefix, tag in [ + ("/components", "Components"), + ("/schedules", "Schedules"), + ("/approvals", "Approvals"), + ]: updated_routers.append(_get_disabled_feature_router(prefix, tag, "db")) # Registry router if self.registry is not None: @@ -738,8 +742,14 @@ def get_app(self) -> FastAPI: routers.append(get_schedule_router(os_db=self.db, settings=self.settings)) routers.append(get_approval_router(os_db=self.db, settings=self.settings)) else: - log_debug("Components, Scheduler, and Approval routers not enabled: requires a db to be provided to AgentOS") - for prefix, tag in [("/components", "Components"), ("/schedules", "Schedules"), ("/approvals", "Approvals")]: + log_debug( + "Components, Scheduler, and Approval routers not enabled: requires a db to be provided to AgentOS" + ) + for prefix, tag in [ + ("/components", "Components"), + ("/schedules", "Schedules"), + ("/approvals", "Approvals"), + ]: routers.append(_get_disabled_feature_router(prefix, tag, "db")) # Registry router diff --git a/libs/agno/agno/os/routers/agents/router.py b/libs/agno/agno/os/routers/agents/router.py index 7c07ede5e9..9492b7265b 100644 --- a/libs/agno/agno/os/routers/agents/router.py +++ b/libs/agno/agno/os/routers/agents/router.py @@ -22,13 +22,13 @@ from agno.exceptions import InputCheckError, OutputCheckError from agno.media import Audio, Image, Video from agno.media import File as FileMedia -from agno.os.managers import event_buffer, sse_subscriber_manager from agno.os.auth import ( get_auth_token_from_request, get_authentication_dependency, require_approval_resolved, require_resource_access, ) +from agno.os.managers import event_buffer, sse_subscriber_manager from agno.os.routers.agents.schema import AgentResponse from agno.os.schema import ( BadRequestResponse, @@ -131,17 +131,14 @@ async def agent_resumable_response_streamer( ) -> AsyncGenerator: """Resumable SSE generator for background=True, stream=True. - The actual agent execution runs in a detached asyncio.Task so it survives - client disconnections. Events are buffered for reconnection via /resume. + A detached asyncio.Task iterates the agent's background stream, buffers events, + and publishes to SSE subscribers — all of which survive client disconnections. + This generator reads from a primary queue and yields SSE data to the client. """ - # Create a queue that the producer will publish to via SSESubscriberManager - # We need a pre-run queue since run_id isn't known yet. Use a dedicated queue - # that the producer writes to directly. primary_queue: asyncio.Queue[Optional[str]] = asyncio.Queue() - # Launch the producer as a detached background task - async def _producer_wrapper() -> None: - """Wrapper that publishes to the primary queue in addition to SSE subscribers.""" + async def _producer() -> None: + """Detached task: iterates agent stream, buffers events, publishes to subscribers.""" run_id: Optional[str] = None final_status = RunStatus.completed @@ -167,6 +164,7 @@ async def _producer_wrapper() -> None: files=files, stream=True, stream_events=stream_events, + background=True, **kwargs, ) async for run_response_chunk in run_response: @@ -189,13 +187,11 @@ async def _producer_wrapper() -> None: run_response_chunk, event_index=event_index, run_id=buffer_run_id ) - # Publish to primary queue (original client) try: await primary_queue.put(sse_data) except Exception: pass - # Publish to SSE subscriber queues (resumed clients) if buffer_run_id: try: await sse_subscriber_manager.publish(buffer_run_id, sse_data) @@ -268,16 +264,13 @@ async def _producer_wrapper() -> None: await sse_subscriber_manager.complete(run_id) except Exception: pass - # Signal the primary queue that the run is done try: await primary_queue.put(None) except Exception: pass - # Start the producer as a detached task (survives client disconnect) - asyncio.create_task(_producer_wrapper()) + asyncio.create_task(_producer()) - # Read from the primary queue and yield SSE data while True: sse_data = await primary_queue.get() if sse_data is None: From 3b68bc62296ab3bf817d741469466da1c5364958 Mon Sep 17 00:00:00 2001 From: kausmeows Date: Thu, 5 Mar 2026 15:37:40 +0530 Subject: [PATCH 07/10] chore: refactor to move producer in agent level --- libs/agno/agno/agent/_run.py | 132 +++++++-- libs/agno/agno/os/routers/agents/router.py | 330 ++------------------- 2 files changed, 137 insertions(+), 325 deletions(-) diff --git a/libs/agno/agno/agent/_run.py b/libs/agno/agno/agent/_run.py index 1fc1d808e1..f197e6d953 100644 --- a/libs/agno/agno/agent/_run.py +++ b/libs/agno/agno/agent/_run.py @@ -1899,17 +1899,26 @@ async def _arun_background_stream( debug_mode: Optional[bool] = None, background_tasks: Optional[Any] = None, **kwargs: Any, -) -> AsyncIterator[Union[RunOutputEvent, RunOutput]]: - """Streaming agent run with RUNNING status persisted in DB before streaming begins. +) -> AsyncIterator[str]: + """Background streaming agent run that survives client disconnections. + + 1. Persists RUNNING status in DB + 2. Spawns a detached asyncio.Task that runs _arun_stream + 3. Buffers events (via event_buffer) and publishes to SSE subscribers + 4. Yields SSE-formatted strings via an asyncio.Queue + + The detached task keeps running even if the client disconnects. + The caller (router) just yields the SSE strings to the client. - Persists the run with RUNNING status, then delegates to _arun_stream. - The caller (e.g. router) is responsible for running this in a detached task - if client-disconnect survival is needed. + Similar to how Workflow._arun_background_stream handles WebSocket streaming, + but uses SSE transport with event_buffer and sse_subscriber_manager. """ from agno.agent._session import asave_session from agno.agent._storage import aread_or_create_session, update_metadata - # Persist RUNNING status so the run is visible in the DB immediately + run_id = run_response.run_id + + # 1. Persist RUNNING status so the run is visible in the DB immediately run_response.status = RunStatus.running agent_session = await aread_or_create_session(agent, session_id=session_id, user_id=user_id) @@ -1917,27 +1926,98 @@ async def _arun_background_stream( agent_session.upsert_run(run=run_response) await asave_session(agent, session=agent_session) - log_info(f"Background stream run {run_response.run_id} persisted with RUNNING status") + log_info(f"Background stream run {run_id} persisted with RUNNING status") - # Delegate to _arun_stream for actual execution - async for event in _arun_stream( - agent, - run_response=run_response, - run_context=run_context, - user_id=user_id, - response_format=response_format, - stream_events=stream_events, - yield_run_output=yield_run_output, - session_id=session_id, - add_history_to_context=add_history_to_context, - add_dependencies_to_context=add_dependencies_to_context, - add_session_state_to_context=add_session_state_to_context, - debug_mode=debug_mode, - background_tasks=background_tasks, - pre_session=agent_session, - **kwargs, - ): - yield event + # 2. Create queue for forwarding SSE strings to the caller + sse_queue: asyncio.Queue[Optional[str]] = asyncio.Queue() + + # 3. Spawn detached background task + async def _background_producer() -> None: + try: + async for event in _arun_stream( + agent, + run_response=run_response, + run_context=run_context, + user_id=user_id, + response_format=response_format, + stream_events=stream_events, + yield_run_output=yield_run_output, + session_id=session_id, + add_history_to_context=add_history_to_context, + add_dependencies_to_context=add_dependencies_to_context, + add_session_state_to_context=add_session_state_to_context, + debug_mode=debug_mode, + background_tasks=background_tasks, + pre_session=agent_session, + **kwargs, + ): + if isinstance(event, RunOutput): + continue + + # Buffer event for reconnection support + event_index: Optional[int] = None + try: + from agno.os.managers import event_buffer + + event_index = event_buffer.add_event(run_id, event) + except Exception: + pass + + # Format as SSE + from agno.os.utils import format_sse_event_with_index + + sse_data = format_sse_event_with_index(event, event_index=event_index, run_id=run_id) + + # Push to primary queue (original client) + try: + await sse_queue.put(sse_data) + except Exception: + pass + + # Publish to SSE subscribers (resumed clients) + try: + from agno.os.managers import sse_subscriber_manager + + await sse_subscriber_manager.publish(run_id, sse_data) + except Exception: + pass + + except Exception: + log_error(f"Background stream run {run_id} failed", exc_info=True) + + finally: + # Mark run completed in event buffer (status is set by _arun_stream/acleanup_and_store) + try: + from agno.os.managers import event_buffer + + event_buffer.set_run_completed(run_id, run_response.status or RunStatus.completed) + except Exception: + pass + + # Signal SSE subscribers that run is done + try: + from agno.os.managers import sse_subscriber_manager + + await sse_subscriber_manager.complete(run_id) + except Exception: + pass + + # Signal primary queue that run is done + try: + await sse_queue.put(None) + except Exception: + pass + + task = asyncio.create_task(_background_producer()) + _background_tasks.add(task) + task.add_done_callback(_background_tasks.discard) + + # 4. Yield SSE strings from the queue + while True: + sse_data = await sse_queue.get() + if sse_data is None: + break + yield sse_data async def _arun_stream( diff --git a/libs/agno/agno/os/routers/agents/router.py b/libs/agno/agno/os/routers/agents/router.py index 9492b7265b..c122f3d985 100644 --- a/libs/agno/agno/os/routers/agents/router.py +++ b/libs/agno/agno/os/routers/agents/router.py @@ -131,273 +131,40 @@ async def agent_resumable_response_streamer( ) -> AsyncGenerator: """Resumable SSE generator for background=True, stream=True. - A detached asyncio.Task iterates the agent's background stream, buffers events, - and publishes to SSE subscribers — all of which survive client disconnections. - This generator reads from a primary queue and yields SSE data to the client. + Delegates to agent.arun(background=True, stream=True) which handles: + - Persisting RUNNING status in DB + - Running agent in a detached asyncio.Task (survives client disconnect) + - Buffering events for reconnection via /resume + - Publishing to SSE subscribers for resumed clients + - Yielding SSE-formatted strings via a queue """ - primary_queue: asyncio.Queue[Optional[str]] = asyncio.Queue() - - async def _producer() -> None: - """Detached task: iterates agent stream, buffers events, publishes to subscribers.""" - run_id: Optional[str] = None - final_status = RunStatus.completed - - try: - if background_tasks is not None: - kwargs["background_tasks"] = background_tasks - - if "stream_events" in kwargs: - stream_events = kwargs.pop("stream_events") - else: - stream_events = True - - if auth_token and isinstance(agent, RemoteAgent): - kwargs["auth_token"] = auth_token - - run_response = agent.arun( - input=message, - session_id=session_id, - user_id=user_id, - images=images, - audio=audio, - videos=videos, - files=files, - stream=True, - stream_events=stream_events, - background=True, - **kwargs, - ) - async for run_response_chunk in run_response: - chunk_run_id = getattr(run_response_chunk, "run_id", None) - if run_id is None and chunk_run_id: - run_id = chunk_run_id - - if isinstance(run_response_chunk, RunOutput): - continue - - event_index: Optional[int] = None - buffer_run_id = run_id or chunk_run_id - if buffer_run_id: - try: - event_index = event_buffer.add_event(buffer_run_id, run_response_chunk) - except Exception: - pass - - sse_data = format_sse_event_with_index( - run_response_chunk, event_index=event_index, run_id=buffer_run_id - ) - - try: - await primary_queue.put(sse_data) - except Exception: - pass - - if buffer_run_id: - try: - await sse_subscriber_manager.publish(buffer_run_id, sse_data) - except Exception: - pass - - event_type = getattr(run_response_chunk, "event", "") - if event_type == RunEvent.run_error.value: - final_status = RunStatus.error - elif event_type == RunEvent.run_cancelled.value: - final_status = RunStatus.cancelled - elif event_type == RunEvent.run_paused.value: - final_status = RunStatus.paused - - except (InputCheckError, OutputCheckError) as e: - final_status = RunStatus.error - error_response = RunErrorEvent( - content=str(e), - error_type=e.type, - error_id=e.error_id, - additional_data=e.additional_data, - ) - event_index = None - if run_id: - try: - event_index = event_buffer.add_event(run_id, error_response) - except Exception: - pass - sse_data = format_sse_event_with_index(error_response, event_index=event_index, run_id=run_id) - try: - await primary_queue.put(sse_data) - except Exception: - pass - if run_id: - try: - await sse_subscriber_manager.publish(run_id, sse_data) - except Exception: - pass - except Exception as e: - import traceback - - traceback.print_exc(limit=3) - final_status = RunStatus.error - error_response = RunErrorEvent( - content=str(e), - ) - event_index = None - if run_id: - try: - event_index = event_buffer.add_event(run_id, error_response) - except Exception: - pass - sse_data = format_sse_event_with_index(error_response, event_index=event_index, run_id=run_id) - try: - await primary_queue.put(sse_data) - except Exception: - pass - if run_id: - try: - await sse_subscriber_manager.publish(run_id, sse_data) - except Exception: - pass - finally: - if run_id: - try: - event_buffer.set_run_completed(run_id, final_status) - except Exception: - pass - try: - await sse_subscriber_manager.complete(run_id) - except Exception: - pass - try: - await primary_queue.put(None) - except Exception: - pass - - asyncio.create_task(_producer()) - - while True: - sse_data = await primary_queue.get() - if sse_data is None: - break + if background_tasks is not None: + kwargs["background_tasks"] = background_tasks + + if "stream_events" in kwargs: + stream_events = kwargs.pop("stream_events") + else: + stream_events = True + + if auth_token and isinstance(agent, RemoteAgent): + kwargs["auth_token"] = auth_token + + async for sse_data in agent.arun( + input=message, + session_id=session_id, + user_id=user_id, + images=images, + audio=audio, + videos=videos, + files=files, + stream=True, + stream_events=stream_events, + background=True, + **kwargs, + ): yield sse_data -async def _agent_continue_run_producer( - agent: Union[Agent, RemoteAgent], - run_id: str, - primary_queue: "asyncio.Queue[Optional[str]]", - updated_tools: Optional[List] = None, - session_id: Optional[str] = None, - user_id: Optional[str] = None, - background_tasks: Optional[BackgroundTasks] = None, - auth_token: Optional[str] = None, -) -> None: - """Background task that runs agent.acontinue_run and publishes events.""" - final_status = RunStatus.completed - - try: - extra_kwargs: dict = {} - if auth_token and isinstance(agent, RemoteAgent): - extra_kwargs["auth_token"] = auth_token - - continue_response = agent.acontinue_run( - run_id=run_id, - updated_tools=updated_tools, - session_id=session_id, - user_id=user_id, - stream=True, - stream_events=True, - background_tasks=background_tasks, - **extra_kwargs, - ) - async for run_response_chunk in continue_response: - if isinstance(run_response_chunk, RunOutput): - continue - - event_index: Optional[int] = None - try: - event_index = event_buffer.add_event(run_id, run_response_chunk) - except Exception: - pass - - sse_data = format_sse_event_with_index(run_response_chunk, event_index=event_index, run_id=run_id) - - try: - await primary_queue.put(sse_data) - except Exception: - pass - - try: - await sse_subscriber_manager.publish(run_id, sse_data) - except Exception: - pass - - event_type = getattr(run_response_chunk, "event", "") - if event_type == RunEvent.run_error.value: - final_status = RunStatus.error - elif event_type == RunEvent.run_cancelled.value: - final_status = RunStatus.cancelled - elif event_type == RunEvent.run_paused.value: - final_status = RunStatus.paused - - except (InputCheckError, OutputCheckError) as e: - final_status = RunStatus.error - error_response = RunErrorEvent( - content=str(e), - error_type=e.type, - error_id=e.error_id, - additional_data=e.additional_data, - ) - event_index = None - try: - event_index = event_buffer.add_event(run_id, error_response) - except Exception: - pass - sse_data = format_sse_event_with_index(error_response, event_index=event_index, run_id=run_id) - try: - await primary_queue.put(sse_data) - except Exception: - pass - try: - await sse_subscriber_manager.publish(run_id, sse_data) - except Exception: - pass - - except Exception as e: - import traceback - - traceback.print_exc(limit=3) - final_status = RunStatus.error - error_response = RunErrorEvent( - content=str(e), - error_type=e.type if hasattr(e, "type") else None, - error_id=e.error_id if hasattr(e, "error_id") else None, - ) - event_index = None - try: - event_index = event_buffer.add_event(run_id, error_response) - except Exception: - pass - sse_data = format_sse_event_with_index(error_response, event_index=event_index, run_id=run_id) - try: - await primary_queue.put(sse_data) - except Exception: - pass - try: - await sse_subscriber_manager.publish(run_id, sse_data) - except Exception: - pass - finally: - try: - event_buffer.set_run_completed(run_id, final_status) - except Exception: - pass - try: - await sse_subscriber_manager.complete(run_id) - except Exception: - pass - try: - await primary_queue.put(None) - except Exception: - pass - - async def agent_continue_response_streamer( agent: Union[Agent, RemoteAgent], run_id: str, @@ -445,38 +212,6 @@ async def agent_continue_response_streamer( yield format_sse_event(error_response) -async def agent_resumable_continue_response_streamer( - agent: Union[Agent, RemoteAgent], - run_id: str, - updated_tools: Optional[List] = None, - session_id: Optional[str] = None, - user_id: Optional[str] = None, - background_tasks: Optional[BackgroundTasks] = None, - auth_token: Optional[str] = None, -) -> AsyncGenerator: - """Resumable SSE generator for continue_run (background=True). Decoupled from agent execution.""" - primary_queue: asyncio.Queue[Optional[str]] = asyncio.Queue() - - asyncio.create_task( - _agent_continue_run_producer( - agent=agent, - run_id=run_id, - updated_tools=updated_tools, - primary_queue=primary_queue, - session_id=session_id, - user_id=user_id, - background_tasks=background_tasks, - auth_token=auth_token, - ) - ) - - while True: - sse_data = await primary_queue.get() - if sse_data is None: - break - yield sse_data - - async def _resume_stream_generator( agent: Union[Agent, RemoteAgent], run_id: str, @@ -1015,7 +750,6 @@ async def continue_agent_run( session_id: Optional[str] = Form(None), user_id: Optional[str] = Form(None), stream: bool = Form(True), - background: bool = Form(False), ): if hasattr(request.state, "user_id") and request.state.user_id is not None: user_id = request.state.user_id @@ -1077,10 +811,8 @@ async def continue_agent_run( auth_token = get_auth_token_from_request(request) if stream: - # Use resumable streamer when background=True (agent survives client disconnect) - streamer = agent_resumable_continue_response_streamer if background else agent_continue_response_streamer return StreamingResponse( - streamer( + agent_continue_response_streamer( agent, run_id=run_id, # run_id from path updated_tools=updated_tools, From 300ca126152ed9a04dc7799df6c92567fe09d997 Mon Sep 17 00:00:00 2001 From: kausmeows Date: Thu, 5 Mar 2026 15:46:13 +0530 Subject: [PATCH 08/10] chore: update --- libs/agno/agno/agent/_run.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/libs/agno/agno/agent/_run.py b/libs/agno/agno/agent/_run.py index f197e6d953..10f36ad208 100644 --- a/libs/agno/agno/agent/_run.py +++ b/libs/agno/agno/agent/_run.py @@ -1965,7 +1965,6 @@ async def _background_producer() -> None: # Format as SSE from agno.os.utils import format_sse_event_with_index - sse_data = format_sse_event_with_index(event, event_index=event_index, run_id=run_id) # Push to primary queue (original client) @@ -1977,7 +1976,6 @@ async def _background_producer() -> None: # Publish to SSE subscribers (resumed clients) try: from agno.os.managers import sse_subscriber_manager - await sse_subscriber_manager.publish(run_id, sse_data) except Exception: pass @@ -1989,7 +1987,6 @@ async def _background_producer() -> None: # Mark run completed in event buffer (status is set by _arun_stream/acleanup_and_store) try: from agno.os.managers import event_buffer - event_buffer.set_run_completed(run_id, run_response.status or RunStatus.completed) except Exception: pass @@ -1997,7 +1994,6 @@ async def _background_producer() -> None: # Signal SSE subscribers that run is done try: from agno.os.managers import sse_subscriber_manager - await sse_subscriber_manager.complete(run_id) except Exception: pass From 18d8a738a2a387dc89bd0aa162aab6187dc49c9f Mon Sep 17 00:00:00 2001 From: kausmeows Date: Thu, 5 Mar 2026 17:17:38 +0530 Subject: [PATCH 09/10] update --- cookbook/05_agent_os/client/10_sse_reconnect.py | 8 +++++--- libs/agno/agno/agent/_run.py | 4 ++++ libs/agno/agno/os/routers/agents/router.py | 8 ++++---- 3 files changed, 13 insertions(+), 7 deletions(-) diff --git a/cookbook/05_agent_os/client/10_sse_reconnect.py b/cookbook/05_agent_os/client/10_sse_reconnect.py index 269e0ab18e..a68f91d237 100644 --- a/cookbook/05_agent_os/client/10_sse_reconnect.py +++ b/cookbook/05_agent_os/client/10_sse_reconnect.py @@ -138,13 +138,15 @@ async def test_sse_reconnection(): print("\nPhase 2: Reconnecting via /resume endpoint...") events_phase2: list[dict] = [] - params: dict = {"last_event_index": last_event_index} + form_data: dict = {} + if last_event_index is not None: + form_data["last_event_index"] = str(last_event_index) if session_id: - params["session_id"] = session_id + form_data["session_id"] = session_id async with httpx.AsyncClient(base_url=BASE_URL, timeout=120) as client: async with client.stream( - "GET", f"/agents/{agent_id}/runs/{run_id}/resume", params=params + "POST", f"/agents/{agent_id}/runs/{run_id}/resume", data=form_data ) as response: buffer = "" async for chunk in response.aiter_text(): diff --git a/libs/agno/agno/agent/_run.py b/libs/agno/agno/agent/_run.py index 10f36ad208..f197e6d953 100644 --- a/libs/agno/agno/agent/_run.py +++ b/libs/agno/agno/agent/_run.py @@ -1965,6 +1965,7 @@ async def _background_producer() -> None: # Format as SSE from agno.os.utils import format_sse_event_with_index + sse_data = format_sse_event_with_index(event, event_index=event_index, run_id=run_id) # Push to primary queue (original client) @@ -1976,6 +1977,7 @@ async def _background_producer() -> None: # Publish to SSE subscribers (resumed clients) try: from agno.os.managers import sse_subscriber_manager + await sse_subscriber_manager.publish(run_id, sse_data) except Exception: pass @@ -1987,6 +1989,7 @@ async def _background_producer() -> None: # Mark run completed in event buffer (status is set by _arun_stream/acleanup_and_store) try: from agno.os.managers import event_buffer + event_buffer.set_run_completed(run_id, run_response.status or RunStatus.completed) except Exception: pass @@ -1994,6 +1997,7 @@ async def _background_producer() -> None: # Signal SSE subscribers that run is done try: from agno.os.managers import sse_subscriber_manager + await sse_subscriber_manager.complete(run_id) except Exception: pass diff --git a/libs/agno/agno/os/routers/agents/router.py b/libs/agno/agno/os/routers/agents/router.py index c122f3d985..12dc6bd05e 100644 --- a/libs/agno/agno/os/routers/agents/router.py +++ b/libs/agno/agno/os/routers/agents/router.py @@ -1004,7 +1004,7 @@ async def get_agent_run( return run_output.to_dict() - @router.get( + @router.post( "/agents/{agent_id}/runs/{run_id}/resume", tags=["Agents"], operation_id="resume_agent_run_stream", @@ -1019,7 +1019,7 @@ async def get_agent_run( "3. **Run completed (in database)**: Replays events from database\n\n" "**Client usage:**\n" "Track `event_index` from each SSE event. On reconnection, pass the last " - "received `event_index` as `last_event_index` query parameter." + "received `event_index` as `last_event_index`." ), responses={ 200: { @@ -1034,8 +1034,8 @@ async def get_agent_run( async def resume_agent_run_stream( agent_id: str, run_id: str, - last_event_index: Optional[int] = Query(None, description="Index of last event received by client (0-based)"), - session_id: Optional[str] = Query(None, description="Session ID for database fallback"), + last_event_index: Optional[int] = Form(None, description="Index of last event received by client (0-based)"), + session_id: Optional[str] = Form(None, description="Session ID for database fallback"), ): agent = get_agent_by_id(agent_id=agent_id, agents=os.agents, db=os.db, registry=os.registry, create_fresh=True) if agent is None: From ea5f2af4492461768f754194a58d55709ffd1a0d Mon Sep 17 00:00:00 2001 From: kausmeows Date: Thu, 5 Mar 2026 17:49:21 +0530 Subject: [PATCH 10/10] fix: mypy and tests --- libs/agno/agno/agent/_run.py | 2 ++ libs/agno/agno/os/routers/agents/router.py | 4 +--- libs/agno/tests/unit/agent/test_background_execution.py | 7 ++++--- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/libs/agno/agno/agent/_run.py b/libs/agno/agno/agent/_run.py index f197e6d953..71ddef9e9b 100644 --- a/libs/agno/agno/agent/_run.py +++ b/libs/agno/agno/agent/_run.py @@ -1917,6 +1917,8 @@ async def _arun_background_stream( from agno.agent._storage import aread_or_create_session, update_metadata run_id = run_response.run_id + if not run_id: + raise ValueError("run_id is required for background streaming") # 1. Persist RUNNING status so the run is visible in the DB immediately run_response.status = RunStatus.running diff --git a/libs/agno/agno/os/routers/agents/router.py b/libs/agno/agno/os/routers/agents/router.py index 12dc6bd05e..b21f0b0059 100644 --- a/libs/agno/agno/os/routers/agents/router.py +++ b/libs/agno/agno/os/routers/agents/router.py @@ -1,4 +1,3 @@ -import asyncio import json from typing import TYPE_CHECKING, Any, AsyncGenerator, List, Optional, Union, cast from uuid import uuid4 @@ -40,7 +39,6 @@ from agno.os.settings import AgnoAPISettings from agno.os.utils import ( format_sse_event, - format_sse_event_with_index, get_agent_by_id, get_request_kwargs, process_audio, @@ -49,7 +47,7 @@ process_video, ) from agno.registry import Registry -from agno.run.agent import RunErrorEvent, RunEvent, RunOutput +from agno.run.agent import RunErrorEvent, RunOutput from agno.run.base import RunStatus from agno.utils.log import log_debug, log_error, log_warning from agno.utils.serialize import json_serializer diff --git a/libs/agno/tests/unit/agent/test_background_execution.py b/libs/agno/tests/unit/agent/test_background_execution.py index 8d10233d9b..3523936ad8 100644 --- a/libs/agno/tests/unit/agent/test_background_execution.py +++ b/libs/agno/tests/unit/agent/test_background_execution.py @@ -102,12 +102,13 @@ def test_cleanup_removes_cancel_intent(self): class TestBackgroundValidation: - def test_background_with_stream_raises_value_error(self, monkeypatch: pytest.MonkeyPatch): - """Background execution cannot be combined with streaming.""" + def test_background_with_stream_requires_db(self, monkeypatch: pytest.MonkeyPatch): + """Background execution with streaming requires a database.""" agent = Agent(name="test-agent") + agent.db = None _patch_sync_dispatch_dependencies(agent, monkeypatch, runs=[]) - with pytest.raises(ValueError, match="Background execution cannot be combined with streaming"): + with pytest.raises(ValueError, match="Background execution requires a database"): _run.arun_dispatch(agent=agent, input="hello", stream=True, background=True) def test_background_without_db_raises_value_error(self, monkeypatch: pytest.MonkeyPatch):