Skip to content

Commit d9f1d5f

Browse files
authored
Fixes realtime example app stability issues (openai#1905)
1 parent 1a54ce7 commit d9f1d5f

File tree

4 files changed

+326
-132
lines changed

4 files changed

+326
-132
lines changed

examples/realtime/app/server.py

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313

1414
from agents.realtime import RealtimeRunner, RealtimeSession, RealtimeSessionEvent
1515
from agents.realtime.config import RealtimeUserInputMessage
16+
from agents.realtime.items import RealtimeItem
17+
from agents.realtime.model import RealtimeModelConfig
1618
from agents.realtime.model_inputs import RealtimeModelSendRawMessage
1719

1820
# Import TwilioHandler class - handle both module and package use cases
@@ -45,7 +47,18 @@ async def connect(self, websocket: WebSocket, session_id: str):
4547

4648
agent = get_starting_agent()
4749
runner = RealtimeRunner(agent)
48-
session_context = await runner.run()
50+
model_config: RealtimeModelConfig = {
51+
"initial_model_settings": {
52+
"turn_detection": {
53+
"type": "server_vad",
54+
"prefix_padding_ms": 300,
55+
"silence_duration_ms": 500,
56+
"interrupt_response": True,
57+
"create_response": True,
58+
},
59+
},
60+
}
61+
session_context = await runner.run(model_config=model_config)
4962
session = await session_context.__aenter__()
5063
self.active_sessions[session_id] = session
5164
self.session_contexts[session_id] = session_context
@@ -103,8 +116,26 @@ async def _process_events(self, session_id: str):
103116
event_data = await self._serialize_event(event)
104117
await websocket.send_text(json.dumps(event_data))
105118
except Exception as e:
119+
print(e)
106120
logger.error(f"Error processing events for session {session_id}: {e}")
107121

122+
def _sanitize_history_item(self, item: RealtimeItem) -> dict[str, Any]:
123+
"""Remove large binary payloads from history items while keeping transcripts."""
124+
item_dict = item.model_dump()
125+
content = item_dict.get("content")
126+
if isinstance(content, list):
127+
sanitized_content: list[Any] = []
128+
for part in content:
129+
if isinstance(part, dict):
130+
sanitized_part = part.copy()
131+
if sanitized_part.get("type") in {"audio", "input_audio"}:
132+
sanitized_part.pop("audio", None)
133+
sanitized_content.append(sanitized_part)
134+
else:
135+
sanitized_content.append(part)
136+
item_dict["content"] = sanitized_content
137+
return item_dict
138+
108139
async def _serialize_event(self, event: RealtimeSessionEvent) -> dict[str, Any]:
109140
base_event: dict[str, Any] = {
110141
"type": event.type,
@@ -129,11 +160,11 @@ async def _serialize_event(self, event: RealtimeSessionEvent) -> dict[str, Any]:
129160
elif event.type == "audio_end":
130161
pass
131162
elif event.type == "history_updated":
132-
base_event["history"] = [item.model_dump(mode="json") for item in event.history]
163+
base_event["history"] = [self._sanitize_history_item(item) for item in event.history]
133164
elif event.type == "history_added":
134165
# Provide the added item so the UI can render incrementally.
135166
try:
136-
base_event["item"] = event.item.model_dump(mode="json")
167+
base_event["item"] = self._sanitize_history_item(event.item)
137168
except Exception:
138169
base_event["item"] = None
139170
elif event.type == "guardrail_tripped":

examples/realtime/app/static/app.js

Lines changed: 116 additions & 129 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,16 @@ class RealtimeDemo {
55
this.isMuted = false;
66
this.isCapturing = false;
77
this.audioContext = null;
8-
this.processor = null;
8+
this.captureSource = null;
9+
this.captureNode = null;
910
this.stream = null;
1011
this.sessionId = this.generateSessionId();
1112

12-
// Audio playback queue
13-
this.audioQueue = [];
1413
this.isPlayingAudio = false;
1514
this.playbackAudioContext = null;
16-
this.currentAudioSource = null;
17-
this.currentAudioGain = null; // per-chunk gain for smooth fades
15+
this.playbackNode = null;
16+
this.playbackInitPromise = null;
17+
this.pendingPlaybackChunks = [];
1818
this.playbackFadeSec = 0.02; // ~20ms fade to reduce clicks
1919
this.messageNodes = new Map(); // item_id -> DOM node
2020
this.seenItemIds = new Set(); // item_id set for append-only syncing
@@ -227,30 +227,35 @@ class RealtimeDemo {
227227
});
228228

229229
this.audioContext = new AudioContext({ sampleRate: 24000, latencyHint: 'interactive' });
230-
const source = this.audioContext.createMediaStreamSource(this.stream);
230+
if (this.audioContext.state === 'suspended') {
231+
try { await this.audioContext.resume(); } catch {}
232+
}
231233

232-
// Create a script processor to capture audio data
233-
this.processor = this.audioContext.createScriptProcessor(4096, 1, 1);
234-
source.connect(this.processor);
235-
this.processor.connect(this.audioContext.destination);
234+
if (!this.audioContext.audioWorklet) {
235+
throw new Error('AudioWorklet API not supported in this browser.');
236+
}
236237

237-
this.processor.onaudioprocess = (event) => {
238-
if (!this.isMuted && this.ws && this.ws.readyState === WebSocket.OPEN) {
239-
const inputBuffer = event.inputBuffer.getChannelData(0);
240-
const int16Buffer = new Int16Array(inputBuffer.length);
238+
await this.audioContext.audioWorklet.addModule('audio-recorder.worklet.js');
241239

242-
// Convert float32 to int16
243-
for (let i = 0; i < inputBuffer.length; i++) {
244-
int16Buffer[i] = Math.max(-32768, Math.min(32767, inputBuffer[i] * 32768));
245-
}
240+
this.captureSource = this.audioContext.createMediaStreamSource(this.stream);
241+
this.captureNode = new AudioWorkletNode(this.audioContext, 'pcm-recorder');
246242

247-
this.ws.send(JSON.stringify({
248-
type: 'audio',
249-
data: Array.from(int16Buffer)
250-
}));
251-
}
243+
this.captureNode.port.onmessage = (event) => {
244+
if (this.isMuted) return;
245+
if (!this.ws || this.ws.readyState !== WebSocket.OPEN) return;
246+
247+
const chunk = event.data instanceof ArrayBuffer ? new Int16Array(event.data) : event.data;
248+
if (!chunk || !(chunk instanceof Int16Array) || chunk.length === 0) return;
249+
250+
this.ws.send(JSON.stringify({
251+
type: 'audio',
252+
data: Array.from(chunk)
253+
}));
252254
};
253255

256+
this.captureSource.connect(this.captureNode);
257+
this.captureNode.connect(this.audioContext.destination);
258+
254259
this.isCapturing = true;
255260
this.updateMuteUI();
256261

@@ -264,9 +269,15 @@ class RealtimeDemo {
264269

265270
this.isCapturing = false;
266271

267-
if (this.processor) {
268-
this.processor.disconnect();
269-
this.processor = null;
272+
if (this.captureSource) {
273+
try { this.captureSource.disconnect(); } catch {}
274+
this.captureSource = null;
275+
}
276+
277+
if (this.captureNode) {
278+
this.captureNode.port.onmessage = null;
279+
try { this.captureNode.disconnect(); } catch {}
280+
this.captureNode = null;
270281
}
271282

272283
if (this.audioContext) {
@@ -544,141 +555,117 @@ class RealtimeDemo {
544555
return;
545556
}
546557

547-
// Add to queue
548-
this.audioQueue.push(audioBase64);
549-
550-
// Start processing queue if not already playing
551-
if (!this.isPlayingAudio) {
552-
this.processAudioQueue();
558+
const int16Array = this.decodeBase64ToInt16(audioBase64);
559+
if (!int16Array || int16Array.length === 0) {
560+
console.warn('Audio chunk has no samples, skipping');
561+
return;
553562
}
554563

564+
this.pendingPlaybackChunks.push(int16Array);
565+
await this.ensurePlaybackNode();
566+
this.flushPendingPlaybackChunks();
567+
555568
} catch (error) {
556569
console.error('Failed to play audio:', error);
570+
this.pendingPlaybackChunks = [];
557571
}
558572
}
559573

560-
async processAudioQueue() {
561-
if (this.isPlayingAudio || this.audioQueue.length === 0) {
574+
async ensurePlaybackNode() {
575+
if (this.playbackNode) {
562576
return;
563577
}
564578

565-
this.isPlayingAudio = true;
566-
567-
// Initialize audio context if needed
568-
if (!this.playbackAudioContext) {
569-
this.playbackAudioContext = new AudioContext({ sampleRate: 24000, latencyHint: 'interactive' });
570-
}
579+
if (!this.playbackInitPromise) {
580+
this.playbackInitPromise = (async () => {
581+
if (!this.playbackAudioContext) {
582+
this.playbackAudioContext = new AudioContext({ sampleRate: 24000, latencyHint: 'interactive' });
583+
}
571584

572-
// Ensure context is running (autoplay policies can suspend it)
573-
if (this.playbackAudioContext.state === 'suspended') {
574-
try { await this.playbackAudioContext.resume(); } catch {}
575-
}
585+
if (this.playbackAudioContext.state === 'suspended') {
586+
try { await this.playbackAudioContext.resume(); } catch {}
587+
}
576588

577-
while (this.audioQueue.length > 0) {
578-
const audioBase64 = this.audioQueue.shift();
579-
await this.playAudioChunk(audioBase64);
580-
}
589+
if (!this.playbackAudioContext.audioWorklet) {
590+
throw new Error('AudioWorklet API not supported in this browser.');
591+
}
581592

582-
this.isPlayingAudio = false;
583-
}
593+
await this.playbackAudioContext.audioWorklet.addModule('audio-playback.worklet.js');
584594

585-
async playAudioChunk(audioBase64) {
586-
return new Promise((resolve, reject) => {
587-
try {
588-
// Decode base64 to ArrayBuffer
589-
const binaryString = atob(audioBase64);
590-
const bytes = new Uint8Array(binaryString.length);
591-
for (let i = 0; i < binaryString.length; i++) {
592-
bytes[i] = binaryString.charCodeAt(i);
593-
}
595+
this.playbackNode = new AudioWorkletNode(this.playbackAudioContext, 'pcm-playback', { outputChannelCount: [1] });
596+
this.playbackNode.port.onmessage = (event) => {
597+
const message = event.data;
598+
if (!message || typeof message !== 'object') return;
599+
if (message.type === 'drained') {
600+
this.isPlayingAudio = false;
601+
}
602+
};
594603

595-
const int16Array = new Int16Array(bytes.buffer);
604+
// Provide initial configuration for fades.
605+
const fadeSamples = Math.floor(this.playbackAudioContext.sampleRate * this.playbackFadeSec);
606+
this.playbackNode.port.postMessage({ type: 'config', fadeSamples });
596607

597-
if (int16Array.length === 0) {
598-
console.warn('Audio chunk has no samples, skipping');
599-
resolve();
600-
return;
601-
}
608+
this.playbackNode.connect(this.playbackAudioContext.destination);
609+
})().catch((error) => {
610+
this.playbackInitPromise = null;
611+
throw error;
612+
});
613+
}
602614

603-
const float32Array = new Float32Array(int16Array.length);
615+
await this.playbackInitPromise;
616+
}
604617

605-
// Convert int16 to float32
606-
for (let i = 0; i < int16Array.length; i++) {
607-
float32Array[i] = int16Array[i] / 32768.0;
608-
}
618+
flushPendingPlaybackChunks() {
619+
if (!this.playbackNode) {
620+
return;
621+
}
609622

610-
const audioBuffer = this.playbackAudioContext.createBuffer(1, float32Array.length, 24000);
611-
audioBuffer.getChannelData(0).set(float32Array);
612-
613-
const source = this.playbackAudioContext.createBufferSource();
614-
source.buffer = audioBuffer;
615-
616-
// Per-chunk gain with short fade-in/out to avoid clicks
617-
const gainNode = this.playbackAudioContext.createGain();
618-
const now = this.playbackAudioContext.currentTime;
619-
const fade = Math.min(this.playbackFadeSec, Math.max(0.005, audioBuffer.duration / 8));
620-
try {
621-
gainNode.gain.cancelScheduledValues(now);
622-
gainNode.gain.setValueAtTime(0.0, now);
623-
gainNode.gain.linearRampToValueAtTime(1.0, now + fade);
624-
const endTime = now + audioBuffer.duration;
625-
gainNode.gain.setValueAtTime(1.0, Math.max(now + fade, endTime - fade));
626-
gainNode.gain.linearRampToValueAtTime(0.0001, endTime);
627-
} catch {}
628-
629-
source.connect(gainNode);
630-
gainNode.connect(this.playbackAudioContext.destination);
631-
632-
// Store references to allow smooth stop on interruption
633-
this.currentAudioSource = source;
634-
this.currentAudioGain = gainNode;
635-
636-
source.onended = () => {
637-
this.currentAudioSource = null;
638-
this.currentAudioGain = null;
639-
resolve();
640-
};
641-
source.start();
623+
while (this.pendingPlaybackChunks.length > 0) {
624+
const chunk = this.pendingPlaybackChunks.shift();
625+
if (!chunk || !(chunk instanceof Int16Array) || chunk.length === 0) {
626+
continue;
627+
}
642628

629+
try {
630+
this.playbackNode.port.postMessage(
631+
{ type: 'chunk', payload: chunk.buffer },
632+
[chunk.buffer]
633+
);
634+
this.isPlayingAudio = true;
643635
} catch (error) {
644-
console.error('Failed to play audio chunk:', error);
645-
reject(error);
636+
console.error('Failed to enqueue audio chunk to worklet:', error);
646637
}
647-
});
638+
}
639+
}
640+
641+
decodeBase64ToInt16(audioBase64) {
642+
try {
643+
const binaryString = atob(audioBase64);
644+
const length = binaryString.length;
645+
const bytes = new Uint8Array(length);
646+
for (let i = 0; i < length; i++) {
647+
bytes[i] = binaryString.charCodeAt(i);
648+
}
649+
return new Int16Array(bytes.buffer);
650+
} catch (error) {
651+
console.error('Failed to decode audio chunk:', error);
652+
return null;
653+
}
648654
}
649655

650656
stopAudioPlayback() {
651657
console.log('Stopping audio playback due to interruption');
652658

653-
// Smoothly ramp down before stopping to avoid clicks
654-
if (this.currentAudioSource && this.playbackAudioContext) {
659+
this.pendingPlaybackChunks = [];
660+
661+
if (this.playbackNode) {
655662
try {
656-
const now = this.playbackAudioContext.currentTime;
657-
const fade = Math.max(0.01, this.playbackFadeSec);
658-
if (this.currentAudioGain) {
659-
try {
660-
this.currentAudioGain.gain.cancelScheduledValues(now);
661-
// Capture current value to ramp from it
662-
const current = this.currentAudioGain.gain.value ?? 1.0;
663-
this.currentAudioGain.gain.setValueAtTime(current, now);
664-
this.currentAudioGain.gain.linearRampToValueAtTime(0.0001, now + fade);
665-
} catch {}
666-
}
667-
// Stop after the fade completes
668-
setTimeout(() => {
669-
try { this.currentAudioSource && this.currentAudioSource.stop(); } catch {}
670-
this.currentAudioSource = null;
671-
this.currentAudioGain = null;
672-
}, Math.ceil(fade * 1000));
663+
this.playbackNode.port.postMessage({ type: 'stop' });
673664
} catch (error) {
674-
console.error('Error stopping audio source:', error);
665+
console.error('Failed to notify playback worklet to stop:', error);
675666
}
676667
}
677668

678-
// Clear the audio queue
679-
this.audioQueue = [];
680-
681-
// Reset playback state
682669
this.isPlayingAudio = false;
683670

684671
console.log('Audio playback stopped and queue cleared');

0 commit comments

Comments
 (0)