Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions examples/realtime/app/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Realtime Demo App

A web-based realtime voice assistant demo with a FastAPI backend and HTML/JS frontend.

## Installation

Install the required dependencies:

```bash
uv add fastapi uvicorn websockets
```

## Usage

Start the application with a single command:

```bash
cd examples/realtime/app && uv run python server.py
```

Then open your browser to: http://localhost:8000

## How to Use

1. Click **Connect** to establish a realtime session
2. Audio capture starts automatically - just speak naturally
3. Click the **Mic On/Off** button to mute/unmute your microphone
4. Watch the conversation unfold in the left pane
5. Monitor raw events in the right pane (click to expand/collapse)
6. Click **Disconnect** when done

## Architecture

- **Backend**: FastAPI server with WebSocket connections for real-time communication
- **Session Management**: Each connection gets a unique session with the OpenAI Realtime API
- **Audio Processing**: 24kHz mono audio capture and playback
- **Event Handling**: Full event stream processing with transcript generation
- **Frontend**: Vanilla JavaScript with clean, responsive CSS

The demo showcases the core patterns for building realtime voice applications with the OpenAI Agents SDK.
172 changes: 172 additions & 0 deletions examples/realtime/app/server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
import asyncio
import base64
import json
import logging
import struct
from contextlib import asynccontextmanager
from typing import Any, assert_never

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import FileResponse
from fastapi.staticfiles import StaticFiles

from agents import function_tool
from agents.realtime import RealtimeAgent, RealtimeRunner, RealtimeSession, RealtimeSessionEvent

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


@function_tool
def get_weather(city: str) -> str:
"""Get the weather in a city."""
return f"The weather in {city} is sunny."


@function_tool
def get_secret_number() -> int:
"""Returns the secret number, if the user asks for it."""
return 71


haiku_agent = RealtimeAgent(
name="Haiku Agent",
instructions="You are a haiku poet. You must respond ONLY in traditional haiku format (5-7-5 syllables). Every response should be a proper haiku about the topic. Do not break character.",
tools=[],
)

agent = RealtimeAgent(
name="Assistant",
instructions="If the user wants poetry or haikus, you can hand them off to the haiku agent via the transfer_to_haiku_agent tool.",
tools=[get_weather, get_secret_number],
handoffs=[haiku_agent],
)


class RealtimeWebSocketManager:
def __init__(self):
self.active_sessions: dict[str, RealtimeSession] = {}
self.session_contexts: dict[str, Any] = {}
self.websockets: dict[str, WebSocket] = {}

async def connect(self, websocket: WebSocket, session_id: str):
await websocket.accept()
self.websockets[session_id] = websocket

runner = RealtimeRunner(agent)
session_context = await runner.run()
session = await session_context.__aenter__()
self.active_sessions[session_id] = session
self.session_contexts[session_id] = session_context

# Start event processing task
asyncio.create_task(self._process_events(session_id))

async def disconnect(self, session_id: str):
if session_id in self.session_contexts:
await self.session_contexts[session_id].__aexit__(None, None, None)
del self.session_contexts[session_id]
if session_id in self.active_sessions:
del self.active_sessions[session_id]
if session_id in self.websockets:
del self.websockets[session_id]

async def send_audio(self, session_id: str, audio_bytes: bytes):
if session_id in self.active_sessions:
await self.active_sessions[session_id].send_audio(audio_bytes)

async def _process_events(self, session_id: str):
try:
session = self.active_sessions[session_id]
websocket = self.websockets[session_id]

async for event in session:
event_data = await self._serialize_event(event)
await websocket.send_text(json.dumps(event_data))
except Exception as e:
logger.error(f"Error processing events for session {session_id}: {e}")

async def _serialize_event(self, event: RealtimeSessionEvent) -> dict[str, Any]:
base_event: dict[str, Any] = {
"type": event.type,
}

if event.type == "agent_start":
base_event["agent"] = event.agent.name
elif event.type == "agent_end":
base_event["agent"] = event.agent.name
elif event.type == "handoff":
base_event["from"] = event.from_agent.name
base_event["to"] = event.to_agent.name
elif event.type == "tool_start":
base_event["tool"] = event.tool.name
elif event.type == "tool_end":
base_event["tool"] = event.tool.name
base_event["output"] = str(event.output)
elif event.type == "audio":
base_event["audio"] = base64.b64encode(event.audio.data).decode("utf-8")
elif event.type == "audio_interrupted":
pass
elif event.type == "audio_end":
pass
elif event.type == "history_updated":
base_event["history"] = [item.model_dump(mode="json") for item in event.history]
elif event.type == "history_added":
pass
elif event.type == "guardrail_tripped":
base_event["guardrail_results"] = [
{"name": result.guardrail.name} for result in event.guardrail_results
]
elif event.type == "raw_model_event":
base_event["raw_model_event"] = {
"type": event.data.type,
}
elif event.type == "error":
base_event["error"] = str(event.error) if hasattr(event, "error") else "Unknown error"
else:
assert_never(event)

return base_event


manager = RealtimeWebSocketManager()


@asynccontextmanager
async def lifespan(app: FastAPI):
yield


app = FastAPI(lifespan=lifespan)


@app.websocket("/ws/{session_id}")
async def websocket_endpoint(websocket: WebSocket, session_id: str):
await manager.connect(websocket, session_id)
try:
while True:
data = await websocket.receive_text()
message = json.loads(data)

if message["type"] == "audio":
# Convert int16 array to bytes
int16_data = message["data"]
audio_bytes = struct.pack(f"{len(int16_data)}h", *int16_data)
await manager.send_audio(session_id, audio_bytes)

except WebSocketDisconnect:
await manager.disconnect(session_id)


app.mount("/", StaticFiles(directory="static", html=True), name="static")


@app.get("/")
async def read_index():
return FileResponse("static/index.html")


if __name__ == "__main__":
import uvicorn

uvicorn.run(app, host="0.0.0.0", port=8000)
Loading