diff --git a/python/chatbot/elevenlabs-chatbot-protect/.env.example b/python/chatbot/elevenlabs-chatbot-protect/.env.example new file mode 100644 index 00000000..49ce4bbc --- /dev/null +++ b/python/chatbot/elevenlabs-chatbot-protect/.env.example @@ -0,0 +1,12 @@ +# ElevenLabs Configuration +ELEVENLABS_API_KEY=your-elevenlabs-api-key +ELEVENLABS_AGENT_ID=your-agent-id + +# Galileo Configuration (for local instance) +GALILEO_API_KEY=your-galileo-api-key +GALILEO_CONSOLE_URL= +GALILEO_PROJECT_NAME=elevenlabs-voice-poc +GALILEO_LOG_STREAM=voice-conversations + +GALILEO_PROTECT_ENABLED=true +GALILEO_PROTECT_STAGE_ID=your-stage-id diff --git a/python/chatbot/elevenlabs-chatbot-protect/.python-version b/python/chatbot/elevenlabs-chatbot-protect/.python-version new file mode 100644 index 00000000..763b6264 --- /dev/null +++ b/python/chatbot/elevenlabs-chatbot-protect/.python-version @@ -0,0 +1 @@ +3.12.12 diff --git a/python/chatbot/elevenlabs-chatbot-protect/README.md b/python/chatbot/elevenlabs-chatbot-protect/README.md new file mode 100644 index 00000000..733b8915 --- /dev/null +++ b/python/chatbot/elevenlabs-chatbot-protect/README.md @@ -0,0 +1,75 @@ +# ElevenLabs Voice Chatbot with Galileo Protect + +A terminal-based voice chatbot that lets you have real-time voice conversations with an ElevenLabs AI agent, with Galileo Protect guardrails for content moderation. + +## What This Example Shows + +- Interactive voice chat in your terminal (speak via microphone, hear responses via speakers) +- Real-time guardrails using Galileo Protect to moderate user input and agent output +- Conversation logging and tracing with Galileo Observe +- Session metrics tracking (latency, turn counts, character counts) + +## Quick Start + +1. Install system dependencies: + ```bash + brew install portaudio # Required for audio support on macOS + ``` + +2. Create and activate a virtual environment: + ```bash + python -m venv venv + source venv/bin/activate # On Windows: venv\Scripts\activate + ``` + +3. Install Python dependencies: + ```bash + pip install -r requirements.txt + ``` + +4. Set up environment variables: + ```bash + cp .env.example .env + # Edit .env with your API keys + ``` + +5. Run the voice chat: + ```bash + python conversation.py + ``` + + For best results use a headset with micrphone. The app will start listening through your microphone. Speak to chat with the AI agent and hear responses through your speakers. Press `Ctrl+C` to end the session. + +## Configuration + +Edit `.env` with your credentials. Note for `ELEVENLABS_*` variables you can signup with a free tier of elevenlabs and use their Agents Platform to create a voice agent to obtain the required API key and Agent Id: + +| Variable | Description | +|----------|-------------| +| `ELEVENLABS_API_KEY` | Your ElevenLabs API key | +| `ELEVENLABS_AGENT_ID` | Your ElevenLabs Agent ID | +| `GALILEO_API_KEY` | Your Galileo API key | +| `GALILEO_CONSOLE_URL` | Galileo console URL (optional) | +| `GALILEO_PROJECT_NAME` | Project name for logging | +| `GALILEO_PROTECT_STAGE_ID` | Protect stage ID for guardrails (see below) | + +### Creating a Galileo Protect Stage + +To enable guardrails, you need to create a Galileo Protect stage. Run the included script: + +```bash +python scripts/create_stage.py +``` + +This will create a protect stage with an input toxicity rule and output the stage ID. Copy this ID to your `.env` file as `GALILEO_PROTECT_STAGE_ID`. + +## Requirements + +- Python 3.9+ +- Microphone and headphones (to avoid audio feedback) + +## Learn More + +- [Galileo Documentation](https://docs.galileo.ai/) +- [Runtime Protection](https://v2docs.galileo.ai/concepts/protect/overview) +- [ElevenLabs Conversational AI](https://elevenlabs.io/docs/conversational-ai) diff --git a/python/chatbot/elevenlabs-chatbot-protect/config.py b/python/chatbot/elevenlabs-chatbot-protect/config.py new file mode 100644 index 00000000..0ea18bf8 --- /dev/null +++ b/python/chatbot/elevenlabs-chatbot-protect/config.py @@ -0,0 +1,32 @@ +"""Configuration management for the ElevenLabs Voice POC.""" + +from pydantic_settings import BaseSettings + + +class Settings(BaseSettings): + """Application settings loaded from environment variables.""" + + # ElevenLabs Configuration + elevenlabs_api_key: str + elevenlabs_agent_id: str + + # WebSocket monitoring endpoint + elevenlabs_ws_url: str = "wss://api.elevenlabs.io/v1/convai/conversation" + + # Galileo Configuration + galileo_api_key: str = "" + galileo_console_url: str = "http://localhost:3000" # Local Galileo instance + galileo_project_name: str = "elevenlabs-voice-poc" + galileo_log_stream: str = "voice-conversations" + + galileo_protect_enabled: bool = True + galileo_protect_stage_id: str = "" + + class Config: + env_file = ".env" + env_file_encoding = "utf-8" + + +def get_settings() -> Settings: + """Get application settings.""" + return Settings() diff --git a/python/chatbot/elevenlabs-chatbot-protect/conversation.py b/python/chatbot/elevenlabs-chatbot-protect/conversation.py new file mode 100644 index 00000000..748f98f3 --- /dev/null +++ b/python/chatbot/elevenlabs-chatbot-protect/conversation.py @@ -0,0 +1,204 @@ +"""ElevenLabs Conversation using official SDK - supports voice input/output.""" + +import os +import uuid +from pathlib import Path + +from dotenv import load_dotenv + +# Load .env from project root +env_path = Path(__file__).parent / ".env" +load_dotenv(env_path) + +from elevenlabs.client import ElevenLabs +from elevenlabs.conversational_ai.conversation import Conversation +from elevenlabs.conversational_ai.default_audio_interface import DefaultAudioInterface + +from config import get_settings +from galileo_handler import get_galileo_handler + + +# Initialize Galileo handler +_galileo = None +_conversation = None + + +def _get_galileo(): + """Lazy initialization of Galileo handler.""" + global _galileo + if _galileo is None: + _galileo = get_galileo_handler() + return _galileo + + +def on_agent_response(response: str) -> None: + """Called when agent responds - logs to Galileo.""" + print(f"\n[AGENT] {response}") + + galileo = _get_galileo() + result = galileo.log_agent_turn(response) + if result.get("blocked"): + print(f"[FAKE_GUARDRAIL] Agent response flagged: {result.get('reason')}") + + +def on_user_transcript(transcript: str) -> None: + """Called when user speech is transcribed - logs to Galileo.""" + global _conversation + print(f"\n[USER] {transcript}") + + galileo = _get_galileo() + result = galileo.log_user_turn(transcript) + if result.get("blocked"): + print(f"\n[GALILEO PROTECT] *** INPUT BLOCKED *** {result.get('reason')}") + + # Get the override message from Galileo Protect + override_message = result.get("override_message") + + if override_message: + # End current conversation session first + if _conversation: + print(f"[GALILEO PROTECT] Ending conversation session...") + _conversation.end_session() + + # Pause briefly to let audio system settle + import time + + time.sleep(1.5) + + # Print the override message + print(f"\n[AGENT] {override_message}") + + # Generate and play the override message audio + try: + import tempfile + import subprocess + import platform + + settings = get_settings() + + # Get the ElevenLabs client + client = ElevenLabs(api_key=settings.elevenlabs_api_key) + + # Generate audio using the text_to_speech module + print(f"[GALILEO PROTECT] Generating audio for override message...") + audio_generator = client.text_to_speech.convert( + text=override_message, + voice_id="cjVigY5qzO86Huf0OWal", # Eric voice ID + model_id="eleven_turbo_v2", + output_format="mp3_22050_32", # Lower quality to match conversational audio + ) + + # Save audio to a temporary file + with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as f: + for chunk in audio_generator: + f.write(chunk) + temp_file = f.name + + # Play using system's default audio player + print(f"[GALILEO PROTECT] Playing override message...") + system = platform.system() + if system == "Darwin": # macOS + subprocess.run(["afplay", temp_file], check=True) + elif system == "Windows": + os.startfile(temp_file) + else: # Linux + subprocess.run(["xdg-open", temp_file], check=True) + + # Clean up + os.unlink(temp_file) + + print(f"[GALILEO PROTECT] Override message delivered via audio") + + except Exception as e: + print(f"[GALILEO PROTECT] Failed to generate or play audio: {e}") + print(f"[GALILEO PROTECT] Message was displayed as text above") + import traceback + + traceback.print_exc() + + # Log the override message to Galileo as an agent turn + # This ensures it shows up in the trace + galileo.log_agent_turn(override_message) + + # End the conversation + print("[GALILEO PROTECT] Ending conversation session...") + galileo.end_conversation() + print("[INFO] Conversation ended - logs sent to Galileo") + raise SystemExit("Call terminated by guardrail") + else: + # No override message, just end immediately + if _conversation: + print("[GALILEO PROTECT] Ending session due to guardrail violation...") + _conversation.end_session() + galileo.end_conversation() + raise SystemExit("Call terminated by guardrail") + + +def on_mode_change(mode: dict) -> None: + """Called when conversation mode changes (speaking/listening).""" + print(f"[MODE] {mode}") + + +def run_voice_conversation(use_headphones: bool = True): + """Run a voice conversation with microphone input and speaker output. + + Args: + use_headphones: If True, plays audio through speakers. + Set to False if not using headphones to avoid feedback loop. + """ + settings = get_settings() + + # Initialize ElevenLabs client + client = ElevenLabs(api_key=settings.elevenlabs_api_key) + + # Initialize Galileo and start conversation trace + galileo = _get_galileo() + session_id = str(uuid.uuid4()) + galileo.start_conversation(session_id) + + print("\n" + "=" * 60) + print("ElevenLabs Voice POC - Voice Mode + Galileo") + print(f"Session ID: {session_id}") + if use_headphones: + print("*** USE HEADPHONES to avoid audio feedback loop ***") + else: + print("*** Silent mode - no audio playback ***") + print("Speak into your microphone to talk to the agent") + print("Press Ctrl+C to end the session") + print("=" * 60 + "\n") + + global _conversation + + # Create conversation with audio interface + conversation = Conversation( + client=client, + agent_id=settings.elevenlabs_agent_id, + requires_auth=True, # We're using API key auth + # Required: audio interface for mic/speaker + audio_interface=DefaultAudioInterface(), + # Callbacks for monitoring - connected to Galileo + callback_agent_response=on_agent_response, + callback_user_transcript=on_user_transcript, + # callback_mode_change=on_mode_change, # Optional + ) + + _conversation = conversation + + # Start the conversation (blocking) + print("[INFO] Starting conversation... Speak now!") + conversation.start_session() + + # Wait for conversation to end + try: + conversation.wait_for_session_end() + except KeyboardInterrupt: + print("\n[INFO] Ending conversation...") + conversation.end_session() + + # End Galileo trace and flush logs + galileo.end_conversation() + print("[INFO] Conversation ended - logs sent to Galileo") + + +if __name__ == "__main__": + run_voice_conversation() diff --git a/python/chatbot/elevenlabs-chatbot-protect/elevenlabs_monitor.py b/python/chatbot/elevenlabs-chatbot-protect/elevenlabs_monitor.py new file mode 100644 index 00000000..7b2290eb --- /dev/null +++ b/python/chatbot/elevenlabs-chatbot-protect/elevenlabs_monitor.py @@ -0,0 +1,211 @@ +"""ElevenLabs WebSocket client for conversation monitoring.""" + +import asyncio +import json +from dataclasses import dataclass +from typing import Callable, Optional +import websockets +from websockets.exceptions import ConnectionClosed + + +@dataclass +class ConversationEvent: + """Represents a conversation event from ElevenLabs.""" + + event_type: str + data: dict + + @property + def transcript(self) -> Optional[str]: + """Get transcript text for user_transcript events.""" + if self.event_type == "user_transcript": + # Check nested structure first + event_data = self.data.get("user_transcript_event", {}) + return event_data.get("user_transcript", self.data.get("user_transcript", "")) + return None + + @property + def response(self) -> Optional[str]: + """Get response text for agent_response events.""" + if self.event_type == "agent_response": + # Check nested structure first + event_data = self.data.get("agent_response_event", {}) + return event_data.get("agent_response", self.data.get("agent_response", "")) + return None + + +class ElevenLabsMonitor: + """WebSocket client for monitoring ElevenLabs conversations.""" + + def __init__( + self, + api_key: str, + agent_id: str, + ws_url: str = "wss://api.elevenlabs.io/v1/convai/conversation", + ): + self.api_key = api_key + self.agent_id = agent_id + self.ws_url = f"{ws_url}?agent_id={agent_id}" + self.ws: Optional[websockets.WebSocketClientProtocol] = None + self._running = False + + # Event callbacks + self.on_user_transcript: Optional[Callable[[str], None]] = None + self.on_agent_response: Optional[Callable[[str], None]] = None + self.on_conversation_start: Optional[Callable[[dict], None]] = None + self.on_conversation_end: Optional[Callable[[], None]] = None + self.on_error: Optional[Callable[[Exception], None]] = None + + async def connect(self) -> None: + """Establish WebSocket connection to ElevenLabs.""" + headers = {"xi-api-key": self.api_key} + print(f"[DEBUG] Connecting to: {self.ws_url}") + self.ws = await websockets.connect( + self.ws_url, + additional_headers=headers, + ping_interval=20, + ping_timeout=10, + ) + self._running = True + print("[DEBUG] WebSocket connected, sending init message...") + + # Send initialization message (minimal - let agent use its configured settings) + init_msg = { + "type": "conversation_initiation_client_data", + } + await self.ws.send(json.dumps(init_msg)) + print("[DEBUG] Init message sent") + + async def disconnect(self) -> None: + """Close WebSocket connection.""" + self._running = False + if self.ws: + await self.ws.close() + self.ws = None + + async def send_text_message(self, text: str) -> None: + """Send a text message to the agent (simulates user input).""" + if not self.ws: + raise RuntimeError("Not connected") + + msg = {"type": "user_message", "message": text} + await self.ws.send(json.dumps(msg)) + + async def _handle_message(self, message: str) -> None: + """Process incoming WebSocket message.""" + try: + data = json.loads(message) + event_type = data.get("type", "unknown") + + # Debug: print raw message for troubleshooting + print(f"[DEBUG RAW] {json.dumps(data, indent=2)[:500]}") + + # Create event object + event = ConversationEvent(event_type=event_type, data=data) + + # Handle different event types + if event_type == "conversation_initiation_metadata": + # conversation_id may be nested or at top level + meta = data.get("conversation_initiation_metadata_event", data) + conv_id = meta.get("conversation_id", data.get("conversation_id", "N/A")) + print(f"[INIT] Conversation started - ID: {conv_id}") + if self.on_conversation_start: + self.on_conversation_start(data) + + elif event_type == "user_transcript": + transcript = event.transcript or "" + print(f"[USER] {transcript}") + if self.on_user_transcript: + self.on_user_transcript(transcript) + + elif event_type == "agent_response": + response = event.response or "" + print(f"[AGENT] {response}") + if self.on_agent_response: + self.on_agent_response(response) + + elif event_type == "agent_response_correction": + # Agent corrected its response + corrected = data.get("agent_response_correction", "") + print(f"[AGENT CORRECTION] {corrected}") + + elif event_type == "audio": + # Audio chunk - skip for text-only monitoring + pass + + elif event_type == "ping": + # Respond to ping with pong (event_id is nested in ping_event) + ping_event = data.get("ping_event", {}) + event_id = ping_event.get("event_id", data.get("event_id")) + pong = {"type": "pong", "event_id": event_id} + if self.ws: + await self.ws.send(json.dumps(pong)) + print(f"[DEBUG] Sent pong for event_id: {event_id}") + + elif event_type == "interruption": + print("[EVENT] User interrupted agent") + + elif event_type == "error": + error_msg = data.get("message", "Unknown error") + print(f"[ERROR] {error_msg}") + + else: + # Log other event types for debugging + print(f"[DEBUG] Event: {event_type}") + + except json.JSONDecodeError as e: + print(f"[ERROR] Failed to parse message: {e}") + + async def listen(self) -> None: + """Listen for messages from WebSocket.""" + if not self.ws: + raise RuntimeError("Not connected") + + try: + async for message in self.ws: + if not self._running: + break + await self._handle_message(message) + + except ConnectionClosed as e: + print(f"[INFO] Connection closed: {e.reason}") + if self.on_conversation_end: + self.on_conversation_end() + + except Exception as e: + print(f"[ERROR] WebSocket error: {e}") + if self.on_error: + self.on_error(e) + + async def run_interactive(self) -> None: + """Run an interactive text conversation session.""" + await self.connect() + + # Start listening in background + listen_task = asyncio.create_task(self.listen()) + + print("\n" + "=" * 60) + print("ElevenLabs Voice POC - Interactive Mode") + print("Type your message and press Enter to send") + print("Type 'quit' or 'exit' to end the session") + print("=" * 60 + "\n") + + try: + while self._running: + # Read user input (in a non-blocking way) + user_input = await asyncio.get_event_loop().run_in_executor(None, input, "You: ") + + if user_input.lower() in ("quit", "exit"): + break + + if user_input.strip(): + await self.send_text_message(user_input) + + except (KeyboardInterrupt, EOFError): + print("\n[INFO] Session interrupted") + + finally: + self._running = False + listen_task.cancel() + await self.disconnect() + print("[INFO] Disconnected") diff --git a/python/chatbot/elevenlabs-chatbot-protect/galileo_handler.py b/python/chatbot/elevenlabs-chatbot-protect/galileo_handler.py new file mode 100644 index 00000000..f21de227 --- /dev/null +++ b/python/chatbot/elevenlabs-chatbot-protect/galileo_handler.py @@ -0,0 +1,491 @@ +"""Galileo integration for logging, tracing. + +Uses the galileo.GalileoLogger API which auto-creates projects in Galileo. +Enables context_adherence metric on the log stream. +""" + +import os +import time +from typing import Optional, List, Dict +from dataclasses import dataclass, field +from datetime import datetime + +from galileo import GalileoScorers, invoke_protect, ExecutionStatus +from galileo.log_streams import enable_metrics +from galileo_core.schemas.protect.payload import Payload + +from config import get_settings + + +# Keywords that trigger call disruption (simulating Galileo guardrails) +BLOCKED_KEYWORDS = ["uss enterprise"] + + +@dataclass +class ConversationTrace: + """Tracks a single conversation session for Galileo logging.""" + + session_id: str + start_time: datetime = field(default_factory=datetime.now) + start_timestamp: float = field(default_factory=time.time) + turns: List[Dict] = field(default_factory=list) + active_trace: bool = False + conversation_context: List[Dict] = field(default_factory=list) + + # Metrics tracking + last_user_turn_time: Optional[float] = None # For latency calculation + latencies_ms: List[float] = field(default_factory=list) # Response latencies + user_char_count: int = 0 + agent_char_count: int = 0 + + +class GalileoHandler: + """Handles Galileo logging + + Uses GalileoLogger from the galileo package which auto-creates projects. + Captures comprehensive metrics for voice conversation analysis. + """ + + def __init__(self): + self.settings = get_settings() + self._galileo_client = None + self._protect_enabled = False + self._current_trace: Optional[ConversationTrace] = None + + # Set Galileo environment variables BEFORE importing galileo + if self.settings.galileo_console_url: + os.environ["GALILEO_CONSOLE_URL"] = self.settings.galileo_console_url + if self.settings.galileo_api_key: + os.environ["GALILEO_API_KEY"] = self.settings.galileo_api_key + + self._init_observe() + self._init_protect() + + def _init_observe(self): + """Initialize Galileo Logger for logging/tracing.""" + try: + from galileo import GalileoLogger + + init_kwargs = {} + if self.settings.galileo_project_name: + init_kwargs["project"] = self.settings.galileo_project_name + if self.settings.galileo_log_stream: + init_kwargs["log_stream"] = self.settings.galileo_log_stream + + self._galileo_client = GalileoLogger(**init_kwargs) + print(f"[GALILEO] Logger initialized - Project: {self.settings.galileo_project_name}, Stream: {self.settings.galileo_log_stream}") + + # Enable context_adherence metric on the log stream + enable_metrics( + project_name=self.settings.galileo_project_name, + log_stream_name=self.settings.galileo_log_stream, + metrics=[GalileoScorers.context_adherence], + ) + print(f"[GALILEO] Enabled metrics: context_adherence") + except ImportError as e: + print(f"[GALILEO] Warning: galileo package not installed, logging disabled: {e}") + except Exception as e: + print(f"[GALILEO] Warning: Could not initialize Logger: {e}") + + def _init_protect(self): + """Initialize Galileo Protect for guardrails.""" + if not self.settings.galileo_protect_enabled: + print("[GALILEO] Protect disabled via config") + return + + if not self.settings.galileo_protect_stage_id: + print("[GALILEO] Protect disabled - no stage_id configured") + return + + if not self.settings.galileo_project_name: + print("[GALILEO] Protect disabled - no project_name configured") + return + + # No client initialization needed - invoke_protect is a function + self._protect_enabled = True + print(f"[GALILEO] Protect enabled - Stage: {self.settings.galileo_protect_stage_id}") + + def _calculate_metrics(self) -> Dict[str, str]: + """Calculate session-level metrics.""" + if not self._current_trace: + return {} + + trace = self._current_trace + duration_sec = time.time() - trace.start_timestamp + user_turns = sum(1 for t in trace.turns if t["role"] == "user") + agent_turns = sum(1 for t in trace.turns if t["role"] == "assistant") + + avg_latency_ms = 0.0 + if trace.latencies_ms: + avg_latency_ms = sum(trace.latencies_ms) / len(trace.latencies_ms) + + return { + "duration_sec": f"{duration_sec:.2f}", + "total_turns": str(len(trace.turns)), + "user_turns": str(user_turns), + "agent_turns": str(agent_turns), + "avg_latency_ms": f"{avg_latency_ms:.0f}", + "min_latency_ms": f"{min(trace.latencies_ms):.0f}" if trace.latencies_ms else "0", + "max_latency_ms": f"{max(trace.latencies_ms):.0f}" if trace.latencies_ms else "0", + "user_char_count": str(trace.user_char_count), + "agent_char_count": str(trace.agent_char_count), + "total_char_count": str(trace.user_char_count + trace.agent_char_count), + } + + def start_conversation(self, session_id: str): + """Start tracking a new conversation session.""" + self._current_trace = ConversationTrace(session_id=session_id) + + # Start a Galileo session for this conversation + if self._galileo_client: + try: + session_name = f"ElevenLabs-{session_id[:8]}" + # Start session - the session_id is automatically stored on the logger + # NOTE: metadata is not supported on start_session - use trace metadata instead + self._galileo_client.start_session( + name=session_name, + external_id=session_id, + ) + # Session ID is now available on the logger instance + print(f"[GALILEO] Started session: {session_name}") + print(f"[GALILEO] Logger session_id: {self._galileo_client.session_id}") + except Exception as e: + print(f"[GALILEO] Failed to start session: {e}") + import traceback + + traceback.print_exc() + + print(f"[GALILEO] Started conversation: {session_id}") + + def log_user_turn(self, transcript: str) -> dict: + """Log a user turn (speech-to-text result) to Galileo. + + This starts a new trace for the conversation turn. + + Returns: + dict with guardrail results if enabled, empty dict otherwise + """ + result = {"blocked": False, "reason": None} + current_time = time.time() + + # Simulate Galileo Protect API call (requires Enterprise for real invoke_protect) + transcript_preview = transcript[:50] + "..." if len(transcript) > 50 else transcript + print(f'[GALILEO PROTECT] invoke_protect(stage="voice-guardrails", input="{transcript_preview}")') + + # Check for blocked keywords (simulating what Galileo Protect would do) + transcript_lower = transcript.lower() + for keyword in BLOCKED_KEYWORDS: + if keyword in transcript_lower: + result["blocked"] = True + result["reason"] = f"Blocked keyword detected: {keyword}" + print(f'[GALILEO PROTECT] Response: status=TRIGGERED, action=BLOCK, rule="keyword:{keyword}"') + return result + + print(f"[GALILEO PROTECT] Response: status=ALLOWED") + + if self._current_trace: + # If there's an active trace, conclude it first + if self._current_trace.active_trace and self._galileo_client: + try: + last_response = "No response" + for turn in reversed(self._current_trace.turns): + if turn["role"] == "assistant": + last_response = turn["content"] + break + self._galileo_client.conclude(output=last_response) + self._current_trace.active_trace = False + print(f"[GALILEO] Concluded previous trace") + except Exception as e: + print(f"[GALILEO] Failed to conclude previous trace: {e}") + import traceback + + traceback.print_exc() + + # Track metrics + self._current_trace.user_char_count += len(transcript) + self._current_trace.last_user_turn_time = current_time + + # Add to conversation context + self._current_trace.turns.append( + { + "role": "user", + "content": transcript, + "timestamp": datetime.now().isoformat(), + "char_count": len(transcript), + } + ) + self._current_trace.conversation_context.append({"role": "user", "content": transcript}) + + # Start a new trace for this user turn + if self._galileo_client: + try: + # Session persists automatically - no need to call set_session() + trace_num = sum(1 for t in self._current_trace.turns if t["role"] == "user") + self._galileo_client.start_trace( + input=transcript, + name=f"Turn-{trace_num}", + metadata={ + "session_id": self._current_trace.session_id, + "turn_number": str(trace_num), + "role": "user", + "source": "elevenlabs-stt", + "char_count": str(len(transcript)), + "timestamp": datetime.now().isoformat(), + }, + ) + self._current_trace.active_trace = True + print(f"[GALILEO] Started trace {trace_num} in session {self._galileo_client.session_id}") + except Exception as e: + print(f"[GALILEO] Failed to start trace: {e}") + import traceback + + traceback.print_exc() + + # Run input guardrails if enabled + if self._protect_enabled: + try: + payload = Payload(input=transcript, metadata={"role": "user"}) + + protect_result = invoke_protect( + payload=payload, + stage_id=self.settings.galileo_protect_stage_id, + project_name=self.settings.galileo_project_name, + ) + + # Log the protect span to Galileo + if self._galileo_client and self._current_trace.active_trace and protect_result: + try: + self._galileo_client.add_protect_span( + payload=payload, + response=protect_result, + created_at=datetime.now(), + metadata={ + "session_id": self._current_trace.session_id, + "role": "user", + "stage": "input_guardrail", + }, + status_code=200, + ) + print(f"[GALILEO] Logged Protect span for input guardrail") + except Exception as e: + print(f"[GALILEO] Failed to log Protect span: {e}") + + if protect_result and protect_result.status == ExecutionStatus.triggered: + result["blocked"] = True + + # Extract the override message to send to the user + override_message = None + if hasattr(protect_result, "action_result") and protect_result.action_result: + action_result = protect_result.action_result + if isinstance(action_result, dict) and action_result.get("type") == "OVERRIDE": + override_message = action_result.get("value") + + # Also extract triggered metrics for logging + triggered_info = [] + if hasattr(protect_result, "ruleset_results") and protect_result.ruleset_results: + for ruleset_result in protect_result.ruleset_results: + if "rule_results" in ruleset_result: + for rule_result in ruleset_result["rule_results"]: + if rule_result.get("status") == "TRIGGERED": + metric_name = rule_result.get("metric", "unknown") + value = rule_result.get("value", "N/A") + threshold = rule_result.get("target_value", "N/A") + operator = rule_result.get("operator", "N/A") + triggered_info.append(f"{metric_name}={value:.3f} (threshold: {operator} {threshold})") + + result["reason"] = ", ".join(triggered_info) if triggered_info else "Unknown rule triggered" + result["override_message"] = override_message + print(f"[GALILEO PROTECT] Input blocked by: {result['reason']}") + if override_message: + print(f"[GALILEO PROTECT] Override message: {override_message}") + except Exception as e: + print(f"[GALILEO] Guardrail check failed: {e}") + + return result + + def log_agent_turn(self, response: str, model: str = "elevenlabs-agent") -> dict: + """Log an agent turn (agent response) to Galileo. + + This adds an LLM span with latency metrics. + + Returns: + dict with guardrail results if enabled, empty dict otherwise + """ + result = {"blocked": False, "reason": None} + current_time = time.time() + latency_ms = 0.0 + + if self._current_trace: + # Calculate latency if we have a user turn timestamp + if self._current_trace.last_user_turn_time: + latency_ms = (current_time - self._current_trace.last_user_turn_time) * 1000 + self._current_trace.latencies_ms.append(latency_ms) + + # Track metrics + self._current_trace.agent_char_count += len(response) + + self._current_trace.turns.append( + { + "role": "assistant", + "content": response, + "timestamp": datetime.now().isoformat(), + "char_count": len(response), + "latency_ms": latency_ms, + } + ) + self._current_trace.conversation_context.append({"role": "assistant", "content": response}) + + # Add LLM span for this response + if self._galileo_client and self._current_trace.active_trace: + try: + from galileo import Message, MessageRole + + # Create output message + output_message = Message(content=response, role=MessageRole.assistant) + + turn_num = sum(1 for t in self._current_trace.turns if t["role"] == "assistant") + + self._galileo_client.add_llm_span( + input=self._current_trace.conversation_context.copy(), + output=output_message, + model=model, + name=f"Agent_Response_{turn_num}", + metadata={ + "session_id": self._current_trace.session_id, + "turn_number": str(turn_num), + "latency_ms": f"{latency_ms:.0f}", + "char_count": str(len(response)), + "timestamp": datetime.now().isoformat(), + }, + ) + + # Flush immediately so logs appear in real-time in Galileo + self._galileo_client.flush() + print(f"[GALILEO] Logged & flushed turn {turn_num} (latency: {latency_ms:.0f}ms, chars: {len(response)})") + except Exception as e: + print(f"[GALILEO] Failed to add LLM span: {e}") + + # Run output guardrails if enabled + if self._protect_enabled: + try: + payload = Payload(output=response, metadata={"role": "assistant"}) + + protect_result = invoke_protect( + payload=payload, + stage_id=self.settings.galileo_protect_stage_id, + project_name=self.settings.galileo_project_name, + ) + + # Log the protect span to Galileo + if self._galileo_client and self._current_trace.active_trace and protect_result: + try: + self._galileo_client.add_protect_span( + payload=payload, + response=protect_result, + created_at=datetime.now(), + metadata={ + "session_id": self._current_trace.session_id, + "role": "assistant", + "stage": "output_guardrail", + }, + status_code=200, + ) + print(f"[GALILEO] Logged Protect span for output guardrail") + except Exception as e: + print(f"[GALILEO] Failed to log Protect span: {e}") + + if protect_result and protect_result.status == ExecutionStatus.triggered: + result["blocked"] = True + + # Extract triggered metrics and values from ruleset_results + triggered_info = [] + if hasattr(protect_result, "ruleset_results") and protect_result.ruleset_results: + for ruleset_result in protect_result.ruleset_results: + if "rule_results" in ruleset_result: + for rule_result in ruleset_result["rule_results"]: + if rule_result.get("status") == "TRIGGERED": + metric_name = rule_result.get("metric", "unknown") + value = rule_result.get("value", "N/A") + threshold = rule_result.get("target_value", "N/A") + operator = rule_result.get("operator", "N/A") + triggered_info.append(f"{metric_name}={value:.3f} (threshold: {operator} {threshold})") + + result["reason"] = ", ".join(triggered_info) if triggered_info else "Unknown rule triggered" + print(f"[GALILEO PROTECT] Output flagged by: {result['reason']}") + except Exception as e: + print(f"[GALILEO] Guardrail check failed: {e}") + + return result + + def end_conversation(self): + """End the current conversation and flush logs to Galileo with final metrics.""" + if not self._current_trace: + return + + # Calculate final metrics + metrics = self._calculate_metrics() + + # Conclude any active trace + if self._galileo_client and self._current_trace.active_trace: + try: + last_response = "Conversation ended" + for turn in reversed(self._current_trace.turns): + if turn["role"] == "assistant": + last_response = turn["content"] + break + self._galileo_client.conclude(output=last_response) + print(f"[GALILEO] Concluded final trace") + except Exception as e: + print(f"[GALILEO] Failed to conclude trace: {e}") + import traceback + + traceback.print_exc() + + # Flush all logs to Galileo + if self._galileo_client: + try: + print(f"[GALILEO] Flushing session {self._galileo_client.session_id}...") + self._galileo_client.flush() + print(f"[GALILEO] Flushed logs - {len(self._current_trace.turns)} turns logged") + except Exception as e: + print(f"[GALILEO] Failed to flush: {e}") + import traceback + + traceback.print_exc() + + # Print metrics summary + print(f"[GALILEO] Session metrics:") + print(f" - Duration: {metrics.get('duration_sec', '0')}s") + print(f" - Turns: {metrics.get('total_turns', '0')} (user: {metrics.get('user_turns', '0')}, agent: {metrics.get('agent_turns', '0')})") + print( + f" - Avg latency: {metrics.get('avg_latency_ms', '0')}ms (min: {metrics.get('min_latency_ms', '0')}ms, max: {metrics.get('max_latency_ms', '0')}ms)" + ) + print( + f" - Characters: {metrics.get('total_char_count', '0')} (user: {metrics.get('user_char_count', '0')}, agent: {metrics.get('agent_char_count', '0')})" + ) + + # Clear the session to properly end it + if self._galileo_client and self._galileo_client.session_id: + try: + print(f"[GALILEO] Clearing session: {self._galileo_client.session_id}") + self._galileo_client.clear_session() + print(f"[GALILEO] Session cleared successfully") + except Exception as e: + print(f"[GALILEO] Failed to clear session: {e}") + import traceback + + traceback.print_exc() + + self._current_trace = None + + +# Singleton instance +_handler: Optional[GalileoHandler] = None + + +def get_galileo_handler() -> GalileoHandler: + """Get or create the Galileo handler singleton.""" + global _handler + if _handler is None: + _handler = GalileoHandler() + return _handler diff --git a/python/chatbot/elevenlabs-chatbot-protect/main.py b/python/chatbot/elevenlabs-chatbot-protect/main.py new file mode 100644 index 00000000..244bcbb9 --- /dev/null +++ b/python/chatbot/elevenlabs-chatbot-protect/main.py @@ -0,0 +1,51 @@ +"""Main entry point for the ElevenLabs Voice POC.""" + +import asyncio +import sys + +from config import get_settings +from elevenlabs_monitor import ElevenLabsMonitor + + +def main() -> None: + """Run the ElevenLabs voice conversation monitor.""" + try: + settings = get_settings() + except Exception as e: + print(f"[ERROR] Failed to load configuration: {e}") + print("\nMake sure you have a .env file with:") + print(" ELEVENLABS_API_KEY=your-api-key") + print(" ELEVENLABS_AGENT_ID=your-agent-id") + sys.exit(1) + + # Create monitor instance + monitor = ElevenLabsMonitor( + api_key=settings.elevenlabs_api_key, + agent_id=settings.elevenlabs_agent_id, + ws_url=settings.elevenlabs_ws_url, + ) + + # Set up optional callbacks for Phase 2 Galileo integration + # These will be connected to galileo_handler.py later + def on_transcript(transcript: str) -> None: + # Placeholder for Galileo logging + # galileo_handler.log_user_turn(transcript) + pass + + def on_response(response: str) -> None: + # Placeholder for Galileo logging + # galileo_handler.log_agent_turn(response) + pass + + monitor.on_user_transcript = on_transcript + monitor.on_agent_response = on_response + + # Run the interactive session + try: + asyncio.run(monitor.run_interactive()) + except KeyboardInterrupt: + print("\n[INFO] Shutting down...") + + +if __name__ == "__main__": + main() diff --git a/python/chatbot/elevenlabs-chatbot-protect/pyproject.toml b/python/chatbot/elevenlabs-chatbot-protect/pyproject.toml new file mode 100644 index 00000000..ce6101bc --- /dev/null +++ b/python/chatbot/elevenlabs-chatbot-protect/pyproject.toml @@ -0,0 +1,22 @@ +[project] +name = "elevenlabs-chatbot-protect" +version = "0.1.0" +description = "ElevenLabs Voice Chatbot with Galileo Protect integration for guardrails" +requires-python = ">=3.9" +dependencies = [ + "websockets>=12.0", + "python-dotenv>=1.0.0", + "pydantic-settings>=2.0.0", + "elevenlabs>=1.0.0", + "galileo>=1.34.0", + "galileo-protect", + "pyaudio>=0.2.14", +] + +[project.scripts] +voice-chatbot = "main:main" +voice-conversation = "conversation:run_voice_conversation" + +[build-system] +requires = ["setuptools>=61.0"] +build-backend = "setuptools.build_meta" diff --git a/python/chatbot/elevenlabs-chatbot-protect/requirements.txt b/python/chatbot/elevenlabs-chatbot-protect/requirements.txt new file mode 100644 index 00000000..49ae86d1 --- /dev/null +++ b/python/chatbot/elevenlabs-chatbot-protect/requirements.txt @@ -0,0 +1,7 @@ +websockets>=12.0 +python-dotenv>=1.0.0 +pydantic-settings>=2.0.0 +elevenlabs>=1.0.0 +galileo>=1.34.0 +galileo-protect +pyaudio>=0.2.14 diff --git a/python/chatbot/elevenlabs-chatbot-protect/scripts/create_stage.py b/python/chatbot/elevenlabs-chatbot-protect/scripts/create_stage.py new file mode 100644 index 00000000..fa289003 --- /dev/null +++ b/python/chatbot/elevenlabs-chatbot-protect/scripts/create_stage.py @@ -0,0 +1,71 @@ +# run this to create galileo protect stage +import os +import sys +import logging +from pathlib import Path + +# Enable HTTP request logging +logging.basicConfig(level=logging.DEBUG, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s") +# Show HTTP details +logging.getLogger("httpx").setLevel(logging.DEBUG) +logging.getLogger("httpcore").setLevel(logging.DEBUG) + +# Add the parent directory to path so we can import config +sys.path.insert(0, str(Path(__file__).parent.parent)) + +from config import get_settings + +# Set Galileo environment variables BEFORE importing galileo +settings = get_settings() +if settings.galileo_console_url: + os.environ["GALILEO_CONSOLE_URL"] = settings.galileo_console_url +if settings.galileo_api_key: + os.environ["GALILEO_API_KEY"] = settings.galileo_api_key + +# Project name is required - stage will not be created without it +os.environ["GALILEO_PROJECT"] = settings.galileo_project_name + + +# Stage configuration +STAGE_NAME = "voice-guardrails" +STAGE_DESCRIPTION = "Guardrails for voice conversations" + +print(f"[CONFIG] GALILEO_CONSOLE_URL: {os.environ.get('GALILEO_CONSOLE_URL', 'not set')}") +print(f"[CONFIG] GALILEO_API_KEY: {os.environ['GALILEO_API_KEY'][:10]}...") +print(f"[CONFIG] GALILEO_PROJECT: {settings.galileo_project_name}") +# All these are available from the top-level galileo module +from galileo import ( + GalileoScorers, + create_protect_stage, + get_protect_stage, + Ruleset, + StageType, +) + +# These are not re-exported in galileo.__init__, so keep them as-is +from galileo_core.schemas.protect.action import OverrideAction +from galileo_core.schemas.protect.rule import Rule, RuleOperator + +# Create a rule +rule = Rule(metric=GalileoScorers.input_toxicity, operator=RuleOperator.gt, target_value=0.2) + +# Create an override action +action = OverrideAction(choices=["We're sorry, we can't process your request."]) + +# Add this rule to a ruleset, using the default passthrough action +ruleset = Ruleset(rules=[rule], action=action) + +# Create a stage +stage = create_protect_stage( + name=STAGE_NAME, + stage_type=StageType.central, + prioritized_rulesets=[ruleset], + description=STAGE_DESCRIPTION, +) +print(f"\n[SUCCESS] Created stage: {stage}") + +# Verify stage was created +stage = get_protect_stage(stage_name=STAGE_NAME) +print(f"\n[INFO] Stage ID: {stage.id}") +print(f"[INFO] Add this to your .env file:") +print(f"GALILEO_PROTECT_STAGE_ID={stage.id}")