diff --git a/examples/realtime/demo.py b/examples/realtime/demo.py new file mode 100644 index 000000000..a2ea96545 --- /dev/null +++ b/examples/realtime/demo.py @@ -0,0 +1,101 @@ +import asyncio +import os +import sys +from typing import TYPE_CHECKING + +import numpy as np + +# Add the current directory to path so we can import ui +sys.path.append(os.path.dirname(os.path.abspath(__file__))) + +from agents import function_tool +from agents.realtime import RealtimeAgent, RealtimeSession, RealtimeSessionEvent + +if TYPE_CHECKING: + from .ui import AppUI +else: + # Try both import styles + try: + # Try relative import first (when used as a package) + from .ui import AppUI + except ImportError: + # Fall back to direct import (when run as a script) + from ui import AppUI + + +@function_tool +def get_weather(city: str) -> str: + """Get the weather in a city.""" + return f"The weather in {city} is sunny." + + +agent = RealtimeAgent( + name="Assistant", + instructions="You always greet the user with 'Top of the morning to you'.", + tools=[get_weather], +) + + +class Example: + def __init__(self) -> None: + self.session = RealtimeSession(agent) + self.ui = AppUI() + self.ui.connected = asyncio.Event() + self.ui.last_audio_item_id = None + # Set the audio callback + self.ui.set_audio_callback(self.on_audio_recorded) + + async def run(self) -> None: + self.session.add_listener(self.on_event) + await self.session.connect() + self.ui.set_is_connected(True) + await self.ui.run_async() + + async def on_audio_recorded(self, audio_bytes: bytes) -> None: + """Called when audio is recorded by the UI.""" + try: + # Send the audio to the session + await self.session.send_audio(audio_bytes) + except Exception as e: + self.ui.log_message(f"Error sending audio: {e}") + + async def on_event(self, event: RealtimeSessionEvent) -> None: + # Display event in the UI + try: + if event.type == "agent_start": + self.ui.add_transcript(f"Agent started: {event.agent.name}") + elif event.type == "agent_end": + self.ui.add_transcript(f"Agent ended: {event.agent.name}") + elif event.type == "handoff": + self.ui.add_transcript( + f"Handoff from {event.from_agent.name} to {event.to_agent.name}" + ) + elif event.type == "tool_start": + self.ui.add_transcript(f"Tool started: {event.tool.name}") + elif event.type == "tool_end": + self.ui.add_transcript(f"Tool ended: {event.tool.name}; output: {event.output}") + elif event.type == "audio_end": + self.ui.add_transcript("Audio ended") + elif event.type == "audio": + np_audio = np.frombuffer(event.audio.data, dtype=np.int16) + self.ui.play_audio(np_audio) + elif event.type == "audio_interrupted": + self.ui.add_transcript("Audio interrupted") + elif event.type == "error": + self.ui.add_transcript(f"Error: {event.error}") + elif event.type == "history_updated": + pass + elif event.type == "history_added": + pass + elif event.type == "raw_transport_event": + self.ui.log_message(f"Raw transport event: {event.data}") + else: + self.ui.log_message(f"Unknown event type: {event.type}") + except Exception as e: + # This can happen if the UI has already exited + self.ui.log_message(f"Event handling error: {str(e)}") + + +if __name__ == "__main__": + example = Example() + asyncio.run(example.run()) diff --git a/examples/realtime/ui.py b/examples/realtime/ui.py new file mode 100644 index 000000000..1ba055835 --- /dev/null +++ b/examples/realtime/ui.py @@ -0,0 +1,271 @@ +from __future__ import annotations + +import asyncio +from collections.abc import Coroutine +from typing import Any, Callable + +import numpy as np +import numpy.typing as npt +import sounddevice as sd +from textual import events +from textual.app import App, ComposeResult +from textual.containers import Container, Horizontal +from textual.reactive import reactive +from textual.widgets import RichLog, Static +from typing_extensions import override + +CHUNK_LENGTH_S = 0.05 # 50ms +SAMPLE_RATE = 24000 +FORMAT = np.int16 +CHANNELS = 1 + + +class Header(Static): + """A header widget.""" + + @override + def render(self) -> str: + return "Realtime Demo" + + +class AudioStatusIndicator(Static): + """A widget that shows the current audio recording status.""" + + is_recording = reactive(False) + + @override + def render(self) -> str: + status = ( + "🔴 Conversation started." + if self.is_recording + else "⚪ Press SPACE to start the conversation (q to quit)" + ) + return status + + +class AppUI(App[None]): + CSS = """ + Screen { + background: #1a1b26; /* Dark blue-grey background */ + } + + Container { + border: double rgb(91, 164, 91); + } + + #input-container { + height: 5; /* Explicit height for input container */ + margin: 1 1; + padding: 1 2; + } + + #bottom-pane { + width: 100%; + height: 82%; /* Reduced to make room for session display */ + border: round rgb(205, 133, 63); + } + + #status-indicator { + height: 3; + content-align: center middle; + background: #2a2b36; + border: solid rgb(91, 164, 91); + margin: 1 1; + } + + #session-display { + height: 3; + content-align: center middle; + background: #2a2b36; + border: solid rgb(91, 164, 91); + margin: 1 1; + } + + #transcripts { + width: 50%; + height: 100%; + border-right: solid rgb(91, 164, 91); + } + + #transcripts-header { + height: 2; + background: #2a2b36; + content-align: center middle; + border-bottom: solid rgb(91, 164, 91); + } + + #transcripts-content { + height: 100%; + } + + #event-log { + width: 50%; + height: 100%; + } + + #event-log-header { + height: 2; + background: #2a2b36; + content-align: center middle; + border-bottom: solid rgb(91, 164, 91); + } + + #event-log-content { + height: 100%; + } + + Static { + color: white; + } + """ + + should_send_audio: asyncio.Event + connected: asyncio.Event + last_audio_item_id: str | None + audio_callback: Callable[[bytes], Coroutine[Any, Any, None]] | None + + def __init__(self) -> None: + super().__init__() + self.audio_player = sd.OutputStream( + samplerate=SAMPLE_RATE, + channels=CHANNELS, + dtype=FORMAT, + ) + self.should_send_audio = asyncio.Event() + self.connected = asyncio.Event() + self.audio_callback = None + + @override + def compose(self) -> ComposeResult: + """Create child widgets for the app.""" + with Container(): + yield Header(id="session-display") + yield AudioStatusIndicator(id="status-indicator") + with Container(id="bottom-pane"): + with Horizontal(): + with Container(id="transcripts"): + yield Static("Conversation transcript", id="transcripts-header") + yield RichLog( + id="transcripts-content", wrap=True, highlight=True, markup=True + ) + with Container(id="event-log"): + yield Static("Raw event log", id="event-log-header") + yield RichLog( + id="event-log-content", wrap=True, highlight=True, markup=True + ) + + def set_is_connected(self, is_connected: bool) -> None: + self.connected.set() if is_connected else self.connected.clear() + + def set_audio_callback(self, callback: Callable[[bytes], Coroutine[Any, Any, None]]) -> None: + """Set a callback function to be called when audio is recorded.""" + self.audio_callback = callback + + # High-level methods for UI operations + def set_header_text(self, text: str) -> None: + """Update the header text.""" + header = self.query_one("#session-display", Header) + header.update(text) + + def set_recording_status(self, is_recording: bool) -> None: + """Set the recording status indicator.""" + status_indicator = self.query_one(AudioStatusIndicator) + status_indicator.is_recording = is_recording + + def log_message(self, message: str) -> None: + """Add a message to the event log.""" + try: + log_pane = self.query_one("#event-log-content", RichLog) + log_pane.write(message) + except Exception: + # Handle the case where the widget might not be available + pass + + def add_transcript(self, message: str) -> None: + """Add a transcript message to the transcripts panel.""" + try: + transcript_pane = self.query_one("#transcripts-content", RichLog) + transcript_pane.write(message) + except Exception: + # Handle the case where the widget might not be available + pass + + def play_audio(self, audio_data: npt.NDArray[np.int16]) -> None: + """Play audio data through the audio player.""" + try: + self.audio_player.write(audio_data) + except Exception as e: + self.log_message(f"Audio play error: {e}") + + async def on_mount(self) -> None: + """Set up audio player and start the audio capture worker.""" + self.audio_player.start() + self.run_worker(self.capture_audio()) + + async def capture_audio(self) -> None: + """Capture audio from the microphone and send to the session.""" + # Wait for connection to be established + await self.connected.wait() + + # Set up audio input stream + stream = sd.InputStream( + channels=CHANNELS, + samplerate=SAMPLE_RATE, + dtype=FORMAT, + ) + + try: + # Wait for user to press spacebar to start + await self.should_send_audio.wait() + + stream.start() + self.set_recording_status(True) + self.log_message("Recording started - speak to the agent") + + # Buffer size in samples + read_size = int(SAMPLE_RATE * CHUNK_LENGTH_S) + + while True: + # Check if there's enough data to read + if stream.read_available < read_size: + await asyncio.sleep(0.01) # Small sleep to avoid CPU hogging + continue + + # Read audio data + data, _ = stream.read(read_size) + + # Convert numpy array to bytes + audio_bytes = data.tobytes() + + # Call audio callback if set + if self.audio_callback: + try: + await self.audio_callback(audio_bytes) + except Exception as e: + self.log_message(f"Audio callback error: {e}") + + # Yield control back to event loop + await asyncio.sleep(0) + + except Exception as e: + self.log_message(f"Audio capture error: {e}") + finally: + if stream.active: + stream.stop() + stream.close() + + async def on_key(self, event: events.Key) -> None: + """Handle key press events.""" + # add the keypress to the log + self.log_message(f"Key pressed: {event.key}") + + if event.key == "q": + self.audio_player.stop() + self.audio_player.close() + self.exit() + return + + if event.key == "space": # Spacebar + if not self.should_send_audio.is_set(): + self.should_send_audio.set() + self.set_recording_status(True) diff --git a/src/agents/realtime/openai_realtime.py b/src/agents/realtime/openai_realtime.py index 226eaed7d..6dc34bcf1 100644 --- a/src/agents/realtime/openai_realtime.py +++ b/src/agents/realtime/openai_realtime.py @@ -231,7 +231,9 @@ async def _handle_ws_event(self, event: dict[str, Any]): ).validate_python(event) except Exception as e: logger.error(f"Invalid event: {event} - {e}") - await self._emit_event(RealtimeTransportErrorEvent(error=f"Invalid event: {event}")) + await self._emit_event( + RealtimeTransportErrorEvent(error=f"Invalid event: {event} - {e}") + ) return if parsed.type == "response.audio.delta": diff --git a/uv.lock b/uv.lock index d882c9bc5..880116f58 100644 --- a/uv.lock +++ b/uv.lock @@ -1496,6 +1496,9 @@ dependencies = [ litellm = [ { name = "litellm" }, ] +realtime = [ + { name = "websockets" }, +] viz = [ { name = "graphviz" }, ] @@ -1541,9 +1544,10 @@ requires-dist = [ { name = "requests", specifier = ">=2.0,<3" }, { name = "types-requests", specifier = ">=2.0,<3" }, { name = "typing-extensions", specifier = ">=4.12.2,<5" }, + { name = "websockets", marker = "extra == 'realtime'", specifier = ">=15.0,<16" }, { name = "websockets", marker = "extra == 'voice'", specifier = ">=15.0,<16" }, ] -provides-extras = ["voice", "viz", "litellm"] +provides-extras = ["voice", "viz", "litellm", "realtime"] [package.metadata.requires-dev] dev = [