Skip to content
Open
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,6 @@ stream-py/
*.pt
*.kef
*.onnx
profile.html

/opencode.json
26 changes: 16 additions & 10 deletions agents-core/vision_agents/core/agents/agents.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
from ..vad.events import VADAudioEvent
from . import events
from .conversation import Conversation
from ..profiling import Profiler
from dataclasses import dataclass
from opentelemetry.trace import set_span_in_context
from opentelemetry.trace.propagation import Span, Context
Expand Down Expand Up @@ -154,7 +155,8 @@ def __init__(
options: Optional[AgentOptions] = None,
tracer: Tracer = trace.get_tracer("agents"),
# Configure the default logging for the sdk here. Pass None to leave the config intact.
log_level: Optional[int] = logging.INFO,
log_level: Optional[int] = logging.DEBUG,
profiler: Optional[Profiler] = None,
):
if log_level is not None:
configure_default_logging(level=log_level)
Expand Down Expand Up @@ -208,7 +210,7 @@ def __init__(
self._pending_user_transcripts: Dict[str, str] = {}

# Merge plugin events BEFORE subscribing to any events
for plugin in [stt, tts, turn_detection, vad, llm, edge]:
for plugin in [stt, tts, turn_detection, vad, llm, edge, profiler]:
if plugin and hasattr(plugin, "events"):
self.logger.debug(f"Register events from plugin {plugin}")
self.events.merge(plugin.events)
Expand Down Expand Up @@ -238,6 +240,8 @@ def __init__(
self._prepare_rtc()
self._setup_stt()

self.events.send(events.AgentInitEvent())

@contextlib.contextmanager
def span(self, name):
with tracer.start_as_current_span(name, context=self._root_ctx) as span:
Expand Down Expand Up @@ -531,14 +535,13 @@ async def finish(self):
Subscribes to the edge transport's `call_ended` event and awaits it. If
no connection is active, returns immediately.
"""
# If connection is None or already closed, return immediately
if not self._connection:
self.logger.info(
"🔚 Agent connection is already closed, finishing immediately"
)
return


running_event = asyncio.Event()
with self.span("agent.finish"):
# If connection is None or already closed, return immediately
if not self._connection:
Expand All @@ -549,15 +552,18 @@ async def finish(self):

@self.edge.events.subscribe
async def on_ended(event: CallEndedEvent):
running_event.set()
self._is_running = False
# TODO: add members count check (particiapnts left + count = 1 timeout 2 minutes)

while self._is_running:
try:
await asyncio.sleep(0.0001)
except asyncio.CancelledError:
self._is_running = False
try:
await running_event.wait()
except asyncio.CancelledError:
running_event.clear()

self.events.send(events.AgentFinishEvent())

await asyncio.shield(self.close())
await asyncio.shield(self.close())

async def close(self):
"""Clean up all connections and resources.
Expand Down
16 changes: 15 additions & 1 deletion agents-core/vision_agents/core/agents/events.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,22 @@
from dataclasses import dataclass, field
from vision_agents.core.events import PluginBaseEvent
from vision_agents.core.events import PluginBaseEvent, BaseEvent
from typing import Optional, Any, Dict


@dataclass
class AgentInitEvent(BaseEvent):
"""Event emitted when Agent class initialized."""

type: str = field(default="agent.init", init=False)


@dataclass
class AgentFinishEvent(BaseEvent):
"""Event emitted when agent.finish() call ended."""

type: str = field(default="agent.finish", init=False)


@dataclass
class AgentSayEvent(PluginBaseEvent):
"""Event emitted when the agent wants to say something."""
Expand Down
19 changes: 12 additions & 7 deletions agents-core/vision_agents/core/events/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ def __init__(self, ignore_unknown_events: bool = True):
self._shutdown = False
self._silent_events: set[type] = set()
self._handler_tasks: Dict[uuid.UUID, asyncio.Task[Any]] = {}
self._received_event = asyncio.Event()

self.register(ExceptionEvent)
self.register(HealthCheckEvent)
Expand Down Expand Up @@ -213,6 +214,7 @@ def merge(self, em: "EventManager"):
em._queue = self._queue
em._silent_events = self._silent_events
em._processing_task = None # Clear the stopped task reference
em._received_event = self._received_event

def register_events_from_module(
self, module, prefix="", ignore_not_compatible=True
Expand Down Expand Up @@ -407,7 +409,7 @@ def _prepare_event(self, event):

# Validate event is registered (handles both BaseEvent and generated protobuf events)
if hasattr(event, "type") and event.type in self._events:
# logger.info(f"Received event {_truncate_event_for_logging(event)}")
logger.debug(f"Received event {_truncate_event_for_logging(event)}")
return event
elif self._ignore_unknown_events:
logger.debug(f"Event not registered {_truncate_event_for_logging(event)}")
Expand Down Expand Up @@ -467,6 +469,8 @@ def send(self, *events):
if event:
self._queue.append(event)

self._received_event.set()

async def wait(self, timeout: float = 10.0):
"""
Wait for all queued events to be processed.
Expand All @@ -486,7 +490,7 @@ async def wait(self, timeout: float = 10.0):
def _start_processing_task(self):
"""Start the background event processing task."""
if self._processing_task and not self._processing_task.done():
return # Already running
return

loop = asyncio.get_running_loop()
self._processing_task = loop.create_task(self._process_events_loop())
Expand Down Expand Up @@ -521,13 +525,15 @@ async def _process_events_loop(self):
)
for task_id in cleanup_ids:
self._handler_tasks.pop(task_id)
await asyncio.sleep(0.0001)

await self._received_event.wait()
self._received_event.clear()

async def _run_handler(self, handler, event):
try:
return await handler(event)
except Exception as exc:
self._queue.appendleft(ExceptionEvent(exc, handler)) # type: ignore[arg-type]
self.send(ExceptionEvent(exc, handler))
module_name = getattr(handler, "__module__", "unknown")
logger.exception(
f"Error calling handler {handler.__name__} from {module_name} for event {event.type}"
Expand All @@ -536,10 +542,9 @@ async def _run_handler(self, handler, event):
async def _process_single_event(self, event):
"""Process a single event."""
for handler in self._handlers.get(event.type, []):
#module_name = getattr(handler, '__module__', 'unknown')
module_name = getattr(handler, '__module__', 'unknown')
if event.type not in self._silent_events:
pass
#logger.info(f"Called handler {handler.__name__} from {module_name} for event {event.type}")
logger.debug(f"Called handler {handler.__name__} from {module_name} for event {event.type}")

loop = asyncio.get_running_loop()
handler_task = loop.create_task(self._run_handler(handler, event))
Expand Down
3 changes: 3 additions & 0 deletions agents-core/vision_agents/core/profiling/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .base import Profiler

__all__ = ["Profiler"]
23 changes: 23 additions & 0 deletions agents-core/vision_agents/core/profiling/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import pyinstrument
import logging

from vision_agents.core.events import EventManager
from vision_agents.core.agents import events

logger = logging.getLogger(__name__)


class Profiler:
def __init__(self, output_path='./profile.html'):
self.output_path = output_path
self.events = EventManager()
self.events.register_events_from_module(events)
self.profiler = pyinstrument.Profiler()
self.profiler.start()
self.events.subscribe(self.on_finish)

Comment on lines +10 to +18
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Add docstrings and improve profiler lifecycle management.

Multiple concerns with the Profiler class:

  1. Missing docstrings: The class and __init__ method lack docstrings. As per coding guidelines, docstrings must follow the Google style guide.

  2. Premature profiling start: The profiler starts immediately on line 16, even before the agent joins a call. This captures initialization overhead that may not be relevant to profiling agent behavior during a call. Consider starting the profiler when AgentInitEvent is received instead.

  3. No cleanup mechanism: If AgentFinishEvent is never emitted (e.g., if finish() is never called or the process crashes), the profiler runs indefinitely and the HTML file is never written. Consider:

    • Adding a context manager interface (__enter__/__exit__)
    • Adding an explicit stop() method for manual cleanup
    • Adding a destructor (__del__) as a fallback
  4. No output path validation: Line 11 accepts any path without validation. If the directory doesn't exist or isn't writable, the error won't surface until on_finish.

Here's a proposed refactor:

 class Profiler:
+    """Profile agent lifecycle and generate HTML profiling reports.
+    
+    Automatically starts profiling when AgentInitEvent is emitted and stops
+    when AgentFinishEvent is emitted, writing results to the specified path.
+    
+    Args:
+        output_path: Path where the HTML profiling report will be written.
+            Defaults to './profile.html'.
+    
+    Example:
+        profiler = Profiler(output_path='./my_profile.html')
+        agent = Agent(..., profiler=profiler)
+        # Profiler automatically captures agent lifecycle
+    """
+    
     def __init__(self, output_path='./profile.html'):
+        """Initialize the profiler.
+        
+        Args:
+            output_path: Path where the HTML profiling report will be written.
+        """
         self.output_path = output_path
         self.events = EventManager()
         self.events.register_events_from_module(events)
         self.profiler = pyinstrument.Profiler()
-        self.profiler.start()
+        self._started = False
+        self.events.subscribe(self.on_init)
         self.events.subscribe(self.on_finish)
+    
+    async def on_init(self, event: events.AgentInitEvent):
+        """Start profiling when agent initializes."""
+        if not self._started:
+            self.profiler.start()
+            self._started = True
+            logger.info("Profiler started")

     async def on_finish(self, event: events.AgentFinishEvent):
+        """Stop profiling and write results when agent finishes."""
+        if not self._started:
+            logger.warning("Profiler was never started, skipping output")
+            return
         self.profiler.stop()
         logger.info(f"Profiler stopped. Time file saved at: {self.output_path}")
         with open(self.output_path, 'w') as f:
             f.write(self.profiler.output_html())
+    
+    def stop(self):
+        """Manually stop profiling and write results (if not already stopped)."""
+        if self._started and self.profiler:
+            self.profiler.stop()
+            self._started = False
+            logger.info(f"Profiler stopped manually. Writing to: {self.output_path}")
+            with open(self.output_path, 'w') as f:
+                f.write(self.profiler.output_html())
+    
+    def __del__(self):
+        """Ensure profiler is stopped on garbage collection."""
+        try:
+            if self._started:
+                self.stop()
+        except Exception:
+            pass  # Avoid exceptions in __del__
🤖 Prompt for AI Agents
In agents-core/vision_agents/core/profiling/base.py around lines 10 to 18, add
Google-style docstrings for the Profiler class and its __init__; remove
immediate self.profiler.start() from __init__ and instead start profiling when
an AgentInitEvent is received (subscribe to AgentInitEvent and call start
there), and stop/profile dump on AgentFinishEvent; add explicit start() and
stop() methods, implement __enter__/__exit__ to support context-manager usage,
and add a __del__ fallback that calls stop(); validate output_path in __init__
(ensure parent directory exists or create it and verify writability, raising a
clear exception if invalid) so on_finish can always write the HTML; ensure
on_finish unsubscribes and writes the profiler output safely and that stop() is
idempotent.

async def on_finish(self, event: events.AgentFinishEvent):
self.profiler.stop()
logger.info(f"Profiler stopped. Time file saved at: {self.output_path}")
with open(self.output_path, 'w') as f:
f.write(self.profiler.output_html())
Comment on lines +19 to +23
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Add error handling and improve async file I/O.

The on_finish method has several issues:

  1. Missing docstring: As per coding guidelines, docstrings must follow the Google style guide.

  2. No error handling: Lines 22-23 perform file I/O without error handling. If the directory doesn't exist, the path is invalid, or there are permission issues, the exception will propagate and potentially crash the event handler. Wrap file operations in try-except.

  3. Synchronous I/O in async function: Using synchronous open() in an async function can block the event loop. Consider using aiofiles for async file I/O.

  4. Missing encoding specification: HTML files should explicitly specify encoding (UTF-8) to avoid potential encoding issues across platforms.

  5. No validation: Line 20 calls stop() without checking if the profiler was started. If on_finish is called twice, this could raise an exception.

Apply this diff:

 async def on_finish(self, event: events.AgentFinishEvent):
+    """Stop profiling and write HTML report when agent finishes.
+    
+    Args:
+        event: The AgentFinishEvent triggering profiler shutdown.
+    """
+    try:
-        self.profiler.stop()
-        logger.info(f"Profiler stopped. Time file saved at: {self.output_path}")
-        with open(self.output_path, 'w') as f:
-            f.write(self.profiler.output_html())
+        self.profiler.stop()
+        html_output = self.profiler.output_html()
+        
+        # Consider using aiofiles for async I/O
+        with open(self.output_path, 'w', encoding='utf-8') as f:
+            f.write(html_output)
+        
+        logger.info(f"Profiler stopped. Profile saved at: {self.output_path}")
+    except Exception as e:
+        logger.error(f"Failed to write profiler output to {self.output_path}: {e}", exc_info=True)

Or, for fully async I/O:

import aiofiles

async def on_finish(self, event: events.AgentFinishEvent):
    """Stop profiling and write HTML report when agent finishes.
    
    Args:
        event: The AgentFinishEvent triggering profiler shutdown.
    """
    try:
        self.profiler.stop()
        html_output = self.profiler.output_html()
        
        async with aiofiles.open(self.output_path, 'w', encoding='utf-8') as f:
            await f.write(html_output)
        
        logger.info(f"Profiler stopped. Profile saved at: {self.output_path}")
    except Exception as e:
        logger.error(f"Failed to write profiler output to {self.output_path}: {e}", exc_info=True)
🤖 Prompt for AI Agents
In agents-core/vision_agents/core/profiling/base.py around lines 19 to 23, the
async on_finish handler lacks a Google-style docstring, performs unguarded
synchronous file I/O, doesn't check profiler state, lacks encoding, and has no
error handling; update the method to include a Google-style docstring, verify
the profiler was started before calling stop(), use aiofiles for non-blocking
file writes with encoding='utf-8', wrap stop/output/write in a try/except and
log failures with context (including exception info), and log success once the
HTML is written.

6 changes: 4 additions & 2 deletions examples/01_simple_agent_example/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "simple-agents-example"
version = "0.0.0"
requires-python = ">=3.13"

# put only what this example needs
# Dependencies that the example actually uses
dependencies = [
"python-dotenv>=1.0",
"vision-agents-plugins-deepgram",
Expand All @@ -14,11 +14,13 @@ dependencies = [
"vision-agents-plugins-smart-turn",
"vision-agents-plugins-cartesia",
"vision-agents-plugins-gemini",
"vision-agents-plugins-vogent",
"vision-agents",
"openai>=1.101.0",
"anthropic>=0.66.0",
"google-genai>=1.33.0",
"fal-client>=0.5.3",
"pyinstrument>=5.1.1",
]

[tool.uv.sources]
Expand All @@ -30,5 +32,5 @@ dependencies = [
"vision-agents-plugins-smart-turn" = {path = "../../plugins/smart_turn", editable=true}
"vision-agents-plugins-cartesia" = {path = "../../plugins/cartesia", editable=true}
"vision-agents-plugins-gemini" = {path = "../../plugins/gemini", editable=true}

"vision-agents-plugins-vogent" = {path = "../../plugins/vogent", editable=true}
"vision-agents" = {path = "../../agents-core", editable=true}
7 changes: 5 additions & 2 deletions examples/01_simple_agent_example/simple_agent_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
from dotenv import load_dotenv

from vision_agents.core import User, Agent
from vision_agents.plugins import cartesia, deepgram, getstream, gemini, vogent
from vision_agents.plugins import deepgram, getstream, gemini, vogent, elevenlabs
from vision_agents.core.profiling import Profiler

load_dotenv()

Expand All @@ -20,9 +21,11 @@ async def start_agent() -> None:
processors=[], # processors can fetch extra data, check images/audio data or transform video
# llm with tts & stt. if you use a realtime (sts capable) llm the tts, stt and vad aren't needed
llm=llm,
tts=cartesia.TTS(),
tts=elevenlabs.TTS(),
stt=deepgram.STT(),
turn_detection=vogent.TurnDetection(),
profiler=Profiler(),
# vad=silero.VAD(),
# realtime version (vad, tts and stt not needed)
# llm=openai.Realtime()
)
Expand Down
Loading
Loading