Skip to content

Commit eb318a7

Browse files
Fixed Realtime Metrics Collector and Realtime Models
1 parent 702fa5d commit eb318a7

File tree

8 files changed

+117
-76
lines changed

8 files changed

+117
-76
lines changed

examples/test_realtime_pipeline.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ async def entrypoint(ctx: JobContext):
2424
model="gpt-realtime-2025-08-28",
2525
config=OpenAIRealtimeConfig(
2626
voice="alloy", # alloy, ash, ballad, coral, echo, fable, onyx, nova, sage, shimmer, and verse
27-
modalities=["audio"],
27+
modalities=["audio", "text"],
2828
turn_detection=TurnDetection(
2929
type="server_vad",
3030
threshold=0.5,

videosdk-agents/videosdk/agents/metrics/models.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -150,10 +150,8 @@ class RealtimeTurnData:
150150
def compute_latencies(self):
151151
if self.user_speech_end_time and self.agent_speech_start_time:
152152
self.ttfb = max(0, (self.agent_speech_start_time - self.user_speech_end_time) * 1000)
153-
if self.user_speech_end_time and self.agent_speech_start_time:
154-
self.thinking_delay = max(0, (self.agent_speech_start_time - self.user_speech_end_time) * 1000)
155-
if self.user_speech_start_time and self.agent_speech_end_time:
156-
self.e2e_latency = (self.agent_speech_end_time - self.user_speech_start_time) * 1000
153+
self.e2e_latency = self.ttfb
154+
self.thinking_delay = self.ttfb
157155
if self.agent_speech_start_time and self.agent_speech_end_time:
158156
self.agent_speech_duration = (self.agent_speech_end_time - self.agent_speech_start_time) * 1000
159157

videosdk-agents/videosdk/agents/metrics/realtime_metrics_collector.py

Lines changed: 47 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ def __init__(self) -> None:
3737
self.analytics_client = AnalyticsClient()
3838
self.traces_flow_manager: Optional[TracesFlowManager] = None
3939
self.playground: bool = False
40+
4041
def set_session_id(self, session_id: str):
4142
"""Set the session ID for metrics tracking"""
4243
self.analytics_client.set_session_id(session_id)
@@ -98,44 +99,56 @@ async def _start_new_interaction(self) -> None:
9899
**RealtimeMetricsCollector._agent_info
99100
)
100101
self.turns.append(self.current_turn)
102+
self.last_user_activity_time = None
103+
104+
def mark_user_activity(self, timestamp: Optional[float] = None) -> None:
105+
"""Mark the time of the last user activity (e.g. transcription received)"""
106+
self.last_user_activity_time = timestamp if timestamp is not None else time.perf_counter()
101107

102108
async def set_user_speech_start(self) -> None:
103-
if self.current_turn:
109+
if self.current_turn and self.current_turn.agent_speech_start_time is not None and self.current_turn.agent_speech_end_time is None:
110+
await self.set_interrupted()
111+
if self.current_turn and (self.current_turn.user_speech_start_time is not None) and (self.current_turn.user_speech_end_time is not None):
104112
self._finalize_interaction_and_send()
105113

106114
await self._start_new_interaction()
107115
if self.current_turn and self.current_turn.user_speech_start_time is None:
108116
self.current_turn.user_speech_start_time = time.perf_counter()
109-
await self.start_timeline_event("user_speech")
110-
111-
async def set_user_speech_end(self) -> None:
112-
if self.current_turn and self.current_turn.user_speech_end_time is None:
113-
self.current_turn.user_speech_end_time = time.perf_counter()
114-
await self.end_timeline_event("user_speech")
117+
await self.start_timeline_event("user_speech", self.current_turn.user_speech_start_time)
118+
119+
async def set_user_speech_end(self, timestamp: Optional[float] = None) -> None:
120+
if self.current_turn and (self.current_turn.user_speech_start_time is not None) and (self.current_turn.user_speech_end_time is None):
121+
if timestamp is not None:
122+
self.current_turn.user_speech_end_time = timestamp
123+
elif self.last_user_activity_time is not None:
124+
self.current_turn.user_speech_end_time = self.last_user_activity_time
125+
else:
126+
self.current_turn.user_speech_end_time = time.perf_counter()
127+
await self.end_timeline_event("user_speech", self.current_turn.user_speech_end_time)
115128

116129
async def set_agent_speech_start(self) -> None:
117130
if not self.current_turn:
118131
await self._start_new_interaction()
119-
elif self.current_turn.user_speech_start_time is not None and self.current_turn.user_speech_end_time is None:
120-
self.current_turn.user_speech_end_time = time.perf_counter()
121-
122-
await self.end_timeline_event("user_speech")
132+
elif (self.current_turn.user_speech_start_time is not None) and (self.current_turn.user_speech_end_time is None):
133+
await self.set_user_speech_end()
123134

124135
if self.current_turn and self.current_turn.agent_speech_start_time is None:
125136
self.current_turn.agent_speech_start_time = time.perf_counter()
126-
await self.start_timeline_event("agent_speech")
137+
await self.start_timeline_event("agent_speech", self.current_turn.agent_speech_start_time)
127138
if self.agent_speech_end_timer:
128139
self.agent_speech_end_timer.cancel()
129140

130141
async def set_agent_speech_end(self, timeout: float = 1.0) -> None:
131142
if self.current_turn:
143+
if self.current_turn.agent_speech_start_time is None:
144+
return
132145
self.current_turn.agent_speech_end_time = time.perf_counter()
133146
if self.agent_speech_end_timer:
134147
self.agent_speech_end_timer.cancel()
135148

136149
loop = asyncio.get_event_loop()
137150
self.agent_speech_end_timer = loop.call_later(timeout, self._finalize_interaction_and_send)
138-
await self.end_timeline_event("agent_speech")
151+
await self.end_timeline_event("agent_speech", self.current_turn.agent_speech_end_time)
139152

140153
async def set_a2a_handoff(self) -> None:
141154
"""Set the A2A enabled and handoff occurred flags for the current turn in A2A scenarios."""
@@ -157,12 +170,15 @@ def _finalize_agent_speech(self) -> None:
157170
self.agent_speech_end_timer = None
158171

159172
def _finalize_interaction_and_send(self) -> None:
173+
if self.agent_speech_end_timer:
174+
self.agent_speech_end_timer.cancel()
175+
self.agent_speech_end_timer = None
160176
if not self.current_turn:
161177
return
162178

163179
self._finalize_agent_speech()
164180

165-
if self.current_turn.user_speech_start_time and not self.current_turn.user_speech_end_time:
181+
if (self.current_turn.user_speech_start_time is not None) and (self.current_turn.user_speech_end_time is None):
166182
self.current_turn.user_speech_end_time = time.perf_counter()
167183

168184
current_time = time.perf_counter()
@@ -216,19 +232,18 @@ async def add_timeline_event(self, event: TimelineEvent) -> None:
216232
if self.current_turn:
217233
self.current_turn.timeline.append(event)
218234

219-
async def start_timeline_event(self, event_type: str) -> None:
235+
async def start_timeline_event(self, event_type: str, start_time: float) -> None:
220236
"""Start a timeline event with a precise start time"""
221237
if self.current_turn:
222238
event = TimelineEvent(
223239
event_type=event_type,
224-
start_time=time.perf_counter()
240+
start_time=start_time
225241
)
226242
self.current_turn.timeline.append(event)
227243

228-
async def end_timeline_event(self, event_type: str) -> None:
244+
async def end_timeline_event(self, event_type: str, end_time: float) -> None:
229245
"""End a timeline event and calculate duration"""
230246
if self.current_turn:
231-
end_time = time.perf_counter()
232247
for event in reversed(self.current_turn.timeline):
233248
if event.event_type == event_type and event.end_time is None:
234249
event.end_time = end_time
@@ -253,10 +268,7 @@ async def set_user_transcript(self, text: str) -> None:
253268
if self.current_turn:
254269
if self.current_turn.user_speech_start_time is None:
255270
self.current_turn.user_speech_start_time = time.perf_counter()
256-
await self.start_timeline_event("user_speech")
257-
if self.current_turn.user_speech_end_time is None:
258-
self.current_turn.user_speech_end_time = time.perf_counter()
259-
await self.end_timeline_event("user_speech")
271+
await self.start_timeline_event("user_speech", self.current_turn.user_speech_start_time)
260272
logger.info(f"user input speech: {text}")
261273
await self.update_timeline_event_text("user_speech", text)
262274

@@ -265,7 +277,7 @@ async def set_agent_response(self, text: str) -> None:
265277
if self.current_turn:
266278
if self.current_turn.agent_speech_start_time is None:
267279
self.current_turn.agent_speech_start_time = time.perf_counter()
268-
await self.start_timeline_event("agent_speech")
280+
await self.start_timeline_event("agent_speech", self.current_turn.agent_speech_start_time)
269281
logger.info(f"agent output speech: {text}")
270282
await self.update_timeline_event_text("agent_speech", text)
271283

@@ -276,8 +288,19 @@ def set_realtime_model_error(self, error: Dict[str, Any]) -> None:
276288
self.current_turn.realtime_model_errors.append(error)
277289

278290
async def set_interrupted(self) -> None:
291+
"""
292+
Handle interruption by finalizing the current turn immediately.
293+
Only marks as interrupted if the agent was actually speaking.
294+
"""
279295
if self.current_turn:
280-
self.current_turn.interrupted = True
296+
if self.current_turn.agent_speech_start_time is not None:
297+
self.current_turn.interrupted = True
298+
if self.current_turn.agent_speech_end_time is None:
299+
self.current_turn.agent_speech_end_time = time.perf_counter()
300+
await self.end_timeline_event("agent_speech", self.current_turn.agent_speech_end_time)
301+
self._finalize_interaction_and_send()
302+
else:
303+
logger.debug("Interrupt signal received but agent hadn't started speaking - ignoring to preserve turn")
281304

282305
def finalize_session(self) -> None:
283306
asyncio.run_coroutine_threadsafe(self._start_new_interaction(), asyncio.get_event_loop())

videosdk-agents/videosdk/agents/realtime_pipeline.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,13 @@ def _configure_components(self) -> None:
8686
if self.avatar:
8787
self.model.audio_track = getattr(job_context.room, 'agent_audio_track', None) or job_context.room.audio_track
8888
elif self.audio_track:
89-
self.model.audio_track = self.audio_track
89+
self.model.audio_track = self.audio_track
90+
91+
if self.model.audio_track and hasattr(self.model.audio_track, "on_last_audio_byte"):
92+
async def on_last_audio_byte() -> None:
93+
logger.info("[RealTimePipeline] Audio playback finished — setting agent_speech_end_time")
94+
await realtime_metrics_collector.set_agent_speech_end()
95+
self.model.audio_track.on_last_audio_byte(on_last_audio_byte)
9096

9197
async def start(self, **kwargs: Any) -> None:
9298
"""
@@ -129,7 +135,6 @@ def _on_agent_speech_ended(self, data: dict) -> None:
129135
"""
130136
Handle agent speech ended event and mark utterance as done, forwarding to agent if handler exists.
131137
"""
132-
asyncio.create_task(realtime_metrics_collector.set_agent_speech_end())
133138
if self._current_utterance_handle and not self._current_utterance_handle.done():
134139
self._current_utterance_handle._mark_done()
135140
self.model.current_utterance = None
@@ -160,7 +165,6 @@ def on_user_speech_started(self, data: dict) -> None:
160165
"""
161166
Handle user speech started event
162167
"""
163-
asyncio.create_task(realtime_metrics_collector.set_user_speech_start())
164168
self._notify_speech_started()
165169
# self.interrupt() # Not sure yet whether this affects utterance handling.
166170
if self.agent.session:

videosdk-plugins/videosdk-plugins-aws/videosdk/plugins/aws/aws_nova_sonic_api.py

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,8 @@ def __init__(
129129
self.is_active = False
130130
self.response_task = None
131131
self._agent_speaking = False
132+
self._user_speaking = False
133+
self._user_transcript_received = False
132134
self._initialize_bedrock_client()
133135
self.input_sample_rate = 48000
134136
self.target_sample_rate = 16000
@@ -319,6 +321,8 @@ async def handle_audio_input(self, audio_data: bytes) -> None:
319321
try:
320322
audio_array = np.frombuffer(audio_data, dtype=np.int16)
321323

324+
if audio_array.size == 0:
325+
return
322326
if len(audio_array) % 2 == 0:
323327
audio_array = audio_array.reshape(-1, 2)
324328
audio_array = np.mean(audio_array, axis=1).astype(np.int16)
@@ -394,12 +398,19 @@ async def _process_responses(self):
394398
role = text_output.get(
395399
"role", "UNKNOWN")
396400
if role == "USER":
397-
await realtime_metrics_collector.set_user_speech_start()
398-
await realtime_metrics_collector.set_user_transcript(
399-
transcript
400-
)
401-
await realtime_metrics_collector.set_user_speech_end()
402-
await self.emit("user_speech_ended", {})
401+
if transcript and isinstance(transcript, str) and transcript.strip():
402+
realtime_metrics_collector.mark_user_activity()
403+
if not self._user_speaking:
404+
await realtime_metrics_collector.set_user_speech_start()
405+
self._user_speaking = True
406+
self._user_transcript_received = False
407+
if not self._user_transcript_received:
408+
await realtime_metrics_collector.set_user_speech_end()
409+
self._user_speaking = False
410+
self._user_transcript_received = True
411+
await realtime_metrics_collector.set_user_transcript(
412+
transcript
413+
)
403414
try:
404415
await self.emit(
405416
"realtime_model_transcription",
@@ -452,8 +463,8 @@ async def _process_responses(self):
452463
audio_bytes = base64.b64decode(
453464
audio_content)
454465
if not self._agent_speaking:
455-
await self.emit("agent_speech_started", {})
456466
await realtime_metrics_collector.set_agent_speech_start()
467+
await self.emit("agent_speech_started", {})
457468
self._agent_speaking = True
458469

459470
if (
@@ -479,11 +490,8 @@ async def _process_responses(self):
479490
"stopReason", "") == "END_TURN"
480491
and self._agent_speaking
481492
):
482-
await realtime_metrics_collector.set_agent_speech_end(
483-
timeout=1.0
484-
)
485-
self._agent_speaking = False
486493
await self.emit("agent_speech_ended", {})
494+
self._agent_speaking = False
487495

488496
elif "usageEvent" in json_data["event"]:
489497
pass
@@ -503,9 +511,7 @@ async def _process_responses(self):
503511
print(
504512
f"Nova completionEnd received: {json.dumps(completion_end, indent=2)}"
505513
)
506-
await realtime_metrics_collector.set_agent_speech_end(
507-
timeout=1.0
508-
)
514+
await self.emit("agent_speech_ended", {})
509515
self._agent_speaking = False
510516

511517
else:
@@ -592,13 +598,12 @@ async def interrupt(self) -> None:
592598
if self.audio_track:
593599
self.audio_track.interrupt()
594600
print("Interrupting user speech, calling set_agent_speech_end")
595-
await self.emit("user_speech_ended", {})
596-
await realtime_metrics_collector.set_agent_speech_end(timeout=1.0)
597601
await realtime_metrics_collector.set_interrupted()
598602
if self._agent_speaking:
599603
print("Interrupting agent speech, calling set_agent_speech_end")
600-
await realtime_metrics_collector.set_agent_speech_end(timeout=1.0)
604+
await self.emit("agent_speech_ended", {})
601605
self._agent_speaking = False
606+
self._user_transcript_received = False
602607

603608
content_end_payload = {
604609
"event": {

videosdk-plugins/videosdk-plugins-google/videosdk/plugins/google/live_api.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -482,7 +482,7 @@ async def _receive_loop(self, session: GeminiSession) -> None:
482482
if self.current_utterance and not self.current_utterance.is_interruptible:
483483
logger.info("Interruption is disabled for the current utterance. Ignoring server interrupt signal.")
484484
continue
485-
485+
await realtime_metrics_collector.set_interrupted()
486486
if active_response_id:
487487
active_response_id = None
488488
accumulated_text = ""
@@ -584,9 +584,6 @@ async def _receive_loop(self, session: GeminiSession) -> None:
584584
accumulated_text = ""
585585
final_transcription = ""
586586
self.emit("agent_speech_ended", {})
587-
await realtime_metrics_collector.set_agent_speech_end(
588-
timeout=1.0
589-
)
590587
self._agent_speaking = False
591588

592589
except Exception as e:
@@ -680,7 +677,6 @@ async def handle_audio_input(self, audio_data: bytes) -> None:
680677
"""Handle incoming audio data from the user"""
681678
if not self._session or self._closing:
682679
return
683-
684680
if self.current_utterance and not self.current_utterance.is_interruptible:
685681
logger.info("Interruption is disabled for the current utterance. Not processing audio input.")
686682
return

0 commit comments

Comments
 (0)