Skip to content

Commit 3ec51b9

Browse files
Optimize delays - realtime, waiting logic and error handling (#132)
Co-authored-by: Daniil Gusev <[email protected]>
1 parent caeaa63 commit 3ec51b9

File tree

15 files changed

+2195
-1047
lines changed

15 files changed

+2195
-1047
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,3 +85,6 @@ stream-py/
8585
*.pt
8686
*.kef
8787
*.onnx
88+
profile.html
89+
90+
/opencode.json

DEVELOPMENT.md

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,43 @@ start_http_server(port=9464)
301301

302302
You can now see the metrics at `http://localhost:9464/metrics` (make sure that your Python program keeps running), after this you can setup your Prometheus server to scrape this endpoint.
303303

304+
### Profiling
305+
306+
The `Profiler` class uses `pyinstrument` to profile your agent's performance and generate an HTML report showing where time is spent during execution.
307+
308+
#### Example usage:
309+
310+
```python
311+
from uuid import uuid4
312+
from vision_agents.core import User, Agent
313+
from vision_agents.core.profiling import Profiler
314+
from vision_agents.plugins import getstream, gemini, deepgram, elevenlabs, vogent
315+
316+
async def start_agent() -> None:
317+
agent = Agent(
318+
edge=getstream.Edge(),
319+
agent_user=User(name="My AI friend", id="agent"),
320+
instructions="You're a helpful assistant.",
321+
llm=gemini.LLM("gemini-2.0-flash"),
322+
tts=elevenlabs.TTS(),
323+
stt=deepgram.STT(),
324+
turn_detection=vogent.TurnDetection(),
325+
profiler=Profiler(output_path='./profile.html'), # Optional: specify output path
326+
)
327+
328+
call = agent.edge.client.video.call("default", str(uuid4()))
329+
with await agent.join(call):
330+
await agent.simple_response("Hello!")
331+
await agent.finish()
332+
```
333+
334+
The profiler automatically:
335+
- Starts profiling when the agent is created
336+
- Stops profiling when the agent finishes (on `AgentFinishEvent`)
337+
- Saves an HTML report to the specified output path (default: `./profile.html`)
338+
339+
You can open the generated HTML file in a browser to view the performance profile, which shows a timeline of function calls and where time is spent during agent execution.
340+
304341

305342
### Queuing
306343

agents-core/vision_agents/core/agents/agents.py

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
from ..vad.events import VADAudioEvent
5252
from . import events
5353
from .conversation import Conversation
54+
from ..profiling import Profiler
5455
from dataclasses import dataclass
5556
from opentelemetry.trace import set_span_in_context
5657
from opentelemetry.trace.propagation import Span, Context
@@ -160,6 +161,7 @@ def __init__(
160161
tracer: Tracer = trace.get_tracer("agents"),
161162
# Configure the default logging for the sdk here. Pass None to leave the config intact.
162163
log_level: Optional[int] = logging.INFO,
164+
profiler: Optional[Profiler] = None,
163165
):
164166
if log_level is not None:
165167
configure_default_logging(level=log_level)
@@ -213,7 +215,7 @@ def __init__(
213215
self._pending_user_transcripts: Dict[str, str] = {}
214216

215217
# Merge plugin events BEFORE subscribing to any events
216-
for plugin in [stt, tts, turn_detection, vad, llm, edge]:
218+
for plugin in [stt, tts, turn_detection, vad, llm, edge, profiler]:
217219
if plugin and hasattr(plugin, "events"):
218220
self.logger.debug(f"Register events from plugin {plugin}")
219221
self.events.merge(plugin.events)
@@ -248,6 +250,8 @@ def __init__(
248250
self._prepare_rtc()
249251
self._setup_stt()
250252

253+
self.events.send(events.AgentInitEvent())
254+
251255
@contextlib.contextmanager
252256
def span(self, name):
253257
with tracer.start_as_current_span(name, context=self._root_ctx) as span:
@@ -542,13 +546,13 @@ async def finish(self):
542546
Subscribes to the edge transport's `call_ended` event and awaits it. If
543547
no connection is active, returns immediately.
544548
"""
545-
# If connection is None or already closed, return immediately
546549
if not self._connection:
547550
self.logger.info(
548551
"🔚 Agent connection is already closed, finishing immediately"
549552
)
550553
return
551554

555+
running_event = asyncio.Event()
552556
with self.span("agent.finish"):
553557
# If connection is None or already closed, return immediately
554558
if not self._connection:
@@ -559,15 +563,18 @@ async def finish(self):
559563

560564
@self.edge.events.subscribe
561565
async def on_ended(event: CallEndedEvent):
566+
running_event.set()
562567
self._is_running = False
568+
# TODO: add members count check (particiapnts left + count = 1 timeout 2 minutes)
563569

564-
while self._is_running:
565-
try:
566-
await asyncio.sleep(0.0001)
567-
except asyncio.CancelledError:
568-
self._is_running = False
570+
try:
571+
await running_event.wait()
572+
except asyncio.CancelledError:
573+
running_event.clear()
574+
575+
self.events.send(events.AgentFinishEvent())
569576

570-
await asyncio.shield(self.close())
577+
await asyncio.shield(self.close())
571578

572579
async def close(self):
573580
"""Clean up all connections and resources.

agents-core/vision_agents/core/agents/events.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,22 @@
11
from dataclasses import dataclass, field
2-
from vision_agents.core.events import PluginBaseEvent
2+
from vision_agents.core.events import PluginBaseEvent, BaseEvent
33
from typing import Optional, Any, Dict
44

55

6+
@dataclass
7+
class AgentInitEvent(BaseEvent):
8+
"""Event emitted when Agent class initialized."""
9+
10+
type: str = field(default="agent.init", init=False)
11+
12+
13+
@dataclass
14+
class AgentFinishEvent(BaseEvent):
15+
"""Event emitted when agent.finish() call ended."""
16+
17+
type: str = field(default="agent.finish", init=False)
18+
19+
620
@dataclass
721
class AgentSayEvent(PluginBaseEvent):
822
"""Event emitted when the agent wants to say something."""

agents-core/vision_agents/core/events/manager.py

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@ def __init__(self, ignore_unknown_events: bool = True):
142142
self._shutdown = False
143143
self._silent_events: set[type] = set()
144144
self._handler_tasks: Dict[uuid.UUID, asyncio.Task[Any]] = {}
145+
self._received_event = asyncio.Event()
145146

146147
self.register(ExceptionEvent)
147148
self.register(HealthCheckEvent)
@@ -213,6 +214,7 @@ def merge(self, em: "EventManager"):
213214
em._queue = self._queue
214215
em._silent_events = self._silent_events
215216
em._processing_task = None # Clear the stopped task reference
217+
em._received_event = self._received_event
216218

217219
def register_events_from_module(
218220
self, module, prefix="", ignore_not_compatible=True
@@ -407,7 +409,7 @@ def _prepare_event(self, event):
407409

408410
# Validate event is registered (handles both BaseEvent and generated protobuf events)
409411
if hasattr(event, "type") and event.type in self._events:
410-
# logger.info(f"Received event {_truncate_event_for_logging(event)}")
412+
logger.debug(f"Received event {_truncate_event_for_logging(event)}")
411413
return event
412414
elif self._ignore_unknown_events:
413415
logger.debug(f"Event not registered {_truncate_event_for_logging(event)}")
@@ -467,6 +469,8 @@ def send(self, *events):
467469
if event:
468470
self._queue.append(event)
469471

472+
self._received_event.set()
473+
470474
async def wait(self, timeout: float = 10.0):
471475
"""
472476
Wait for all queued events to be processed.
@@ -485,7 +489,7 @@ async def wait(self, timeout: float = 10.0):
485489
def _start_processing_task(self):
486490
"""Start the background event processing task."""
487491
if self._processing_task and not self._processing_task.done():
488-
return # Already running
492+
return
489493

490494
loop = asyncio.get_running_loop()
491495
self._processing_task = loop.create_task(self._process_events_loop())
@@ -520,13 +524,15 @@ async def _process_events_loop(self):
520524
)
521525
for task_id in cleanup_ids:
522526
self._handler_tasks.pop(task_id)
523-
await asyncio.sleep(0.0001)
527+
528+
await self._received_event.wait()
529+
self._received_event.clear()
524530

525531
async def _run_handler(self, handler, event):
526532
try:
527533
return await handler(event)
528534
except Exception as exc:
529-
self._queue.appendleft(ExceptionEvent(exc, handler)) # type: ignore[arg-type]
535+
self.send(ExceptionEvent(exc, handler))
530536
module_name = getattr(handler, "__module__", "unknown")
531537
logger.exception(
532538
f"Error calling handler {handler.__name__} from {module_name} for event {event.type}"
@@ -535,10 +541,9 @@ async def _run_handler(self, handler, event):
535541
async def _process_single_event(self, event):
536542
"""Process a single event."""
537543
for handler in self._handlers.get(event.type, []):
538-
#module_name = getattr(handler, '__module__', 'unknown')
544+
module_name = getattr(handler, '__module__', 'unknown')
539545
if event.type not in self._silent_events:
540-
pass
541-
#logger.info(f"Called handler {handler.__name__} from {module_name} for event {event.type}")
546+
logger.debug(f"Called handler {handler.__name__} from {module_name} for event {event.type}")
542547

543548
loop = asyncio.get_running_loop()
544549
handler_task = loop.create_task(self._run_handler(handler, event))
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from .base import Profiler
2+
3+
__all__ = ["Profiler"]
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
import pyinstrument
2+
import logging
3+
4+
from vision_agents.core.events import EventManager
5+
from vision_agents.core.agents import events
6+
7+
logger = logging.getLogger(__name__)
8+
9+
10+
class Profiler:
11+
"""Profiles agent execution using pyinstrument and generates an HTML report.
12+
13+
The profiler automatically starts when instantiated and stops when the agent
14+
finishes (on AgentFinishEvent), saving an HTML performance report to disk.
15+
16+
Example:
17+
agent = Agent(
18+
edge=getstream.Edge(),
19+
agent_user=User(name="Agent", id="agent"),
20+
llm=gemini.LLM("gemini-2.0-flash"),
21+
profiler=Profiler(output_path='./profile.html'),
22+
)
23+
"""
24+
25+
def __init__(self, output_path='./profile.html'):
26+
"""Initialize the profiler.
27+
28+
Args:
29+
output_path: Path where the HTML profile report will be saved.
30+
Defaults to './profile.html'.
31+
"""
32+
self.output_path = output_path
33+
self.events = EventManager()
34+
self.events.register_events_from_module(events)
35+
self.profiler = pyinstrument.Profiler()
36+
self.profiler.start()
37+
self.events.subscribe(self.on_finish)
38+
39+
async def on_finish(self, event: events.AgentFinishEvent):
40+
"""Handle agent finish event by stopping profiler and saving report.
41+
42+
Args:
43+
event: The AgentFinishEvent emitted when the agent finishes.
44+
"""
45+
self.profiler.stop()
46+
logger.info(f"Profiler stopped. Time file saved at: {self.output_path}")
47+
with open(self.output_path, 'w') as f:
48+
f.write(self.profiler.output_html())

examples/01_simple_agent_example/pyproject.toml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ name = "simple-agents-example"
33
version = "0.0.0"
44
requires-python = ">=3.13"
55

6-
# put only what this example needs
6+
# Dependencies that the example actually uses
77
dependencies = [
88
"python-dotenv>=1.0",
99
"vision-agents-plugins-deepgram",
@@ -14,11 +14,13 @@ dependencies = [
1414
"vision-agents-plugins-smart-turn",
1515
"vision-agents-plugins-cartesia",
1616
"vision-agents-plugins-gemini",
17+
"vision-agents-plugins-vogent",
1718
"vision-agents",
1819
"openai>=1.101.0",
1920
"anthropic>=0.66.0",
2021
"google-genai>=1.33.0",
2122
"fal-client>=0.5.3",
23+
"pyinstrument>=5.1.1",
2224
]
2325

2426
[tool.uv.sources]
@@ -30,5 +32,5 @@ dependencies = [
3032
"vision-agents-plugins-smart-turn" = {path = "../../plugins/smart_turn", editable=true}
3133
"vision-agents-plugins-cartesia" = {path = "../../plugins/cartesia", editable=true}
3234
"vision-agents-plugins-gemini" = {path = "../../plugins/gemini", editable=true}
33-
35+
"vision-agents-plugins-vogent" = {path = "../../plugins/vogent", editable=true}
3436
"vision-agents" = {path = "../../agents-core", editable=true}

examples/01_simple_agent_example/simple_agent_example.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@
33
from dotenv import load_dotenv
44

55
from vision_agents.core import User, Agent
6-
from vision_agents.plugins import cartesia, deepgram, getstream, gemini, vogent
6+
from vision_agents.plugins import deepgram, getstream, gemini, vogent, elevenlabs
7+
# from vision_agents.core.profiling import Profiler
78

89
load_dotenv()
910

@@ -20,9 +21,12 @@ async def start_agent() -> None:
2021
processors=[], # processors can fetch extra data, check images/audio data or transform video
2122
# llm with tts & stt. if you use a realtime (sts capable) llm the tts, stt and vad aren't needed
2223
llm=llm,
23-
tts=cartesia.TTS(),
24+
tts=elevenlabs.TTS(),
2425
stt=deepgram.STT(),
2526
turn_detection=vogent.TurnDetection(),
27+
# enable profiler by uncommenting the following line
28+
# profiler=Profiler(),
29+
# vad=silero.VAD(),
2630
# realtime version (vad, tts and stt not needed)
2731
# llm=openai.Realtime()
2832
)

0 commit comments

Comments
 (0)