diff --git a/agents-core/pyproject.toml b/agents-core/pyproject.toml index 3ad9f92b..77b19a5a 100644 --- a/agents-core/pyproject.toml +++ b/agents-core/pyproject.toml @@ -42,6 +42,7 @@ deepgram = ["vision-agents-plugins-deepgram"] elevenlabs = ["vision-agents-plugins-elevenlabs"] gemini = ["vision-agents-plugins-gemini"] getstream = ["vision-agents-plugins-getstream"] +heygen = ["vision-agents-plugins-heygen"] kokoro = ["vision-agents-plugins-kokoro"] krisp = ["vision-agents-plugins-krisp"] moonshine = ["vision-agents-plugins-moonshine"] @@ -57,6 +58,7 @@ all-plugins = [ "vision-agents-plugins-elevenlabs", "vision-agents-plugins-gemini", "vision-agents-plugins-getstream", + "vision-agents-plugins-heygen", "vision-agents-plugins-kokoro", "vision-agents-plugins-krisp", "vision-agents-plugins-moonshine", diff --git a/agents-core/vision_agents/core/agents/agents.py b/agents-core/vision_agents/core/agents/agents.py index 39f902e1..30c17553 100644 --- a/agents-core/vision_agents/core/agents/agents.py +++ b/agents-core/vision_agents/core/agents/agents.py @@ -220,6 +220,11 @@ def __init__( self.llm._attach_agent(self) + # Attach processors that need agent reference + for processor in self.processors: + if hasattr(processor, '_attach_agent'): + processor._attach_agent(self) + self.events.subscribe(self._on_vad_audio) self.events.subscribe(self._on_agent_say) # Initialize state variables @@ -1176,10 +1181,13 @@ def publish_audio(self) -> bool: """Whether the agent should publish an outbound audio track. Returns: - True if TTS is configured or when in Realtime mode. + True if TTS is configured, when in Realtime mode, or if there are audio publishers. """ if self.tts is not None or self.realtime_mode: return True + # Also publish audio if there are audio publishers (e.g., HeyGen avatar) + if self.audio_publishers: + return True return False @property @@ -1305,6 +1313,11 @@ def _prepare_rtc(self): if self.realtime_mode and isinstance(self.llm, Realtime): self._audio_track = self.llm.output_track self.logger.info("🎵 Using Realtime provider output track for audio") + elif self.audio_publishers: + # Get the first audio publisher to create the track + audio_publisher = self.audio_publishers[0] + self._audio_track = audio_publisher.publish_audio_track() + self.logger.info("🎵 Audio track initialized from audio publisher") else: # Default to WebRTC-friendly format unless configured differently framerate = 48000 diff --git a/aiortc b/aiortc deleted file mode 160000 index f84800ce..00000000 --- a/aiortc +++ /dev/null @@ -1 +0,0 @@ -Subproject commit f84800ce052de7d81a62b07c6f6094504c19b65f diff --git a/plugins/aws/example/uv.lock b/plugins/aws/example/uv.lock index fad869b7..5c0123ac 100644 --- a/plugins/aws/example/uv.lock +++ b/plugins/aws/example/uv.lock @@ -2648,6 +2648,8 @@ requires-dist = [ { name = "vision-agents-plugins-gemini", marker = "extra == 'gemini'", editable = "../../gemini" }, { name = "vision-agents-plugins-getstream", marker = "extra == 'all-plugins'", editable = "../../getstream" }, { name = "vision-agents-plugins-getstream", marker = "extra == 'getstream'", editable = "../../getstream" }, + { name = "vision-agents-plugins-heygen", marker = "extra == 'all-plugins'", editable = "../../heygen" }, + { name = "vision-agents-plugins-heygen", marker = "extra == 'heygen'", editable = "../../heygen" }, { name = "vision-agents-plugins-kokoro", marker = "extra == 'all-plugins'", editable = "../../kokoro" }, { name = "vision-agents-plugins-kokoro", marker = "extra == 'kokoro'", editable = "../../kokoro" }, { name = "vision-agents-plugins-krisp", marker = "extra == 'all-plugins'", editable = "../../krisp" }, @@ -2665,7 +2667,7 @@ requires-dist = [ { name = "vision-agents-plugins-xai", marker = "extra == 'all-plugins'", editable = "../../xai" }, { name = "vision-agents-plugins-xai", marker = "extra == 'xai'", editable = "../../xai" }, ] -provides-extras = ["all-plugins", "anthropic", "cartesia", "deepgram", "dev", "elevenlabs", "gemini", "getstream", "kokoro", "krisp", "moonshine", "openai", "smart-turn", "ultralytics", "wizper", "xai"] +provides-extras = ["all-plugins", "anthropic", "cartesia", "deepgram", "dev", "elevenlabs", "gemini", "getstream", "heygen", "kokoro", "krisp", "moonshine", "openai", "smart-turn", "ultralytics", "wizper", "xai"] [[package]] name = "vision-agents-plugins-aws" diff --git a/plugins/gemini/vision_agents/plugins/gemini/gemini_realtime.py b/plugins/gemini/vision_agents/plugins/gemini/gemini_realtime.py index 3106e596..14d54713 100644 --- a/plugins/gemini/vision_agents/plugins/gemini/gemini_realtime.py +++ b/plugins/gemini/vision_agents/plugins/gemini/gemini_realtime.py @@ -152,7 +152,7 @@ async def simple_audio_response( audio_bytes = pcm.resample( target_sample_rate=16000, target_channels=1 ).samples.tobytes() - mime = f"audio/pcm;rate=16000" + mime = "audio/pcm;rate=16000" blob = Blob(data=audio_bytes, mime_type=mime) await self._require_session().send_realtime_input(audio=blob) diff --git a/plugins/heygen/README.md b/plugins/heygen/README.md new file mode 100644 index 00000000..f8b53b77 --- /dev/null +++ b/plugins/heygen/README.md @@ -0,0 +1,189 @@ +# HeyGen Avatar Plugin for Vision Agents + +Add realistic avatar video to your AI agents using HeyGen's streaming avatar API. + +## Features + +- 🎭 **Realistic Avatars**: Use HeyGen's high-quality avatars with natural movements +- 🎤 **Automatic Lip-Sync**: Avatar automatically syncs with audio from any TTS provider +- 🚀 **WebRTC Streaming**: Low-latency real-time video streaming via WebRTC +- 🔌 **Easy Integration**: Works seamlessly with Vision Agents framework +- 🎨 **Customizable**: Configure avatar, quality, resolution, and more + +## Installation + +```bash +pip install vision-agents-plugins-heygen +``` + +Or with uv: + +```bash +uv pip install vision-agents-plugins-heygen +``` + +## Quick Start + +```python +import asyncio +from uuid import uuid4 +from dotenv import load_dotenv + +from vision_agents.core import User, Agent +from vision_agents.plugins import cartesia, deepgram, getstream, gemini, heygen +from vision_agents.plugins.heygen import VideoQuality + +load_dotenv() + +async def start_avatar_agent(): + agent = Agent( + edge=getstream.Edge(), + agent_user=User(name="AI Assistant with Avatar", id="agent"), + instructions="You're a friendly AI assistant.", + + llm=gemini.LLM("gemini-2.0-flash"), + tts=cartesia.TTS(), + stt=deepgram.STT(), + + # Add HeyGen avatar + processors=[ + heygen.AvatarPublisher( + avatar_id="default", + quality=VideoQuality.HIGH + ) + ] + ) + + call = agent.edge.client.video.call("default", str(uuid4())) + + with await agent.join(call): + await agent.edge.open_demo(call) + await agent.simple_response("Hello! I'm your AI assistant with an avatar.") + await agent.finish() + +if __name__ == "__main__": + asyncio.run(start_avatar_agent()) +``` + +## Configuration + +### Environment Variables + +Set your HeyGen API key: + +```bash +HEYGEN_API_KEY=your_heygen_api_key_here +``` + +### AvatarPublisher Options + +```python +from vision_agents.plugins.heygen import VideoQuality + +heygen.AvatarPublisher( + avatar_id="default", # HeyGen avatar ID + quality=VideoQuality.HIGH, # Video quality: VideoQuality.LOW, VideoQuality.MEDIUM, or VideoQuality.HIGH + resolution=(1920, 1080), # Output resolution (width, height) + api_key=None, # Optional: override env var +) +``` + +## Usage Examples + +### With Realtime LLM + +```python +from vision_agents.plugins import gemini, heygen, getstream + +agent = Agent( + edge=getstream.Edge(), + agent_user=User(name="Realtime Avatar AI"), + instructions="Be conversational and responsive.", + + llm=gemini.Realtime(fps=2), # No separate TTS needed + + processors=[ + heygen.AvatarPublisher(avatar_id="professional_presenter") + ] +) + +call = agent.edge.client.video.call("default", str(uuid4())) + +with await agent.join(call): + await agent.finish() +``` + +### With Multiple Processors + +```python +from vision_agents.plugins import ultralytics, heygen + +agent = Agent( + edge=getstream.Edge(), + agent_user=User(name="Fitness Coach"), + instructions="Analyze user poses and provide feedback.", + + llm=gemini.Realtime(fps=3), + + processors=[ + # Process incoming user video + ultralytics.YOLOPoseProcessor(model_path="yolo11n-pose.pt"), + # Publish avatar video + heygen.AvatarPublisher(avatar_id="fitness_trainer") + ] +) +``` + +## How It Works + +1. **Connection**: Establishes WebRTC connection to HeyGen's streaming API +2. **Audio Input**: Receives audio from your TTS provider or Realtime LLM +3. **Avatar Generation**: HeyGen generates avatar video with lip-sync +4. **Video Streaming**: Streams avatar video to call participants via GetStream Edge + +## Requirements + +- Python 3.10+ +- HeyGen API key (get one at [heygen.com](https://heygen.com)) +- GetStream account for video calls +- TTS provider (Cartesia, ElevenLabs, etc.) or Realtime LLM + +## Troubleshooting + +### Connection Issues + +If you experience connection problems: + +1. Check your HeyGen API key is valid +2. Ensure you have network access to HeyGen's servers +3. Check firewall settings for WebRTC traffic + +### Video Quality + +To optimize video quality: + +- Use `quality=VideoQuality.HIGH` for best results +- Increase resolution if bandwidth allows +- Ensure stable internet connection + +## API Reference + +### AvatarPublisher + +Main class for publishing HeyGen avatar video. + +**Methods:** +- `publish_video_track()`: Returns video track for streaming +- `state()`: Returns current state information +- `close()`: Clean up resources + +## License + +MIT + +## Links + +- [Documentation](https://visionagents.ai/) +- [GitHub](https://github.com/GetStream/Vision-Agents) +- [HeyGen API Docs](https://docs.heygen.com/docs/streaming-api) + diff --git a/plugins/heygen/example/README.md b/plugins/heygen/example/README.md new file mode 100644 index 00000000..a9206171 --- /dev/null +++ b/plugins/heygen/example/README.md @@ -0,0 +1,192 @@ +# HeyGen Avatar Examples + +This directory contains examples of how to use the HeyGen plugin to add realistic avatar video to your AI agent. + +## Examples + +### 1. Standard Streaming LLM (`avatar_example.py`) + +Uses a standard streaming LLM (Gemini) with separate TTS/STT components. Best for traditional text-based LLMs. + +### 2. Realtime LLM (`avatar_realtime_example.py`) + +Uses Gemini Realtime with native audio input/output. The avatar lip-syncs to the transcribed text while Gemini handles voice processing. + +## Setup + +1. **Install dependencies:** + +```bash +cd plugins/heygen/example +uv pip install -e . +``` + +2. **Configure environment variables:** + +Copy `.env.example` to `.env` and fill in your API keys: + +```bash +cp .env.example .env +``` + +**For Standard Example** (`avatar_example.py`): +- `HEYGEN_API_KEY` - Get from [HeyGen](https://heygen.com) +- `STREAM_API_KEY` and `STREAM_SECRET` - Get from [GetStream](https://getstream.io) +- `CARTESIA_API_KEY` - Get from [Cartesia](https://cartesia.ai) +- `DEEPGRAM_API_KEY` - Get from [Deepgram](https://deepgram.com) +- `GOOGLE_API_KEY` - Get from [Google AI Studio](https://makersuite.google.com/app/apikey) + +**For Realtime Example** (`avatar_realtime_example.py`): +- `HEYGEN_API_KEY` - Get from [HeyGen](https://heygen.com) +- `STREAM_API_KEY` and `STREAM_SECRET` - Get from [GetStream](https://getstream.io) +- `GOOGLE_API_KEY` - Get from [Google AI Studio](https://makersuite.google.com/app/apikey) + +## Running the Examples + +From the project root: + +**Standard Streaming LLM:** +```bash +uv run plugins/heygen/example/avatar_example.py +``` + +**Realtime LLM:** +```bash +uv run plugins/heygen/example/avatar_realtime_example.py +``` + +Both will: +1. Start an AI agent with a HeyGen avatar +2. Open a demo UI in your browser +3. The avatar will speak and be ready to chat + +## How It Works + +### Standard Streaming LLM (`avatar_example.py`) + +1. **Agent Setup**: The agent is configured with: + - Gemini LLM for generating responses + - Cartesia TTS for speech synthesis + - Deepgram STT for speech recognition + - HeyGen AvatarPublisher for avatar video + +2. **Avatar Streaming**: When the agent speaks: + - Text is generated by Gemini LLM + - Text is sent to HeyGen for lip-sync + - Audio is synthesized by Cartesia TTS + - HeyGen generates avatar video with lip-sync + - Avatar video and audio are streamed to the call + +3. **User Interaction**: When you speak: + - Audio is captured from your microphone + - Transcribed to text by Deepgram + - Sent to Gemini LLM for processing + - Response is generated and spoken through the avatar + +### Realtime LLM (`avatar_realtime_example.py`) + +1. **Agent Setup**: The agent is configured with: + - Gemini Realtime for native audio processing + - HeyGen AvatarPublisher for avatar video + +2. **Avatar Streaming**: When the agent speaks: + - Gemini Realtime generates audio directly (24kHz PCM) + - Text transcription is sent to HeyGen for lip-sync + - HeyGen generates avatar video with lip-sync + - Gemini's audio is used (HeyGen audio is not forwarded for Realtime LLMs) + - Avatar video and Gemini audio are streamed to the call + +3. **User Interaction**: When you speak: + - Audio is captured and sent directly to Gemini Realtime + - Gemini processes audio natively (no separate STT needed) + - Response is generated and spoken through the avatar + +## Customization + +### Using a Different Avatar + +Get your avatar ID from HeyGen dashboard and update: + +```python +from vision_agents.plugins.heygen import VideoQuality + +heygen.AvatarPublisher( + avatar_id="your_avatar_id_here", + quality=VideoQuality.HIGH +) +``` + +### Adjusting Video Quality + +Choose quality based on your bandwidth: + +```python +from vision_agents.plugins.heygen import VideoQuality + +heygen.AvatarPublisher( + avatar_id="default", + quality=VideoQuality.LOW, # Options: VideoQuality.LOW, VideoQuality.MEDIUM, or VideoQuality.HIGH + resolution=(1280, 720) # Lower resolution for better performance +) +``` + +### Using a Different LLM + +**With Standard Streaming LLM:** +```python +from vision_agents.plugins import openai, elevenlabs + +agent = Agent( + edge=getstream.Edge(), + agent_user=User(name="Avatar AI"), + instructions="Your instructions here", + llm=openai.LLM("gpt-4"), + tts=elevenlabs.TTS(), + stt=deepgram.STT(), + processors=[ + heygen.AvatarPublisher(avatar_id="default") + ] +) +``` + +**With Realtime LLM:** +```python +from vision_agents.plugins import openai + +agent = Agent( + edge=getstream.Edge(), + agent_user=User(name="Avatar AI"), + instructions="Your instructions here", + llm=openai.Realtime(model="gpt-4o-realtime-preview"), + processors=[ + heygen.AvatarPublisher( + avatar_id="default" + ) + ] +) +``` + +## Troubleshooting + +### "HeyGen API key required" Error + +Make sure `HEYGEN_API_KEY` is set in your `.env` file. + +### Connection Timeout + +- Check your internet connection +- Verify HeyGen API key is valid +- Ensure firewall allows WebRTC traffic + +### No Video Appearing + +- Check browser console for errors +- Verify GetStream credentials are correct +- Try lowering video quality settings + +## Learn More + +- [HeyGen API Documentation](https://docs.heygen.com/docs/streaming-api) +- [Vision Agents Documentation](https://visionagents.ai/) +- [GetStream Video Documentation](https://getstream.io/video/docs/) + diff --git a/plugins/heygen/example/__init__.py b/plugins/heygen/example/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/plugins/heygen/example/avatar_example.py b/plugins/heygen/example/avatar_example.py new file mode 100644 index 00000000..a07e77cd --- /dev/null +++ b/plugins/heygen/example/avatar_example.py @@ -0,0 +1,68 @@ +import asyncio +from uuid import uuid4 +from dotenv import load_dotenv + +from vision_agents.core import User, Agent +from vision_agents.plugins import getstream, gemini, heygen, deepgram +from vision_agents.plugins.heygen import VideoQuality + +load_dotenv() + + +async def start_avatar_agent() -> None: + """Start an agent with HeyGen avatar using streaming LLM. + + This example demonstrates how to use HeyGen's avatar streaming + with a regular streaming LLM. This approach has much lower latency + than using Realtime LLMs because text goes directly to HeyGen + without any transcription round-trip. + + HeyGen handles all TTS and lip-sync based on the LLM's text output. + """ + + # Create agent with HeyGen avatar and streaming LLM + agent = Agent( + edge=getstream.Edge(), + agent_user=User( + name="AI Assistant with Avatar", + id="agent" + ), + instructions=( + "You're a friendly and helpful AI assistant. " + "Keep your responses conversational and engaging. " + "Don't use special characters or formatting." + ), + + # Use regular streaming LLM (not Realtime) for lower latency + llm=gemini.LLM("gemini-2.0-flash-exp"), + + # Add STT for speech input + stt=deepgram.STT(), + + # Add HeyGen avatar as a video publisher + # Note: mute_llm_audio is not needed since streaming LLM doesn't produce audio + processors=[ + heygen.AvatarPublisher( + avatar_id="default", # Use your HeyGen avatar ID + quality=VideoQuality.HIGH, # Video quality: VideoQuality.LOW, VideoQuality.MEDIUM, or VideoQuality.HIGH + resolution=(1920, 1080), # Output resolution + mute_llm_audio=False, # Not needed for streaming LLM + ) + ] + ) + + # Create a call + call = agent.edge.client.video.call("default", str(uuid4())) + + # Join the call + with await agent.join(call): + # Open demo UI + await agent.edge.open_demo(call) + + # Keep the call running + await agent.finish() + + +if __name__ == "__main__": + asyncio.run(start_avatar_agent()) + diff --git a/plugins/heygen/example/avatar_realtime_example.py b/plugins/heygen/example/avatar_realtime_example.py new file mode 100644 index 00000000..a851064b --- /dev/null +++ b/plugins/heygen/example/avatar_realtime_example.py @@ -0,0 +1,66 @@ +import asyncio +import logging +from uuid import uuid4 + +from dotenv import load_dotenv + +from vision_agents.core import User, Agent +from vision_agents.plugins import getstream, gemini, heygen +from vision_agents.plugins.heygen import VideoQuality + +load_dotenv() + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", +) + + +async def start_avatar_agent() -> None: + """Start a HeyGen avatar agent with Gemini Realtime LLM. + + This example demonstrates using a HeyGen avatar with a Realtime LLM. + HeyGen provides the lip-synced avatar video based on text transcriptions, + while Gemini Realtime provides the audio directly. + """ + + # Create agent with Gemini Realtime and HeyGen avatar + agent = Agent( + edge=getstream.Edge(), + agent_user=User(name="Avatar AI Assistant"), + instructions=( + "You are a helpful AI assistant with a virtual avatar. " + "Keep responses conversational and natural. " + "Be friendly and engaging." + ), + llm=gemini.Realtime( + model="gemini-2.5-flash-native-audio-preview-09-2025" + ), + processors=[ + heygen.AvatarPublisher( + avatar_id="default", + quality=VideoQuality.HIGH, + ) + ], + ) + + # Create a call + call = agent.edge.client.video.call("default", str(uuid4())) + + # Join call first + with await agent.join(call): + # Open demo UI after joining + await agent.edge.open_demo(call) + + # Start the conversation + await agent.llm.simple_response( + text="Hello! I'm your AI assistant. How can I help you today?" + ) + + # Keep running until the call ends + await agent.finish() + + +if __name__ == "__main__": + asyncio.run(start_avatar_agent()) + diff --git a/plugins/heygen/example/pyproject.toml b/plugins/heygen/example/pyproject.toml new file mode 100644 index 00000000..ffdd3922 --- /dev/null +++ b/plugins/heygen/example/pyproject.toml @@ -0,0 +1,21 @@ +[project] +name = "heygen-avatar-example" +version = "0.1.0" +description = "Example using HeyGen avatar with Vision Agents" +requires-python = ">=3.10" +dependencies = [ + "vision-agents", + "vision-agents-plugins-heygen", + "vision-agents-plugins-gemini", + "vision-agents-plugins-getstream", + "vision-agents-plugins-deepgram", + "python-dotenv", +] + +[tool.uv.sources] +vision-agents = { workspace = true } +vision-agents-plugins-heygen = { workspace = true } +vision-agents-plugins-gemini = { workspace = true } +vision-agents-plugins-getstream = { workspace = true } +vision-agents-plugins-deepgram = { workspace = true } + diff --git a/plugins/heygen/py.typed b/plugins/heygen/py.typed new file mode 100644 index 00000000..e69de29b diff --git a/plugins/heygen/pyproject.toml b/plugins/heygen/pyproject.toml new file mode 100644 index 00000000..b152460d --- /dev/null +++ b/plugins/heygen/pyproject.toml @@ -0,0 +1,41 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "vision-agents-plugins-heygen" +version = "0.1.0" +description = "HeyGen avatar plugin for Vision Agents" +readme = "README.md" +requires-python = ">=3.10" +license = "MIT" +dependencies = [ + "vision-agents", + "aiortc>=1.9.0", + "aiohttp>=3.9.0", +] + +[project.urls] +Documentation = "https://visionagents.ai/" +Website = "https://visionagents.ai/" +Source = "https://github.com/GetStream/Vision-Agents" + +[tool.hatch.version] +source = "vcs" +raw-options = { root = "..", search_parent_directories = true, fallback_version = "0.0.0" } + +[tool.hatch.build.targets.wheel] +packages = ["vision_agents"] + +[tool.hatch.build.targets.sdist] +include = ["/vision_agents"] + +[tool.uv.sources] +vision-agents = { workspace = true } + +[dependency-groups] +dev = [ + "pytest>=8.4.1", + "pytest-asyncio>=1.0.0", +] + diff --git a/plugins/heygen/tests/__init__.py b/plugins/heygen/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/plugins/heygen/tests/test_heygen_plugin.py b/plugins/heygen/tests/test_heygen_plugin.py new file mode 100644 index 00000000..d3aa6c2c --- /dev/null +++ b/plugins/heygen/tests/test_heygen_plugin.py @@ -0,0 +1,123 @@ +import pytest +from unittest.mock import patch +from vision_agents.plugins.heygen import AvatarPublisher, VideoQuality +from vision_agents.plugins.heygen.heygen_video_track import HeyGenVideoTrack +from vision_agents.plugins.heygen.heygen_rtc_manager import HeyGenRTCManager +from vision_agents.plugins.heygen.heygen_session import HeyGenSession + + +class TestHeyGenSession: + """Tests for HeyGenSession.""" + + def test_init_with_api_key(self): + """Test initialization with explicit API key.""" + session = HeyGenSession( + avatar_id="test_avatar", + quality=VideoQuality.HIGH, + api_key="test_key", + ) + + assert session.avatar_id == "test_avatar" + assert session.quality == VideoQuality.HIGH + assert session.api_key == "test_key" + + def test_init_without_api_key_raises(self): + """Test initialization without API key raises error.""" + with patch.dict("os.environ", {}, clear=True): + with pytest.raises(ValueError, match="HeyGen API key required"): + HeyGenSession(avatar_id="test_avatar") + + +class TestHeyGenVideoTrack: + """Tests for HeyGenVideoTrack.""" + + def test_init(self): + """Test video track initialization.""" + track = HeyGenVideoTrack(width=1920, height=1080) + + assert track.width == 1920 + assert track.height == 1080 + assert not track._stopped + + def test_stop(self): + """Test stopping the video track.""" + track = HeyGenVideoTrack() + track.stop() + + assert track._stopped + + +class TestHeyGenRTCManager: + """Tests for HeyGenRTCManager.""" + + def test_init(self): + """Test RTC manager initialization.""" + with patch.object(HeyGenSession, "__init__", return_value=None): + manager = HeyGenRTCManager( + avatar_id="test_avatar", + quality=VideoQuality.MEDIUM, + api_key="test_key", + ) + + assert manager.pc is None + assert not manager._connected + + def test_is_connected_property(self): + """Test is_connected property.""" + with patch.object(HeyGenSession, "__init__", return_value=None): + manager = HeyGenRTCManager(api_key="test_key") + + assert not manager.is_connected + + manager._connected = True + assert manager.is_connected + + +class TestAvatarPublisher: + """Tests for AvatarPublisher.""" + + def test_init(self): + """Test avatar publisher initialization.""" + with patch.object(HeyGenRTCManager, "__init__", return_value=None): + publisher = AvatarPublisher( + avatar_id="test_avatar", + quality=VideoQuality.HIGH, + resolution=(1920, 1080), + api_key="test_key", + ) + + assert publisher.avatar_id == "test_avatar" + assert publisher.quality == VideoQuality.HIGH + assert publisher.resolution == (1920, 1080) + assert not publisher._connected + + def test_publish_video_track(self): + """Test publishing video track.""" + with patch.object(HeyGenRTCManager, "__init__", return_value=None): + publisher = AvatarPublisher(api_key="test_key") + # Set _connected to True to avoid creating async task + publisher._connected = True + publisher._connection_task = None + + track = publisher.publish_video_track() + + assert isinstance(track, HeyGenVideoTrack) + + def test_state(self): + """Test state method.""" + with patch.object(HeyGenRTCManager, "__init__", return_value=None): + publisher = AvatarPublisher( + avatar_id="test_avatar", + quality=VideoQuality.MEDIUM, + api_key="test_key", + ) + # Mock the _connected attribute on the RTC manager + publisher.rtc_manager._connected = False + + state = publisher.state() + + assert state["avatar_id"] == "test_avatar" + assert state["quality"] == VideoQuality.MEDIUM + assert "connected" in state + assert "rtc_connected" in state + diff --git a/plugins/heygen/vision_agents/plugins/heygen/__init__.py b/plugins/heygen/vision_agents/plugins/heygen/__init__.py new file mode 100644 index 00000000..e5dd68f6 --- /dev/null +++ b/plugins/heygen/vision_agents/plugins/heygen/__init__.py @@ -0,0 +1,14 @@ +"""HeyGen avatar plugin for Vision Agents. + +This plugin provides HeyGen's interactive avatar streaming capabilities, +allowing AI agents to have realistic avatar video output with lip-sync. +""" + +from .heygen_avatar_publisher import AvatarPublisher +from .heygen_types import VideoQuality + +__all__ = [ + "AvatarPublisher", + "VideoQuality", +] + diff --git a/plugins/heygen/vision_agents/plugins/heygen/heygen_avatar_publisher.py b/plugins/heygen/vision_agents/plugins/heygen/heygen_avatar_publisher.py new file mode 100644 index 00000000..7fa18c26 --- /dev/null +++ b/plugins/heygen/vision_agents/plugins/heygen/heygen_avatar_publisher.py @@ -0,0 +1,393 @@ +import asyncio +import logging +from typing import Optional, Any, Tuple + +from getstream.video.rtc import audio_track + +from vision_agents.core.processors.base_processor import ( + AudioVideoProcessor, + VideoPublisherMixin, + AudioPublisherMixin, +) + +from .heygen_rtc_manager import HeyGenRTCManager +from .heygen_types import VideoQuality +from .heygen_video_track import HeyGenVideoTrack + +logger = logging.getLogger(__name__) + + +class AvatarPublisher(AudioVideoProcessor, VideoPublisherMixin, AudioPublisherMixin): + """HeyGen avatar video and audio publisher. + + Publishes video of a HeyGen avatar that lip-syncs based on LLM text output. + + For standard LLMs: HeyGen provides both video and audio (with TTS). + For Realtime LLMs: HeyGen provides video only; LLM provides audio. + + Example: + agent = Agent( + edge=getstream.Edge(), + agent_user=User(name="Avatar AI"), + instructions="Be helpful and friendly", + llm=gemini.LLM("gemini-2.0-flash"), + stt=deepgram.STT(), + processors=[ + heygen.AvatarPublisher( + avatar_id="default", + quality=heygen.VideoQuality.HIGH + ) + ] + ) + """ + + def __init__( + self, + avatar_id: str = "default", + quality: VideoQuality = VideoQuality.HIGH, + resolution: Tuple[int, int] = (1920, 1080), + api_key: Optional[str] = None, + interval: int = 0, + **kwargs, + ): + """Initialize the HeyGen avatar publisher. + + Args: + avatar_id: HeyGen avatar ID to use for streaming. + quality: Video quality (VideoQuality.LOW, VideoQuality.MEDIUM, or VideoQuality.HIGH). + resolution: Output video resolution (width, height). + api_key: HeyGen API key. Uses HEYGEN_API_KEY env var if not provided. + interval: Processing interval (not used, kept for compatibility). + **kwargs: Additional arguments passed to parent class. + """ + super().__init__( + interval=interval, + receive_audio=False, # We send text to HeyGen, not audio + receive_video=False, + **kwargs + ) + + self.avatar_id = avatar_id + self.quality = quality + self.resolution = resolution + self.api_key = api_key + + # WebRTC manager for HeyGen connection + self.rtc_manager = HeyGenRTCManager( + avatar_id=avatar_id, + quality=quality, + api_key=api_key, + ) + + # Video track for publishing avatar frames + self._video_track = HeyGenVideoTrack( + width=resolution[0], + height=resolution[1], + ) + + # Audio track for publishing HeyGen's audio + # Create it immediately so the agent can detect it during initialization + self._audio_track = audio_track.AudioStreamTrack( + sample_rate=48000, channels=2, format="s16" + ) + + # Connection state + self._connected = False + self._connection_task: Optional[asyncio.Task] = None + self._agent = None # Will be set by the agent + + # Text buffer for accumulating LLM response chunks before sending to HeyGen + self._text_buffer = "" + self._current_response_id: Optional[str] = None + self._all_sent_texts: set = set() # Track all sent texts to prevent duplicates + + logger.info( + f"HeyGen AvatarPublisher initialized " + f"(avatar: {avatar_id}, quality: {quality}, resolution: {resolution})" + ) + + def publish_audio_track(self): + """Return the audio track for publishing HeyGen's audio. + + This method is called by the Agent to get the audio track that will + be published to the call. HeyGen's audio will be forwarded to this track. + """ + return self._audio_track + + def _attach_agent(self, agent: Any) -> None: + """Attach the agent reference for event subscription. + + This is called automatically by the Agent during initialization. + + Args: + agent: The agent instance. + """ + self._agent = agent + logger.info("Agent reference set for HeyGen avatar publisher") + + # Subscribe to text events immediately when agent is set + self._subscribe_to_text_events() + + async def _connect_to_heygen(self) -> None: + """Establish connection to HeyGen and start receiving video and audio.""" + try: + # Set up video and audio callbacks before connecting + self.rtc_manager.set_video_callback(self._on_video_track) + self.rtc_manager.set_audio_callback(self._on_audio_track) + + # Connect to HeyGen + await self.rtc_manager.connect() + + self._connected = True + logger.info("Connected to HeyGen, avatar streaming active") + + except Exception as e: + logger.error(f"Failed to connect to HeyGen: {e}") + self._connected = False + raise + + def _subscribe_to_text_events(self) -> None: + """Subscribe to text output events from the LLM. + + HeyGen requires text input (not audio) for proper lip-sync. + We listen to the LLM's text output and send it to HeyGen's task API. + """ + try: + # Import the event types + from vision_agents.core.llm.events import ( + LLMResponseChunkEvent, + LLMResponseCompletedEvent, + RealtimeAgentSpeechTranscriptionEvent, + ) + + # Get the LLM's event manager (events are emitted by the LLM, not the agent) + if hasattr(self, '_agent') and self._agent and hasattr(self._agent, 'llm'): + @self._agent.llm.events.subscribe + async def on_text_chunk(event: LLMResponseChunkEvent): + """Handle streaming text chunks from the LLM.""" + logger.debug(f"HeyGen received text chunk: delta='{event.delta}'") + if event.delta: + await self._on_text_chunk(event.delta, event.item_id) + + @self._agent.llm.events.subscribe + async def on_text_complete(event: LLMResponseCompletedEvent): + """Handle end of LLM response - split into sentences and send each once.""" + if not self._text_buffer.strip(): + return + + # Split the complete response into sentences + import re + text = self._text_buffer.strip() + # Split on sentence boundaries but keep the punctuation + sentences = re.split(r'([.!?]+\s*)', text) + # Recombine sentences with their punctuation + full_sentences = [] + for i in range(0, len(sentences)-1, 2): + if sentences[i].strip(): + sentence = (sentences[i] + sentences[i+1] if i+1 < len(sentences) else sentences[i]).strip() + full_sentences.append(sentence) + # Handle last part if no punctuation + if sentences and sentences[-1].strip() and not any(sentences[-1].strip().endswith(p) for p in ['.', '!', '?']): + full_sentences.append(sentences[-1].strip()) + + # Send each sentence once if not already sent + for sentence in full_sentences: + if sentence and len(sentence) > 5: + if sentence not in self._all_sent_texts: + await self._send_text_to_heygen(sentence) + self._all_sent_texts.add(sentence) + else: + logger.debug(f"Skipping duplicate: '{sentence[:30]}...'") + + # Reset for next response + self._text_buffer = "" + self._current_response_id = None + + @self._agent.llm.events.subscribe + async def on_agent_speech(event: RealtimeAgentSpeechTranscriptionEvent): + """Handle agent speech transcription from Realtime LLMs. + + This is the primary path for Gemini Realtime which transcribes + the agent's speech output as text. + """ + logger.debug(f"HeyGen received agent speech: text='{event.text}'") + if event.text: + # Send directly to HeyGen - this is the complete utterance + await self._send_text_to_heygen(event.text) + + logger.info("Subscribed to LLM text output events for HeyGen lip-sync") + else: + logger.warning("Cannot subscribe to text events - no agent or LLM attached yet") + except Exception as e: + logger.error(f"Failed to subscribe to text events: {e}", exc_info=True) + + async def _on_video_track(self, track: Any) -> None: + """Callback when video track is received from HeyGen. + + Args: + track: Incoming video track from HeyGen's WebRTC connection. + """ + logger.info("Received video track from HeyGen, starting frame forwarding") + await self._video_track.start_receiving(track) + + async def _on_audio_track(self, track: Any) -> None: + """Callback when audio track is received from HeyGen. + + HeyGen provides audio with lip-synced TTS. We forward this audio + to the agent's audio track so it gets published to the call. + + For Realtime LLMs: We DON'T forward HeyGen audio - the LLM generates its own audio. + HeyGen is only used for video lip-sync based on text transcriptions. + + Args: + track: Incoming audio track from HeyGen's WebRTC connection. + """ + logger.info("Received audio track from HeyGen") + + # Check if we're using a Realtime LLM + using_realtime_llm = False + if hasattr(self, '_agent') and self._agent: + from vision_agents.core.llm.realtime import Realtime + if hasattr(self._agent, 'llm') and isinstance(self._agent.llm, Realtime): + using_realtime_llm = True + + if using_realtime_llm: + # For Realtime LLMs, don't forward HeyGen audio - use the LLM's native audio + # HeyGen is only used for lip-synced video based on text transcriptions + logger.info("Using Realtime LLM - skipping HeyGen audio forwarding (using LLM's native audio)") + return + + # For standard LLMs, forward HeyGen's audio to our audio track + logger.info("Forwarding HeyGen audio to audio track") + asyncio.create_task(self._forward_audio_frames(track, self._audio_track)) + + async def _forward_audio_frames(self, source_track: Any, dest_track: Any) -> None: + """Forward audio frames from HeyGen to agent's audio track. + + Args: + source_track: Audio track from HeyGen. + dest_track: Agent's audio track to write to. + """ + try: + logger.info("Starting HeyGen audio frame forwarding") + frame_count = 0 + while True: + try: + # Read audio frame from HeyGen + frame = await source_track.recv() + frame_count += 1 + + if hasattr(frame, 'to_ndarray'): + audio_array = frame.to_ndarray() + audio_bytes = audio_array.tobytes() + await dest_track.write(audio_bytes) + else: + logger.warning("Received frame without to_ndarray() method") + + except Exception as e: + if "ended" in str(e).lower() or "closed" in str(e).lower(): + logger.info(f"HeyGen audio track ended (forwarded {frame_count} frames)") + break + logger.error(f"Error forwarding audio frame: {e}", exc_info=True) + break + + except Exception as e: + logger.error(f"Error in audio forwarding loop: {e}", exc_info=True) + + async def _on_text_chunk(self, text_delta: str, item_id: Optional[str]) -> None: + """Handle text chunk from the LLM. + + Accumulates text chunks. Does NOT send immediately - waits for completion event + to avoid sending partial/duplicate sentences. + + Args: + text_delta: The text chunk/delta from the LLM. + item_id: The response item ID. + """ + # If this is a new response, reset the buffer and sent tracking + if item_id != self._current_response_id: + if self._text_buffer: + # Send any accumulated text from previous response + text_to_send = self._text_buffer.strip() + if text_to_send and text_to_send not in self._all_sent_texts: + await self._send_text_to_heygen(text_to_send) + self._all_sent_texts.add(text_to_send) + self._text_buffer = "" + self._current_response_id = item_id + + # Just accumulate text - don't send yet! + # Wait for completion event to avoid sending partial sentences + self._text_buffer += text_delta + + async def _send_text_to_heygen(self, text: str) -> None: + """Send text to HeyGen for the avatar to speak with lip-sync. + + Args: + text: The text for the avatar to speak. + """ + if not text: + return + + if not self._connected: + logger.warning("Cannot send text to HeyGen - not connected") + return + + try: + logger.info(f"Sending text to HeyGen: '{text[:50]}...'") + await self.rtc_manager.send_text(text, task_type="repeat") + except Exception as e: + logger.error(f"Failed to send text to HeyGen: {e}", exc_info=True) + + def publish_video_track(self): + """Publish the HeyGen avatar video track. + + This method is called by the Agent to get the video track + for publishing to the call. + + Returns: + HeyGenVideoTrack instance for streaming avatar video. + """ + # Start connection if not already connected + if not self._connected and not self._connection_task: + self._connection_task = asyncio.create_task(self._connect_to_heygen()) + + logger.info("Publishing HeyGen avatar video track") + return self._video_track + + def state(self) -> dict: + """Get current state of the avatar publisher. + + Returns: + Dictionary containing current state information. + """ + return { + "avatar_id": self.avatar_id, + "quality": self.quality, + "resolution": self.resolution, + "connected": self._connected, + "rtc_connected": self.rtc_manager.is_connected, + } + + async def close(self) -> None: + """Clean up resources and close connections.""" + logger.info("Closing HeyGen avatar publisher") + + # Stop video track + if self._video_track: + self._video_track.stop() + + # Close RTC connection + if self.rtc_manager: + await self.rtc_manager.close() + + # Cancel connection task if running + if self._connection_task: + self._connection_task.cancel() + try: + await self._connection_task + except asyncio.CancelledError: + pass + + self._connected = False + logger.info("HeyGen avatar publisher closed") + diff --git a/plugins/heygen/vision_agents/plugins/heygen/heygen_rtc_manager.py b/plugins/heygen/vision_agents/plugins/heygen/heygen_rtc_manager.py new file mode 100644 index 00000000..e91ba6c8 --- /dev/null +++ b/plugins/heygen/vision_agents/plugins/heygen/heygen_rtc_manager.py @@ -0,0 +1,268 @@ +import asyncio +import logging +from typing import Optional, Callable, Any + +from aiortc import ( + RTCPeerConnection, + RTCSessionDescription, + RTCIceServer, + RTCConfiguration, + MediaStreamTrack, +) + +from .heygen_session import HeyGenSession +from .heygen_types import VideoQuality + +logger = logging.getLogger(__name__) + + +class HeyGenRTCManager: + """Manages WebRTC connection to HeyGen's Streaming Avatar API. + + Handles the low-level WebRTC peer connection, audio/video streaming, + and communication with HeyGen's servers. + """ + + def __init__( + self, + avatar_id: str = "default", + quality: Optional["VideoQuality"] = VideoQuality.HIGH, + api_key: Optional[str] = None, + ): + """Initialize the RTC manager. + + Args: + avatar_id: HeyGen avatar ID to use. + quality: Video quality setting (VideoQuality.LOW, VideoQuality.MEDIUM, or VideoQuality.HIGH). + api_key: HeyGen API key (uses HEYGEN_API_KEY env var if not provided). + """ + # Default to HIGH if not provided + if quality is None: + quality = VideoQuality.HIGH + + self.session_manager = HeyGenSession( + avatar_id=avatar_id, + quality=quality, + api_key=api_key, + ) + + self.pc: Optional[RTCPeerConnection] = None + + # Video track callback for receiving avatar video + self._video_callback: Optional[Callable[[MediaStreamTrack], Any]] = None + + # Audio track callback for receiving avatar audio + self._audio_callback: Optional[Callable[[MediaStreamTrack], Any]] = None + + self._connected = False + self._connection_ready = asyncio.Event() + + async def connect(self) -> None: + """Establish WebRTC connection to HeyGen's Streaming API. + + Sets up the peer connection, negotiates tracks, and establishes + the connection for real-time avatar streaming. + + HeyGen flow: + 1. Create session -> HeyGen provides SDP offer and ICE servers + 2. Set HeyGen's offer as remote description + 3. Create answer + 4. Send answer to HeyGen + 5. Start session + """ + try: + # Create HeyGen session - they provide the SDP offer + session_info = await self.session_manager.create_session() + + # Extract ICE servers and SDP offer from session info + ice_servers = self._parse_ice_servers(session_info) + + # HeyGen's sdp field - check the actual structure + sdp_data = session_info.get("sdp") + + if isinstance(sdp_data, dict): + # Standard WebRTC format: {'type': 'offer', 'sdp': 'v=0...'} + offer_sdp = sdp_data.get("sdp") + sdp_type = sdp_data.get("type") + logger.debug(f"Got SDP dict from HeyGen (type: {sdp_type})") + elif isinstance(sdp_data, str) and sdp_data.startswith("v=0"): + # Raw SDP string (less common) + offer_sdp = sdp_data + logger.debug("Got raw SDP string from HeyGen") + else: + offer_sdp = None + + if not offer_sdp: + logger.error(f"Unexpected SDP format. Type: {type(sdp_data)}") + if isinstance(sdp_data, dict): + logger.error(f"SDP dict keys: {list(sdp_data.keys())}") + logger.error(f"SDP data: {str(sdp_data)[:200] if sdp_data else 'None'}") + raise RuntimeError("No valid SDP offer received from HeyGen") + + # Create RTCPeerConnection with ICE servers + config = RTCConfiguration(iceServers=ice_servers) + self.pc = RTCPeerConnection(configuration=config) + + # Set up track handlers + @self.pc.on("track") + async def on_track(track: MediaStreamTrack): + await self._handle_track(track) + + @self.pc.on("connectionstatechange") + async def on_connection_state_change(): + if self.pc is None: + return + logger.info(f"HeyGen connection state: {self.pc.connectionState}") + if self.pc.connectionState == "connected": + self._connected = True + self._connection_ready.set() + elif self.pc.connectionState in ["failed", "closed"]: + self._connected = False + self._connection_ready.clear() + + # Set HeyGen's offer as remote description + offer = RTCSessionDescription(sdp=offer_sdp, type="offer") + await self.pc.setRemoteDescription(offer) + + # Log transceivers for debugging + logger.debug(f"Transceivers after setRemoteDescription: {len(self.pc.getTransceivers())}") + + # Create our answer + answer = await self.pc.createAnswer() + await self.pc.setLocalDescription(answer) + + # Start the session with our SDP answer + # HeyGen expects the answer in the start_session call + await self.session_manager.start_session(sdp_answer=self.pc.localDescription.sdp) + + # Wait for connection to be established + await asyncio.wait_for(self._connection_ready.wait(), timeout=10.0) + + logger.info("HeyGen WebRTC connection established") + + except Exception as e: + logger.error(f"Failed to connect to HeyGen: {e}") + raise + + def _parse_ice_servers(self, session_info: dict) -> list: + """Parse ICE servers from HeyGen session info. + + HeyGen may provide ice_servers, ice_servers2, or rely on LiveKit's embedded servers. + + Args: + session_info: Session information from HeyGen API. + + Returns: + List of RTCIceServer objects. + """ + ice_servers = [] + + # Try ice_servers first, then ice_servers2 as backup + ice_server_configs = ( + session_info.get("ice_servers") or + session_info.get("ice_servers2") or + session_info.get("iceServers", []) + ) + + if ice_server_configs and not isinstance(ice_server_configs, list): + logger.warning(f"Unexpected ice_servers format: {type(ice_server_configs)}") + ice_server_configs = [] + + for server_config in ice_server_configs: + if not isinstance(server_config, dict): + continue + + urls = server_config.get("urls", []) + if isinstance(urls, str): + urls = [urls] # Convert single URL to list + + username = server_config.get("username") + credential = server_config.get("credential") + + if urls: + ice_servers.append( + RTCIceServer( + urls=urls, + username=username, + credential=credential, + ) + ) + logger.info(f"Added ICE server: {urls[0]}") + + # When using LiveKit, ICE servers may be embedded in SDP + # In that case, use public STUN as fallback + if not ice_servers: + logger.info("Using default STUN servers (LiveKit may provide its own via SDP)") + ice_servers.append( + RTCIceServer(urls=["stun:stun.l.google.com:19302"]) + ) + + return ice_servers + + async def _handle_track(self, track: MediaStreamTrack) -> None: + """Handle incoming media track from HeyGen. + + Args: + track: Incoming media track (audio or video). + """ + logger.info(f"Received track from HeyGen: {track.kind}") + + if track.kind == "video": + if self._video_callback: + await self._video_callback(track) + else: + logger.warning("Video track received but no callback registered") + elif track.kind == "audio": + # Audio track from HeyGen (avatar speech with lip-synced TTS) + logger.info("Audio track received from HeyGen") + if self._audio_callback: + await self._audio_callback(track) + else: + logger.warning("Audio track received but no callback registered") + + def set_video_callback(self, callback: Callable[[MediaStreamTrack], Any]) -> None: + """Set callback for handling incoming video track. + + Args: + callback: Async function to handle video track. + """ + self._video_callback = callback + + def set_audio_callback(self, callback: Callable[[MediaStreamTrack], Any]) -> None: + """Set callback for handling incoming audio track. + + Args: + callback: Async function to handle audio track. + """ + self._audio_callback = callback + + async def send_text(self, text: str, task_type: str = "repeat") -> None: + """Send text to HeyGen for the avatar to speak with lip-sync. + + This is the correct way to achieve lip-sync with HeyGen - they handle + TTS and lip-sync server-side based on the text input. + + Args: + text: The text for the avatar to speak. + task_type: Either "repeat" or "talk" (default: "repeat"). + """ + await self.session_manager.send_task(text, task_type) + + @property + def is_connected(self) -> bool: + """Check if WebRTC connection is established.""" + return self._connected + + async def close(self) -> None: + """Close the WebRTC connection and clean up resources.""" + if self.pc: + await self.pc.close() + self.pc = None + + await self.session_manager.close() + + self._connected = False + self._connection_ready.clear() + + logger.info("HeyGen RTC connection closed") + diff --git a/plugins/heygen/vision_agents/plugins/heygen/heygen_session.py b/plugins/heygen/vision_agents/plugins/heygen/heygen_session.py new file mode 100644 index 00000000..c73c8648 --- /dev/null +++ b/plugins/heygen/vision_agents/plugins/heygen/heygen_session.py @@ -0,0 +1,235 @@ +import logging +from typing import Optional, Dict, Any +from os import getenv +import aiohttp + +from .heygen_types import VideoQuality + +logger = logging.getLogger(__name__) + + +class HeyGenSession: + """Manages HeyGen API session lifecycle and configuration. + + Handles authentication, session creation, and API communication + with HeyGen's Streaming API. + """ + + def __init__( + self, + avatar_id: str = "default", + quality: VideoQuality = VideoQuality.HIGH, + api_key: Optional[str] = None, + ): + """Initialize HeyGen session manager. + + Args: + avatar_id: HeyGen avatar ID to use for streaming. + quality: Video quality setting (VideoQuality.LOW, VideoQuality.MEDIUM, or VideoQuality.HIGH). + api_key: HeyGen API key. Uses HEYGEN_API_KEY env var if not provided. + """ + self.avatar_id = avatar_id + self.quality = quality + self.api_key: str = api_key or getenv("HEYGEN_API_KEY") or "" + + if not self.api_key: + raise ValueError( + "HeyGen API key required. Set HEYGEN_API_KEY environment variable " + "or pass api_key parameter." + ) + + self.base_url = "https://api.heygen.com/v1" + self.session_id: Optional[str] = None + self.session_info: Optional[Dict[str, Any]] = None + self._http_session: Optional[aiohttp.ClientSession] = None + + async def create_session(self) -> Dict[str, Any]: + """Create a new HeyGen streaming session. + + Returns: + Session information including session_id, ICE servers, and SDP offer. + """ + if not self._http_session: + self._http_session = aiohttp.ClientSession() + + headers: dict[str, str] = { + "X-Api-Key": self.api_key, + "Content-Type": "application/json", + } + + payload = { + "avatar_id": self.avatar_id, + "quality": self.quality, + } + + try: + async with self._http_session.post( + f"{self.base_url}/streaming.new", + json=payload, + headers=headers, + ) as response: + if response.status != 200: + error_text = await response.text() + raise RuntimeError( + f"Failed to create HeyGen session: {response.status} - {error_text}" + ) + + data = await response.json() + self.session_info = data.get("data", {}) + self.session_id = self.session_info.get("session_id") + + logger.info(f"HeyGen session created: {self.session_id}") + return self.session_info + + except Exception as e: + logger.error(f"Failed to create HeyGen session: {e}") + raise + + async def start_session(self, sdp_answer: Optional[str] = None) -> Dict[str, Any]: + """Start the HeyGen streaming session. + + Args: + sdp_answer: Optional SDP answer to include in the start request. + + Returns: + Start confirmation with session details. + """ + if not self.session_id: + raise RuntimeError("Session not created. Call create_session() first.") + + if not self._http_session: + self._http_session = aiohttp.ClientSession() + + headers: dict[str, str] = { + "X-Api-Key": self.api_key, + "Content-Type": "application/json", + } + + payload: Dict[str, Any] = { + "session_id": self.session_id, + } + + # Include SDP answer if provided + if sdp_answer: + payload["sdp"] = { + "type": "answer", + "sdp": sdp_answer + } + + try: + async with self._http_session.post( + f"{self.base_url}/streaming.start", + json=payload, + headers=headers, + ) as response: + if response.status != 200: + error_text = await response.text() + raise RuntimeError( + f"Failed to start HeyGen session: {response.status} - {error_text}" + ) + + data = await response.json() + logger.info(f"HeyGen session started: {self.session_id}") + return data + + except Exception as e: + logger.error(f"Failed to start HeyGen session: {e}") + raise + + async def send_task(self, text: str, task_type: str = "repeat") -> Dict[str, Any]: + """Send a text task to HeyGen for the avatar to speak. + + This is the proper way to achieve lip-sync with HeyGen - send text, + and HeyGen handles TTS and lip-sync server-side. + + Args: + text: The text for the avatar to speak. + task_type: Either "repeat" (avatar repeats text exactly) or + "talk" (processes through HeyGen's LLM first). + + Returns: + Task response from HeyGen. + """ + if not self.session_id: + raise RuntimeError("Session not created. Call create_session() first.") + + if not self._http_session: + self._http_session = aiohttp.ClientSession() + + headers: dict[str, str] = { + "X-Api-Key": self.api_key, + "Content-Type": "application/json", + } + + payload = { + "session_id": self.session_id, + "text": text, + "task_type": task_type, + } + + try: + async with self._http_session.post( + f"{self.base_url}/streaming.task", + json=payload, + headers=headers, + ) as response: + if response.status != 200: + error_text = await response.text() + logger.warning( + f"Failed to send task to HeyGen: {response.status} - {error_text}" + ) + return {} + + data = await response.json() + logger.debug(f"Sent text to HeyGen: '{text[:50]}...'") + return data + + except Exception as e: + logger.error(f"Error sending task to HeyGen: {e}") + return {} + + async def stop_session(self) -> None: + """Stop the HeyGen streaming session.""" + if not self.session_id: + logger.warning("No active session to stop") + return + + if not self._http_session: + return + + headers: dict[str, str] = { + "X-Api-Key": self.api_key, + "Content-Type": "application/json", + } + + payload = { + "session_id": self.session_id, + } + + try: + async with self._http_session.post( + f"{self.base_url}/streaming.stop", + json=payload, + headers=headers, + ) as response: + if response.status == 200: + logger.info(f"HeyGen session stopped: {self.session_id}") + else: + logger.warning( + f"Failed to stop HeyGen session: {response.status}" + ) + except Exception as e: + logger.error(f"Error stopping HeyGen session: {e}") + + async def close(self) -> None: + """Clean up session resources.""" + await self.stop_session() + + if self._http_session: + await self._http_session.close() + self._http_session = None + + self.session_id = None + self.session_info = None + logger.info("HeyGen session cleaned up") + diff --git a/plugins/heygen/vision_agents/plugins/heygen/heygen_types.py b/plugins/heygen/vision_agents/plugins/heygen/heygen_types.py new file mode 100644 index 00000000..f7981db9 --- /dev/null +++ b/plugins/heygen/vision_agents/plugins/heygen/heygen_types.py @@ -0,0 +1,12 @@ +"""Type definitions for HeyGen plugin.""" + +from enum import Enum + + +class VideoQuality(str, Enum): + """Video quality options for HeyGen avatar streaming.""" + + LOW = "low" + MEDIUM = "medium" + HIGH = "high" + diff --git a/plugins/heygen/vision_agents/plugins/heygen/heygen_video_track.py b/plugins/heygen/vision_agents/plugins/heygen/heygen_video_track.py new file mode 100644 index 00000000..e74a4c23 --- /dev/null +++ b/plugins/heygen/vision_agents/plugins/heygen/heygen_video_track.py @@ -0,0 +1,188 @@ +import asyncio +import logging +from typing import Optional + +import av +from aiortc import MediaStreamTrack, VideoStreamTrack +from PIL import Image + +from vision_agents.core.utils.queue import LatestNQueue + +logger = logging.getLogger(__name__) + + +class HeyGenVideoTrack(VideoStreamTrack): + """Video track that forwards HeyGen avatar video frames. + + Receives video frames from HeyGen's WebRTC connection and provides + them through the standard VideoStreamTrack interface for publishing + to the call. + """ + + def __init__(self, width: int = 1920, height: int = 1080): + """Initialize the HeyGen video track. + + Args: + width: Video frame width. + height: Video frame height. + """ + super().__init__() + + self.width = width + self.height = height + + # Queue for incoming frames from HeyGen - keep minimal for low latency + self.frame_queue: LatestNQueue[av.VideoFrame] = LatestNQueue(maxlen=2) + + # Create placeholder frame for when no frames are available + placeholder = Image.new("RGB", (self.width, self.height), color=(30, 30, 40)) + self.placeholder_frame = av.VideoFrame.from_image(placeholder) + self.last_frame: av.VideoFrame = self.placeholder_frame + + self._stopped = False + self._receiving_task: Optional[asyncio.Task] = None + self._source_track: Optional[MediaStreamTrack] = None + + logger.info(f"HeyGenVideoTrack initialized ({width}x{height})") + + async def start_receiving(self, source_track: MediaStreamTrack) -> None: + """Start receiving frames from HeyGen's video track. + + Args: + source_track: The incoming video track from HeyGen's WebRTC connection. + """ + if self._receiving_task: + logger.info("Restarting HeyGen video receiver with new source track") + self._receiving_task.cancel() + try: + await self._receiving_task + except asyncio.CancelledError: + pass + self._receiving_task = None + self._source_track = None + + self._source_track = source_track + self._receiving_task = asyncio.create_task(self._receive_frames()) + logger.info("Started receiving frames from HeyGen") + + async def _receive_frames(self) -> None: + """Continuously receive frames from HeyGen and add to queue.""" + if not self._source_track: + logger.error("No source track set") + return + + try: + while not self._stopped: + try: + # Receive frame from HeyGen + frame = await self._source_track.recv() + + # Type check: ensure we have a VideoFrame + if frame and isinstance(frame, av.VideoFrame): + # Resize if needed + if frame.width != self.width or frame.height != self.height: + frame = self._resize_frame(frame) + + # Add to queue (will replace oldest if full) + self.frame_queue.put_latest_nowait(frame) + + logger.debug( + f"Received frame from HeyGen: {frame.width}x{frame.height}" + ) + + except Exception as e: + if not self._stopped: + logger.warning(f"Error receiving frame from HeyGen: {e}") + await asyncio.sleep(0.01) + + except asyncio.CancelledError: + logger.info("Frame receiving task cancelled") + except Exception as e: + logger.error(f"Fatal error in frame receiving: {e}") + + def _resize_frame(self, frame: av.VideoFrame) -> av.VideoFrame: + """Resize a video frame to match the track dimensions while maintaining aspect ratio. + + Args: + frame: Input video frame. + + Returns: + Resized video frame with letterboxing if needed. + """ + try: + img = frame.to_image() + + # Calculate scaling to maintain aspect ratio + src_width, src_height = img.size + target_width, target_height = self.width, self.height + + # Calculate scale factor (fit within target dimensions) + scale = min(target_width / src_width, target_height / src_height) + new_width = int(src_width * scale) + new_height = int(src_height * scale) + + # Resize with aspect ratio maintained + resized = img.resize((new_width, new_height), Image.Resampling.LANCZOS) + + # Create black background at target resolution + result = Image.new('RGB', (target_width, target_height), (0, 0, 0)) + + # Paste resized image centered + x_offset = (target_width - new_width) // 2 + y_offset = (target_height - new_height) // 2 + result.paste(resized, (x_offset, y_offset)) + + return av.VideoFrame.from_image(result) + + except Exception as e: + logger.error(f"Error resizing frame: {e}") + return frame + + async def recv(self) -> av.VideoFrame: + """Receive the next video frame. + + This is called by the WebRTC stack to get frames for transmission. + + Returns: + Video frame to transmit. + """ + if self._stopped: + raise Exception("Track stopped") + + try: + # Try to get a new frame from queue with short timeout + frame = await asyncio.wait_for( + self.frame_queue.get(), + timeout=0.033 # ~30 FPS + ) + if frame: + self.last_frame = frame + + except asyncio.TimeoutError: + # No new frame, use last frame + pass + + except Exception as e: + logger.warning(f"Error getting frame from queue: {e}") + + # Get timestamp for the frame + pts, time_base = await self.next_timestamp() + + # Create a copy of the frame with updated timestamp + output_frame = self.last_frame + output_frame.pts = pts + output_frame.time_base = time_base + + return output_frame + + def stop(self) -> None: + """Stop the video track.""" + self._stopped = True + + if self._receiving_task: + self._receiving_task.cancel() + self._receiving_task = None + + super().stop() + logger.info("HeyGenVideoTrack stopped") + diff --git a/plugins/openai/vision_agents/plugins/openai/openai_realtime.py b/plugins/openai/vision_agents/plugins/openai/openai_realtime.py index dfa30f75..fb1efcb2 100644 --- a/plugins/openai/vision_agents/plugins/openai/openai_realtime.py +++ b/plugins/openai/vision_agents/plugins/openai/openai_realtime.py @@ -6,7 +6,7 @@ RealtimeSessionCreateRequestParam, ResponseAudioTranscriptDoneEvent, InputAudioBufferSpeechStartedEvent, - ConversationItemInputAudioTranscriptionCompletedEvent, SessionUpdatedEvent, ResponseCreatedEvent, ResponseDoneEvent, + ConversationItemInputAudioTranscriptionCompletedEvent, ResponseDoneEvent, ) from vision_agents.core.llm import realtime @@ -241,7 +241,6 @@ async def _handle_openai_event(self, event: dict) -> None: # Handle tool calls from OpenAI realtime await self._handle_tool_call_event(event) elif et == "response.created": - e = ResponseCreatedEvent(**event) pass elif et == "response.done": logger.info("OpenAI response done %s", event) diff --git a/pyproject.toml b/pyproject.toml index f994438f..abe5aa36 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,8 +21,9 @@ vision-agents-plugins-ultralytics = { workspace = true } vision-agents-plugins-krisp = { workspace = true } vision-agents-plugins-smart-turn = { workspace = true } vision-agents-plugins-wizper = { workspace = true } -vision-agents-plugins-vogent = { workspace = true } +vision-agents-plugins-heygen = { workspace = true } vision-agents-plugins-moondream = { workspace = true } +vision-agents-plugins-vogent = { workspace = true } [tool.uv] # Workspace-level override to resolve numpy version conflicts @@ -50,6 +51,7 @@ members = [ "plugins/krisp", "plugins/smart_turn", "plugins/wizper", + "plugins/heygen", "plugins/vogent", "plugins/moondream" ] diff --git a/uv.lock b/uv.lock index 219cabe9..bcc2460c 100644 --- a/uv.lock +++ b/uv.lock @@ -19,6 +19,7 @@ members = [ "vision-agents-plugins-fish", "vision-agents-plugins-gemini", "vision-agents-plugins-getstream", + "vision-agents-plugins-heygen", "vision-agents-plugins-kokoro", "vision-agents-plugins-krisp", "vision-agents-plugins-moondream", @@ -195,6 +196,24 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/3b/58/af07dda649c22a1ae954ffb7aaaf4d4a57f1bf00ebdf62307affc0b8552f/aioice-0.10.1-py3-none-any.whl", hash = "sha256:f31ae2abc8608b1283ed5f21aebd7b6bd472b152ff9551e9b559b2d8efed79e9", size = 24872, upload-time = "2025-04-13T08:15:24.044Z" }, ] +[[package]] +name = "aiortc" +version = "1.14.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "aioice" }, + { name = "av" }, + { name = "cryptography" }, + { name = "google-crc32c" }, + { name = "pyee" }, + { name = "pylibsrtp" }, + { name = "pyopenssl" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/51/9c/4e027bfe0195de0442da301e2389329496745d40ae44d2d7c4571c4290ce/aiortc-1.14.0.tar.gz", hash = "sha256:adc8a67ace10a085721e588e06a00358ed8eaf5f6b62f0a95358ff45628dd762", size = 1180864, upload-time = "2025-10-13T21:40:37.905Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/57/ab/31646a49209568cde3b97eeade0d28bb78b400e6645c56422c101df68932/aiortc-1.14.0-py3-none-any.whl", hash = "sha256:4b244d7e482f4e1f67e685b3468269628eca1ec91fa5b329ab517738cfca086e", size = 93183, upload-time = "2025-10-13T21:40:36.59Z" }, +] + [[package]] name = "aiortc-getstream" version = "1.13.0.post1" @@ -5302,6 +5321,7 @@ all-plugins = [ { name = "vision-agents-plugins-elevenlabs" }, { name = "vision-agents-plugins-gemini" }, { name = "vision-agents-plugins-getstream" }, + { name = "vision-agents-plugins-heygen" }, { name = "vision-agents-plugins-kokoro" }, { name = "vision-agents-plugins-krisp" }, { name = "vision-agents-plugins-moonshine" }, @@ -5335,6 +5355,9 @@ gemini = [ getstream = [ { name = "vision-agents-plugins-getstream" }, ] +heygen = [ + { name = "vision-agents-plugins-heygen" }, +] kokoro = [ { name = "vision-agents-plugins-kokoro" }, ] @@ -5384,6 +5407,8 @@ requires-dist = [ { name = "vision-agents-plugins-gemini", marker = "extra == 'gemini'", editable = "plugins/gemini" }, { name = "vision-agents-plugins-getstream", marker = "extra == 'all-plugins'", editable = "plugins/getstream" }, { name = "vision-agents-plugins-getstream", marker = "extra == 'getstream'", editable = "plugins/getstream" }, + { name = "vision-agents-plugins-heygen", marker = "extra == 'all-plugins'", editable = "plugins/heygen" }, + { name = "vision-agents-plugins-heygen", marker = "extra == 'heygen'", editable = "plugins/heygen" }, { name = "vision-agents-plugins-kokoro", marker = "extra == 'all-plugins'", editable = "plugins/kokoro" }, { name = "vision-agents-plugins-kokoro", marker = "extra == 'kokoro'", editable = "plugins/kokoro" }, { name = "vision-agents-plugins-krisp", marker = "extra == 'all-plugins'", editable = "plugins/krisp" }, @@ -5401,7 +5426,7 @@ requires-dist = [ { name = "vision-agents-plugins-xai", marker = "extra == 'all-plugins'", editable = "plugins/xai" }, { name = "vision-agents-plugins-xai", marker = "extra == 'xai'", editable = "plugins/xai" }, ] -provides-extras = ["all-plugins", "anthropic", "cartesia", "deepgram", "dev", "elevenlabs", "gemini", "getstream", "kokoro", "krisp", "moonshine", "openai", "smart-turn", "ultralytics", "wizper", "xai"] +provides-extras = ["all-plugins", "anthropic", "cartesia", "deepgram", "dev", "elevenlabs", "gemini", "getstream", "heygen", "kokoro", "krisp", "moonshine", "openai", "smart-turn", "ultralytics", "wizper", "xai"] [[package]] name = "vision-agents-plugins-anthropic" @@ -5625,6 +5650,35 @@ dev = [ { name = "pytest-asyncio", specifier = ">=1.0.0" }, ] +[[package]] +name = "vision-agents-plugins-heygen" +version = "0.1.0" +source = { editable = "plugins/heygen" } +dependencies = [ + { name = "aiohttp" }, + { name = "aiortc" }, + { name = "vision-agents" }, +] + +[package.dev-dependencies] +dev = [ + { name = "pytest" }, + { name = "pytest-asyncio" }, +] + +[package.metadata] +requires-dist = [ + { name = "aiohttp", specifier = ">=3.9.0" }, + { name = "aiortc", specifier = ">=1.9.0" }, + { name = "vision-agents", editable = "agents-core" }, +] + +[package.metadata.requires-dev] +dev = [ + { name = "pytest", specifier = ">=8.4.1" }, + { name = "pytest-asyncio", specifier = ">=1.0.0" }, +] + [[package]] name = "vision-agents-plugins-kokoro" source = { editable = "plugins/kokoro" }