Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
c269f89
Optimize EventManager waiting logic and error handling
yarikdevcom Oct 28, 2025
8a48582
perf: pyinstrument integration
yarikdevcom Oct 29, 2025
4f3c3ad
feat: integrate pyinstrument for performance profiling in WebRTC example
yarikdevcom Oct 29, 2025
113ba95
profiling: added implementation as plugin for dev install
yarikdevcom Nov 3, 2025
5c3e662
fix: gemini async client, events base class for start/finish
yarikdevcom Nov 3, 2025
de822d2
fix: after rebase
yarikdevcom Nov 3, 2025
10d90f4
fix events flow
yarikdevcom Nov 3, 2025
3ae4fd3
fix: registered events before adding to agent
yarikdevcom Nov 3, 2025
4e8631f
chore: profile.html
yarikdevcom Nov 3, 2025
96f71c7
profile: added example of profiling in simple agent
yarikdevcom Nov 3, 2025
3c5377c
chore: lint
yarikdevcom Nov 3, 2025
540409c
fix: update type hints for async message streaming in GeminiLLM
yarikdevcom Nov 5, 2025
552d809
docs: enhance DEVELOPMENT.md with profiling usage example and update …
yarikdevcom Nov 5, 2025
58d5da4
docs: enhance Profiler class with detailed docstrings for initializat…
yarikdevcom Nov 5, 2025
af7d97c
chore: clean up openai_realtime_example.py and update dependencies in…
yarikdevcom Nov 5, 2025
ef0313e
refactor: remove unnecessary local variable assignment in GeminiLLM
yarikdevcom Nov 5, 2025
869a253
refactor: update Gemini function calling tests to use new client stru…
yarikdevcom Nov 5, 2025
69194f6
chore: comment out Profiler usage in simple_agent_example.py and prov…
yarikdevcom Nov 5, 2025
57496a9
gemini_llm: fix non-awaited func calls
dangusev Nov 5, 2025
94469f3
gemini_llm: remove type:ignore
dangusev Nov 5, 2025
336fae8
gemini_llm: silence mypy check
dangusev Nov 5, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,6 @@ stream-py/
*.pt
*.kef
*.onnx
profile.html

/opencode.json
37 changes: 37 additions & 0 deletions DEVELOPMENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,43 @@ start_http_server(port=9464)

You can now see the metrics at `http://localhost:9464/metrics` (make sure that your Python program keeps running), after this you can setup your Prometheus server to scrape this endpoint.

### Profiling

The `Profiler` class uses `pyinstrument` to profile your agent's performance and generate an HTML report showing where time is spent during execution.

#### Example usage:

```python
from uuid import uuid4
from vision_agents.core import User, Agent
from vision_agents.core.profiling import Profiler
from vision_agents.plugins import getstream, gemini, deepgram, elevenlabs, vogent

async def start_agent() -> None:
agent = Agent(
edge=getstream.Edge(),
agent_user=User(name="My AI friend", id="agent"),
instructions="You're a helpful assistant.",
llm=gemini.LLM("gemini-2.0-flash"),
tts=elevenlabs.TTS(),
stt=deepgram.STT(),
turn_detection=vogent.TurnDetection(),
profiler=Profiler(output_path='./profile.html'), # Optional: specify output path
)

call = agent.edge.client.video.call("default", str(uuid4()))
with await agent.join(call):
await agent.simple_response("Hello!")
await agent.finish()
```

The profiler automatically:
- Starts profiling when the agent is created
- Stops profiling when the agent finishes (on `AgentFinishEvent`)
- Saves an HTML report to the specified output path (default: `./profile.html`)

You can open the generated HTML file in a browser to view the performance profile, which shows a timeline of function calls and where time is spent during agent execution.


### Queuing

