+
+
+
+
+
+
+
+ Disconnected
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/examples/realtime/cli/demo.py b/examples/realtime/cli/demo.py
new file mode 100644
index 000000000..be610b43e
--- /dev/null
+++ b/examples/realtime/cli/demo.py
@@ -0,0 +1,253 @@
+import asyncio
+import queue
+import sys
+import threading
+from typing import Any
+
+import numpy as np
+import sounddevice as sd
+
+from agents import function_tool
+from agents.realtime import RealtimeAgent, RealtimeRunner, RealtimeSession, RealtimeSessionEvent
+
+# Audio configuration
+CHUNK_LENGTH_S = 0.05 # 50ms
+SAMPLE_RATE = 24000
+FORMAT = np.int16
+CHANNELS = 1
+
+# Set up logging for OpenAI agents SDK
+# logging.basicConfig(
+# level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
+# )
+# logger.logger.setLevel(logging.ERROR)
+
+
+@function_tool
+def get_weather(city: str) -> str:
+ """Get the weather in a city."""
+ return f"The weather in {city} is sunny."
+
+
+agent = RealtimeAgent(
+ name="Assistant",
+ instructions="You always greet the user with 'Top of the morning to you'.",
+ tools=[get_weather],
+)
+
+
+def _truncate_str(s: str, max_length: int) -> str:
+ if len(s) > max_length:
+ return s[:max_length] + "..."
+ return s
+
+
+class NoUIDemo:
+ def __init__(self) -> None:
+ self.session: RealtimeSession | None = None
+ self.audio_stream: sd.InputStream | None = None
+ self.audio_player: sd.OutputStream | None = None
+ self.recording = False
+
+ # Audio output state for callback system
+ self.output_queue: queue.Queue[Any] = queue.Queue(maxsize=10) # Buffer more chunks
+ self.interrupt_event = threading.Event()
+ self.current_audio_chunk: np.ndarray | None = None # type: ignore
+ self.chunk_position = 0
+
+ def _output_callback(self, outdata, frames: int, time, status) -> None:
+ """Callback for audio output - handles continuous audio stream from server."""
+ if status:
+ print(f"Output callback status: {status}")
+
+ # Check if we should clear the queue due to interrupt
+ if self.interrupt_event.is_set():
+ # Clear the queue and current chunk state
+ while not self.output_queue.empty():
+ try:
+ self.output_queue.get_nowait()
+ except queue.Empty:
+ break
+ self.current_audio_chunk = None
+ self.chunk_position = 0
+ self.interrupt_event.clear()
+ outdata.fill(0)
+ return
+
+ # Fill output buffer from queue and current chunk
+ outdata.fill(0) # Start with silence
+ samples_filled = 0
+
+ while samples_filled < len(outdata):
+ # If we don't have a current chunk, try to get one from queue
+ if self.current_audio_chunk is None:
+ try:
+ self.current_audio_chunk = self.output_queue.get_nowait()
+ self.chunk_position = 0
+ except queue.Empty:
+ # No more audio data available - this causes choppiness
+ # Uncomment next line to debug underruns:
+ # print(f"Audio underrun: {samples_filled}/{len(outdata)} samples filled")
+ break
+
+ # Copy data from current chunk to output buffer
+ remaining_output = len(outdata) - samples_filled
+ remaining_chunk = len(self.current_audio_chunk) - self.chunk_position
+ samples_to_copy = min(remaining_output, remaining_chunk)
+
+ if samples_to_copy > 0:
+ chunk_data = self.current_audio_chunk[
+ self.chunk_position : self.chunk_position + samples_to_copy
+ ]
+ # More efficient: direct assignment for mono audio instead of reshape
+ outdata[samples_filled : samples_filled + samples_to_copy, 0] = chunk_data
+ samples_filled += samples_to_copy
+ self.chunk_position += samples_to_copy
+
+ # If we've used up the entire chunk, reset for next iteration
+ if self.chunk_position >= len(self.current_audio_chunk):
+ self.current_audio_chunk = None
+ self.chunk_position = 0
+
+ async def run(self) -> None:
+ print("Connecting, may take a few seconds...")
+
+ # Initialize audio player with callback
+ chunk_size = int(SAMPLE_RATE * CHUNK_LENGTH_S)
+ self.audio_player = sd.OutputStream(
+ channels=CHANNELS,
+ samplerate=SAMPLE_RATE,
+ dtype=FORMAT,
+ callback=self._output_callback,
+ blocksize=chunk_size, # Match our chunk timing for better alignment
+ )
+ self.audio_player.start()
+
+ try:
+ runner = RealtimeRunner(agent)
+ async with await runner.run() as session:
+ self.session = session
+ print("Connected. Starting audio recording...")
+
+ # Start audio recording
+ await self.start_audio_recording()
+ print("Audio recording started. You can start speaking - expect lots of logs!")
+
+ # Process session events
+ async for event in session:
+ await self._on_event(event)
+
+ finally:
+ # Clean up audio player
+ if self.audio_player and self.audio_player.active:
+ self.audio_player.stop()
+ if self.audio_player:
+ self.audio_player.close()
+
+ print("Session ended")
+
+ async def start_audio_recording(self) -> None:
+ """Start recording audio from the microphone."""
+ # Set up audio input stream
+ self.audio_stream = sd.InputStream(
+ channels=CHANNELS,
+ samplerate=SAMPLE_RATE,
+ dtype=FORMAT,
+ )
+
+ self.audio_stream.start()
+ self.recording = True
+
+ # Start audio capture task
+ asyncio.create_task(self.capture_audio())
+
+ async def capture_audio(self) -> None:
+ """Capture audio from the microphone and send to the session."""
+ if not self.audio_stream or not self.session:
+ return
+
+ # Buffer size in samples
+ read_size = int(SAMPLE_RATE * CHUNK_LENGTH_S)
+
+ try:
+ while self.recording:
+ # Check if there's enough data to read
+ if self.audio_stream.read_available < read_size:
+ await asyncio.sleep(0.01)
+ continue
+
+ # Read audio data
+ data, _ = self.audio_stream.read(read_size)
+
+ # Convert numpy array to bytes
+ audio_bytes = data.tobytes()
+
+ # Send audio to session
+ await self.session.send_audio(audio_bytes)
+
+ # Yield control back to event loop
+ await asyncio.sleep(0)
+
+ except Exception as e:
+ print(f"Audio capture error: {e}")
+ finally:
+ if self.audio_stream and self.audio_stream.active:
+ self.audio_stream.stop()
+ if self.audio_stream:
+ self.audio_stream.close()
+
+ async def _on_event(self, event: RealtimeSessionEvent) -> None:
+ """Handle session events."""
+ try:
+ if event.type == "agent_start":
+ print(f"Agent started: {event.agent.name}")
+ elif event.type == "agent_end":
+ print(f"Agent ended: {event.agent.name}")
+ elif event.type == "handoff":
+ print(f"Handoff from {event.from_agent.name} to {event.to_agent.name}")
+ elif event.type == "tool_start":
+ print(f"Tool started: {event.tool.name}")
+ elif event.type == "tool_end":
+ print(f"Tool ended: {event.tool.name}; output: {event.output}")
+ elif event.type == "audio_end":
+ print("Audio ended")
+ elif event.type == "audio":
+ # Enqueue audio for callback-based playback
+ np_audio = np.frombuffer(event.audio.data, dtype=np.int16)
+ try:
+ self.output_queue.put_nowait(np_audio)
+ except queue.Full:
+ # Queue is full - only drop if we have significant backlog
+ # This prevents aggressive dropping that could cause choppiness
+ if self.output_queue.qsize() > 8: # Keep some buffer
+ try:
+ self.output_queue.get_nowait()
+ self.output_queue.put_nowait(np_audio)
+ except queue.Empty:
+ pass
+ # If queue isn't too full, just skip this chunk to avoid blocking
+ elif event.type == "audio_interrupted":
+ print("Audio interrupted")
+ # Signal the output callback to clear its queue and state
+ self.interrupt_event.set()
+ elif event.type == "error":
+ print(f"Error: {event.error}")
+ elif event.type == "history_updated":
+ pass # Skip these frequent events
+ elif event.type == "history_added":
+ pass # Skip these frequent events
+ elif event.type == "raw_model_event":
+ print(f"Raw model event: {_truncate_str(str(event.data), 50)}")
+ else:
+ print(f"Unknown event type: {event.type}")
+ except Exception as e:
+ print(f"Error processing event: {_truncate_str(str(e), 50)}")
+
+
+if __name__ == "__main__":
+ demo = NoUIDemo()
+ try:
+ asyncio.run(demo.run())
+ except KeyboardInterrupt:
+ print("\nExiting...")
+ sys.exit(0)
diff --git a/examples/realtime/cli/ui.py b/examples/realtime/cli/ui.py
new file mode 100644
index 000000000..51a1fed41
--- /dev/null
+++ b/examples/realtime/cli/ui.py
@@ -0,0 +1,268 @@
+from __future__ import annotations
+
+import asyncio
+from collections.abc import Coroutine
+from typing import Any, Callable
+
+import numpy as np
+import numpy.typing as npt
+import sounddevice as sd
+from textual import events
+from textual.app import App, ComposeResult
+from textual.containers import Container, Horizontal
+from textual.reactive import reactive
+from textual.widgets import RichLog, Static
+from typing_extensions import override
+
+CHUNK_LENGTH_S = 0.05 # 50ms
+SAMPLE_RATE = 24000
+FORMAT = np.int16
+CHANNELS = 1
+
+
+class Header(Static):
+ """A header widget."""
+
+ @override
+ def render(self) -> str:
+ return "Realtime Demo"
+
+
+class AudioStatusIndicator(Static):
+ """A widget that shows the current audio recording status."""
+
+ is_recording = reactive(False)
+
+ @override
+ def render(self) -> str:
+ status = (
+ "🔴 Conversation started."
+ if self.is_recording
+ else "⚪ Press SPACE to start the conversation (q to quit)"
+ )
+ return status
+
+
+class AppUI(App[None]):
+ CSS = """
+ Screen {
+ background: #1a1b26; /* Dark blue-grey background */
+ }
+
+ Container {
+ border: double rgb(91, 164, 91);
+ }
+
+ #input-container {
+ height: 5; /* Explicit height for input container */
+ margin: 1 1;
+ padding: 1 2;
+ }
+
+ #bottom-pane {
+ width: 100%;
+ height: 82%; /* Reduced to make room for session display */
+ border: round rgb(205, 133, 63);
+ }
+
+ #status-indicator {
+ height: 3;
+ content-align: center middle;
+ background: #2a2b36;
+ border: solid rgb(91, 164, 91);
+ margin: 1 1;
+ }
+
+ #session-display {
+ height: 3;
+ content-align: center middle;
+ background: #2a2b36;
+ border: solid rgb(91, 164, 91);
+ margin: 1 1;
+ }
+
+ #transcripts {
+ width: 50%;
+ height: 100%;
+ border-right: solid rgb(91, 164, 91);
+ }
+
+ #transcripts-header {
+ height: 2;
+ background: #2a2b36;
+ content-align: center middle;
+ border-bottom: solid rgb(91, 164, 91);
+ }
+
+ #transcripts-content {
+ height: 100%;
+ }
+
+ #event-log {
+ width: 50%;
+ height: 100%;
+ }
+
+ #event-log-header {
+ height: 2;
+ background: #2a2b36;
+ content-align: center middle;
+ border-bottom: solid rgb(91, 164, 91);
+ }
+
+ #event-log-content {
+ height: 100%;
+ }
+
+ Static {
+ color: white;
+ }
+ """
+
+ should_send_audio: asyncio.Event
+ connected: asyncio.Event
+ last_audio_item_id: str | None
+ audio_callback: Callable[[bytes], Coroutine[Any, Any, None]] | None
+
+ def __init__(self) -> None:
+ super().__init__()
+ self.audio_player = sd.OutputStream(
+ samplerate=SAMPLE_RATE,
+ channels=CHANNELS,
+ dtype=FORMAT,
+ )
+ self.should_send_audio = asyncio.Event()
+ self.connected = asyncio.Event()
+ self.audio_callback = None
+
+ @override
+ def compose(self) -> ComposeResult:
+ """Create child widgets for the app."""
+ with Container():
+ yield Header(id="session-display")
+ yield AudioStatusIndicator(id="status-indicator")
+ with Container(id="bottom-pane"):
+ with Horizontal():
+ with Container(id="transcripts"):
+ yield Static("Conversation transcript", id="transcripts-header")
+ yield RichLog(
+ id="transcripts-content", wrap=True, highlight=True, markup=True
+ )
+ with Container(id="event-log"):
+ yield Static("Raw event log", id="event-log-header")
+ yield RichLog(
+ id="event-log-content", wrap=True, highlight=True, markup=True
+ )
+
+ def set_is_connected(self, is_connected: bool) -> None:
+ self.connected.set() if is_connected else self.connected.clear()
+
+ def set_audio_callback(self, callback: Callable[[bytes], Coroutine[Any, Any, None]]) -> None:
+ """Set a callback function to be called when audio is recorded."""
+ self.audio_callback = callback
+
+ # High-level methods for UI operations
+ def set_header_text(self, text: str) -> None:
+ """Update the header text."""
+ header = self.query_one("#session-display", Header)
+ header.update(text)
+
+ def set_recording_status(self, is_recording: bool) -> None:
+ """Set the recording status indicator."""
+ status_indicator = self.query_one(AudioStatusIndicator)
+ status_indicator.is_recording = is_recording
+
+ def log_message(self, message: str) -> None:
+ """Add a message to the event log."""
+ try:
+ log_pane = self.query_one("#event-log-content", RichLog)
+ log_pane.write(message)
+ except Exception:
+ # Handle the case where the widget might not be available
+ pass
+
+ def add_transcript(self, message: str) -> None:
+ """Add a transcript message to the transcripts panel."""
+ try:
+ transcript_pane = self.query_one("#transcripts-content", RichLog)
+ transcript_pane.write(message)
+ except Exception:
+ # Handle the case where the widget might not be available
+ pass
+
+ def play_audio(self, audio_data: npt.NDArray[np.int16]) -> None:
+ """Play audio data through the audio player."""
+ try:
+ self.audio_player.write(audio_data)
+ except Exception as e:
+ self.log_message(f"Audio play error: {e}")
+
+ async def on_mount(self) -> None:
+ """Set up audio player and start the audio capture worker."""
+ self.audio_player.start()
+ self.run_worker(self.capture_audio())
+
+ async def capture_audio(self) -> None:
+ """Capture audio from the microphone and send to the session."""
+ # Wait for connection to be established
+ await self.connected.wait()
+
+ # Set up audio input stream
+ stream = sd.InputStream(
+ channels=CHANNELS,
+ samplerate=SAMPLE_RATE,
+ dtype=FORMAT,
+ )
+
+ try:
+ # Wait for user to press spacebar to start
+ await self.should_send_audio.wait()
+
+ stream.start()
+ self.set_recording_status(True)
+ self.log_message("Recording started - speak to the agent")
+
+ # Buffer size in samples
+ read_size = int(SAMPLE_RATE * CHUNK_LENGTH_S)
+
+ while True:
+ # Check if there's enough data to read
+ if stream.read_available < read_size:
+ await asyncio.sleep(0.01) # Small sleep to avoid CPU hogging
+ continue
+
+ # Read audio data
+ data, _ = stream.read(read_size)
+
+ # Convert numpy array to bytes
+ audio_bytes = data.tobytes()
+
+ # Call audio callback if set
+ if self.audio_callback:
+ await self.audio_callback(audio_bytes)
+
+ # Yield control back to event loop
+ await asyncio.sleep(0)
+
+ except Exception as e:
+ self.log_message(f"Audio capture error: {e}")
+ finally:
+ if stream.active:
+ stream.stop()
+ stream.close()
+
+ async def on_key(self, event: events.Key) -> None:
+ """Handle key press events."""
+ # add the keypress to the log
+ self.log_message(f"Key pressed: {event.key}")
+
+ if event.key == "q":
+ self.audio_player.stop()
+ self.audio_player.close()
+ self.exit()
+ return
+
+ if event.key == "space": # Spacebar
+ if not self.should_send_audio.is_set():
+ self.should_send_audio.set()
+ self.set_recording_status(True)
diff --git a/examples/realtime/twilio/README.md b/examples/realtime/twilio/README.md
new file mode 100644
index 000000000..e92f0681a
--- /dev/null
+++ b/examples/realtime/twilio/README.md
@@ -0,0 +1,86 @@
+# Realtime Twilio Integration
+
+This example demonstrates how to connect the OpenAI Realtime API to a phone call using Twilio's Media Streams. The server handles incoming phone calls and streams audio between Twilio and the OpenAI Realtime API, enabling real-time voice conversations with an AI agent over the phone.
+
+## Prerequisites
+
+- Python 3.9+
+- OpenAI API key with [Realtime API](https://platform.openai.com/docs/guides/realtime) access
+- [Twilio](https://www.twilio.com/docs/voice) account with a phone number
+- A tunneling service like [ngrok](https://ngrok.com/) to expose your local server
+
+## Setup
+
+1. **Start the server:**
+
+ ```bash
+ uv run server.py
+ ```
+
+ The server will start on port 8000 by default.
+
+2. **Expose the server publicly, e.g. via ngrok:**
+
+ ```bash
+ ngrok http 8000
+ ```
+
+ Note the public URL (e.g., `https://abc123.ngrok.io`)
+
+3. **Configure your Twilio phone number:**
+ - Log into your Twilio Console
+ - Select your phone number
+ - Set the webhook URL for incoming calls to: `https://your-ngrok-url.ngrok.io/incoming-call`
+ - Set the HTTP method to POST
+
+## Usage
+
+1. Call your Twilio phone number
+2. You'll hear: "Hello! You're now connected to an AI assistant. You can start talking!"
+3. Start speaking - the AI will respond in real-time
+4. The assistant has access to tools like weather information and current time
+
+## How It Works
+
+1. **Incoming Call**: When someone calls your Twilio number, Twilio makes a request to `/incoming-call`
+2. **TwiML Response**: The server returns TwiML that:
+ - Plays a greeting message
+ - Connects the call to a WebSocket stream at `/media-stream`
+3. **WebSocket Connection**: Twilio establishes a WebSocket connection for bidirectional audio streaming
+4. **Transport Layer**: The `TwilioRealtimeTransportLayer` class owns the WebSocket message handling:
+ - Takes ownership of the Twilio WebSocket after initial handshake
+ - Runs its own message loop to process all Twilio messages
+ - Handles protocol differences between Twilio and OpenAI
+ - Automatically sets G.711 μ-law audio format for Twilio compatibility
+ - Manages audio chunk tracking for interruption support
+ - Wraps the OpenAI realtime model instead of subclassing it
+5. **Audio Processing**:
+ - Audio from the caller is base64 decoded and sent to OpenAI Realtime API
+ - Audio responses from OpenAI are base64 encoded and sent back to Twilio
+ - Twilio plays the audio to the caller
+
+## Configuration
+
+- **Port**: Set `PORT` environment variable (default: 8000)
+- **OpenAI API Key**: Set `OPENAI_API_KEY` environment variable
+- **Agent Instructions**: Modify the `RealtimeAgent` configuration in `server.py`
+- **Tools**: Add or modify function tools in `server.py`
+
+## Troubleshooting
+
+- **WebSocket connection issues**: Ensure your ngrok URL is correct and publicly accessible
+- **Audio quality**: Twilio streams audio in mulaw format at 8kHz, which may affect quality
+- **Latency**: Network latency between Twilio, your server, and OpenAI affects response time
+- **Logs**: Check the console output for detailed connection and error logs
+
+## Architecture
+
+```
+Phone Call → Twilio → WebSocket → TwilioRealtimeTransportLayer → OpenAI Realtime API
+ ↓
+ RealtimeAgent with Tools
+ ↓
+ Audio Response → Twilio → Phone Call
+```
+
+The `TwilioRealtimeTransportLayer` acts as a bridge between Twilio's Media Streams and OpenAI's Realtime API, handling the protocol differences and audio format conversions. It wraps the OpenAI realtime model to provide a clean interface for Twilio integration.
diff --git a/examples/realtime/twilio/__init__.py b/examples/realtime/twilio/__init__.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/examples/realtime/twilio/requirements.txt b/examples/realtime/twilio/requirements.txt
new file mode 100644
index 000000000..3fcc0b0fe
--- /dev/null
+++ b/examples/realtime/twilio/requirements.txt
@@ -0,0 +1,5 @@
+openai-agents
+fastapi
+uvicorn[standard]
+websockets
+python-dotenv
\ No newline at end of file
diff --git a/examples/realtime/twilio/server.py b/examples/realtime/twilio/server.py
new file mode 100644
index 000000000..8a753f789
--- /dev/null
+++ b/examples/realtime/twilio/server.py
@@ -0,0 +1,80 @@
+import os
+from typing import TYPE_CHECKING
+
+from fastapi import FastAPI, Request, WebSocket, WebSocketDisconnect
+from fastapi.responses import PlainTextResponse
+
+# Import TwilioHandler class - handle both module and package use cases
+if TYPE_CHECKING:
+ # For type checking, use the relative import
+ from .twilio_handler import TwilioHandler
+else:
+ # At runtime, try both import styles
+ try:
+ # Try relative import first (when used as a package)
+ from .twilio_handler import TwilioHandler
+ except ImportError:
+ # Fall back to direct import (when run as a script)
+ from twilio_handler import TwilioHandler
+
+
+class TwilioWebSocketManager:
+ def __init__(self):
+ self.active_handlers: dict[str, TwilioHandler] = {}
+
+ async def new_session(self, websocket: WebSocket) -> TwilioHandler:
+ """Create and configure a new session."""
+ print("Creating twilio handler")
+
+ handler = TwilioHandler(websocket)
+ return handler
+
+ # In a real app, you'd also want to clean up/close the handler when the call ends
+
+
+manager = TwilioWebSocketManager()
+app = FastAPI()
+
+
+@app.get("/")
+async def root():
+ return {"message": "Twilio Media Stream Server is running!"}
+
+
+@app.post("/incoming-call")
+@app.get("/incoming-call")
+async def incoming_call(request: Request):
+ """Handle incoming Twilio phone calls"""
+ host = request.headers.get("Host")
+
+ twiml_response = f"""
+