Skip to content

Commit 1dd7a5f

Browse files
committed
wip: realtime ga migration
1 parent 184757b commit 1dd7a5f

File tree

13 files changed

+525
-182
lines changed

13 files changed

+525
-182
lines changed

examples/realtime/app/server.py

Lines changed: 65 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -36,20 +36,32 @@ def __init__(self):
3636
self.active_sessions: dict[str, RealtimeSession] = {}
3737
self.session_contexts: dict[str, Any] = {}
3838
self.websockets: dict[str, WebSocket] = {}
39+
self._pending_audio: dict[str, bytearray] = {}
40+
self._audio_flush_tasks: dict[str, asyncio.Task[Any]] = {}
3941

4042
async def connect(self, websocket: WebSocket, session_id: str):
4143
await websocket.accept()
4244
self.websockets[session_id] = websocket
4345

4446
agent = get_starting_agent()
4547
runner = RealtimeRunner(agent)
46-
session_context = await runner.run()
48+
# Disable server-side interrupt_response to avoid truncating assistant audio
49+
session_context = await runner.run(
50+
model_config={
51+
"initial_model_settings": {
52+
"turn_detection": {"type": "semantic_vad", "interrupt_response": False}
53+
}
54+
}
55+
)
4756
session = await session_context.__aenter__()
4857
self.active_sessions[session_id] = session
4958
self.session_contexts[session_id] = session_context
5059

5160
# Start event processing task
5261
asyncio.create_task(self._process_events(session_id))
62+
# Init audio buffer + steady flush task (~40ms)
63+
self._pending_audio[session_id] = bytearray()
64+
self._audio_flush_tasks[session_id] = asyncio.create_task(self._flush_audio_loop(session_id))
5365

5466
async def disconnect(self, session_id: str):
5567
if session_id in self.session_contexts:
@@ -59,6 +71,11 @@ async def disconnect(self, session_id: str):
5971
del self.active_sessions[session_id]
6072
if session_id in self.websockets:
6173
del self.websockets[session_id]
74+
if session_id in self._pending_audio:
75+
del self._pending_audio[session_id]
76+
if session_id in self._audio_flush_tasks:
77+
self._audio_flush_tasks[session_id].cancel()
78+
del self._audio_flush_tasks[session_id]
6279

6380
async def send_audio(self, session_id: str, audio_bytes: bytes):
6481
if session_id in self.active_sessions:
@@ -70,12 +87,13 @@ async def _process_events(self, session_id: str):
7087
websocket = self.websockets[session_id]
7188

7289
async for event in session:
73-
event_data = await self._serialize_event(event)
74-
await websocket.send_text(json.dumps(event_data))
90+
event_data = await self._serialize_event(session_id, event)
91+
if event_data is not None:
92+
await websocket.send_text(json.dumps(event_data))
7593
except Exception as e:
7694
logger.error(f"Error processing events for session {session_id}: {e}")
7795

78-
async def _serialize_event(self, event: RealtimeSessionEvent) -> dict[str, Any]:
96+
async def _serialize_event(self, session_id: str, event: RealtimeSessionEvent) -> dict[str, Any] | None:
7997
base_event: dict[str, Any] = {
8098
"type": event.type,
8199
}
@@ -93,7 +111,9 @@ async def _serialize_event(self, event: RealtimeSessionEvent) -> dict[str, Any]:
93111
base_event["tool"] = event.tool.name
94112
base_event["output"] = str(event.output)
95113
elif event.type == "audio":
96-
base_event["audio"] = base64.b64encode(event.audio.data).decode("utf-8")
114+
# Coalesce raw PCM and flush on a steady timer for smoother playback.
115+
self._pending_audio[session_id].extend(event.audio.data)
116+
return None
97117
elif event.type == "audio_interrupted":
98118
pass
99119
elif event.type == "audio_end":
@@ -107,9 +127,20 @@ async def _serialize_event(self, event: RealtimeSessionEvent) -> dict[str, Any]:
107127
{"name": result.guardrail.name} for result in event.guardrail_results
108128
]
109129
elif event.type == "raw_model_event":
110-
base_event["raw_model_event"] = {
111-
"type": event.data.type,
112-
}
130+
# Surface useful raw events to the UI with details.
131+
if getattr(event.data, "type", None) == "transcript_delta":
132+
# Stream assistant transcript deltas to the UI.
133+
base_event = {
134+
"type": "transcript_delta",
135+
"item_id": getattr(event.data, "item_id", ""),
136+
"response_id": getattr(event.data, "response_id", ""),
137+
"delta": getattr(event.data, "delta", ""),
138+
}
139+
else:
140+
# Fallback to a minimal raw event descriptor.
141+
base_event["raw_model_event"] = {
142+
"type": getattr(event.data, "type", "other"),
143+
}
113144
elif event.type == "error":
114145
base_event["error"] = str(event.error) if hasattr(event, "error") else "Unknown error"
115146
elif event.type == "input_audio_timeout_triggered":
@@ -119,6 +150,28 @@ async def _serialize_event(self, event: RealtimeSessionEvent) -> dict[str, Any]:
119150

