Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 4 additions & 8 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -167,20 +167,16 @@ EVA_MODEL__LLM=gpt-5.2
# GOOGLE_API_KEY=your_google_api_key_here

# ==============================================
# Optional: Realtime / Audio-LLM Configuration
# Optional: Speech-to-Speech / Audio-LLM Configuration
# ==============================================
# Only needed if benchmarking speech-to-speech or realtime models.
# Only needed if benchmarking speech-to-speech models.

# EVA_MODEL__REALTIME_MODEL=gpt-realtime-mini
# EVA_MODEL__REALTIME_MODEL_PARAMS='{"voice":"marin"}'
# EVA_MODEL__S2S=openai
# EVA_MODEL__S2S_PARAMS='{"model": "gpt-realtime-mini", "api_key": ""}'

# EVA_MODEL__AUDIO_LLM=
# EVA_MODEL__AUDIO_LLM_PARAMS='{"url": "", "api_key": ""}'

# Azure Realtime credentials (if using Azure realtime models)
# AZURE_OPENAI_REALTIME_API_KEY=
# AZURE_OPENAI_REALTIME_ENDPOINT=

# ==============================================
# Optional: Execution Settings
# ==============================================
Expand Down
55 changes: 24 additions & 31 deletions docs/assets/index-B7XxGtG-.js → docs/assets/index-DNHH3PHW.js

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions docs/assets/index-DNsPq0CK.css

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion docs/assets/index-DcU8EScs.css

This file was deleted.

