Skip to content

Commit 11f548a

Browse files
[AI-245] - Low latency for OpenAI realtime (#145)
1 parent 5059336 commit 11f548a

File tree

68 files changed

+1822
-1700
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

68 files changed

+1822
-1700
lines changed

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ __pycache__/
33
*.py[cod]
44
*$py.class
55
*.so
6-
6+
.cursor/*
77
# Distribution / packaging
88
.Python
99
build/

DEVELOPMENT.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -310,16 +310,17 @@ You can now see the metrics at `http://localhost:9464/metrics` (make sure that y
310310
- Audio generated by LLM: The LLM -> TTS can generate a lot of audio. This has to be stopped when interrupt happens
311311
- Gemini & Google generate at what pace?
312312

313-
### Tasks
313+
### Tasks & Async
314314

315315
- Short running tasks should check if the connection is closed before doing work
316316
- Long running tasks are should be cancelled when calling agent.close()
317+
- Examples can be run with --debug to enable blockbuster and debug mode for async
317318

318319
### Video Frames & Tracks
319320

320321
- Track.recv errors will fail silently. The API is to return a frame. Never return None. and wait till the next frame is available
321322
- When using frame.to_ndarray(format="rgb24") specify the format. Typically you want rgb24 when connecting/sending to Yolo etc
322-
323+
- QueuedVideoTrack is a writable/queued video track implementation which is useful when forwarding video
323324

324325
## Onboarding Plan for new contributors
325326

PRODUCTION.md

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
## WIP Guidelines on production deployment
2+
3+
### API
4+
5+
Wrap the launching of agents behind an API.
6+
7+
### OpenAPI
8+
9+
OpenAPI is a nice way to create SDKs for your API for usage in other languages
10+
11+
### Hosting
12+
13+
Edge providers like Stream and AI companies like openAI tend to run in many regions.
14+
A distributed approach where agents are available in many regions is ideal
15+
16+
### Full guides
17+
18+
- Fly
19+
- Digital Ocean
20+
- AWS

agents-core/pyproject.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ classifiers = [
2121

2222
requires-python = ">=3.10"
2323
dependencies = [
24-
"getstream[webrtc,telemetry]>=2.5.7",
24+
"getstream[webrtc,telemetry]>=2.5.8",
2525
"python-dotenv>=1.1.1",
2626
"pillow>=10.4.0", # Compatible with moondream SDK (<11.0.0)
2727
"numpy>=1.24.0",
@@ -87,6 +87,7 @@ include = ["vision_agents"]
8787
# { path = "./vision_agents/core/turn_detection/krisp/krisp_audio-1.4.0-cp313-cp313-linux_x86_64.whl", marker = "sys_platform == 'linux' and platform_machine == 'x86_64'" },
8888
# { path = "./vision_agents/core/turn_detection/krisp/krisp_audio-1.4.0-cp313-cp313-win_amd64.whl", marker = "sys_platform == 'win32'" }
8989
#]
90+
# getstream = { git = "https://github.com/GetStream/stream-py.git", branch = "audio-more" }
9091
# for local development
9192
# getstream = { path = "../../stream-py/", editable = true }
9293
# aiortc = { path = "../stream-py/", editable = true }

agents-core/vision_agents/core/__init__.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,7 @@
22

33
from vision_agents.core.agents import Agent
44

5-
__all__ = ["Agent", "User"]
5+
from vision_agents.core.cli.cli_runner import cli
6+
7+
8+
__all__ = ["Agent", "User", "cli"]

agents-core/vision_agents/core/agents/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,10 @@
66

77
from .agents import Agent as Agent
88
from .conversation import Conversation as Conversation
9+
from .agent_launcher import AgentLauncher as AgentLauncher
910

1011
__all__ = [
1112
"Agent",
1213
"Conversation",
14+
"AgentLauncher",
1315
]
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
"""Agent launcher with warmup support."""
2+
3+
import asyncio
4+
import logging
5+
from typing import Optional, TYPE_CHECKING, Callable, Awaitable, Union, cast
6+
7+
if TYPE_CHECKING:
8+
from .agents import Agent
9+
10+
logger = logging.getLogger(__name__)
11+
12+
class AgentProcess:
13+
"""
14+
Add info here about the thread/process. Enabling warm up to work well in a multiprocess env
15+
"""
16+
pass
17+
18+
class AgentLauncher:
19+
"""
20+
Agent launcher that handles warmup and lifecycle management.
21+
22+
The launcher ensures all components (LLM, TTS, STT, turn detection)
23+
are warmed up before the agent is launched.
24+
"""
25+
26+
def __init__(
27+
self,
28+
create_agent: Callable[..., Union["Agent", Awaitable["Agent"]]],
29+
join_call: Optional[Callable[..., Union[None, Awaitable[None]]]] = None,
30+
):
31+
"""
32+
Initialize the agent launcher.
33+
34+
Args:
35+
create_agent: A function that creates and returns an Agent instance
36+
join_call: Optional function that handles joining a call with the agent
37+
"""
38+
self.create_agent = create_agent
39+
self.join_call = join_call
40+
self._agent: Optional["Agent"] = None
41+
self._warmed_up = False
42+
self._warmup_lock = asyncio.Lock()
43+
44+
async def warmup(self, **kwargs) -> None:
45+
"""
46+
Warm up all agent components.
47+
48+
This method creates the agent and calls warmup on LLM, TTS, STT,
49+
and turn detection components if they exist. It ensures warmup is
50+
only called once.
51+
52+
Args:
53+
**kwargs: Additional keyword arguments to pass to create_agent
54+
"""
55+
async with self._warmup_lock:
56+
if self._warmed_up:
57+
logger.debug("Agent already warmed up, skipping")
58+
return
59+
60+
logger.info("Creating agent...")
61+
62+
# Create the agent
63+
result = self.create_agent(**kwargs)
64+
if asyncio.iscoroutine(result):
65+
agent: "Agent" = await result
66+
else:
67+
agent = cast("Agent", result)
68+
69+
self._agent = agent
70+
71+
logger.info("Warming up agent components...")
72+
73+
# Warmup tasks to run in parallel
74+
warmup_tasks = []
75+
76+
# Warmup LLM (including Realtime)
77+
if agent.llm and hasattr(agent.llm, 'warmup'):
78+
logger.debug("Warming up LLM: %s", agent.llm.__class__.__name__)
79+
warmup_tasks.append(agent.llm.warmup())
80+
81+
# Warmup TTS
82+
if agent.tts and hasattr(agent.tts, 'warmup'):
83+
logger.debug("Warming up TTS: %s", agent.tts.__class__.__name__)
84+
warmup_tasks.append(agent.tts.warmup())
85+
86+
# Warmup STT
87+
if agent.stt and hasattr(agent.stt, 'warmup'):
88+
logger.debug("Warming up STT: %s", agent.stt.__class__.__name__)
89+
warmup_tasks.append(agent.stt.warmup())
90+
91+
# Warmup turn detection
92+
if agent.turn_detection and hasattr(agent.turn_detection, 'warmup'):
93+
logger.debug("Warming up turn detection: %s", agent.turn_detection.__class__.__name__)
94+
warmup_tasks.append(agent.turn_detection.warmup())
95+
96+
# Run all warmups in parallel
97+
if warmup_tasks:
98+
await asyncio.gather(*warmup_tasks)
99+
100+
self._warmed_up = True
101+
logger.info("Agent warmup completed")
102+
103+
async def launch(self, **kwargs) -> "Agent":
104+
"""
105+
Launch the agent with warmup.
106+
107+
This ensures warmup is called before returning the agent.
108+
109+
Args:
110+
**kwargs: Additional keyword arguments to pass to create_agent
111+
112+
Returns:
113+
The warmed-up agent instance
114+
"""
115+
await self.warmup(**kwargs)
116+
assert self._agent is not None, "Agent should be created during warmup"
117+
return self._agent
118+

agents-core/vision_agents/core/agents/agents.py

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ def process(self, msg: str, kwargs):
8585
return "[Agent: %s] | %s" % (self.extra["agent_id"], msg), kwargs
8686
return super(_AgentLoggerAdapter, self).process(msg, kwargs)
8787

88+
8889
# TODO: move me
8990
@dataclass
9091
class AgentOptions:
@@ -100,8 +101,12 @@ def update(self, other: "AgentOptions") -> "AgentOptions":
100101
return AgentOptions(**merged_dict)
101102

102103

104+
# Cache tempdir at module load time to avoid blocking I/O during async operations
105+
_DEFAULT_MODEL_DIR = tempfile.gettempdir()
106+
107+
103108
def default_agent_options():
104-
return AgentOptions(model_dir=tempfile.gettempdir())
109+
return AgentOptions(model_dir=_DEFAULT_MODEL_DIR)
105110

106111

107112
class Agent:
@@ -340,6 +345,7 @@ async def _handle_output_text_delta(event: LLMResponseChunkEvent):
340345

341346
async def _setup_speech_events(self):
342347
self.logger.info("_setup_speech_events")
348+
343349
@self.events.subscribe
344350
async def on_error(event: STTErrorEvent):
345351
self.logger.error("stt error event %s", event)
@@ -408,8 +414,8 @@ async def on_realtime_agent_speech_transcription(
408414

409415
@self.events.subscribe
410416
async def _on_tts_audio_write_to_output(event: TTSAudioEvent):
411-
if self._audio_track and event and event.audio_data is not None:
412-
await self._audio_track.write(event.audio_data)
417+
if self._audio_track and event and event.data is not None:
418+
await self._audio_track.write(event.data)
413419

414420
@self.events.subscribe
415421
async def on_stt_transcript_event_create_response(event: STTTranscriptEvent):
@@ -538,7 +544,6 @@ async def finish(self):
538544
)
539545
return
540546

541-
542547
with self.span("agent.finish"):
543548
# If connection is None or already closed, return immediately
544549
if not self._connection:
@@ -1105,18 +1110,22 @@ async def _on_turn_event(self, event: TurnStartedEvent | TurnEndedEvent) -> None
11051110
except Exception as e:
11061111
self.logger.error(f"Error stopping TTS: {e}")
11071112
else:
1108-
participant_id = event.participant.user_id if event.participant else "unknown"
1113+
participant_id = (
1114+
event.participant.user_id if event.participant else "unknown"
1115+
)
11091116
self.logger.info(
11101117
f"👉 Turn started - participant speaking {participant_id} : {event.confidence}"
11111118
)
11121119
else:
11131120
# Agent itself started speaking - this is normal
1114-
participant_id = event.participant.user_id if event.participant else "unknown"
1115-
self.logger.debug(
1116-
f"👉 Turn started - agent speaking {participant_id}"
1121+
participant_id = (
1122+
event.participant.user_id if event.participant else "unknown"
11171123
)
1124+
self.logger.debug(f"👉 Turn started - agent speaking {participant_id}")
11181125
elif isinstance(event, TurnEndedEvent):
1119-
participant_id = event.participant.user_id if event.participant else "unknown"
1126+
participant_id = (
1127+
event.participant.user_id if event.participant else "unknown"
1128+
)
11201129
self.logger.info(
11211130
f"👉 Turn ended - participant {participant_id} finished (confidence: {event.confidence})"
11221131
)

agents-core/vision_agents/core/cli/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)