120151
return base_event
121152

153+
async def _flush_audio_loop(self, session_id: str) -> None:
154+
try:
155+
while session_id in self.websockets:
156+
await asyncio.sleep(0.04) # ~40ms cadence
157+
buf = self._pending_audio.get(session_id)
158+
ws = self.websockets.get(session_id)
159+
if not buf or ws is None:
160+
continue
161+
if not buf:
162+
continue
163+
b = bytes(buf)
164+
self._pending_audio[session_id] = bytearray()
165+
try:
166+
await ws.send_text(
167+
json.dumps({"type": "audio", "audio": base64.b64encode(b).decode("utf-8")})
168+
)
169+
except Exception:
170+
logger.error("Failed sending coalesced audio", exc_info=True)
171+
break
172+
except asyncio.CancelledError:
173+
pass
174+
122175

123176
manager = RealtimeWebSocketManager()
124177

@@ -142,7 +195,8 @@ async def websocket_endpoint(websocket: WebSocket, session_id: str):
142195
if message["type"] == "audio":
143196
# Convert int16 array to bytes
144197
int16_data = message["data"]
145-
audio_bytes = struct.pack(f"{len(int16_data)}h", *int16_data)
198+
# Send little-endian PCM16 to the model.
199+
audio_bytes = struct.pack("<" + f"{len(int16_data)}h", *int16_data)
146200
await manager.send_audio(session_id, audio_bytes)
147201

148202
except WebSocketDisconnect:
@@ -160,4 +214,5 @@ async def read_index():
160214
if __name__ == "__main__":
161215
import uvicorn
162216

163-
uvicorn.run(app, host="0.0.0.0", port=8000)
217+
log_level = "info"
218+
uvicorn.run(app, host="0.0.0.0", port=8000, log_level=log_level)

examples/realtime/app/static/app.js