4 changes: 2 additions & 2 deletions docs/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
<link rel="preconnect" href="https://fonts.googleapis.com" />
<link rel="preconnect" href="https://fonts.gstatic.com" crossorigin />
<link href="https://fonts.googleapis.com/css2?family=Inter:wght@300;400;500;600;700;800&family=JetBrains+Mono:wght@400;500&display=swap" rel="stylesheet" />
<script type="module" crossorigin src="/eva/assets/index-B7XxGtG-.js"></script>
<link rel="stylesheet" crossorigin href="/eva/assets/index-DcU8EScs.css">
<script type="module" crossorigin src="/eva/assets/index-DNHH3PHW.js"></script>
<link rel="stylesheet" crossorigin href="/eva/assets/index-DNsPq0CK.css">
</head>
<body>
<div id="root"></div>
Expand Down
3 changes: 1 addition & 2 deletions src/eva/assistant/pipeline/audio_llm_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import asyncio
import base64
import io
import os
import time
import wave
from collections.abc import Awaitable
Expand Down Expand Up @@ -418,7 +417,7 @@ def __init__(
super().__init__(**kwargs)
self._audio_collector = audio_collector
params = params or {}
self._api_key = params.get("api_key") or os.getenv("OPENAI_API_KEY")
self._api_key = params.get["api_key"]
self._model = model
self._system_prompt = system_prompt or self.DEFAULT_SYSTEM_PROMPT
self._sample_rate = sample_rate
Expand Down
3 changes: 2 additions & 1 deletion src/eva/assistant/pipeline/observers.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from pipecat.observers.turn_tracking_observer import TurnTrackingObserver
from pipecat.services.azure.realtime.llm import AzureRealtimeLLMService
from pipecat.services.llm_service import LLMService
from pipecat.services.openai.realtime.llm import OpenAIRealtimeLLMService
from pipecat.services.stt_service import STTService
from pipecat.services.tts_service import TTSService

Expand All @@ -31,7 +32,7 @@
logger = get_logger(__name__)


_TRANSCRIPTION_SERVICES = (STTService, AzureRealtimeLLMService)
_TRANSCRIPTION_SERVICES = (STTService, AzureRealtimeLLMService, OpenAIRealtimeLLMService)


class WallClock(SystemClock):
Expand Down
94 changes: 89 additions & 5 deletions src/eva/assistant/pipeline/realtime_llm.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Instrumented realtime LLM service for correct audit log ordering and timestamps.

Subclasses AzureRealtimeLLMService to intercept raw OpenAI Realtime API events
Subclasses OpenAIRealtimeLLMService to intercept raw OpenAI Realtime API events
(speech_started, speech_stopped, transcription.completed, response.done) which
have a guaranteed ordering and carry item_id for correlation.

Expand All @@ -11,17 +11,24 @@
Writing user entries on #3 and assistant entries on #5 guarantees correct order.
"""

import struct
import time
from dataclasses import dataclass
from typing import Any, Optional

from pipecat.services.azure.realtime.llm import AzureRealtimeLLMService
from pipecat.frames.frames import Frame, InputAudioRawFrame, VADUserStartedSpeakingFrame, VADUserStoppedSpeakingFrame
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.openai.realtime.llm import OpenAIRealtimeLLMService

from eva.assistant.agentic.audit_log import AuditLog
from eva.utils.logging import get_logger

logger = get_logger(__name__)

# Audio threshold for detecting speech vs silence
# RMS values below this are considered silence
SILENCE_RMS_THRESHOLD = 10


@dataclass
class _UserTurnRecord:
Expand All @@ -39,8 +46,20 @@ def _wall_ms() -> str:
return str(int(round(time.time() * 1000)))


class InstrumentedRealtimeLLMService(AzureRealtimeLLMService):
"""AzureRealtimeLLMService subclass that writes audit log entries with correct ordering and wall-clock timestamps derived from Realtime API events.
def _calculate_rms(audio_bytes: bytes) -> float:
"""Calculate RMS (root mean square) energy of 16-bit PCM audio."""
if len(audio_bytes) < 2:
return 0.0
num_samples = len(audio_bytes) // 2
samples = struct.unpack(f"<{num_samples}h", audio_bytes[: num_samples * 2])
if not samples:
return 0.0
sum_squares = sum(s * s for s in samples)
return (sum_squares / len(samples)) ** 0.5


class InstrumentedRealtimeLLMService(OpenAIRealtimeLLMService):
"""OpenAIRealtimeLLMService subclass that writes audit log entries with correct ordering and wall-clock timestamps derived from Realtime API events.

All overridden methods call ``super()`` first so that the parent's frame
processing (audio playback, interruption handling, metrics, etc.) is fully
Expand All @@ -61,12 +80,35 @@ def __init__(self, *, audit_log: AuditLog, **kwargs: Any) -> None:
# Track whether we're mid-assistant-response (for interruption flushing)
self._assistant_responding: bool = False

# Track audio frame timing for VAD delay calculation
self._last_audio_frame_time: Optional[float] = None
self._vad_delay_ms: Optional[int] = None

async def process_frame(self, frame: Frame, direction: FrameDirection) -> None:
"""Track audio frame timing before passing to parent.

Only updates the timestamp when audio has actual speech content (not silence),
so VAD delay calculation reflects when user actually stopped speaking.
"""
if isinstance(frame, InputAudioRawFrame):
rms = _calculate_rms(frame.audio)
if rms > SILENCE_RMS_THRESHOLD:
self._last_audio_frame_time = time.time()

await super().process_frame(frame, direction)

async def _handle_evt_speech_started(self, evt: Any) -> None:
"""Fires when user starts speaking (input_audio_buffer.speech_started).

Captures wall-clock start time. Also flushes any in-progress
interrupted assistant response before recording the new user turn.
"""
# Reset VAD tracking for new turn
self._vad_delay_ms = None

# Broadcast VAD user started speaking frame because realtime VAD does not broadcast it themselves
await self.broadcast_frame(VADUserStartedSpeakingFrame)

# Flush interrupted assistant response if one is in progress
if self._assistant_responding and self._current_assistant_transcript_parts:
partial_text = "".join(self._current_assistant_transcript_parts) + " [interrupted]"
Expand All @@ -92,8 +134,21 @@ async def _handle_evt_speech_started(self, evt: Any) -> None:
async def _handle_evt_speech_stopped(self, evt: Any) -> None:
"""Fires when user stops speaking (input_audio_buffer.speech_stopped).

Captures wall-clock end time for the user turn.
Captures wall-clock end time for the user turn and calculates VAD delay.
"""
speech_stopped_time = time.time()

# Calculate VAD delay: time between last audio frame and speech_stopped event
if self._last_audio_frame_time is not None:
self._vad_delay_ms = int((speech_stopped_time - self._last_audio_frame_time) * 1000)
else:
logger.warning("speech_stopped fired but no audio frames were tracked")
self._vad_delay_ms = None

# Reset audio tracking for next turn
self._last_audio_frame_time = None

await self.broadcast_frame(VADUserStoppedSpeakingFrame)
await super()._handle_evt_speech_stopped(evt)

item_id = getattr(evt, "item_id", None) or ""
Expand Down Expand Up @@ -145,13 +200,32 @@ async def _handle_evt_audio_delta(self, evt: Any) -> None:
"""Fires for each audio chunk of the assistant response.

Captures wall-clock of the *first* delta as assistant response start.
Also logs the full user-perceived response latency including VAD delay.
"""
await super()._handle_evt_audio_delta(evt)

if self._assistant_response_start_wall_ms is None:
self._assistant_response_start_wall_ms = _wall_ms()
self._assistant_responding = True

# Log full user-perceived latency (includes VAD delay)
if self._vad_delay_ms is not None:
# Find the most recent user turn to get speech_stopped time
recent_record = None
for record in self._user_turns.values():
if record.speech_stopped_wall_ms:
recent_record = record

if recent_record and recent_record.speech_stopped_wall_ms:
speech_stopped_ms = int(recent_record.speech_stopped_wall_ms)
response_start_ms = int(self._assistant_response_start_wall_ms)
vad_to_response_ms = response_start_ms - speech_stopped_ms
full_latency_ms = vad_to_response_ms + self._vad_delay_ms
logger.debug(
f"Full response latency: {full_latency_ms}ms "
f"(VAD delay: {self._vad_delay_ms}ms + response: {vad_to_response_ms}ms)"
)

async def _handle_evt_audio_transcript_delta(self, evt: Any) -> None:
"""Fires for incremental assistant transcript text.

Expand Down Expand Up @@ -220,6 +294,16 @@ def _reset_assistant_state(self) -> None:
self._assistant_response_start_wall_ms = None
self._assistant_responding = False

@property
def last_vad_delay_ms(self) -> Optional[int]:
"""Return the most recent VAD delay in milliseconds.

This is the time between when audio frames stopped arriving and when
OpenAI's VAD detected end of speech. Can be used to adjust response
latency measurements to reflect user-perceived latency.
"""
return self._vad_delay_ms

@staticmethod
def _response_has_function_calls(evt: Any) -> bool:
"""Return True if the response.done event contains any function_call outputs."""
Expand Down
Loading
Loading