Expand Down
23 changes: 15 additions & 8 deletions agents-core/vision_agents/core/agents/agents.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
from ..vad.events import VADAudioEvent
from . import events
from .conversation import Conversation
from ..profiling import Profiler
from dataclasses import dataclass
from opentelemetry.trace import set_span_in_context
from opentelemetry.trace.propagation import Span, Context
Expand Down Expand Up @@ -160,6 +161,7 @@ def __init__(
tracer: Tracer = trace.get_tracer("agents"),
# Configure the default logging for the sdk here. Pass None to leave the config intact.
log_level: Optional[int] = logging.INFO,
profiler: Optional[Profiler] = None,
):
if log_level is not None:
configure_default_logging(level=log_level)
Expand Down Expand Up @@ -213,7 +215,7 @@ def __init__(
self._pending_user_transcripts: Dict[str, str] = {}

# Merge plugin events BEFORE subscribing to any events
for plugin in [stt, tts, turn_detection, vad, llm, edge]:
for plugin in [stt, tts, turn_detection, vad, llm, edge, profiler]:
if plugin and hasattr(plugin, "events"):
self.logger.debug(f"Register events from plugin {plugin}")
self.events.merge(plugin.events)
Expand Down Expand Up @@ -248,6 +250,8 @@ def __init__(
self._prepare_rtc()
self._setup_stt()

self.events.send(events.AgentInitEvent())

@contextlib.contextmanager
def span(self, name):
with tracer.start_as_current_span(name, context=self._root_ctx) as span:
Expand Down Expand Up @@ -542,13 +546,13 @@ async def finish(self):
Subscribes to the edge transport's `call_ended` event and awaits it. If
no connection is active, returns immediately.
"""
# If connection is None or already closed, return immediately
if not self._connection:
self.logger.info(
"🔚 Agent connection is already closed, finishing immediately"
)
return

running_event = asyncio.Event()
with self.span("agent.finish"):
# If connection is None or already closed, return immediately
if not self._connection:
Expand All @@ -559,15 +563,18 @@ async def finish(self):

@self.edge.events.subscribe
async def on_ended(event: CallEndedEvent):
running_event.set()
self._is_running = False
# TODO: add members count check (particiapnts left + count = 1 timeout 2 minutes)

while self._is_running:
try:
await asyncio.sleep(0.0001)
except asyncio.CancelledError:
self._is_running = False
try:
await running_event.wait()
except asyncio.CancelledError:
running_event.clear()

self.events.send(events.AgentFinishEvent())

await asyncio.shield(self.close())
await asyncio.shield(self.close())

async def close(self):
"""Clean up all connections and resources.
Expand Down
16 changes: 15 additions & 1 deletion agents-core/vision_agents/core/agents/events.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,22 @@
from dataclasses import dataclass, field
from vision_agents.core.events import PluginBaseEvent
from vision_agents.core.events import PluginBaseEvent, BaseEvent
from typing import Optional, Any, Dict


@dataclass
class AgentInitEvent(BaseEvent):
"""Event emitted when Agent class initialized."""

type: str = field(default="agent.init", init=False)


@dataclass
class AgentFinishEvent(BaseEvent):
"""Event emitted when agent.finish() call ended."""

type: str = field(default="agent.finish", init=False)


@dataclass
class AgentSayEvent(PluginBaseEvent):
"""Event emitted when the agent wants to say something."""
Expand Down
19 changes: 12 additions & 7 deletions agents-core/vision_agents/core/events/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ def __init__(self, ignore_unknown_events: bool = True):
self._shutdown = False
self._silent_events: set[type] = set()
self._handler_tasks: Dict[uuid.UUID, asyncio.Task[Any]] = {}
self._received_event = asyncio.Event()

self.register(ExceptionEvent)
self.register(HealthCheckEvent)
Expand Down Expand Up @@ -213,6 +214,7 @@ def merge(self, em: "EventManager"):
em._queue = self._queue
em._silent_events = self._silent_events
em._processing_task = None # Clear the stopped task reference
em._received_event = self._received_event

def register_events_from_module(
self, module, prefix="", ignore_not_compatible=True
Expand Down Expand Up @@ -407,7 +409,7 @@ def _prepare_event(self, event):

# Validate event is registered (handles both BaseEvent and generated protobuf events)
if hasattr(event, "type") and event.type in self._events:
# logger.info(f"Received event {_truncate_event_for_logging(event)}")
logger.debug(f"Received event {_truncate_event_for_logging(event)}")
return event
elif self._ignore_unknown_events:
logger.debug(f"Event not registered {_truncate_event_for_logging(event)}")
Expand Down Expand Up @@ -467,6 +469,8 @@ def send(self, *events):
if event:
self._queue.append(event)

self._received_event.set()

async def wait(self, timeout: float = 10.0):
"""
Wait for all queued events to be processed.
Expand All @@ -485,7 +489,7 @@ async def wait(self, timeout: float = 10.0):
def _start_processing_task(self):
"""Start the background event processing task."""
if self._processing_task and not self._processing_task.done():
return # Already running
return

loop = asyncio.get_running_loop()
self._processing_task = loop.create_task(self._process_events_loop())
Expand Down Expand Up @@ -520,13 +524,15 @@ async def _process_events_loop(self):
)
for task_id in cleanup_ids:
self._handler_tasks.pop(task_id)
await asyncio.sleep(0.0001)

await self._received_event.wait()
self._received_event.clear()

async def _run_handler(self, handler, event):
try:
return await handler(event)
except Exception as exc:
self._queue.appendleft(ExceptionEvent(exc, handler)) # type: ignore[arg-type]
self.send(ExceptionEvent(exc, handler))
module_name = getattr(handler, "__module__", "unknown")
logger.exception(
f"Error calling handler {handler.__name__} from {module_name} for event {event.type}"
Expand All @@ -535,10 +541,9 @@ async def _run_handler(self, handler, event):
async def _process_single_event(self, event):
"""Process a single event."""
for handler in self._handlers.get(event.type, []):
#module_name = getattr(handler, '__module__', 'unknown')
module_name = getattr(handler, '__module__', 'unknown')
if event.type not in self._silent_events:
pass
#logger.info(f"Called handler {handler.__name__} from {module_name} for event {event.type}")
logger.debug(f"Called handler {handler.__name__} from {module_name} for event {event.type}")

loop = asyncio.get_running_loop()
handler_task = loop.create_task(self._run_handler(handler, event))
Expand Down
3 changes: 3 additions & 0 deletions agents-core/vision_agents/core/profiling/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .base import Profiler

__all__ = ["Profiler"]
48 changes: 48 additions & 0 deletions agents-core/vision_agents/core/profiling/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import pyinstrument
import logging

from vision_agents.core.events import EventManager
from vision_agents.core.agents import events

logger = logging.getLogger(__name__)


class Profiler:
"""Profiles agent execution using pyinstrument and generates an HTML report.

The profiler automatically starts when instantiated and stops when the agent
finishes (on AgentFinishEvent), saving an HTML performance report to disk.

Example:
agent = Agent(
edge=getstream.Edge(),
agent_user=User(name="Agent", id="agent"),
llm=gemini.LLM("gemini-2.0-flash"),
profiler=Profiler(output_path='./profile.html'),
)
"""

def __init__(self, output_path='./profile.html'):
"""Initialize the profiler.

Args:
output_path: Path where the HTML profile report will be saved.
Defaults to './profile.html'.
"""
self.output_path = output_path
self.events = EventManager()
self.events.register_events_from_module(events)
self.profiler = pyinstrument.Profiler()
self.profiler.start()
self.events.subscribe(self.on_finish)

async def on_finish(self, event: events.AgentFinishEvent):
"""Handle agent finish event by stopping profiler and saving report.

Args:
event: The AgentFinishEvent emitted when the agent finishes.
"""
self.profiler.stop()
logger.info(f"Profiler stopped. Time file saved at: {self.output_path}")
with open(self.output_path, 'w') as f:
f.write(self.profiler.output_html())
6 changes: 4 additions & 2 deletions examples/01_simple_agent_example/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "simple-agents-example"
version = "0.0.0"
requires-python = ">=3.13"

# put only what this example needs
# Dependencies that the example actually uses
dependencies = [
"python-dotenv>=1.0",
"vision-agents-plugins-deepgram",
Expand All @@ -14,11 +14,13 @@ dependencies = [
"vision-agents-plugins-smart-turn",
"vision-agents-plugins-cartesia",
"vision-agents-plugins-gemini",
"vision-agents-plugins-vogent",
"vision-agents",
"openai>=1.101.0",
"anthropic>=0.66.0",
"google-genai>=1.33.0",
"fal-client>=0.5.3",
"pyinstrument>=5.1.1",
]

[tool.uv.sources]
Expand All @@ -30,5 +32,5 @@ dependencies = [
"vision-agents-plugins-smart-turn" = {path = "../../plugins/smart_turn", editable=true}
"vision-agents-plugins-cartesia" = {path = "../../plugins/cartesia", editable=true}
"vision-agents-plugins-gemini" = {path = "../../plugins/gemini", editable=true}

"vision-agents-plugins-vogent" = {path = "../../plugins/vogent", editable=true}
"vision-agents" = {path = "../../agents-core", editable=true}
8 changes: 6 additions & 2 deletions examples/01_simple_agent_example/simple_agent_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
from dotenv import load_dotenv

from vision_agents.core import User, Agent
from vision_agents.plugins import cartesia, deepgram, getstream, gemini, vogent
from vision_agents.plugins import deepgram, getstream, gemini, vogent, elevenlabs
# from vision_agents.core.profiling import Profiler

load_dotenv()

Expand All @@ -20,9 +21,12 @@ async def start_agent() -> None:
processors=[], # processors can fetch extra data, check images/audio data or transform video
# llm with tts & stt. if you use a realtime (sts capable) llm the tts, stt and vad aren't needed
llm=llm,
tts=cartesia.TTS(),
tts=elevenlabs.TTS(),
stt=deepgram.STT(),
turn_detection=vogent.TurnDetection(),
# enable profiler by uncommenting the following line
# profiler=Profiler(),
# vad=silero.VAD(),
# realtime version (vad, tts and stt not needed)
# llm=openai.Realtime()
)
Expand Down
Loading