Lines changed: 41 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@ class RealtimeDemo {
1414
this.isPlayingAudio = false;
1515
this.playbackAudioContext = null;
1616
this.currentAudioSource = null;
17+
this.nextPlaybackTime = 0;
18+
19+
// Live assistant transcript buffer
20+
this.pendingAssistantText = '';
1721

1822
this.initializeElements();
1923
this.setupEventListeners();
@@ -138,9 +142,9 @@ class RealtimeDemo {
138142
const source = this.audioContext.createMediaStreamSource(this.stream);
139143

140144
// Create a script processor to capture audio data
141-
this.processor = this.audioContext.createScriptProcessor(4096, 1, 1);
145+
this.processor = this.audioContext.createScriptProcessor(2048, 1, 1);
142146
source.connect(this.processor);
143-
this.processor.connect(this.audioContext.destination);
147+
// Do not connect to destination to avoid local echo.
144148

145149
this.processor.onaudioprocess = (event) => {
146150
if (!this.isMuted && this.ws && this.ws.readyState === WebSocket.OPEN) {
@@ -204,6 +208,9 @@ class RealtimeDemo {
204208
case 'audio':
205209
this.playAudio(event.audio);
206210
break;
211+
case 'transcript_delta':
212+
this.handleTranscriptDelta(event.delta || '');
213+
break;
207214
case 'audio_interrupted':
208215
this.stopAudioPlayback();
209216
break;
@@ -260,7 +267,18 @@ class RealtimeDemo {
260267
} else {
261268
console.log('History is not an array or is null/undefined');
262269
}
263-
270+
271+
// If we have a live assistant transcript, append it as a streaming bubble.
272+
if (this.pendingAssistantText && this.pendingAssistantText.trim()) {
273+
const messageDiv = document.createElement('div');
274+
messageDiv.className = 'message assistant';
275+
const bubbleDiv = document.createElement('div');
276+
bubbleDiv.className = 'message-bubble';
277+
bubbleDiv.textContent = this.pendingAssistantText;
278+
messageDiv.appendChild(bubbleDiv);
279+
this.messagesContent.appendChild(messageDiv);
280+
}
281+
264282
this.scrollToBottom();
265283
}
266284

@@ -370,12 +388,13 @@ class RealtimeDemo {
370388
if (this.isPlayingAudio || this.audioQueue.length === 0) {
371389
return;
372390
}
373-
391+
374392
this.isPlayingAudio = true;
375-
393+
376394
// Initialize audio context if needed
377395
if (!this.playbackAudioContext) {
378396
this.playbackAudioContext = new AudioContext({ sampleRate: 24000 });
397+
this.nextPlaybackTime = this.playbackAudioContext.currentTime;
379398
}
380399

381400
while (this.audioQueue.length > 0) {
@@ -425,14 +444,28 @@ class RealtimeDemo {
425444
this.currentAudioSource = null;
426445
resolve();
427446
};
428-
source.start();
429-
447+
// Schedule to minimize gaps between chunks
448+
const now = this.playbackAudioContext.currentTime;
449+
if (this.nextPlaybackTime < now) {
450+
this.nextPlaybackTime = now;
451+
}
452+
source.start(this.nextPlaybackTime);
453+
this.nextPlaybackTime += audioBuffer.duration;
454+
430455
} catch (error) {
431456
console.error('Failed to play audio chunk:', error);
432457
reject(error);
433458
}
434459
});
435460
}
461+
462+
handleTranscriptDelta(delta) {
463+
if (!delta) return;
464+
this.pendingAssistantText += delta;
465+
// Update the live bubble if present; otherwise, append a temporary one.
466+
// Reuse updateMessagesFromHistory to keep behavior consistent.
467+
this.updateMessagesFromHistory([]);
468+
}
436469

437470
stopAudioPlayback() {
438471
console.log('Stopping audio playback due to interruption');
@@ -464,4 +497,4 @@ class RealtimeDemo {
464497
// Initialize the demo when the page loads
465498
document.addEventListener('DOMContentLoaded', () => {
466499
new RealtimeDemo();
467-
});
500+
});

examples/realtime/cli/demo.py

Lines changed: 58 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,17 @@
88
import sounddevice as sd
99

1010
from agents import function_tool
11-
from agents.realtime import RealtimeAgent, RealtimeRunner, RealtimeSession, RealtimeSessionEvent
11+
from agents.realtime import (
12+
RealtimeAgent,
13+
RealtimePlaybackTracker,
14+
RealtimeRunner,
15+
RealtimeSession,
16+
RealtimeSessionEvent,
17+
)
18+
from agents.realtime.model import RealtimeModelConfig
1219

1320
# Audio configuration
14-
CHUNK_LENGTH_S = 0.05 # 50ms
21+
CHUNK_LENGTH_S = 0.04 # 40ms aligns with realtime defaults
1522
SAMPLE_RATE = 24000
1623
FORMAT = np.int16
1724
CHANNELS = 1
@@ -49,11 +56,16 @@ def __init__(self) -> None:
4956
self.audio_player: sd.OutputStream | None = None
5057
self.recording = False
5158

59+
# Playback tracker lets the model know our real playback progress
60+
self.playback_tracker = RealtimePlaybackTracker()
61+
5262
# Audio output state for callback system
53-
self.output_queue: queue.Queue[Any] = queue.Queue(maxsize=10) # Buffer more chunks
63+
# Store tuples: (samples_np, item_id, content_index)
64+
self.output_queue: queue.Queue[Any] = queue.Queue(maxsize=100)
5465
self.interrupt_event = threading.Event()
55-
self.current_audio_chunk: np.ndarray[Any, np.dtype[Any]] | None = None
66+
self.current_audio_chunk: tuple[np.ndarray[Any, np.dtype[Any]], str, int] | None = None
5667
self.chunk_position = 0
68+
self.bytes_per_sample = np.dtype(FORMAT).itemsize
5769

5870
def _output_callback(self, outdata, frames: int, time, status) -> None:
5971
"""Callback for audio output - handles continuous audio stream from server."""
@@ -92,20 +104,29 @@ def _output_callback(self, outdata, frames: int, time, status) -> None:
92104

93105
# Copy data from current chunk to output buffer
94106
remaining_output = len(outdata) - samples_filled
95-
remaining_chunk = len(self.current_audio_chunk) - self.chunk_position
107+
samples, item_id, content_index = self.current_audio_chunk
108+
remaining_chunk = len(samples) - self.chunk_position
96109
samples_to_copy = min(remaining_output, remaining_chunk)
97110

98111
if samples_to_copy > 0:
99-
chunk_data = self.current_audio_chunk[
100-
self.chunk_position : self.chunk_position + samples_to_copy
101-
]
112+
chunk_data = samples[self.chunk_position : self.chunk_position + samples_to_copy]
102113
# More efficient: direct assignment for mono audio instead of reshape
103114
outdata[samples_filled : samples_filled + samples_to_copy, 0] = chunk_data
104115
samples_filled += samples_to_copy
105116
self.chunk_position += samples_to_copy
106117

118+
# Inform playback tracker about played bytes
119+
try:
120+
self.playback_tracker.on_play_bytes(
121+
item_id=item_id,
122+
item_content_index=content_index,
123+
bytes=chunk_data.tobytes(),
124+
)
125+
except Exception:
126+
pass
127+
107128
# If we've used up the entire chunk, reset for next iteration
108-
if self.chunk_position >= len(self.current_audio_chunk):
129+
if self.chunk_position >= len(samples):
109130
self.current_audio_chunk = None
110131
self.chunk_position = 0
111132

@@ -125,7 +146,15 @@ async def run(self) -> None:
125146

126147
try:
127148
runner = RealtimeRunner(agent)
128-
async with await runner.run() as session:
149+
# Attach playback tracker and disable server-side response interruption,
150+
# which can truncate assistant audio when mic picks up speaker output.
151+
model_config: RealtimeModelConfig = {
152+
"playback_tracker": self.playback_tracker,
153+
"initial_model_settings": {
154+
"turn_detection": {"type": "semantic_vad", "interrupt_response": False},
155+
},
156+
}
157+
async with await runner.run(model_config=model_config) as session:
129158
self.session = session
130159
print("Connected. Starting audio recording...")
131160

@@ -170,6 +199,14 @@ async def capture_audio(self) -> None:
170199
read_size = int(SAMPLE_RATE * CHUNK_LENGTH_S)
171200

172201
try:
202+
# Simple energy-based barge-in: if user speaks while audio is playing, interrupt.
203+
def rms_energy(samples: np.ndarray[Any, np.dtype[Any]]) -> float:
204+
if samples.size == 0:
205+
return 0.0
206+
# Normalize int16 to [-1, 1]
207+
x = samples.astype(np.float32) / 32768.0
208+
return float(np.sqrt(np.mean(x * x)))
209+
173210
while self.recording:
174211
# Check if there's enough data to read
175212
if self.audio_stream.read_available < read_size:
@@ -182,8 +219,12 @@ async def capture_audio(self) -> None:
182219
# Convert numpy array to bytes
183220
audio_bytes = data.tobytes()
184221

185-
# Send audio to session
186-
await self.session.send_audio(audio_bytes)
222+
# Half-duplex gating: do not send mic while assistant audio is playing
223+
assistant_playing = (
224+
self.current_audio_chunk is not None or not self.output_queue.empty()
225+
)
226+
if not assistant_playing:
227+
await self.session.send_audio(audio_bytes)
187228

188229
# Yield control back to event loop
189230
await asyncio.sleep(0)
@@ -212,17 +253,19 @@ async def _on_event(self, event: RealtimeSessionEvent) -> None:
212253
elif event.type == "audio_end":
213254
print("Audio ended")
214255
elif event.type == "audio":
215-
# Enqueue audio for callback-based playback
256+
# Enqueue audio for callback-based playback with metadata
216257
np_audio = np.frombuffer(event.audio.data, dtype=np.int16)
217258
try:
218-
self.output_queue.put_nowait(np_audio)
259+
self.output_queue.put_nowait((np_audio, event.item_id, event.content_index))
219260
except queue.Full:
220261
# Queue is full - only drop if we have significant backlog
221262
# This prevents aggressive dropping that could cause choppiness
222263
if self.output_queue.qsize() > 8: # Keep some buffer
223264
try:
224265
self.output_queue.get_nowait()
225-
self.output_queue.put_nowait(np_audio)
266+
self.output_queue.put_nowait(
267+
(np_audio, event.item_id, event.content_index)
268+
)
226269
except queue.Empty:
227270
pass
228271
# If queue isn't too full, just skip this chunk to avoid blocking

0 commit comments

Comments
 (0)