Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ __pycache__/
*.py[cod]
*$py.class
*.so

.cursor/*
# Distribution / packaging
.Python
build/
Expand Down
5 changes: 3 additions & 2 deletions DEVELOPMENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -310,16 +310,17 @@ You can now see the metrics at `http://localhost:9464/metrics` (make sure that y
- Audio generated by LLM: The LLM -> TTS can generate a lot of audio. This has to be stopped when interrupt happens
- Gemini & Google generate at what pace?

### Tasks
### Tasks & Async

- Short running tasks should check if the connection is closed before doing work
- Long running tasks are should be cancelled when calling agent.close()
- Examples can be run with --debug to enable blockbuster and debug mode for async

### Video Frames & Tracks

- Track.recv errors will fail silently. The API is to return a frame. Never return None. and wait till the next frame is available
- When using frame.to_ndarray(format="rgb24") specify the format. Typically you want rgb24 when connecting/sending to Yolo etc

- QueuedVideoTrack is a writable/queued video track implementation which is useful when forwarding video

## Onboarding Plan for new contributors

Expand Down
20 changes: 20 additions & 0 deletions PRODUCTION.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
## WIP Guidelines on production deployment

### API

Wrap the launching of agents behind an API.

### OpenAPI

OpenAPI is a nice way to create SDKs for your API for usage in other languages

### Hosting

Edge providers like Stream and AI companies like openAI tend to run in many regions.
A distributed approach where agents are available in many regions is ideal

### Full guides

- Fly
- Digital Ocean
- AWS
3 changes: 2 additions & 1 deletion agents-core/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ classifiers = [

requires-python = ">=3.10"
dependencies = [
"getstream[webrtc,telemetry]>=2.5.7",
"getstream[webrtc,telemetry]>=2.5.8",
"python-dotenv>=1.1.1",
"pillow>=10.4.0", # Compatible with moondream SDK (<11.0.0)
"numpy>=1.24.0",
Expand Down Expand Up @@ -87,6 +87,7 @@ include = ["vision_agents"]
# { path = "./vision_agents/core/turn_detection/krisp/krisp_audio-1.4.0-cp313-cp313-linux_x86_64.whl", marker = "sys_platform == 'linux' and platform_machine == 'x86_64'" },
# { path = "./vision_agents/core/turn_detection/krisp/krisp_audio-1.4.0-cp313-cp313-win_amd64.whl", marker = "sys_platform == 'win32'" }
#]
# getstream = { git = "https://github.com/GetStream/stream-py.git", branch = "audio-more" }
# for local development
# getstream = { path = "../../stream-py/", editable = true }
# aiortc = { path = "../stream-py/", editable = true }
5 changes: 4 additions & 1 deletion agents-core/vision_agents/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,7 @@

from vision_agents.core.agents import Agent

__all__ = ["Agent", "User"]
from vision_agents.core.cli.cli_runner import cli


__all__ = ["Agent", "User", "cli"]
2 changes: 2 additions & 0 deletions agents-core/vision_agents/core/agents/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@

from .agents import Agent as Agent
from .conversation import Conversation as Conversation
from .agent_launcher import AgentLauncher as AgentLauncher

__all__ = [
"Agent",
"Conversation",
"AgentLauncher",
]
118 changes: 118 additions & 0 deletions agents-core/vision_agents/core/agents/agent_launcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
"""Agent launcher with warmup support."""

import asyncio
import logging
from typing import Optional, TYPE_CHECKING, Callable, Awaitable, Union, cast

if TYPE_CHECKING:
from .agents import Agent

logger = logging.getLogger(__name__)

class AgentProcess:
"""
Add info here about the thread/process. Enabling warm up to work well in a multiprocess env
"""
pass

class AgentLauncher:
"""
Agent launcher that handles warmup and lifecycle management.

The launcher ensures all components (LLM, TTS, STT, turn detection)
are warmed up before the agent is launched.
"""

def __init__(
self,
create_agent: Callable[..., Union["Agent", Awaitable["Agent"]]],
join_call: Optional[Callable[..., Union[None, Awaitable[None]]]] = None,
):
"""
Initialize the agent launcher.

Args:
create_agent: A function that creates and returns an Agent instance
join_call: Optional function that handles joining a call with the agent
"""
self.create_agent = create_agent
self.join_call = join_call
self._agent: Optional["Agent"] = None
self._warmed_up = False
self._warmup_lock = asyncio.Lock()

async def warmup(self, **kwargs) -> None:
"""
Warm up all agent components.

This method creates the agent and calls warmup on LLM, TTS, STT,
and turn detection components if they exist. It ensures warmup is
only called once.

Args:
**kwargs: Additional keyword arguments to pass to create_agent
"""
async with self._warmup_lock:
if self._warmed_up:
logger.debug("Agent already warmed up, skipping")
return

logger.info("Creating agent...")

# Create the agent
result = self.create_agent(**kwargs)
if asyncio.iscoroutine(result):
agent: "Agent" = await result
else:
agent = cast("Agent", result)

self._agent = agent

logger.info("Warming up agent components...")

# Warmup tasks to run in parallel
warmup_tasks = []

# Warmup LLM (including Realtime)
if agent.llm and hasattr(agent.llm, 'warmup'):
logger.debug("Warming up LLM: %s", agent.llm.__class__.__name__)
warmup_tasks.append(agent.llm.warmup())

# Warmup TTS
if agent.tts and hasattr(agent.tts, 'warmup'):
logger.debug("Warming up TTS: %s", agent.tts.__class__.__name__)
warmup_tasks.append(agent.tts.warmup())

# Warmup STT
if agent.stt and hasattr(agent.stt, 'warmup'):
logger.debug("Warming up STT: %s", agent.stt.__class__.__name__)
warmup_tasks.append(agent.stt.warmup())

# Warmup turn detection
if agent.turn_detection and hasattr(agent.turn_detection, 'warmup'):
logger.debug("Warming up turn detection: %s", agent.turn_detection.__class__.__name__)
warmup_tasks.append(agent.turn_detection.warmup())

# Run all warmups in parallel
if warmup_tasks:
await asyncio.gather(*warmup_tasks)

self._warmed_up = True
logger.info("Agent warmup completed")

async def launch(self, **kwargs) -> "Agent":
"""
Launch the agent with warmup.

This ensures warmup is called before returning the agent.

Args:
**kwargs: Additional keyword arguments to pass to create_agent

Returns:
The warmed-up agent instance
"""
await self.warmup(**kwargs)
assert self._agent is not None, "Agent should be created during warmup"
return self._agent

27 changes: 18 additions & 9 deletions agents-core/vision_agents/core/agents/agents.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ def process(self, msg: str, kwargs):
return "[Agent: %s] | %s" % (self.extra["agent_id"], msg), kwargs
return super(_AgentLoggerAdapter, self).process(msg, kwargs)


# TODO: move me
@dataclass
class AgentOptions:
Expand All @@ -100,8 +101,12 @@ def update(self, other: "AgentOptions") -> "AgentOptions":
return AgentOptions(**merged_dict)


# Cache tempdir at module load time to avoid blocking I/O during async operations
_DEFAULT_MODEL_DIR = tempfile.gettempdir()


def default_agent_options():
return AgentOptions(model_dir=tempfile.gettempdir())
return AgentOptions(model_dir=_DEFAULT_MODEL_DIR)


class Agent:
Expand Down Expand Up @@ -340,6 +345,7 @@ async def _handle_output_text_delta(event: LLMResponseChunkEvent):

async def _setup_speech_events(self):
self.logger.info("_setup_speech_events")

@self.events.subscribe
async def on_error(event: STTErrorEvent):
self.logger.error("stt error event %s", event)
Expand Down Expand Up @@ -408,8 +414,8 @@ async def on_realtime_agent_speech_transcription(

@self.events.subscribe
async def _on_tts_audio_write_to_output(event: TTSAudioEvent):
if self._audio_track and event and event.audio_data is not None:
await self._audio_track.write(event.audio_data)
if self._audio_track and event and event.data is not None:
await self._audio_track.write(event.data)

@self.events.subscribe
async def on_stt_transcript_event_create_response(event: STTTranscriptEvent):
Expand Down Expand Up @@ -538,7 +544,6 @@ async def finish(self):
)
return


with self.span("agent.finish"):
# If connection is None or already closed, return immediately
if not self._connection:
Expand Down Expand Up @@ -1105,18 +1110,22 @@ async def _on_turn_event(self, event: TurnStartedEvent | TurnEndedEvent) -> None
except Exception as e:
self.logger.error(f"Error stopping TTS: {e}")
else:
participant_id = event.participant.user_id if event.participant else "unknown"
participant_id = (
event.participant.user_id if event.participant else "unknown"
)
self.logger.info(
f"👉 Turn started - participant speaking {participant_id} : {event.confidence}"
)
else:
# Agent itself started speaking - this is normal
participant_id = event.participant.user_id if event.participant else "unknown"
self.logger.debug(
f"👉 Turn started - agent speaking {participant_id}"
participant_id = (
event.participant.user_id if event.participant else "unknown"
)
self.logger.debug(f"👉 Turn started - agent speaking {participant_id}")
elif isinstance(event, TurnEndedEvent):
participant_id = event.participant.user_id if event.participant else "unknown"
participant_id = (
event.participant.user_id if event.participant else "unknown"
)
self.logger.info(
f"👉 Turn ended - participant {participant_id} finished (confidence: {event.confidence})"
)
Expand Down
Empty file.
Loading
Loading