diff --git a/examples/realtime/app/README.md b/examples/realtime/app/README.md new file mode 100644 index 000000000..3a7176707 --- /dev/null +++ b/examples/realtime/app/README.md @@ -0,0 +1,40 @@ +# Realtime Demo App + +A web-based realtime voice assistant demo with a FastAPI backend and HTML/JS frontend. + +## Installation + +Install the required dependencies: + +```bash +uv add fastapi uvicorn websockets +``` + +## Usage + +Start the application with a single command: + +```bash +cd examples/realtime/app && uv run python server.py +``` + +Then open your browser to: http://localhost:8000 + +## How to Use + +1. Click **Connect** to establish a realtime session +2. Audio capture starts automatically - just speak naturally +3. Click the **Mic On/Off** button to mute/unmute your microphone +4. Watch the conversation unfold in the left pane +5. Monitor raw events in the right pane (click to expand/collapse) +6. Click **Disconnect** when done + +## Architecture + +- **Backend**: FastAPI server with WebSocket connections for real-time communication +- **Session Management**: Each connection gets a unique session with the OpenAI Realtime API +- **Audio Processing**: 24kHz mono audio capture and playback +- **Event Handling**: Full event stream processing with transcript generation +- **Frontend**: Vanilla JavaScript with clean, responsive CSS + +The demo showcases the core patterns for building realtime voice applications with the OpenAI Agents SDK. diff --git a/examples/realtime/app/server.py b/examples/realtime/app/server.py new file mode 100644 index 000000000..db2cd7bda --- /dev/null +++ b/examples/realtime/app/server.py @@ -0,0 +1,172 @@ +import asyncio +import base64 +import json +import logging +import struct +from contextlib import asynccontextmanager +from typing import Any, assert_never + +from fastapi import FastAPI, WebSocket, WebSocketDisconnect +from fastapi.responses import FileResponse +from fastapi.staticfiles import StaticFiles + +from agents import function_tool +from agents.realtime import RealtimeAgent, RealtimeRunner, RealtimeSession, RealtimeSessionEvent + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +@function_tool +def get_weather(city: str) -> str: + """Get the weather in a city.""" + return f"The weather in {city} is sunny." + + +@function_tool +def get_secret_number() -> int: + """Returns the secret number, if the user asks for it.""" + return 71 + + +haiku_agent = RealtimeAgent( + name="Haiku Agent", + instructions="You are a haiku poet. You must respond ONLY in traditional haiku format (5-7-5 syllables). Every response should be a proper haiku about the topic. Do not break character.", + tools=[], +) + +agent = RealtimeAgent( + name="Assistant", + instructions="If the user wants poetry or haikus, you can hand them off to the haiku agent via the transfer_to_haiku_agent tool.", + tools=[get_weather, get_secret_number], + handoffs=[haiku_agent], +) + + +class RealtimeWebSocketManager: + def __init__(self): + self.active_sessions: dict[str, RealtimeSession] = {} + self.session_contexts: dict[str, Any] = {} + self.websockets: dict[str, WebSocket] = {} + + async def connect(self, websocket: WebSocket, session_id: str): + await websocket.accept() + self.websockets[session_id] = websocket + + runner = RealtimeRunner(agent) + session_context = await runner.run() + session = await session_context.__aenter__() + self.active_sessions[session_id] = session + self.session_contexts[session_id] = session_context + + # Start event processing task + asyncio.create_task(self._process_events(session_id)) + + async def disconnect(self, session_id: str): + if session_id in self.session_contexts: + await self.session_contexts[session_id].__aexit__(None, None, None) + del self.session_contexts[session_id] + if session_id in self.active_sessions: + del self.active_sessions[session_id] + if session_id in self.websockets: + del self.websockets[session_id] + + async def send_audio(self, session_id: str, audio_bytes: bytes): + if session_id in self.active_sessions: + await self.active_sessions[session_id].send_audio(audio_bytes) + + async def _process_events(self, session_id: str): + try: + session = self.active_sessions[session_id] + websocket = self.websockets[session_id] + + async for event in session: + event_data = await self._serialize_event(event) + await websocket.send_text(json.dumps(event_data)) + except Exception as e: + logger.error(f"Error processing events for session {session_id}: {e}") + + async def _serialize_event(self, event: RealtimeSessionEvent) -> dict[str, Any]: + base_event: dict[str, Any] = { + "type": event.type, + } + + if event.type == "agent_start": + base_event["agent"] = event.agent.name + elif event.type == "agent_end": + base_event["agent"] = event.agent.name + elif event.type == "handoff": + base_event["from"] = event.from_agent.name + base_event["to"] = event.to_agent.name + elif event.type == "tool_start": + base_event["tool"] = event.tool.name + elif event.type == "tool_end": + base_event["tool"] = event.tool.name + base_event["output"] = str(event.output) + elif event.type == "audio": + base_event["audio"] = base64.b64encode(event.audio.data).decode("utf-8") + elif event.type == "audio_interrupted": + pass + elif event.type == "audio_end": + pass + elif event.type == "history_updated": + base_event["history"] = [item.model_dump(mode="json") for item in event.history] + elif event.type == "history_added": + pass + elif event.type == "guardrail_tripped": + base_event["guardrail_results"] = [ + {"name": result.guardrail.name} for result in event.guardrail_results + ] + elif event.type == "raw_model_event": + base_event["raw_model_event"] = { + "type": event.data.type, + } + elif event.type == "error": + base_event["error"] = str(event.error) if hasattr(event, "error") else "Unknown error" + else: + assert_never(event) + + return base_event + + +manager = RealtimeWebSocketManager() + + +@asynccontextmanager +async def lifespan(app: FastAPI): + yield + + +app = FastAPI(lifespan=lifespan) + + +@app.websocket("/ws/{session_id}") +async def websocket_endpoint(websocket: WebSocket, session_id: str): + await manager.connect(websocket, session_id) + try: + while True: + data = await websocket.receive_text() + message = json.loads(data) + + if message["type"] == "audio": + # Convert int16 array to bytes + int16_data = message["data"] + audio_bytes = struct.pack(f"{len(int16_data)}h", *int16_data) + await manager.send_audio(session_id, audio_bytes) + + except WebSocketDisconnect: + await manager.disconnect(session_id) + + +app.mount("/", StaticFiles(directory="static", html=True), name="static") + + +@app.get("/") +async def read_index(): + return FileResponse("static/index.html") + + +if __name__ == "__main__": + import uvicorn + + uvicorn.run(app, host="0.0.0.0", port=8000) diff --git a/examples/realtime/app/static/app.js b/examples/realtime/app/static/app.js new file mode 100644 index 000000000..3ec8fcc99 --- /dev/null +++ b/examples/realtime/app/static/app.js @@ -0,0 +1,467 @@ +class RealtimeDemo { + constructor() { + this.ws = null; + this.isConnected = false; + this.isMuted = false; + this.isCapturing = false; + this.audioContext = null; + this.processor = null; + this.stream = null; + this.sessionId = this.generateSessionId(); + + // Audio playback queue + this.audioQueue = []; + this.isPlayingAudio = false; + this.playbackAudioContext = null; + this.currentAudioSource = null; + + this.initializeElements(); + this.setupEventListeners(); + } + + initializeElements() { + this.connectBtn = document.getElementById('connectBtn'); + this.muteBtn = document.getElementById('muteBtn'); + this.status = document.getElementById('status'); + this.messagesContent = document.getElementById('messagesContent'); + this.eventsContent = document.getElementById('eventsContent'); + this.toolsContent = document.getElementById('toolsContent'); + } + + setupEventListeners() { + this.connectBtn.addEventListener('click', () => { + if (this.isConnected) { + this.disconnect(); + } else { + this.connect(); + } + }); + + this.muteBtn.addEventListener('click', () => { + this.toggleMute(); + }); + } + + generateSessionId() { + return 'session_' + Math.random().toString(36).substr(2, 9); + } + + async connect() { + try { + this.ws = new WebSocket(`ws://localhost:8000/ws/${this.sessionId}`); + + this.ws.onopen = () => { + this.isConnected = true; + this.updateConnectionUI(); + this.startContinuousCapture(); + }; + + this.ws.onmessage = (event) => { + const data = JSON.parse(event.data); + this.handleRealtimeEvent(data); + }; + + this.ws.onclose = () => { + this.isConnected = false; + this.updateConnectionUI(); + }; + + this.ws.onerror = (error) => { + console.error('WebSocket error:', error); + }; + + } catch (error) { + console.error('Failed to connect:', error); + } + } + + disconnect() { + if (this.ws) { + this.ws.close(); + } + this.stopContinuousCapture(); + } + + updateConnectionUI() { + if (this.isConnected) { + this.connectBtn.textContent = 'Disconnect'; + this.connectBtn.className = 'connect-btn connected'; + this.status.textContent = 'Connected'; + this.status.className = 'status connected'; + this.muteBtn.disabled = false; + } else { + this.connectBtn.textContent = 'Connect'; + this.connectBtn.className = 'connect-btn disconnected'; + this.status.textContent = 'Disconnected'; + this.status.className = 'status disconnected'; + this.muteBtn.disabled = true; + } + } + + toggleMute() { + this.isMuted = !this.isMuted; + this.updateMuteUI(); + } + + updateMuteUI() { + if (this.isMuted) { + this.muteBtn.textContent = '🔇 Mic Off'; + this.muteBtn.className = 'mute-btn muted'; + } else { + this.muteBtn.textContent = '🎤 Mic On'; + this.muteBtn.className = 'mute-btn unmuted'; + if (this.isCapturing) { + this.muteBtn.classList.add('active'); + } + } + } + + async startContinuousCapture() { + if (!this.isConnected || this.isCapturing) return; + + // Check if getUserMedia is available + if (!navigator.mediaDevices || !navigator.mediaDevices.getUserMedia) { + throw new Error('getUserMedia not available. Please use HTTPS or localhost.'); + } + + try { + this.stream = await navigator.mediaDevices.getUserMedia({ + audio: { + sampleRate: 24000, + channelCount: 1, + echoCancellation: true, + noiseSuppression: true + } + }); + + this.audioContext = new AudioContext({ sampleRate: 24000 }); + const source = this.audioContext.createMediaStreamSource(this.stream); + + // Create a script processor to capture audio data + this.processor = this.audioContext.createScriptProcessor(4096, 1, 1); + source.connect(this.processor); + this.processor.connect(this.audioContext.destination); + + this.processor.onaudioprocess = (event) => { + if (!this.isMuted && this.ws && this.ws.readyState === WebSocket.OPEN) { + const inputBuffer = event.inputBuffer.getChannelData(0); + const int16Buffer = new Int16Array(inputBuffer.length); + + // Convert float32 to int16 + for (let i = 0; i < inputBuffer.length; i++) { + int16Buffer[i] = Math.max(-32768, Math.min(32767, inputBuffer[i] * 32768)); + } + + this.ws.send(JSON.stringify({ + type: 'audio', + data: Array.from(int16Buffer) + })); + } + }; + + this.isCapturing = true; + this.updateMuteUI(); + + } catch (error) { + console.error('Failed to start audio capture:', error); + } + } + + stopContinuousCapture() { + if (!this.isCapturing) return; + + this.isCapturing = false; + + if (this.processor) { + this.processor.disconnect(); + this.processor = null; + } + + if (this.audioContext) { + this.audioContext.close(); + this.audioContext = null; + } + + if (this.stream) { + this.stream.getTracks().forEach(track => track.stop()); + this.stream = null; + } + + this.updateMuteUI(); + } + + handleRealtimeEvent(event) { + // Add to raw events pane + this.addRawEvent(event); + + // Add to tools panel if it's a tool or handoff event + if (event.type === 'tool_start' || event.type === 'tool_end' || event.type === 'handoff') { + this.addToolEvent(event); + } + + // Handle specific event types + switch (event.type) { + case 'audio': + this.playAudio(event.audio); + break; + case 'audio_interrupted': + this.stopAudioPlayback(); + break; + case 'history_updated': + this.updateMessagesFromHistory(event.history); + break; + } + } + + + updateMessagesFromHistory(history) { + console.log('updateMessagesFromHistory called with:', history); + + // Clear all existing messages + this.messagesContent.innerHTML = ''; + + // Add messages from history + if (history && Array.isArray(history)) { + console.log('Processing history array with', history.length, 'items'); + history.forEach((item, index) => { + console.log(`History item ${index}:`, item); + if (item.type === 'message') { + const role = item.role; + let content = ''; + + console.log(`Message item - role: ${role}, content:`, item.content); + + if (item.content && Array.isArray(item.content)) { + // Extract text from content array + item.content.forEach(contentPart => { + console.log('Content part:', contentPart); + if (contentPart.type === 'text' && contentPart.text) { + content += contentPart.text; + } else if (contentPart.type === 'input_text' && contentPart.text) { + content += contentPart.text; + } else if (contentPart.type === 'input_audio' && contentPart.transcript) { + content += contentPart.transcript; + } else if (contentPart.type === 'audio' && contentPart.transcript) { + content += contentPart.transcript; + } + }); + } + + console.log(`Final content for ${role}:`, content); + + if (content.trim()) { + this.addMessage(role, content.trim()); + console.log(`Added message: ${role} - ${content.trim()}`); + } + } else { + console.log(`Skipping non-message item of type: ${item.type}`); + } + }); + } else { + console.log('History is not an array or is null/undefined'); + } + + this.scrollToBottom(); + } + + addMessage(type, content) { + const messageDiv = document.createElement('div'); + messageDiv.className = `message ${type}`; + + const bubbleDiv = document.createElement('div'); + bubbleDiv.className = 'message-bubble'; + bubbleDiv.textContent = content; + + messageDiv.appendChild(bubbleDiv); + this.messagesContent.appendChild(messageDiv); + this.scrollToBottom(); + + return messageDiv; + } + + addRawEvent(event) { + const eventDiv = document.createElement('div'); + eventDiv.className = 'event'; + + const headerDiv = document.createElement('div'); + headerDiv.className = 'event-header'; + headerDiv.innerHTML = ` + ${event.type} + ▼ + `; + + const contentDiv = document.createElement('div'); + contentDiv.className = 'event-content collapsed'; + contentDiv.textContent = JSON.stringify(event, null, 2); + + headerDiv.addEventListener('click', () => { + const isCollapsed = contentDiv.classList.contains('collapsed'); + contentDiv.classList.toggle('collapsed'); + headerDiv.querySelector('span:last-child').textContent = isCollapsed ? '▲' : '▼'; + }); + + eventDiv.appendChild(headerDiv); + eventDiv.appendChild(contentDiv); + this.eventsContent.appendChild(eventDiv); + + // Auto-scroll events pane + this.eventsContent.scrollTop = this.eventsContent.scrollHeight; + } + + addToolEvent(event) { + const eventDiv = document.createElement('div'); + eventDiv.className = 'event'; + + let title = ''; + let description = ''; + let eventClass = ''; + + if (event.type === 'handoff') { + title = `🔄 Handoff`; + description = `From ${event.from} to ${event.to}`; + eventClass = 'handoff'; + } else if (event.type === 'tool_start') { + title = `🔧 Tool Started`; + description = `Running ${event.tool}`; + eventClass = 'tool'; + } else if (event.type === 'tool_end') { + title = `✅ Tool Completed`; + description = `${event.tool}: ${event.output || 'No output'}`; + eventClass = 'tool'; + } + + eventDiv.innerHTML = ` +
+
+
${title}
+
${description}
+
+ ${new Date().toLocaleTimeString()} +
+ `; + + this.toolsContent.appendChild(eventDiv); + + // Auto-scroll tools pane + this.toolsContent.scrollTop = this.toolsContent.scrollHeight; + } + + async playAudio(audioBase64) { + try { + if (!audioBase64 || audioBase64.length === 0) { + console.warn('Received empty audio data, skipping playback'); + return; + } + + // Add to queue + this.audioQueue.push(audioBase64); + + // Start processing queue if not already playing + if (!this.isPlayingAudio) { + this.processAudioQueue(); + } + + } catch (error) { + console.error('Failed to play audio:', error); + } + } + + async processAudioQueue() { + if (this.isPlayingAudio || this.audioQueue.length === 0) { + return; + } + + this.isPlayingAudio = true; + + // Initialize audio context if needed + if (!this.playbackAudioContext) { + this.playbackAudioContext = new AudioContext({ sampleRate: 24000 }); + } + + while (this.audioQueue.length > 0) { + const audioBase64 = this.audioQueue.shift(); + await this.playAudioChunk(audioBase64); + } + + this.isPlayingAudio = false; + } + + async playAudioChunk(audioBase64) { + return new Promise((resolve, reject) => { + try { + // Decode base64 to ArrayBuffer + const binaryString = atob(audioBase64); + const bytes = new Uint8Array(binaryString.length); + for (let i = 0; i < binaryString.length; i++) { + bytes[i] = binaryString.charCodeAt(i); + } + + const int16Array = new Int16Array(bytes.buffer); + + if (int16Array.length === 0) { + console.warn('Audio chunk has no samples, skipping'); + resolve(); + return; + } + + const float32Array = new Float32Array(int16Array.length); + + // Convert int16 to float32 + for (let i = 0; i < int16Array.length; i++) { + float32Array[i] = int16Array[i] / 32768.0; + } + + const audioBuffer = this.playbackAudioContext.createBuffer(1, float32Array.length, 24000); + audioBuffer.getChannelData(0).set(float32Array); + + const source = this.playbackAudioContext.createBufferSource(); + source.buffer = audioBuffer; + source.connect(this.playbackAudioContext.destination); + + // Store reference to current source + this.currentAudioSource = source; + + source.onended = () => { + this.currentAudioSource = null; + resolve(); + }; + source.start(); + + } catch (error) { + console.error('Failed to play audio chunk:', error); + reject(error); + } + }); + } + + stopAudioPlayback() { + console.log('Stopping audio playback due to interruption'); + + // Stop current audio source if playing + if (this.currentAudioSource) { + try { + this.currentAudioSource.stop(); + this.currentAudioSource = null; + } catch (error) { + console.error('Error stopping audio source:', error); + } + } + + // Clear the audio queue + this.audioQueue = []; + + // Reset playback state + this.isPlayingAudio = false; + + console.log('Audio playback stopped and queue cleared'); + } + + scrollToBottom() { + this.messagesContent.scrollTop = this.messagesContent.scrollHeight; + } +} + +// Initialize the demo when the page loads +document.addEventListener('DOMContentLoaded', () => { + new RealtimeDemo(); +}); \ No newline at end of file diff --git a/examples/realtime/app/static/index.html b/examples/realtime/app/static/index.html new file mode 100644 index 000000000..fbd0de46d --- /dev/null +++ b/examples/realtime/app/static/index.html @@ -0,0 +1,295 @@ + + + + + + Realtime Demo + + + +
+

Realtime Demo

+ +
+ +
+
+
+ Conversation +
+
+ +
+
+ + Disconnected +
+
+ +
+
+
+ Event stream +
+
+ +
+
+ +
+
+ Tools & Handoffs +
+
+ +
+
+
+
+ + + + \ No newline at end of file diff --git a/examples/realtime/no_ui_demo.py b/examples/realtime/cli/demo.py similarity index 100% rename from examples/realtime/no_ui_demo.py rename to examples/realtime/cli/demo.py diff --git a/examples/realtime/ui.py b/examples/realtime/cli/ui.py similarity index 100% rename from examples/realtime/ui.py rename to examples/realtime/cli/ui.py diff --git a/examples/realtime/demo.py b/examples/realtime/demo.py deleted file mode 100644 index 65ca63f27..000000000 --- a/examples/realtime/demo.py +++ /dev/null @@ -1,117 +0,0 @@ -import asyncio -import os -import sys -from typing import TYPE_CHECKING - -import numpy as np - -from agents.realtime import RealtimeSession - -# Add the current directory to path so we can import ui -sys.path.append(os.path.dirname(os.path.abspath(__file__))) - -from agents import function_tool -from agents.realtime import RealtimeAgent, RealtimeRunner, RealtimeSessionEvent - -if TYPE_CHECKING: - from .ui import AppUI -else: - # Try both import styles - try: - # Try relative import first (when used as a package) - from .ui import AppUI - except ImportError: - # Fall back to direct import (when run as a script) - from ui import AppUI - - -@function_tool -def get_weather(city: str) -> str: - """Get the weather in a city.""" - return f"The weather in {city} is sunny." - - -agent = RealtimeAgent( - name="Assistant", - instructions="You always greet the user with 'Top of the morning to you'.", - tools=[get_weather], -) - - -def _truncate_str(s: str, max_length: int) -> str: - if len(s) > max_length: - return s[:max_length] + "..." - return s - - -class Example: - def __init__(self) -> None: - self.ui = AppUI() - self.ui.connected = asyncio.Event() - self.ui.last_audio_item_id = None - # Set the audio callback - self.ui.set_audio_callback(self.on_audio_recorded) - - self.session: RealtimeSession | None = None - - async def run(self) -> None: - # Start UI in a separate task instead of waiting for it to complete - ui_task = asyncio.create_task(self.ui.run_async()) - - # Set up session immediately without waiting for UI to finish - runner = RealtimeRunner(agent) - async with await runner.run() as session: - self.session = session - self.ui.set_is_connected(True) - async for event in session: - await self._on_event(event) - print("done") - - # Wait for UI task to complete when session ends - await ui_task - - async def on_audio_recorded(self, audio_bytes: bytes) -> None: - # Send the audio to the session - assert self.session is not None - await self.session.send_audio(audio_bytes) - - async def _on_event(self, event: RealtimeSessionEvent) -> None: - try: - if event.type == "agent_start": - self.ui.add_transcript(f"Agent started: {event.agent.name}") - elif event.type == "agent_end": - self.ui.add_transcript(f"Agent ended: {event.agent.name}") - elif event.type == "handoff": - self.ui.add_transcript( - f"Handoff from {event.from_agent.name} to {event.to_agent.name}" - ) - elif event.type == "tool_start": - self.ui.add_transcript(f"Tool started: {event.tool.name}") - elif event.type == "tool_end": - self.ui.add_transcript(f"Tool ended: {event.tool.name}; output: {event.output}") - elif event.type == "audio_end": - self.ui.add_transcript("Audio ended") - elif event.type == "audio": - np_audio = np.frombuffer(event.audio.data, dtype=np.int16) - # Play audio in a separate thread to avoid blocking the event loop - await asyncio.to_thread(self.ui.play_audio, np_audio) - elif event.type == "audio_interrupted": - self.ui.add_transcript("Audio interrupted") - elif event.type == "error": - pass - elif event.type == "history_updated": - pass - elif event.type == "history_added": - pass - elif event.type == "raw_model_event": - if event.data.type != "error" and event.data.type != "exception": - self.ui.log_message(f"Raw model event: {event.data}") - else: - self.ui.log_message(f"Unknown event type: {event.type}") - except Exception as e: - self.ui.log_message(f"Error processing event: {_truncate_str(str(e), 50)}") - - -if __name__ == "__main__": - example = Example() - asyncio.run(example.run()) diff --git a/tests/realtime/test_conversion_helpers.py b/tests/realtime/test_conversion_helpers.py index 2264407c9..859813edd 100644 --- a/tests/realtime/test_conversion_helpers.py +++ b/tests/realtime/test_conversion_helpers.py @@ -178,7 +178,7 @@ def test_convert_user_input_to_conversation_item_dict(self): "content": [ {"type": "input_text", "text": "Hello"}, {"type": "input_text", "text": "World"}, - ] + ], } event = RealtimeModelSendUserInput(user_input=user_input_dict) @@ -199,7 +199,7 @@ def test_convert_user_input_to_conversation_item_dict_empty_content(self): user_input_dict: RealtimeModelUserInputMessage = { "type": "message", "role": "user", - "content": [] + "content": [], } event = RealtimeModelSendUserInput(user_input=user_input_dict)