diff --git a/sdk/ai/azure-ai-voicelive/.env.template b/sdk/ai/azure-ai-voicelive/.env.template index 6d6b54e6e159..c0b57bc9670b 100644 --- a/sdk/ai/azure-ai-voicelive/.env.template +++ b/sdk/ai/azure-ai-voicelive/.env.template @@ -2,13 +2,18 @@ # Copy this file to .env and fill in your values # Required credentials -AZURE_VOICELIVE_API_KEY=your-voicelive-api-key -AZURE_VOICELIVE_ENDPOINT=wss://api.voicelive.com/v1 +AZURE_VOICELIVE_ENDPOINT=https://your-resource-name.services.ai.azure.com/ +AZURE_VOICELIVE_API_VERSION=2025-10-01 +AZURE_VOICELIVE_API_KEY= # Only required if using API key authentication # Optional configuration -AZURE_VOICELIVE_MODEL=gpt-4o-realtime-preview -AZURE_VOICELIVE_VOICE=alloy +AZURE_VOICELIVE_MODEL=gpt-realtime +AZURE_VOICELIVE_VOICE=en-US-Ava:DragonHDLatestNeural AZURE_VOICELIVE_INSTRUCTIONS=You are a helpful assistant. Keep your responses concise. +# For Foundry agent connection +AZURE_VOICELIVE_AGENT_ID= +AZURE_VOICELIVE_PROJECT_NAME= + # For audio samples -AUDIO_FILE=path/to/your/test_audio.wav \ No newline at end of file +AUDIO_FILE=path/to/your/test_audio.wav diff --git a/sdk/ai/azure-ai-voicelive/samples/.env_sample b/sdk/ai/azure-ai-voicelive/samples/.env_sample new file mode 100644 index 000000000000..47bf5db4cceb --- /dev/null +++ b/sdk/ai/azure-ai-voicelive/samples/.env_sample @@ -0,0 +1,8 @@ +AZURE_VOICELIVE_ENDPOINT=https://your-resource-name.services.ai.azure.com/ +AZURE_VOICELIVE_MODEL=gpt-realtime +AZURE_VOICELIVE_VOICE=en-US-Ava:DragonHDLatestNeural +AZURE_VOICELIVE_API_VERSION=2025-10-01 +AZURE_VOICELIVE_API_KEY= # Only required if using API key authentication +AZURE_VOICELIVE_INSTRUCTIONS=You are a helpful assistant. Keep your responses concise. +AZURE_VOICELIVE_AGENT_ID= +AZURE_VOICELIVE_PROJECT_NAME= diff --git a/sdk/ai/azure-ai-voicelive/samples/.gitignore b/sdk/ai/azure-ai-voicelive/samples/.gitignore new file mode 100644 index 000000000000..818e74659ef3 --- /dev/null +++ b/sdk/ai/azure-ai-voicelive/samples/.gitignore @@ -0,0 +1,2 @@ +# logs +logs/ \ No newline at end of file diff --git a/sdk/ai/azure-ai-voicelive/samples/BASIC_VOICE_ASSISTANT.md b/sdk/ai/azure-ai-voicelive/samples/BASIC_VOICE_ASSISTANT.md index 768f915a250d..6e8c873dd383 100644 --- a/sdk/ai/azure-ai-voicelive/samples/BASIC_VOICE_ASSISTANT.md +++ b/sdk/ai/azure-ai-voicelive/samples/BASIC_VOICE_ASSISTANT.md @@ -1,200 +1,204 @@ -# Basic Voice Assistant - -This sample demonstrates a complete voice assistant implementation using the Azure AI VoiceLive SDK with async patterns. It provides real-time speech-to-speech interaction with interruption handling and server-side voice activity detection. - -## Features - -- **Real-time Speech Streaming**: Continuous audio capture and playback -- **Server-Side Voice Activity Detection (VAD)**: Automatic detection of speech start/end -- **Interruption Handling**: Users can interrupt the AI assistant mid-response -- **High-Quality Audio Processing**: 24kHz PCM16 mono audio for optimal quality -- **Robust Error Handling**: Connection error recovery and graceful shutdown -- **Async Architecture**: Non-blocking operations for responsive interaction - -## Prerequisites - -- Python 3.9+ -- Microphone and speakers/headphones -- Azure AI VoiceLive API key and endpoint - -## Installation - -```bash -pip install azure-ai-voicelive pyaudio python-dotenv -``` - -## Configuration - -Create a `.env` file with your credentials: - -```bash -AZURE_VOICELIVE_API_KEY=your-api-key -AZURE_VOICELIVE_ENDPOINT=your-endpoint -AZURE_VOICELIVE_MODEL=gpt-4o-realtime-preview -AZURE_VOICELIVE_VOICE=en-US-AvaNeural -AZURE_VOICELIVE_INSTRUCTIONS=You are a helpful AI assistant. Respond naturally and conversationally. -``` - -## Running the Sample - -```bash -python basic_voice_assistant_async.py -``` - -Optional command-line arguments: - -```bash -python basic_voice_assistant_async.py \ - --model gpt-4o-realtime-preview \ - --voice en-US-AvaNeural \ - --instructions "You are a helpful assistant" \ - --verbose -``` - -## How It Works - -### 1. Connection Setup -The sample establishes an async WebSocket connection to the Azure VoiceLive service: - -```python -async with connect( - endpoint=endpoint, - credential=credential, - model=model -) as connection: - # Voice assistant logic here -``` - -### 2. Session Configuration -Configures audio formats, voice settings, and VAD parameters: - -```python -session_config = RequestSession( - modalities=[Modality.TEXT, Modality.AUDIO], - instructions=instructions, - voice=voice_config, - input_audio_format=InputAudioFormat.PCM16, - output_audio_format=OutputAudioFormat.PCM16, - turn_detection=ServerVad( - threshold=0.5, - prefix_padding_ms=300, - silence_duration_ms=500 - ), -) -``` - -### 3. Audio Processing -- **Input**: Captures microphone audio in real-time using PyAudio -- **Streaming**: Sends base64-encoded audio chunks to the service -- **Output**: Receives and plays AI-generated speech responses - -### 4. Event Handling -Processes various server events: - -- `SESSION_UPDATED`: Session is ready for interaction -- `INPUT_AUDIO_BUFFER_SPEECH_STARTED`: User starts speaking (interrupt AI) -- `INPUT_AUDIO_BUFFER_SPEECH_STOPPED`: User stops speaking (process input) -- `RESPONSE_AUDIO_DELTA`: Receive AI speech audio chunks -- `RESPONSE_DONE`: AI response complete -- `ERROR`: Handle service errors - -## Threading Architecture - -The sample uses a multi-threaded approach for real-time audio processing: - -- **Main Thread**: Async event loop and UI -- **Capture Thread**: PyAudio input stream reading -- **Send Thread**: Audio data transmission to service -- **Playback Thread**: PyAudio output stream writing - -## Key Classes - -### AudioProcessor -Manages real-time audio capture and playback with proper threading and queue management. - -### BasicVoiceAssistant -Main application class that coordinates WebSocket connection, session management, and audio processing. - -## Supported Voices - -### Azure Neural Voices -- `en-US-AvaNeural` - Female, natural and professional -- `en-US-JennyNeural` - Female, conversational -- `en-US-GuyNeural` - Male, professional - -### OpenAI Voices -- `alloy` - Versatile, neutral -- `echo` - Precise, clear -- `fable` - Animated, expressive -- `onyx` - Deep, authoritative -- `nova` - Warm, conversational -- `shimmer` - Optimistic, friendly - -## Troubleshooting - -### Audio Issues -- **No microphone detected**: Check device connections and permissions -- **No audio output**: Verify speakers/headphones are connected -- **Audio quality issues**: Ensure 24kHz sample rate support - -### Connection Issues -- **WebSocket errors**: Verify endpoint and credentials -- **API errors**: Check model availability and account permissions -- **Network timeouts**: Check firewall settings and network connectivity - -### PyAudio Installation Issues -- **Linux**: `sudo apt-get install -y portaudio19-dev libasound2-dev` -- **macOS**: `brew install portaudio` -- **Windows**: Usually installs without issues - -## Advanced Usage - -### Custom Instructions -Modify the AI assistant's behavior by customizing the instructions: - -```bash -python basic_voice_assistant_async.py --instructions "You are a coding assistant that helps with Python programming questions." -``` - -### Voice Selection -Choose different voices for varied experience: - -```bash -# Azure Neural Voice -python basic_voice_assistant_async.py --voice en-US-JennyNeural - -# OpenAI Voice -python basic_voice_assistant_async.py --voice nova -``` - -### Debug Mode -Enable verbose logging for troubleshooting: - -```bash -python basic_voice_assistant_async.py --verbose -``` - -## Code Structure - -``` -basic_voice_assistant_async.py -├── AudioProcessor class -│ ├── Audio capture (microphone input) -│ ├── Audio streaming (to service) -│ └── Audio playback (AI responses) -├── BasicVoiceAssistant class -│ ├── WebSocket connection management -│ ├── Session configuration -│ └── Event processing -└── Main execution - ├── Argument parsing - ├── Environment setup - └── Assistant initialization -``` - -## Next Steps - -- Explore `async_function_calling_sample.py` for function calling capabilities -- Check out other samples in the `samples/` directory -- Read the main SDK documentation in `README.md` -- Review the API reference for advanced usage patterns \ No newline at end of file +# Basic Voice Assistant + +This sample demonstrates a complete voice assistant implementation using the Azure AI VoiceLive SDK with async patterns. It provides real-time speech-to-speech interaction with interruption handling and server-side voice activity detection. + +## Features + +- **Real-time Speech Streaming**: Continuous audio capture and playback +- **Server-Side Voice Activity Detection (VAD)**: Automatic detection of speech start/end +- **Interruption Handling**: Users can interrupt the AI assistant mid-response +- **High-Quality Audio Processing**: 24kHz PCM16 mono audio for optimal quality +- **Robust Error Handling**: Connection error recovery and graceful shutdown +- **Async Architecture**: Non-blocking operations for responsive interaction + +## Prerequisites + +- Python 3.9+ +- Microphone and speakers/headphones +- Azure AI VoiceLive API key and endpoint + +## Installation + +```bash +pip install azure-ai-voicelive pyaudio python-dotenv +``` + +## Configuration + +Create a `.env` file with your credentials: + +```bash +AZURE_VOICELIVE_API_KEY=your-api-key +AZURE_VOICELIVE_ENDPOINT=your-endpoint +AZURE_VOICELIVE_MODEL=gpt-realtime +AZURE_VOICELIVE_VOICE=en-US-Ava:DragonHDLatestNeural +AZURE_VOICELIVE_INSTRUCTIONS=You are a helpful AI assistant. Respond naturally and conversationally. +``` + +## Running the Sample + +```bash +python basic_voice_assistant_async.py +``` + +Optional command-line arguments: + +```bash +python basic_voice_assistant_async.py \ + --model gpt-realtime \ + --voice en-US-Ava:DragonHDLatestNeural \ + --instructions "You are a helpful assistant" \ + --verbose + +# Or use Azure token authentication instead of API key: +python basic_voice_assistant_async.py --use-token-credential +``` + +## How It Works + +### 1. Connection Setup +The sample establishes an async WebSocket connection to the Azure VoiceLive service: + +```python +async with connect( + endpoint=endpoint, + credential=credential, + model=model +) as connection: + # Voice assistant logic here +``` + +### 2. Session Configuration +Configures audio formats, voice settings, and VAD parameters: + +```python +session_config = RequestSession( + modalities=[Modality.TEXT, Modality.AUDIO], + instructions=instructions, + voice=voice_config, + input_audio_format=InputAudioFormat.PCM16, + output_audio_format=OutputAudioFormat.PCM16, + turn_detection=ServerVad( + threshold=0.5, + prefix_padding_ms=300, + silence_duration_ms=500 + ), +) +``` + +### 3. Audio Processing +- **Input**: Captures microphone audio in real-time using PyAudio +- **Streaming**: Sends base64-encoded audio chunks to the service +- **Output**: Receives and plays AI-generated speech responses + +### 4. Event Handling +Processes various server events: + +- `SESSION_UPDATED`: Session is ready for interaction +- `INPUT_AUDIO_BUFFER_SPEECH_STARTED`: User starts speaking (interrupt AI) +- `INPUT_AUDIO_BUFFER_SPEECH_STOPPED`: User stops speaking (process input) +- `RESPONSE_AUDIO_DELTA`: Receive AI speech audio chunks +- `RESPONSE_DONE`: AI response complete +- `ERROR`: Handle service errors + +## Threading Architecture + +The sample uses a multi-threaded approach for real-time audio processing: + +- **Main Thread**: Async event loop and UI +- **Capture Thread**: PyAudio input stream reading +- **Send Thread**: Audio data transmission to service +- **Playback Thread**: PyAudio output stream writing + +## Key Classes + +### AudioProcessor +Manages real-time audio capture and playback with proper threading and queue management. + +### BasicVoiceAssistant +Main application class that coordinates WebSocket connection, session management, and audio processing. + +## Supported Voices + +### Azure Neural Voices +- `en-US-AvaNeural` - Female, natural and professional +- `en-US-JennyNeural` - Female, conversational +- `en-US-GuyNeural` - Male, professional + +### OpenAI Voices +- `alloy` - Versatile, neutral +- `echo` - Precise, clear +- `fable` - Animated, expressive +- `onyx` - Deep, authoritative +- `nova` - Warm, conversational +- `shimmer` - Optimistic, friendly + +## Troubleshooting + +### Audio Issues +- **No microphone detected**: Check device connections and permissions +- **No audio output**: Verify speakers/headphones are connected +- **Audio quality issues**: Ensure 24kHz sample rate support + +### Connection Issues +- **WebSocket errors**: Verify endpoint and credentials +- **API errors**: Check model availability and account permissions +- **Network timeouts**: Check firewall settings and network connectivity + +### PyAudio Installation Issues +- **Linux**: `sudo apt-get install -y portaudio19-dev libasound2-dev` +- **macOS**: `brew install portaudio` +- **Windows**: Usually installs without issues + +## Advanced Usage + +### Custom Instructions +Modify the AI assistant's behavior by customizing the instructions: + +```bash +python basic_voice_assistant_async.py --instructions "You are a coding assistant that helps with Python programming questions." +``` + +### Voice Selection +Choose different voices for varied experience: + +```bash +# Azure Neural Voice +python basic_voice_assistant_async.py --voice en-US-JennyNeural + +# OpenAI Voice +python basic_voice_assistant_async.py --voice nova +``` + +### Debug Mode +Enable verbose logging for troubleshooting: + +```bash +python basic_voice_assistant_async.py --verbose +``` + +## Code Structure + +``` +basic_voice_assistant_async.py +├── AudioProcessor class +│ ├── Audio capture (microphone input) +│ ├── Audio streaming (to service) +│ └── Audio playback (AI responses) +├── BasicVoiceAssistant class +│ ├── WebSocket connection management +│ ├── Session configuration +│ └── Event processing +└── Main execution + ├── Argument parsing + ├── Environment setup + └── Assistant initialization +``` + +## Next Steps + +- Explore `agent_voice_assistant_async.py` for Foundry agent integration and conversation logging capabilities +- Explore `function_calling_sample_async.py` for function calling capabilities with get_current_time and get_current_weather examples +- Explore `voice_assistant_w_proactive_greeting_async.py` for proactive greeting strategies +- Check out other samples in the `samples/` directory +- Read the main SDK documentation in `README.md` diff --git a/sdk/ai/azure-ai-voicelive/samples/README.md b/sdk/ai/azure-ai-voicelive/samples/README.md index 41529c87416f..83d5f4e22b6f 100644 --- a/sdk/ai/azure-ai-voicelive/samples/README.md +++ b/sdk/ai/azure-ai-voicelive/samples/README.md @@ -1,113 +1,134 @@ -# Azure AI VoiceLive Samples - -This directory contains sample applications demonstrating various capabilities of the Azure AI VoiceLive SDK. - -> **Note:** All samples use async/await patterns as the SDK is now exclusively async. - -## Prerequisites - -- Python 3.9 or later -- An Azure subscription with access to Azure AI VoiceLive -- Azure AI VoiceLive API key - -## Setup - -1. **Install dependencies**: - - ```bash - pip install azure-ai-voicelive[aiohttp] pyaudio python-dotenv - ``` - -2. **Configure environment variables**: - - Create a `.env` file at the root of the azure-ai-voicelive directory or in the samples directory with the following variables: - - ```ini - AZURE_VOICELIVE_API_KEY=your-voicelive-api-key - AZURE_VOICELIVE_ENDPOINT=wss://api.voicelive.com/v1 - AZURE_VOICELIVE_MODEL=gpt-4o-realtime-preview - AZURE_VOICELIVE_VOICE=alloy - AZURE_VOICELIVE_INSTRUCTIONS=You are a helpful assistant. Keep your responses concise. - ``` - - You can copy the `.env.template` file and fill in your values: - - ```bash - cp ../.env.template ./.env - ``` - -## Running the samples - -### Quick Start: Basic Voice Assistant 🎤 - -For a complete voice conversation experience, start with the featured sample: - -```bash -python basic_voice_assistant_async.py -``` - -This sample demonstrates: - -- Real-time voice conversation with AI -- Automatic turn detection and interruption handling -- Full duplex audio streaming -- Robust error handling and reconnection - -See "BASIC_VOICE_ASSISTANT.md" for complete documentation. - -### Using Visual Studio Code - -1. Open the `azure-ai-voicelive` directory in VS Code -2. Configure your `.env` file as described above -3. Open the VS Code Run panel (Ctrl+Shift+D) -4. Select a sample configuration from the dropdown -5. Click the Run button or press F5 to run the sample in debug mode - -### From the command line - -Run any sample directly: - -```bash -python basic_voice_assistant_async.py -``` - -Most samples support additional command-line arguments. For example: - -```bash -python basic_voice_assistant_async.py --model gpt-4o-realtime-preview --voice alloy -``` - -Use the `--help` flag to see all available options: - -```bash -python basic_voice_assistant_async.py --help -``` - -## Sample descriptions - -- **basic_voice_assistant_async.py**: 🌟 **[Featured Sample]** Complete async voice assistant demonstrating real-time conversation, interruption handling, and server VAD. Perfect starting point for voice applications. See "BASIC_VOICE_ASSISTANT.md" for detailed documentation. -- **async_function_calling_sample.py**: Demonstrates async function calling capabilities with the VoiceLive SDK, showing how to handle function calls from the AI model. - -## Troubleshooting - -- **PyAudio / PortAudio build errors** - - Linux: `sudo apt-get install -y portaudio19-dev libasound2-dev` - - macOS: `brew install portaudio` - - Windows: try `pip install pyaudio` - -- **No input/output devices** - Ensure your OS sees a microphone and speakers. On headless CI, you typically cannot run audio samples. - -- **WebSocket connection issues (1006/timeout)** - - Recheck `AZURE_VOICELIVE_ENDPOINT` - - Confirm your network allows WSS to the service - -- **Auth errors** - - For API key: confirm `AZURE_VOICELIVE_API_KEY` - - For AAD: ensure your identity has access to the resource - -## Next steps - -- Try the **Basic Voice Assistant** sample first, then explore the others for specific scenarios. -- Integrate the SDK into your own app by copying pieces from the samples (e.g., audio capture/playback or event handling loops). -- Visit the Azure SDK repo to see additional guidance, issues, and contributions. +# Azure AI VoiceLive Samples + +This directory contains sample applications demonstrating various capabilities of the Azure AI VoiceLive SDK. + +> **Note:** All samples use async/await patterns as the SDK is now exclusively async. + +## Prerequisites + +- Python 3.9 or later +- An Azure subscription with access to Azure AI VoiceLive +- Azure AI VoiceLive API key + +## Setup + +1. **Install dependencies**: + + ```bash + pip install azure-ai-voicelive[aiohttp] pyaudio python-dotenv azure-identity + ``` + +2. **Configure environment variables**: + + Create a `.env` file at the root of the azure-ai-voicelive directory or in the samples directory with the following variables: + + ```ini + AZURE_VOICELIVE_API_KEY=your-voicelive-api-key + AZURE_VOICELIVE_ENDPOINT=wss://api.voicelive.com/v1 + AZURE_VOICELIVE_API_VERSION=2025-10-01 + AZURE_VOICELIVE_MODEL=gpt-realtime + AZURE_VOICELIVE_VOICE=en-US-Ava:DragonHDLatestNeural + AZURE_VOICELIVE_INSTRUCTIONS=You are a helpful assistant. Keep your responses concise. + AZURE_VOICELIVE_AGENT_ID= + AZURE_VOICELIVE_PROJECT_NAME= + ``` + + You can copy the `.env_sample` file and fill in your values: + + ```bash + cp ../.env_sample ./.env + ``` + +## Running the samples + +### Quick Start: Basic Voice Assistant 🎤 + +For a complete voice conversation experience, start with the featured sample: + +```bash +python basic_voice_assistant_async.py +``` + +This sample demonstrates: + +- Real-time voice conversation with AI +- Automatic turn detection and interruption handling +- Full duplex audio streaming +- Robust error handling and reconnection + +See "BASIC_VOICE_ASSISTANT.md" for complete documentation. + +### Using Visual Studio Code + +1. Open the `azure-ai-voicelive` directory in VS Code +2. Configure your `.env` file as described above +3. Open the VS Code Run panel (Ctrl+Shift+D) +4. Select a sample configuration from the dropdown +5. Click the Run button or press F5 to run the sample in debug mode + +### From the command line + +Run any sample directly: + +```bash +python basic_voice_assistant_async.py +``` + +Most samples support additional command-line arguments. For example: + +```bash +python basic_voice_assistant_async.py --model gpt-4o-realtime-preview --voice alloy +``` + +For Azure authentication instead of API key: + +```bash +python basic_voice_assistant_async.py --use-token-credential +``` + +Use the `--help` flag to see all available options: + +```bash +python basic_voice_assistant_async.py --help +``` + +## Sample descriptions + +- **basic_voice_assistant_async.py**: 🌟 **[Featured Sample]** Complete async voice assistant demonstrating real-time conversation, interruption handling, and server VAD. Perfect starting point for voice applications. See "BASIC_VOICE_ASSISTANT.md" for detailed documentation. + +- **function_calling_sample_async.py**: Demonstrates async function calling capabilities with the VoiceLive SDK, showing how to define functions (`get_current_time`, `get_current_weather`), handle function calls from the AI model, and process results with audio playback. + +- **voice_assistant_w_proactive_greeting_async.py**: Shows two proactive greeting strategies: + 1. invoking a server-generated greeting by sending a `response.create` event; + 2. sending a pre-generated assistant greeting using a raw `response.create` event with `pre_generated_assistant_message`. + + **Tips** + + - Send the proactive event(s) immediately after `SESSION_UPDATED` and before starting microphone capture to avoid overlap if the user speaks early. + - If you might repeat the greeting (e.g. reconnect), guard with a flag like `self.conversation_started`. + +- **agent_voice_assistant_async.py**: Demonstrates integration with Azure AI Foundry agents, showing how to connect a voice assistant to an agent backend. This sample also demonstrates conversation logging to track user and agent interactions. + +## Troubleshooting + +- **PyAudio / PortAudio build errors** + - Linux: `sudo apt-get install -y portaudio19-dev libasound2-dev` + - macOS: `brew install portaudio` + - Windows: try `pip install pyaudio` + +- **No input/output devices** + Ensure your OS sees a microphone and speakers. On headless CI, you typically cannot run audio samples. + +- **WebSocket connection issues (1006/timeout)** + - Recheck `AZURE_VOICELIVE_ENDPOINT` + - Confirm your network allows WSS to the service + +- **Auth errors** + - For API key: confirm `AZURE_VOICELIVE_API_KEY` + - For AAD: ensure your identity has access to the resource + +## Next steps + +- Try the **Basic Voice Assistant** sample first, then explore the others for specific scenarios. +- Integrate the SDK into your own app by copying pieces from the samples (e.g., audio capture/playback or event handling loops). +- Visit the Azure SDK repo to see additional guidance, issues, and contributions. diff --git a/sdk/ai/azure-ai-voicelive/samples/agent_voice_assistant_async.py b/sdk/ai/azure-ai-voicelive/samples/agent_voice_assistant_async.py new file mode 100644 index 000000000000..10b24e302b86 --- /dev/null +++ b/sdk/ai/azure-ai-voicelive/samples/agent_voice_assistant_async.py @@ -0,0 +1,631 @@ +# pylint: disable=line-too-long,useless-suppression +#!/usr/bin/env python + +# ------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +# -------------------------------------------------------------------------- + +""" +FILE: agent_voice_assistant_async.py + +DESCRIPTION: + This sample demonstrates the fundamental capabilities of the VoiceLive SDK by creating + a basic voice assistant that can engage in natural conversation with proper interruption + handling. This serves as the foundational example that showcases the core value + proposition of unified speech-to-speech interaction. + + In this example a Foundry agent is used as the conversational AI backend. This example also + demonstrates how to collect a conversation log of user and agent interactions. + +USAGE: + python basic_voice_assistant_async.py + + Set the environment variables with your own values before running the sample: + 1) AZURE_VOICELIVE_API_KEY - The Azure VoiceLive API key + 2) AZURE_VOICELIVE_ENDPOINT - The Azure VoiceLive endpoint + + Or copy .env.template to .env and fill in your values. + +REQUIREMENTS: + - azure-ai-voicelive + - python-dotenv + - pyaudio (for audio capture and playback) +""" + +from __future__ import annotations +import os +import sys +import argparse +import asyncio +import base64 +from datetime import datetime +import logging +import queue +import signal +from typing import Union, Optional, TYPE_CHECKING, cast + +from azure.core.credentials import AzureKeyCredential +from azure.core.credentials_async import AsyncTokenCredential +from azure.identity.aio import AzureCliCredential, DefaultAzureCredential + +from azure.ai.voicelive.aio import connect +from azure.ai.voicelive.models import ( + AudioEchoCancellation, + AudioNoiseReduction, + AzureStandardVoice, + InputAudioFormat, + Modality, + OutputAudioFormat, + RequestSession, + ServerEventType, + ServerVad +) +from dotenv import load_dotenv +import pyaudio + +if TYPE_CHECKING: + # Only needed for type checking; avoids runtime import issues + from azure.ai.voicelive.aio import VoiceLiveConnection + +## Change to the directory where this script is located +os.chdir(os.path.dirname(os.path.abspath(__file__))) + +# Environment variable loading +load_dotenv('./.env', override=True) + +# Set up logging +## Add folder for logging +if not os.path.exists('logs'): + os.makedirs('logs') + +## Add timestamp for logfiles +timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") + +## Create conversation log filename +logfilename = f"{timestamp}_conversation.log" + +## Set up logging +logging.basicConfig( + filename=f'logs/{timestamp}_voicelive.log', + filemode="w", + format='%(asctime)s:%(name)s:%(levelname)s:%(message)s', + level=logging.INFO +) +logger = logging.getLogger(__name__) + +class AudioProcessor: + """ + Handles real-time audio capture and playback for the voice assistant. + + Threading Architecture: + - Main thread: Event loop and UI + - Capture thread: PyAudio input stream reading + - Send thread: Async audio data transmission to VoiceLive + - Playback thread: PyAudio output stream writing + """ + + loop: asyncio.AbstractEventLoop + + class AudioPlaybackPacket: + """Represents a packet that can be sent to the audio playback queue.""" + def __init__(self, seq_num: int, data: Optional[bytes]): + self.seq_num = seq_num + self.data = data + + def __init__(self, connection): + self.connection = connection + self.audio = pyaudio.PyAudio() + + # Audio configuration - PCM16, 24kHz, mono as specified + self.format = pyaudio.paInt16 + self.channels = 1 + self.rate = 24000 + self.chunk_size = 1200 # 50ms + + # Capture and playback state + self.input_stream = None + + self.playback_queue: queue.Queue[AudioProcessor.AudioPlaybackPacket] = queue.Queue() + self.playback_base = 0 + self.next_seq_num = 0 + self.output_stream: Optional[pyaudio.Stream] = None + + logger.info("AudioProcessor initialized with 24kHz PCM16 mono audio") + + def start_capture(self): + """Start capturing audio from microphone.""" + def _capture_callback( + in_data, # data + _frame_count, # number of frames + _time_info, # dictionary + _status_flags): + """Audio capture thread - runs in background.""" + audio_base64 = base64.b64encode(in_data).decode("utf-8") + asyncio.run_coroutine_threadsafe( + self.connection.input_audio_buffer.append(audio=audio_base64), self.loop + ) + return (None, pyaudio.paContinue) + + if self.input_stream: + return + + # Store the current event loop for use in threads + self.loop = asyncio.get_event_loop() + + try: + self.input_stream = self.audio.open( + format=self.format, + channels=self.channels, + rate=self.rate, + input=True, + frames_per_buffer=self.chunk_size, + stream_callback=_capture_callback, + ) + logger.info("Started audio capture") + + except Exception: + logger.exception("Failed to start audio capture") + raise + + def start_playback(self): + """Initialize audio playback system.""" + if self.output_stream: + return + + remaining = bytes() + def _playback_callback( + _in_data, + frame_count, # number of frames + _time_info, + _status_flags): + + nonlocal remaining + frame_count *= pyaudio.get_sample_size(pyaudio.paInt16) + + out = remaining[:frame_count] + remaining = remaining[frame_count:] + + while len(out) < frame_count: + try: + packet = self.playback_queue.get_nowait() + except queue.Empty: + out = out + bytes(frame_count - len(out)) + continue + except Exception: + logger.exception("Error in audio playback") + raise + + if not packet or not packet.data: + # None packet indicates end of stream + logger.info("End of playback queue.") + break + + if packet.seq_num < self.playback_base: + # skip requested + # ignore skipped packet and clear remaining + if len(remaining) > 0: + remaining = bytes() + continue + + num_to_take = frame_count - len(out) + out = out + packet.data[:num_to_take] + remaining = packet.data[num_to_take:] + + if len(out) >= frame_count: + return (out, pyaudio.paContinue) + else: + return (out, pyaudio.paComplete) + + try: + self.output_stream = self.audio.open( + format=self.format, + channels=self.channels, + rate=self.rate, + output=True, + frames_per_buffer=self.chunk_size, + stream_callback=_playback_callback + ) + logger.info("Audio playback system ready") + except Exception: + logger.exception("Failed to initialize audio playback") + raise + + def _get_and_increase_seq_num(self): + seq = self.next_seq_num + self.next_seq_num += 1 + return seq + + def queue_audio(self, audio_data: Optional[bytes]) -> None: + """Queue audio data for playback.""" + self.playback_queue.put( + AudioProcessor.AudioPlaybackPacket( + seq_num=self._get_and_increase_seq_num(), + data=audio_data)) + + def skip_pending_audio(self): + """Skip current audio in playback queue.""" + self.playback_base = self._get_and_increase_seq_num() + + def shutdown(self): + """Clean up audio resources.""" + if self.input_stream: + self.input_stream.stop_stream() + self.input_stream.close() + self.input_stream = None + + logger.info("Stopped audio capture") + + # Inform thread to complete + if self.output_stream: + self.skip_pending_audio() + self.queue_audio(None) + self.output_stream.stop_stream() + self.output_stream.close() + self.output_stream = None + + logger.info("Stopped audio playback") + + if self.audio: + self.audio.terminate() + + logger.info("Audio processor cleaned up") + +class BasicVoiceAssistant: + """ + Basic voice assistant implementing the VoiceLive SDK patterns with Foundry Agent. + This sample also demonstrates how to collect a conversation log of user and agent interactions. + """ + + + def __init__( + self, + endpoint: str, + credential: Union[AzureKeyCredential, AsyncTokenCredential], + agent_id: str, + foundry_project_name: str, + voice: str, + ): + + self.endpoint = endpoint + self.credential = credential + self.agent_id = agent_id + self.foundry_project_name = foundry_project_name + self.voice = voice + self.connection: Optional["VoiceLiveConnection"] = None + self.audio_processor: Optional[AudioProcessor] = None + self.session_ready = False + self.conversation_started = False + self._active_response = False + self._response_api_done = False + + async def start(self): + """Start the voice assistant session.""" + try: + logger.info("Connecting to VoiceLive API with Foundry agent connection %s for project %s", self.agent_id, self.foundry_project_name) + + # Get agent access token + agent_access_token = (await DefaultAzureCredential().get_token("https://ai.azure.com/.default")).token + logger.info("Obtained agent access token") + + # Connect to VoiceLive WebSocket API + async with connect( + endpoint=self.endpoint, + credential=self.credential, + query={ + "agent-id": self.agent_id, + "agent-project-name": self.foundry_project_name, + "agent-access-token": agent_access_token + }, + ) as connection: + conn = connection + self.connection = conn + + # Initialize audio processor + ap = AudioProcessor(conn) + self.audio_processor = ap + + # Configure session for voice conversation + await self._setup_session() + + # Start audio systems + ap.start_playback() + + logger.info("Voice assistant ready! Start speaking...") + print("\n" + "=" * 60) + print("🎤 VOICE ASSISTANT READY") + print("Start speaking to begin conversation") + print("Press Ctrl+C to exit") + print("=" * 60 + "\n") + + # Process events + await self._process_events() + finally: + if self.audio_processor: + self.audio_processor.shutdown() + + async def _setup_session(self): + """Configure the VoiceLive session for audio conversation.""" + logger.info("Setting up voice conversation session...") + + # Create voice configuration + voice_config: Union[AzureStandardVoice, str] + if self.voice.startswith("en-US-") or self.voice.startswith("en-CA-") or "-" in self.voice: + # Azure voice + voice_config = AzureStandardVoice(name=self.voice) + else: + # OpenAI voice (alloy, echo, fable, onyx, nova, shimmer) + voice_config = self.voice + + # Create turn detection configuration + turn_detection_config = ServerVad( + threshold=0.5, + prefix_padding_ms=300, + silence_duration_ms=500) + + # Create session configuration + session_config = RequestSession( + modalities=[Modality.TEXT, Modality.AUDIO], + voice=voice_config, + input_audio_format=InputAudioFormat.PCM16, + output_audio_format=OutputAudioFormat.PCM16, + turn_detection=turn_detection_config, + input_audio_echo_cancellation=AudioEchoCancellation(), + input_audio_noise_reduction=AudioNoiseReduction(type="azure_deep_noise_suppression"), + ) + + conn = self.connection + assert conn is not None, "Connection must be established before setting up session" + await conn.session.update(session=session_config) + + logger.info("Session configuration sent") + + async def _process_events(self): + """Process events from the VoiceLive connection.""" + try: + conn = self.connection + assert conn is not None, "Connection must be established before processing events" + async for event in conn: + await self._handle_event(event) + except Exception: + logger.exception("Error processing events") + raise + + async def _handle_event(self, event): + """Handle different types of events from VoiceLive.""" + logger.debug("Received event: %s", event.type) + ap = self.audio_processor + conn = self.connection + assert ap is not None, "AudioProcessor must be initialized" + assert conn is not None, "Connection must be established" + + if event.type == ServerEventType.SESSION_UPDATED: + logger.info("Session ready: %s", event.session.id) + await write_conversation_log(f"SessionID: {event.session.id}") + await write_conversation_log(f"Model: {event.session.model}") + await write_conversation_log(f"Voice: {event.session.voice}") + await write_conversation_log(f"Instructions: {event.session.instructions}") + await write_conversation_log(f"") + self.session_ready = True + + # Invoke Proactive greeting + if not self.conversation_started: + self.conversation_started = True + logger.info("Sending proactive greeting request") + try: + await conn.response.create() + + except Exception: + logger.exception("Failed to send proactive greeting request") + + # Start audio capture once session is ready + ap.start_capture() + + elif event.type == ServerEventType.CONVERSATION_ITEM_INPUT_AUDIO_TRANSCRIPTION_COMPLETED: + print(f'👤 You said:\t{event.get("transcript", "")}') + await write_conversation_log(f'User Input:\t{event.get("transcript", "")}') + + elif event.type == ServerEventType.RESPONSE_TEXT_DONE: + print(f'🤖 Agent responded with text:\t{event.get("text", "")}') + await write_conversation_log(f'Agent Text Response:\t{event.get("text", "")}') + + elif event.type == ServerEventType.RESPONSE_AUDIO_TRANSCRIPT_DONE: + print(f'🤖 Agent responded with audio transcript:\t{event.get("transcript", "")}') + await write_conversation_log(f'Agent Audio Response:\t{event.get("transcript", "")}') + + elif event.type == ServerEventType.INPUT_AUDIO_BUFFER_SPEECH_STARTED: + logger.info("User started speaking - stopping playback") + print("🎤 Listening...") + + ap.skip_pending_audio() + + # Only cancel if response is active and not already done + if self._active_response and not self._response_api_done: + try: + await conn.response.cancel() + logger.debug("Cancelled in-progress response due to barge-in") + except Exception as e: + if "no active response" in str(e).lower(): + logger.debug("Cancel ignored - response already completed") + else: + logger.warning("Cancel failed: %s", e) + + elif event.type == ServerEventType.INPUT_AUDIO_BUFFER_SPEECH_STOPPED: + logger.info("🎤 User stopped speaking") + print("🤔 Processing...") + + elif event.type == ServerEventType.RESPONSE_CREATED: + logger.info("🤖 Assistant response created") + self._active_response = True + self._response_api_done = False + + elif event.type == ServerEventType.RESPONSE_AUDIO_DELTA: + logger.debug("Received audio delta") + ap.queue_audio(event.delta) + + elif event.type == ServerEventType.RESPONSE_AUDIO_DONE: + logger.info("🤖 Assistant finished speaking") + print("🎤 Ready for next input...") + + elif event.type == ServerEventType.RESPONSE_DONE: + logger.info("✅ Response complete") + self._active_response = False + self._response_api_done = True + + elif event.type == ServerEventType.ERROR: + msg = event.error.message + if "Cancellation failed: no active response" in msg: + logger.debug("Benign cancellation error: %s", msg) + else: + logger.error("❌ VoiceLive error: %s", msg) + print(f"Error: {msg}") + + elif event.type == ServerEventType.CONVERSATION_ITEM_CREATED: + logger.debug("Conversation item created: %s", event.item.id) + + else: + logger.debug("Unhandled event type: %s", event.type) + +async def write_conversation_log(message: str) -> None: + """Write a message to the conversation log.""" + def _write_to_file(): + with open(f'logs/{logfilename}', 'a', encoding='utf-8') as conversation_log: + conversation_log.write(message + "\n") + + await asyncio.to_thread(_write_to_file) + +def parse_arguments(): + """Parse command line arguments.""" + parser = argparse.ArgumentParser( + description="Basic Voice Assistant using Azure VoiceLive SDK", + formatter_class=argparse.ArgumentDefaultsHelpFormatter, + ) + + parser.add_argument( + "--api-key", + help="Azure VoiceLive API key. If not provided, will use AZURE_VOICELIVE_API_KEY environment variable.", + type=str, + default=os.environ.get("AZURE_VOICELIVE_API_KEY"), + ) + + parser.add_argument( + "--endpoint", + help="Azure VoiceLive endpoint", + type=str, + default=os.environ.get("AZURE_VOICELIVE_ENDPOINT", "https://your-resource-name.services.ai.azure.com/"), + ) + + parser.add_argument( + "--agent_id", + help="Foundry agent ID to use", + type=str, + default=os.environ.get("AZURE_VOICELIVE_AGENT_ID", ""), + ) + + parser.add_argument( + "--foundry_project_name", + help="Foundry project name to use", + type=str, + default=os.environ.get("AZURE_VOICELIVE_PROJECT_NAME", ""), + ) + + parser.add_argument( + "--voice", + help="Voice to use for the assistant. E.g. alloy, echo, fable, en-US-AvaNeural, en-US-GuyNeural", + type=str, + default=os.environ.get("AZURE_VOICELIVE_VOICE", "en-US-Ava:DragonHDLatestNeural"), + ) + + parser.add_argument( + "--use-token-credential", help="Use Azure token credential instead of API key", action="store_true", default=True + ) + + parser.add_argument("--verbose", help="Enable verbose logging", action="store_true") + + return parser.parse_args() + + +def main(): + """Main function.""" + args = parse_arguments() + + # Set logging level + if args.verbose: + logging.getLogger().setLevel(logging.DEBUG) + + # Validate credentials + if not args.api_key and not args.use_token_credential: + print("❌ Error: No authentication provided") + print("Please provide an API key using --api-key or set AZURE_VOICELIVE_API_KEY environment variable,") + print("or use --use-token-credential for Azure authentication.") + sys.exit(1) + + # Create client with appropriate credential + credential: Union[AzureKeyCredential, AsyncTokenCredential] + if args.use_token_credential: + credential = AzureCliCredential() # or DefaultAzureCredential() if needed + logger.info("Using Azure token credential") + else: + credential = AzureKeyCredential(args.api_key) + logger.info("Using API key credential") + + # Create and start voice assistant + assistant = BasicVoiceAssistant( + endpoint=args.endpoint, + credential=credential, + agent_id=args.agent_id, + foundry_project_name=args.foundry_project_name, + voice=args.voice, + ) + + # Setup signal handlers for graceful shutdown + def signal_handler(_sig, _frame): + logger.info("Received shutdown signal") + raise KeyboardInterrupt() + + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + # Start the assistant + try: + asyncio.run(assistant.start()) + except KeyboardInterrupt: + print("\n👋 Voice assistant shut down. Goodbye!") + except Exception as e: + print("Fatal Error: ", e) + +if __name__ == "__main__": + # Check audio system + try: + p = pyaudio.PyAudio() + # Check for input devices + input_devices = [ + i + for i in range(p.get_device_count()) + if cast(Union[int, float], p.get_device_info_by_index(i).get("maxInputChannels", 0) or 0) > 0 + ] + # Check for output devices + output_devices = [ + i + for i in range(p.get_device_count()) + if cast(Union[int, float], p.get_device_info_by_index(i).get("maxOutputChannels", 0) or 0) > 0 + ] + p.terminate() + + if not input_devices: + print("❌ No audio input devices found. Please check your microphone.") + sys.exit(1) + if not output_devices: + print("❌ No audio output devices found. Please check your speakers.") + sys.exit(1) + + except Exception as e: + print(f"❌ Audio system check failed: {e}") + sys.exit(1) + + print("🎙️ Basic Voice Assistant with Azure VoiceLive SDK") + print("=" * 50) + + # Run the assistant + main() diff --git a/sdk/ai/azure-ai-voicelive/samples/async_function_calling_sample.py b/sdk/ai/azure-ai-voicelive/samples/async_function_calling_sample.py deleted file mode 100644 index 4faf55a22a3b..000000000000 --- a/sdk/ai/azure-ai-voicelive/samples/async_function_calling_sample.py +++ /dev/null @@ -1,808 +0,0 @@ -# pylint: disable=line-too-long,useless-suppression -#!/usr/bin/env python - -# ------------------------------------------------------------------------- -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. See License.txt in the project root for -# license information. -# -------------------------------------------------------------------------- - -""" -FILE: async_function_calling_sample.py - -DESCRIPTION: - This sample demonstrates how to use the Azure AI Voice Live SDK asynchronously - with function calling capabilities. It shows how to define functions, - handle function calls from the AI model, and process the results. - -USAGE: - python async_function_calling_sample.py - - Set the environment variables with your own values before running the sample: - 1) AZURE_VOICELIVE_API_KEY - The Azure VoiceLive API key - 2) AZURE_VOICELIVE_ENDPOINT - The Azure VoiceLive endpoint - -REQUIREMENTS: - - azure-ai-voicelive - - python-dotenv - - pyaudio (for audio capture and playback) -""" - -import os -import sys -import asyncio -import json -import datetime -import logging -import base64 -import signal -import threading -import queue -from typing import Union, Optional, Dict, Any, Mapping, Callable, TYPE_CHECKING, cast -from concurrent.futures import ThreadPoolExecutor - -# Audio processing imports -try: - import pyaudio -except ImportError: - print("This sample requires pyaudio. Install with: pip install pyaudio") - sys.exit(1) - -# Environment variable loading -try: - from dotenv import load_dotenv - - load_dotenv() -except ImportError: - print("Note: python-dotenv not installed. Using existing environment variables.") - -# Azure VoiceLive SDK imports -from azure.core.credentials import AzureKeyCredential -from azure.core.credentials_async import AsyncTokenCredential -from azure.ai.voicelive.aio import connect -from azure.ai.voicelive.models import ( - RequestSession, - ServerEventType, - ServerVad, - AudioEchoCancellation, - AzureStandardVoice, - Modality, - InputAudioFormat, - OutputAudioFormat, - FunctionTool, - FunctionCallItem, - FunctionCallOutputItem, - ItemType, - ToolChoiceLiteral, - AudioInputTranscriptionOptions, - ResponseFunctionCallItem, - ServerEventConversationItemCreated, - ServerEventResponseFunctionCallArgumentsDone, - ServerEventResponseCreated, - Tool, -) - -# Set up logging -logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s") -logger = logging.getLogger(__name__) - - -async def _wait_for_event(conn, wanted_types: set, timeout_s: float = 10.0): - """Wait until we receive any event whose type is in wanted_types.""" - - async def _next(): - while True: - evt = await conn.recv() - if evt.type in wanted_types: - return evt - - return await asyncio.wait_for(_next(), timeout=timeout_s) - - -async def _wait_for_match( - conn, - predicate: Callable[[Any], bool], - timeout_s: float = 10.0, -): - """Wait until we receive an event that satisfies the given predicate.""" - - async def _next(): - while True: - evt = await conn.recv() - if predicate(evt): - return evt - - return await asyncio.wait_for(_next(), timeout=timeout_s) - - -class AudioProcessor: - """ - Handles real-time audio capture and playback for the voice assistant. - - Responsibilities: - - Captures audio input from the microphone using PyAudio. - - Plays back audio output using PyAudio. - - Manages threading for audio capture, sending, and playback. - - Uses queues to buffer audio data between threads. - """ - - def __init__(self, connection): - self.connection = connection - self.audio = pyaudio.PyAudio() - - # Audio configuration - PCM16, 24kHz, mono as specified - self.format = pyaudio.paInt16 - self.channels = 1 - self.rate = 24000 - self.chunk_size = 1024 - - # Capture and playback state - self.is_capturing = False - self.is_playing = False - self.input_stream = None - self.output_stream = None - - # Audio queues and threading - self.audio_queue: "queue.Queue[bytes]" = queue.Queue() - self.audio_send_queue: "queue.Queue[str]" = queue.Queue() # base64 audio to send - self.executor = ThreadPoolExecutor(max_workers=3) - self.capture_thread: Optional[threading.Thread] = None - self.playback_thread: Optional[threading.Thread] = None - self.send_thread: Optional[threading.Thread] = None - self.loop: Optional[asyncio.AbstractEventLoop] = None # Store the event loop - - logger.info("AudioProcessor initialized with 24kHz PCM16 mono audio") - - async def start_capture(self): - """Start capturing audio from microphone.""" - if self.is_capturing: - return - - # Store the current event loop for use in threads - self.loop = asyncio.get_event_loop() - - self.is_capturing = True - - try: - self.input_stream = self.audio.open( - format=self.format, - channels=self.channels, - rate=self.rate, - input=True, - frames_per_buffer=self.chunk_size, - stream_callback=None, - ) - - self.input_stream.start_stream() - - # Start capture thread - self.capture_thread = threading.Thread(target=self._capture_audio_thread) - self.capture_thread.daemon = True - self.capture_thread.start() - - # Start audio send thread - self.send_thread = threading.Thread(target=self._send_audio_thread) - self.send_thread.daemon = True - self.send_thread.start() - - logger.info("Started audio capture") - - except Exception as e: - logger.error(f"Failed to start audio capture: {e}") - self.is_capturing = False - raise - - def _capture_audio_thread(self): - """Audio capture thread - runs in background.""" - while self.is_capturing and self.input_stream: - try: - # Read audio data - audio_data = self.input_stream.read(self.chunk_size, exception_on_overflow=False) - - if audio_data and self.is_capturing: - # Convert to base64 and queue for sending - audio_base64 = base64.b64encode(audio_data).decode("utf-8") - self.audio_send_queue.put(audio_base64) - - except Exception as e: - if self.is_capturing: - logger.error(f"Error in audio capture: {e}") - break - - def _send_audio_thread(self): - """Audio send thread - handles async operations from sync thread.""" - while self.is_capturing: - try: - # Get audio data from queue (blocking with timeout) - audio_base64 = self.audio_send_queue.get(timeout=0.1) - - if audio_base64 and self.is_capturing and self.loop: - # Schedule the async send operation in the main event loop - future = asyncio.run_coroutine_threadsafe( - self.connection.input_audio_buffer.append(audio=audio_base64), self.loop - ) - # Don't wait for completion to avoid blocking - - except queue.Empty: - continue - except Exception as e: - if self.is_capturing: - logger.error(f"Error sending audio: {e}") - break - - async def stop_capture(self): - """Stop capturing audio.""" - if not self.is_capturing: - return - - self.is_capturing = False - - if self.input_stream: - self.input_stream.stop_stream() - self.input_stream.close() - self.input_stream = None - - if self.capture_thread: - self.capture_thread.join(timeout=1.0) - - if self.send_thread: - self.send_thread.join(timeout=1.0) - - # Clear the send queue - while not self.audio_send_queue.empty(): - try: - self.audio_send_queue.get_nowait() - except queue.Empty: - break - - logger.info("Stopped audio capture") - - async def start_playback(self): - """Initialize audio playback system.""" - if self.is_playing: - return - - self.is_playing = True - - try: - self.output_stream = self.audio.open( - format=self.format, - channels=self.channels, - rate=self.rate, - output=True, - frames_per_buffer=self.chunk_size, - ) - - # Start playback thread - self.playback_thread = threading.Thread(target=self._playback_audio_thread) - self.playback_thread.daemon = True - self.playback_thread.start() - - logger.info("Audio playback system ready") - - except Exception as e: - logger.error(f"Failed to initialize audio playback: {e}") - self.is_playing = False - raise - - def _playback_audio_thread(self): - """Audio playback thread - runs in background.""" - while self.is_playing: - try: - # Get audio data from queue (blocking with timeout) - audio_data = self.audio_queue.get(timeout=0.1) - - if audio_data and self.output_stream and self.is_playing: - self.output_stream.write(audio_data) - - except queue.Empty: - continue - except Exception as e: - if self.is_playing: - logger.error(f"Error in audio playback: {e}") - break - - async def queue_audio(self, audio_data: bytes): - """Queue audio data for playback.""" - if self.is_playing: - self.audio_queue.put(audio_data) - - async def stop_playback(self): - """Stop audio playback and clear queue.""" - if not self.is_playing: - return - - self.is_playing = False - - # Clear the queue - while not self.audio_queue.empty(): - try: - self.audio_queue.get_nowait() - except queue.Empty: - break - - if self.output_stream: - self.output_stream.stop_stream() - self.output_stream.close() - self.output_stream = None - - if self.playback_thread: - self.playback_thread.join(timeout=1.0) - - logger.info("Stopped audio playback") - - async def cleanup(self): - """Clean up audio resources.""" - await self.stop_capture() - await self.stop_playback() - - if self.audio: - self.audio.terminate() - - self.executor.shutdown(wait=True) - logger.info("Audio processor cleaned up") - - -class AsyncFunctionCallingClient: - """Async client for Azure Voice Live API with function calling capabilities and audio input.""" - - def __init__( - self, - endpoint: str, - credential: Union[AzureKeyCredential, AsyncTokenCredential], - model: str, - voice: str, - instructions: str, - ): - self.endpoint = endpoint - self.credential = credential - self.model = model - self.voice = voice - self.instructions = instructions - self.session_id: Optional[str] = None - self.function_call_in_progress: bool = False - self.active_call_id: Optional[str] = None - self.audio_processor: Optional[AudioProcessor] = None - self.session_ready: bool = False - - # Define available functions - self.available_functions: Dict[str, Callable[[Union[str, Mapping[str, Any]]], Mapping[str, Any]]] = { - "get_current_time": self.get_current_time, - "get_current_weather": self.get_current_weather, - } - - async def run(self): - """Run the async function calling client with audio input.""" - try: - logger.info(f"Connecting to VoiceLive API with model {self.model}") - - # Connect to VoiceLive WebSocket API asynchronously - async with connect( - endpoint=self.endpoint, - credential=self.credential, - model=self.model, - ) as connection: - # Initialize audio processor - self.audio_processor = AudioProcessor(connection) - - # Configure session with function tools - await self._setup_session(connection) - - # Start audio playback system - await self.audio_processor.start_playback() - - logger.info("Voice assistant with function calling ready! Start speaking...") - print("\n" + "=" * 70) - print("🎤 VOICE ASSISTANT WITH FUNCTION CALLING READY") - print("Try saying:") - print(" • 'What's the current time?'") - print(" • 'What's the weather in Seattle?'") - print(" • 'What time is it in UTC?'") - print("Press Ctrl+C to exit") - print("=" * 70 + "\n") - - # Process events asynchronously - await self._process_events(connection) - - except KeyboardInterrupt: - logger.info("Received interrupt signal, shutting down...") - except Exception as e: - logger.error(f"Connection error: {e}") - raise - finally: - # Cleanup audio processor - if self.audio_processor: - await self.audio_processor.cleanup() - - async def _setup_session(self, connection): - """Configure the VoiceLive session with function tools asynchronously.""" - logger.info("Setting up voice conversation session with function tools...") - - # Create voice configuration - voice_config = AzureStandardVoice(name=self.voice) - - # Create turn detection configuration - turn_detection_config = ServerVad(threshold=0.5, prefix_padding_ms=300, silence_duration_ms=500) - - # Define available function tools - function_tools: list[Tool] = [ - FunctionTool( - name="get_current_time", - description="Get the current time", - parameters={ - "type": "object", - "properties": { - "timezone": { - "type": "string", - "description": "The timezone to get the current time for, e.g., 'UTC', 'local'", - } - }, - "required": [], - }, - ), - FunctionTool( - name="get_current_weather", - description="Get the current weather in a given location", - parameters={ - "type": "object", - "properties": { - "location": { - "type": "string", - "description": "The city and state, e.g., 'San Francisco, CA'", - }, - "unit": { - "type": "string", - "enum": ["celsius", "fahrenheit"], - "description": "The unit of temperature to use (celsius or fahrenheit)", - }, - }, - "required": ["location"], - }, - ), - ] - - # Create session configuration with function tools - session_config = RequestSession( - modalities=[Modality.TEXT, Modality.AUDIO], - instructions=self.instructions, - voice=voice_config, - input_audio_format=InputAudioFormat.PCM16, - output_audio_format=OutputAudioFormat.PCM16, - input_audio_echo_cancellation=AudioEchoCancellation(), - turn_detection=turn_detection_config, - tools=function_tools, - tool_choice=ToolChoiceLiteral.AUTO, # Let the model decide when to call functions - input_audio_transcription=AudioInputTranscriptionOptions(model="whisper-1"), - ) - - # Send session configuration asynchronously - await connection.session.update(session=session_config) - logger.info("Session configuration with function tools sent") - - async def _process_events(self, connection): - """Process events from the VoiceLive connection asynchronously.""" - try: - async for event in connection: - await self._handle_event(event, connection) - except KeyboardInterrupt: - logger.info("Event processing interrupted") - except Exception as e: - logger.error(f"Error processing events: {e}") - raise - - async def _handle_event(self, event, connection): - """Handle different types of events from VoiceLive asynchronously.""" - ap = self.audio_processor - assert ap is not None, "AudioProcessor must be initialized" - - if event.type == ServerEventType.SESSION_UPDATED: - self.session_id = event.session.id - logger.info(f"Session ready: {self.session_id}") - self.session_ready = True - - # Start audio capture once session is ready - await ap.start_capture() - print("🎤 Ready for voice input! Try asking about time or weather with your location...") - - elif event.type == ServerEventType.INPUT_AUDIO_BUFFER_SPEECH_STARTED: - logger.info("🎤 User started speaking - stopping playback") - print("🎤 Listening...") - - # Stop current assistant audio playback (interruption handling) - await ap.stop_playback() - - # Cancel any ongoing response - try: - await connection.response.cancel() - except Exception as e: - logger.debug(f"No response to cancel: {e}") - - elif event.type == ServerEventType.INPUT_AUDIO_BUFFER_SPEECH_STOPPED: - logger.info("🎤 User stopped speaking") - print("🤔 Processing...") - - # Restart playback system for response - await ap.start_playback() - - elif event.type == ServerEventType.RESPONSE_CREATED: - logger.info("🤖 Assistant response created") - - elif event.type == ServerEventType.RESPONSE_TEXT_DELTA: - logger.info(f"Text response: {event.delta}") - - elif event.type == ServerEventType.RESPONSE_AUDIO_DELTA: - # Stream audio response to speakers - logger.debug("Received audio delta") - await ap.queue_audio(event.delta) - - elif event.type == ServerEventType.RESPONSE_AUDIO_DONE: - logger.info("🤖 Assistant finished speaking") - print("🎤 Ready for next input...") - - elif event.type == ServerEventType.RESPONSE_DONE: - logger.info("✅ Response complete") - self.function_call_in_progress = False - self.active_call_id = None - - elif event.type == ServerEventType.ERROR: - logger.error(f"❌ VoiceLive error: {event.error.message}") - print(f"Error: {event.error.message}") - - elif event.type == ServerEventType.CONVERSATION_ITEM_CREATED: - logger.info(f"Conversation item created: {event.item.id}") - - # Check if it's a function call item using the improved pattern from the test - if event.item.type == ItemType.FUNCTION_CALL: - print(f"🔧 Calling function: {event.item.name}") - await self._handle_function_call_with_improved_pattern(event, connection) - - async def _handle_function_call_with_improved_pattern(self, conversation_created_event, connection): - """Handle function call using the improved pattern from the test.""" - # Validate the event structure - if not isinstance(conversation_created_event, ServerEventConversationItemCreated): - logger.error("Expected ServerEventConversationItemCreated") - return - - if not isinstance(conversation_created_event.item, ResponseFunctionCallItem): - logger.error("Expected ResponseFunctionCallItem") - return - - function_call_item = conversation_created_event.item - function_name = function_call_item.name - call_id = function_call_item.call_id - previous_item_id = function_call_item.id - - logger.info(f"Function call detected: {function_name} with call_id: {call_id}") - - try: - # Set tracking variables - self.function_call_in_progress = True - self.active_call_id = call_id - - # Wait for the function arguments to be complete - function_done = await _wait_for_event(connection, {ServerEventType.RESPONSE_FUNCTION_CALL_ARGUMENTS_DONE}) - - if not isinstance(function_done, ServerEventResponseFunctionCallArgumentsDone): - logger.error("Expected ServerEventResponseFunctionCallArgumentsDone") - return - - if function_done.call_id != call_id: - logger.warning(f"Call ID mismatch: expected {call_id}, got {function_done.call_id}") - return - - arguments = function_done.arguments - logger.info(f"Function arguments received: {arguments}") - - # Wait for response to be done before proceeding - await _wait_for_event(connection, {ServerEventType.RESPONSE_DONE}) - - # Execute the function if we have it - if function_name in self.available_functions: - logger.info(f"Executing function: {function_name}") - result = self.available_functions[function_name](arguments) - - # Create function call output item - function_output = FunctionCallOutputItem(call_id=call_id, output=json.dumps(result)) - - # Send the result back to the conversation with proper previous_item_id - await connection.conversation.item.create(previous_item_id=previous_item_id, item=function_output) - logger.info(f"Function result sent: {result}") - - # Create a new response to process the function result - await connection.response.create() - - # Wait for the final response - response = await _wait_for_match( - connection, - lambda e: e.type == ServerEventType.RESPONSE_OUTPUT_ITEM_DONE - and hasattr(e, "item") - and e.item.id != previous_item_id, - ) - - if hasattr(response, "item") and hasattr(response.item, "content") and response.item.content: - if hasattr(response.item.content[0], "transcript"): - transcript = response.item.content[0].transcript - logger.info(f"Final response transcript: {transcript}") - - else: - logger.error(f"Unknown function: {function_name}") - - except asyncio.TimeoutError: - logger.error(f"Timeout waiting for function call completion for {function_name}") - except Exception as e: - logger.error(f"Error executing function {function_name}: {e}") - finally: - self.function_call_in_progress = False - self.active_call_id = None - - def get_current_time(self, arguments: Optional[Union[str, Mapping[str, Any]]] = None) -> Dict[str, Any]: - """Get the current time.""" - # Parse arguments if provided as string - if isinstance(arguments, str): - try: - args = json.loads(arguments) - except json.JSONDecodeError: - args = {} - elif isinstance(arguments, dict): - args = arguments - else: - args = {} - - timezone = args.get("timezone", "local") - now = datetime.datetime.now() - - if timezone.lower() == "utc": - now = datetime.datetime.now(datetime.timezone.utc) - timezone_name = "UTC" - else: - timezone_name = "local" - - formatted_time = now.strftime("%I:%M:%S %p") - formatted_date = now.strftime("%A, %B %d, %Y") - - return {"time": formatted_time, "date": formatted_date, "timezone": timezone_name} - - def get_current_weather(self, arguments: Union[str, Mapping[str, Any]]): - """Get the current weather for a location.""" - # Parse arguments if provided as string - if isinstance(arguments, str): - try: - args = json.loads(arguments) - except json.JSONDecodeError: - logger.error(f"Failed to parse weather arguments: {arguments}") - return {"error": "Invalid arguments"} - elif isinstance(arguments, dict): - args = arguments - else: - return {"error": "No arguments provided"} - - location = args.get("location", "Unknown") - unit = args.get("unit", "celsius") - - # In a real application, you would call a weather API - # This is a simulated response similar to the test - try: - weather_data = { - "location": location, - "temperature": 22 if unit == "celsius" else 72, - "unit": unit, - "condition": "Partly Cloudy", - "humidity": 65, - "wind_speed": 10, - } - - return weather_data - - except Exception as e: - logger.error(f"Error getting weather: {e}") - return {"error": str(e)} - - -async def main(): - """Main async function.""" - # Get credentials from environment variables - api_key = os.environ.get("AZURE_VOICELIVE_API_KEY") - endpoint = os.environ.get("AZURE_VOICELIVE_ENDPOINT", "wss://api.voicelive.com/v1") - - if not api_key: - print("❌ Error: No API key provided") - print("Please set the AZURE_VOICELIVE_API_KEY environment variable.") - sys.exit(1) - - # Option 1: API key authentication (simple, recommended for quick start) - credential = AzureKeyCredential(api_key) - - # Option 2: Async AAD authentication (requires azure-identity) - # from azure.identity.aio import AzureCliCredential, DefaultAzureCredential - # credential = DefaultAzureCredential() or AzureCliCredential() - # - # 👉 Use this if you prefer AAD/MSAL-based auth. - # It will look for environment variables like: - # AZURE_CLIENT_ID, AZURE_TENANT_ID, AZURE_CLIENT_SECRET - # or fall back to managed identity if running in Azure. - - # Create and run the client - client = AsyncFunctionCallingClient( - endpoint=endpoint, - credential=credential, - model="gpt-4o-realtime-preview", - voice="en-US-AvaNeural", - instructions="You are a helpful AI assistant with access to functions. " - "Use the functions when appropriate to provide accurate, real-time information. " - "If you are asked about the weather, please respond with 'I will get the weather for you. Please wait a moment.' and then call the get_current_weather function. " - "If you are asked about the time, please respond with 'I will get the time for you. Please wait a moment.' and then call the get_current_time function. " - "Explain when you're using a function and include the results in your response naturally.", - ) - - # Setup signal handlers for graceful shutdown - def signal_handler(sig, frame): - logger.info("Received shutdown signal") - raise KeyboardInterrupt() - - signal.signal(signal.SIGINT, signal_handler) - signal.signal(signal.SIGTERM, signal_handler) - - try: - await client.run() - except KeyboardInterrupt: - print("\n👋 Voice Live function calling client shut down.") - except Exception as e: - logger.error(f"Error: {e}") - sys.exit(1) - - -if __name__ == "__main__": - # Check for required dependencies - dependencies = { - "pyaudio": "Audio processing", - "azure.ai.voicelive": "Azure VoiceLive SDK", - "azure.core": "Azure Core libraries", - } - - missing_deps = [] - for dep, description in dependencies.items(): - try: - __import__(dep.replace("-", "_")) - except ImportError: - missing_deps.append(f"{dep} ({description})") - - if missing_deps: - print("❌ Missing required dependencies:") - for dep in missing_deps: - print(f" - {dep}") - print("\nInstall with: pip install azure-ai-voicelive pyaudio python-dotenv") - sys.exit(1) - - # Check audio system - try: - p = pyaudio.PyAudio() - # Check for input devices - input_devices = [ - i - for i in range(p.get_device_count()) - if cast(Union[int, float], p.get_device_info_by_index(i).get("maxInputChannels", 0) or 0) > 0 - ] - # Check for output devices - output_devices = [ - i - for i in range(p.get_device_count()) - if cast(Union[int, float], p.get_device_info_by_index(i).get("maxOutputChannels", 0) or 0) > 0 - ] - p.terminate() - - if not input_devices: - print("❌ No audio input devices found. Please check your microphone.") - sys.exit(1) - if not output_devices: - print("❌ No audio output devices found. Please check your speakers.") - sys.exit(1) - - except Exception as e: - print(f"❌ Audio system check failed: {e}") - sys.exit(1) - - print("🎙️ Voice Assistant with Function Calling - Azure VoiceLive SDK") - print("=" * 65) - - # Run the async main function - asyncio.run(main()) diff --git a/sdk/ai/azure-ai-voicelive/samples/basic_voice_assistant_async.py b/sdk/ai/azure-ai-voicelive/samples/basic_voice_assistant_async.py index 897d629f892b..67fa3e8b03a3 100644 --- a/sdk/ai/azure-ai-voicelive/samples/basic_voice_assistant_async.py +++ b/sdk/ai/azure-ai-voicelive/samples/basic_voice_assistant_async.py @@ -1,571 +1,583 @@ -# pylint: disable=line-too-long,useless-suppression -#!/usr/bin/env python - -# ------------------------------------------------------------------------- -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. See License.txt in the project root for -# license information. -# -------------------------------------------------------------------------- - -""" -FILE: basic_voice_assistant_async.py - -DESCRIPTION: - This sample demonstrates the fundamental capabilities of the VoiceLive SDK by creating - a basic voice assistant that can engage in natural conversation with proper interruption - handling. This serves as the foundational example that showcases the core value - proposition of unified speech-to-speech interaction. - -USAGE: - python basic_voice_assistant_async.py - - Set the environment variables with your own values before running the sample: - 1) AZURE_VOICELIVE_API_KEY - The Azure VoiceLive API key - 2) AZURE_VOICELIVE_ENDPOINT - The Azure VoiceLive endpoint - - Or copy .env.template to .env and fill in your values. - -REQUIREMENTS: - - azure-ai-voicelive - - python-dotenv - - pyaudio (for audio capture and playback) -""" - -from __future__ import annotations -import os -import sys -import argparse -import asyncio -import base64 -from datetime import datetime -import logging -import queue -import signal -from typing import Union, Optional, TYPE_CHECKING, cast - -from azure.core.credentials import AzureKeyCredential -from azure.core.credentials_async import AsyncTokenCredential -from azure.identity.aio import AzureCliCredential, DefaultAzureCredential - -from azure.ai.voicelive.aio import connect -from azure.ai.voicelive.models import ( - AudioEchoCancellation, - AudioNoiseReduction, - AzureStandardVoice, - InputAudioFormat, - Modality, - OutputAudioFormat, - RequestSession, - ServerEventType, - ServerVad -) -from dotenv import load_dotenv -import pyaudio - -if TYPE_CHECKING: - # Only needed for type checking; avoids runtime import issues - from azure.ai.voicelive.aio import VoiceLiveConnection - -## Change to the directory where this script is located -os.chdir(os.path.dirname(os.path.abspath(__file__))) - -# Environment variable loading -load_dotenv('./.env', override=True) - -# Set up logging -## Add folder for logging -if not os.path.exists('logs'): - os.makedirs('logs') - -## Add timestamp for logfiles -timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") - -## Set up logging -logging.basicConfig( - filename=f'logs/{timestamp}_voicelive.log', - filemode="w", - format='%(asctime)s:%(name)s:%(levelname)s:%(message)s', - level=logging.INFO -) -logger = logging.getLogger(__name__) - -class AudioProcessor: - """ - Handles real-time audio capture and playback for the voice assistant. - - Threading Architecture: - - Main thread: Event loop and UI - - Capture thread: PyAudio input stream reading - - Send thread: Async audio data transmission to VoiceLive - - Playback thread: PyAudio output stream writing - """ - - loop: asyncio.AbstractEventLoop - - class AudioPlaybackPacket: - """Represents a packet that can be sent to the audio playback queue.""" - def __init__(self, seq_num: int, data: Optional[bytes]): - self.seq_num = seq_num - self.data = data - - def __init__(self, connection): - self.connection = connection - self.audio = pyaudio.PyAudio() - - # Audio configuration - PCM16, 24kHz, mono as specified - self.format = pyaudio.paInt16 - self.channels = 1 - self.rate = 24000 - self.chunk_size = 1200 # 50ms - - # Capture and playback state - self.input_stream = None - - self.playback_queue: queue.Queue[AudioProcessor.AudioPlaybackPacket] = queue.Queue() - self.playback_base = 0 - self.next_seq_num = 0 - self.output_stream: Optional[pyaudio.Stream] = None - - logger.info("AudioProcessor initialized with 24kHz PCM16 mono audio") - - def start_capture(self): - """Start capturing audio from microphone.""" - def _capture_callback( - in_data, # data - _frame_count, # number of frames - _time_info, # dictionary - _status_flags): - """Audio capture thread - runs in background.""" - audio_base64 = base64.b64encode(in_data).decode("utf-8") - asyncio.run_coroutine_threadsafe( - self.connection.input_audio_buffer.append(audio=audio_base64), self.loop - ) - return (None, pyaudio.paContinue) - - if self.input_stream: - return - - # Store the current event loop for use in threads - self.loop = asyncio.get_event_loop() - - try: - self.input_stream = self.audio.open( - format=self.format, - channels=self.channels, - rate=self.rate, - input=True, - frames_per_buffer=self.chunk_size, - stream_callback=_capture_callback, - ) - logger.info("Started audio capture") - - except Exception: - logger.exception("Failed to start audio capture") - raise - - def start_playback(self): - """Initialize audio playback system.""" - if self.output_stream: - return - - remaining = bytes() - def _playback_callback( - _in_data, - frame_count, # number of frames - _time_info, - _status_flags): - - nonlocal remaining - frame_count *= pyaudio.get_sample_size(pyaudio.paInt16) - - out = remaining[:frame_count] - remaining = remaining[frame_count:] - - while len(out) < frame_count: - try: - packet = self.playback_queue.get_nowait() - except queue.Empty: - out = out + bytes(frame_count - len(out)) - continue - except Exception: - logger.exception("Error in audio playback") - raise - - if not packet or not packet.data: - # None packet indicates end of stream - logger.info("End of playback queue.") - break - - if packet.seq_num < self.playback_base: - # skip requested - # ignore skipped packet and clear remaining - if len(remaining) > 0: - remaining = bytes() - continue - - num_to_take = frame_count - len(out) - out = out + packet.data[:num_to_take] - remaining = packet.data[num_to_take:] - - if len(out) >= frame_count: - return (out, pyaudio.paContinue) - else: - return (out, pyaudio.paComplete) - - try: - self.output_stream = self.audio.open( - format=self.format, - channels=self.channels, - rate=self.rate, - output=True, - frames_per_buffer=self.chunk_size, - stream_callback=_playback_callback - ) - logger.info("Audio playback system ready") - except Exception: - logger.exception("Failed to initialize audio playback") - raise - - def _get_and_increase_seq_num(self): - seq = self.next_seq_num - self.next_seq_num += 1 - return seq - - def queue_audio(self, audio_data: Optional[bytes]) -> None: - """Queue audio data for playback.""" - self.playback_queue.put( - AudioProcessor.AudioPlaybackPacket( - seq_num=self._get_and_increase_seq_num(), - data=audio_data)) - - def skip_pending_audio(self): - """Skip current audio in playback queue.""" - self.playback_base = self._get_and_increase_seq_num() - - def shutdown(self): - """Clean up audio resources.""" - if self.input_stream: - self.input_stream.stop_stream() - self.input_stream.close() - self.input_stream = None - - logger.info("Stopped audio capture") - - # Inform thread to complete - if self.output_stream: - self.skip_pending_audio() - self.queue_audio(None) - self.output_stream.stop_stream() - self.output_stream.close() - self.output_stream = None - - logger.info("Stopped audio playback") - - if self.audio: - self.audio.terminate() - - logger.info("Audio processor cleaned up") - -class BasicVoiceAssistant: - """Basic voice assistant implementing the VoiceLive SDK patterns.""" - - def __init__( - self, - endpoint: str, - credential: Union[AzureKeyCredential, AsyncTokenCredential], - model: str, - voice: str, - instructions: str, - ): - - self.endpoint = endpoint - self.credential = credential - self.model = model - self.voice = voice - self.instructions = instructions - self.connection: Optional["VoiceLiveConnection"] = None - self.audio_processor: Optional[AudioProcessor] = None - self.session_ready = False - self.conversation_started = False - - async def start(self): - """Start the voice assistant session.""" - try: - logger.info("Connecting to VoiceLive API with model %s", self.model) - - # Connect to VoiceLive WebSocket API - async with connect( - endpoint=self.endpoint, - credential=self.credential, - model=self.model, - ) as connection: - conn = connection - self.connection = conn - - # Initialize audio processor - ap = AudioProcessor(conn) - self.audio_processor = ap - - # Configure session for voice conversation - await self._setup_session() - - # Start audio systems - ap.start_playback() - - logger.info("Voice assistant ready! Start speaking...") - print("\n" + "=" * 60) - print("🎤 VOICE ASSISTANT READY") - print("Start speaking to begin conversation") - print("Press Ctrl+C to exit") - print("=" * 60 + "\n") - - # Process events - await self._process_events() - finally: - if self.audio_processor: - self.audio_processor.shutdown() - - async def _setup_session(self): - """Configure the VoiceLive session for audio conversation.""" - logger.info("Setting up voice conversation session...") - - # Create strongly typed voice configuration - voice_config: Union[AzureStandardVoice, str] - if self.voice.startswith("en-US-") or self.voice.startswith("en-CA-") or "-" in self.voice: - # Azure voice - voice_config = AzureStandardVoice(name=self.voice) - else: - # OpenAI voice (alloy, echo, fable, onyx, nova, shimmer) - voice_config = self.voice - - # Create strongly typed turn detection configuration - turn_detection_config = ServerVad( - threshold=0.5, - prefix_padding_ms=300, - silence_duration_ms=500) - - # Create strongly typed session configuration - session_config = RequestSession( - modalities=[Modality.TEXT, Modality.AUDIO], - instructions=self.instructions, - voice=voice_config, - input_audio_format=InputAudioFormat.PCM16, - output_audio_format=OutputAudioFormat.PCM16, - turn_detection=turn_detection_config, - input_audio_echo_cancellation=AudioEchoCancellation(), - input_audio_noise_reduction=AudioNoiseReduction(type="azure_deep_noise_suppression"), - ) - - conn = self.connection - assert conn is not None, "Connection must be established before setting up session" - await conn.session.update(session=session_config) - - logger.info("Session configuration sent") - - async def _process_events(self): - """Process events from the VoiceLive connection.""" - try: - conn = self.connection - assert conn is not None, "Connection must be established before processing events" - async for event in conn: - await self._handle_event(event) - except Exception: - logger.exception("Error processing events") - raise - - async def _handle_event(self, event): - """Handle different types of events from VoiceLive.""" - logger.debug("Received event: %s", event.type) - ap = self.audio_processor - conn = self.connection - assert ap is not None, "AudioProcessor must be initialized" - assert conn is not None, "Connection must be established" - - if event.type == ServerEventType.SESSION_UPDATED: - logger.info("Session ready: %s", event.session.id) - self.session_ready = True - - # Start audio capture once session is ready - ap.start_capture() - - elif event.type == ServerEventType.INPUT_AUDIO_BUFFER_SPEECH_STARTED: - logger.info("User started speaking - stopping playback") - print("🎤 Listening...") - - # skip queued audio - ap.skip_pending_audio() - - # Cancel any ongoing response - try: - await conn.response.cancel() - except Exception: - logger.exception("No response to cancel") - - elif event.type == ServerEventType.INPUT_AUDIO_BUFFER_SPEECH_STOPPED: - logger.info("🎤 User stopped speaking") - print("🤔 Processing...") - - elif event.type == ServerEventType.RESPONSE_CREATED: - logger.info("🤖 Assistant response created") - - elif event.type == ServerEventType.RESPONSE_AUDIO_DELTA: - # Stream audio response to speakers - logger.debug("Received audio delta") - ap.queue_audio(event.delta) - - elif event.type == ServerEventType.RESPONSE_AUDIO_DONE: - logger.info("🤖 Assistant finished speaking") - print("🎤 Ready for next input...") - - elif event.type == ServerEventType.RESPONSE_DONE: - logger.info("✅ Response complete") - - elif event.type == ServerEventType.ERROR: - logger.error("❌ VoiceLive error: %s", event.error.message) - print(f"Error: {event.error.message}") - - elif event.type == ServerEventType.CONVERSATION_ITEM_CREATED: - logger.debug("Conversation item created: %s", event.item.id) - - else: - logger.debug("Unhandled event type: %s", event.type) - - -def parse_arguments(): - """Parse command line arguments.""" - parser = argparse.ArgumentParser( - description="Basic Voice Assistant using Azure VoiceLive SDK", - formatter_class=argparse.ArgumentDefaultsHelpFormatter, - ) - - parser.add_argument( - "--api-key", - help="Azure VoiceLive API key. If not provided, will use AZURE_VOICELIVE_API_KEY environment variable.", - type=str, - default=os.environ.get("AZURE_VOICELIVE_API_KEY"), - ) - - parser.add_argument( - "--endpoint", - help="Azure VoiceLive endpoint", - type=str, - default=os.environ.get("AZURE_VOICELIVE_ENDPOINT", "wss://api.voicelive.com/v1"), - ) - - parser.add_argument( - "--model", - help="VoiceLive model to use", - type=str, - default=os.environ.get("AZURE_VOICELIVE_MODEL", "gpt-realtime"), - ) - - parser.add_argument( - "--voice", - help="Voice to use for the assistant. E.g. alloy, echo, fable, en-US-AvaNeural, en-US-GuyNeural", - type=str, - default=os.environ.get("AZURE_VOICELIVE_VOICE", "en-US-Ava:DragonHDLatestNeural"), - ) - - parser.add_argument( - "--instructions", - help="System instructions for the AI assistant", - type=str, - default=os.environ.get( - "AZURE_VOICELIVE_INSTRUCTIONS", - "You are a helpful AI assistant. Respond naturally and conversationally. " - "Keep your responses concise but engaging.", - ), - ) - - parser.add_argument( - "--use-token-credential", help="Use Azure token credential instead of API key", action="store_true", default=True - ) - - parser.add_argument("--verbose", help="Enable verbose logging", action="store_true") - - return parser.parse_args() - - -def main(): - """Main function.""" - args = parse_arguments() - - # Set logging level - if args.verbose: - logging.getLogger().setLevel(logging.DEBUG) - - # Validate credentials - if not args.api_key and not args.use_token_credential: - print("❌ Error: No authentication provided") - print("Please provide an API key using --api-key or set AZURE_VOICELIVE_API_KEY environment variable,") - print("or use --use-token-credential for Azure authentication.") - sys.exit(1) - - # Create client with appropriate credential - credential: Union[AzureKeyCredential, AsyncTokenCredential] - if args.use_token_credential: - credential = AzureCliCredential() # or DefaultAzureCredential() if needed - logger.info("Using Azure token credential") - else: - credential = AzureKeyCredential(args.api_key) - logger.info("Using API key credential") - - # Create and start voice assistant - assistant = BasicVoiceAssistant( - endpoint=args.endpoint, - credential=credential, - model=args.model, - voice=args.voice, - instructions=args.instructions, - ) - - # Setup signal handlers for graceful shutdown - def signal_handler(_sig, _frame): - logger.info("Received shutdown signal") - raise KeyboardInterrupt() - - signal.signal(signal.SIGINT, signal_handler) - signal.signal(signal.SIGTERM, signal_handler) - - # Start the assistant - try: - asyncio.run(assistant.start()) - except KeyboardInterrupt: - print("\n👋 Voice assistant shut down. Goodbye!") - except Exception as e: - print("Fatal Error: ", e) - -if __name__ == "__main__": - # Check audio system - try: - p = pyaudio.PyAudio() - # Check for input devices - input_devices = [ - i - for i in range(p.get_device_count()) - if cast(Union[int, float], p.get_device_info_by_index(i).get("maxInputChannels", 0) or 0) > 0 - ] - # Check for output devices - output_devices = [ - i - for i in range(p.get_device_count()) - if cast(Union[int, float], p.get_device_info_by_index(i).get("maxOutputChannels", 0) or 0) > 0 - ] - p.terminate() - - if not input_devices: - print("❌ No audio input devices found. Please check your microphone.") - sys.exit(1) - if not output_devices: - print("❌ No audio output devices found. Please check your speakers.") - sys.exit(1) - - except Exception as e: - print(f"❌ Audio system check failed: {e}") - sys.exit(1) - - print("🎙️ Basic Voice Assistant with Azure VoiceLive SDK") - print("=" * 50) - - # Run the assistant - main() +# pylint: disable=line-too-long,useless-suppression +#!/usr/bin/env python + +# ------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +# -------------------------------------------------------------------------- + +""" +FILE: basic_voice_assistant_async.py + +DESCRIPTION: + This sample demonstrates the fundamental capabilities of the VoiceLive SDK by creating + a basic voice assistant that can engage in natural conversation with proper interruption + handling. This serves as the foundational example that showcases the core value + proposition of unified speech-to-speech interaction. + +USAGE: + python basic_voice_assistant_async.py + + Set the environment variables with your own values before running the sample: + 1) AZURE_VOICELIVE_API_KEY - The Azure VoiceLive API key + 2) AZURE_VOICELIVE_ENDPOINT - The Azure VoiceLive endpoint + + Or copy .env.template to .env and fill in your values. + +REQUIREMENTS: + - azure-ai-voicelive + - python-dotenv + - pyaudio (for audio capture and playback) +""" + +from __future__ import annotations +import os +import sys +import argparse +import asyncio +import base64 +from datetime import datetime +import logging +import queue +import signal +from typing import Union, Optional, TYPE_CHECKING, cast + +from azure.core.credentials import AzureKeyCredential +from azure.core.credentials_async import AsyncTokenCredential +from azure.identity.aio import AzureCliCredential, DefaultAzureCredential + +from azure.ai.voicelive.aio import connect +from azure.ai.voicelive.models import ( + AudioEchoCancellation, + AudioNoiseReduction, + AzureStandardVoice, + InputAudioFormat, + Modality, + OutputAudioFormat, + RequestSession, + ServerEventType, + ServerVad +) +from dotenv import load_dotenv +import pyaudio + +if TYPE_CHECKING: + # Only needed for type checking; avoids runtime import issues + from azure.ai.voicelive.aio import VoiceLiveConnection + +## Change to the directory where this script is located +os.chdir(os.path.dirname(os.path.abspath(__file__))) + +# Environment variable loading +load_dotenv('./.env', override=True) + +# Set up logging +## Add folder for logging +if not os.path.exists('logs'): + os.makedirs('logs') + +## Add timestamp for logfiles +timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") + +## Set up logging +logging.basicConfig( + filename=f'logs/{timestamp}_voicelive.log', + filemode="w", + format='%(asctime)s:%(name)s:%(levelname)s:%(message)s', + level=logging.INFO +) +logger = logging.getLogger(__name__) + +class AudioProcessor: + """ + Handles real-time audio capture and playback for the voice assistant. + + Threading Architecture: + - Main thread: Event loop and UI + - Capture thread: PyAudio input stream reading + - Send thread: Async audio data transmission to VoiceLive + - Playback thread: PyAudio output stream writing + """ + + loop: asyncio.AbstractEventLoop + + class AudioPlaybackPacket: + """Represents a packet that can be sent to the audio playback queue.""" + def __init__(self, seq_num: int, data: Optional[bytes]): + self.seq_num = seq_num + self.data = data + + def __init__(self, connection): + self.connection = connection + self.audio = pyaudio.PyAudio() + + # Audio configuration - PCM16, 24kHz, mono as specified + self.format = pyaudio.paInt16 + self.channels = 1 + self.rate = 24000 + self.chunk_size = 1200 # 50ms + + # Capture and playback state + self.input_stream = None + + self.playback_queue: queue.Queue[AudioProcessor.AudioPlaybackPacket] = queue.Queue() + self.playback_base = 0 + self.next_seq_num = 0 + self.output_stream: Optional[pyaudio.Stream] = None + + logger.info("AudioProcessor initialized with 24kHz PCM16 mono audio") + + def start_capture(self): + """Start capturing audio from microphone.""" + def _capture_callback( + in_data, # data + _frame_count, # number of frames + _time_info, # dictionary + _status_flags): + """Audio capture thread - runs in background.""" + audio_base64 = base64.b64encode(in_data).decode("utf-8") + asyncio.run_coroutine_threadsafe( + self.connection.input_audio_buffer.append(audio=audio_base64), self.loop + ) + return (None, pyaudio.paContinue) + + if self.input_stream: + return + + # Store the current event loop for use in threads + self.loop = asyncio.get_event_loop() + + try: + self.input_stream = self.audio.open( + format=self.format, + channels=self.channels, + rate=self.rate, + input=True, + frames_per_buffer=self.chunk_size, + stream_callback=_capture_callback, + ) + logger.info("Started audio capture") + + except Exception: + logger.exception("Failed to start audio capture") + raise + + def start_playback(self): + """Initialize audio playback system.""" + if self.output_stream: + return + + remaining = bytes() + def _playback_callback( + _in_data, + frame_count, # number of frames + _time_info, + _status_flags): + + nonlocal remaining + frame_count *= pyaudio.get_sample_size(pyaudio.paInt16) + + out = remaining[:frame_count] + remaining = remaining[frame_count:] + + while len(out) < frame_count: + try: + packet = self.playback_queue.get_nowait() + except queue.Empty: + out = out + bytes(frame_count - len(out)) + continue + except Exception: + logger.exception("Error in audio playback") + raise + + if not packet or not packet.data: + # None packet indicates end of stream + logger.info("End of playback queue.") + break + + if packet.seq_num < self.playback_base: + # skip requested + # ignore skipped packet and clear remaining + if len(remaining) > 0: + remaining = bytes() + continue + + num_to_take = frame_count - len(out) + out = out + packet.data[:num_to_take] + remaining = packet.data[num_to_take:] + + if len(out) >= frame_count: + return (out, pyaudio.paContinue) + else: + return (out, pyaudio.paComplete) + + try: + self.output_stream = self.audio.open( + format=self.format, + channels=self.channels, + rate=self.rate, + output=True, + frames_per_buffer=self.chunk_size, + stream_callback=_playback_callback + ) + logger.info("Audio playback system ready") + except Exception: + logger.exception("Failed to initialize audio playback") + raise + + def _get_and_increase_seq_num(self): + seq = self.next_seq_num + self.next_seq_num += 1 + return seq + + def queue_audio(self, audio_data: Optional[bytes]) -> None: + """Queue audio data for playback.""" + self.playback_queue.put( + AudioProcessor.AudioPlaybackPacket( + seq_num=self._get_and_increase_seq_num(), + data=audio_data)) + + def skip_pending_audio(self): + """Skip current audio in playback queue.""" + self.playback_base = self._get_and_increase_seq_num() + + def shutdown(self): + """Clean up audio resources.""" + if self.input_stream: + self.input_stream.stop_stream() + self.input_stream.close() + self.input_stream = None + + logger.info("Stopped audio capture") + + # Inform thread to complete + if self.output_stream: + self.skip_pending_audio() + self.queue_audio(None) + self.output_stream.stop_stream() + self.output_stream.close() + self.output_stream = None + + logger.info("Stopped audio playback") + + if self.audio: + self.audio.terminate() + + logger.info("Audio processor cleaned up") + +class BasicVoiceAssistant: + """Basic voice assistant implementing the VoiceLive SDK patterns.""" + + def __init__( + self, + endpoint: str, + credential: Union[AzureKeyCredential, AsyncTokenCredential], + model: str, + voice: str, + instructions: str, + ): + + self.endpoint = endpoint + self.credential = credential + self.model = model + self.voice = voice + self.instructions = instructions + self.connection: Optional["VoiceLiveConnection"] = None + self.audio_processor: Optional[AudioProcessor] = None + self.session_ready = False + self._active_response = False + self._response_api_done = False + + async def start(self): + """Start the voice assistant session.""" + try: + logger.info("Connecting to VoiceLive API with model %s", self.model) + + # Connect to VoiceLive WebSocket API + async with connect( + endpoint=self.endpoint, + credential=self.credential, + model=self.model, + ) as connection: + conn = connection + self.connection = conn + + # Initialize audio processor + ap = AudioProcessor(conn) + self.audio_processor = ap + + # Configure session for voice conversation + await self._setup_session() + + # Start audio systems + ap.start_playback() + + logger.info("Voice assistant ready! Start speaking...") + print("\n" + "=" * 60) + print("🎤 VOICE ASSISTANT READY") + print("Start speaking to begin conversation") + print("Press Ctrl+C to exit") + print("=" * 60 + "\n") + + # Process events + await self._process_events() + finally: + if self.audio_processor: + self.audio_processor.shutdown() + + async def _setup_session(self): + """Configure the VoiceLive session for audio conversation.""" + logger.info("Setting up voice conversation session...") + + # Create voice configuration + voice_config: Union[AzureStandardVoice, str] + if self.voice.startswith("en-US-") or self.voice.startswith("en-CA-") or "-" in self.voice: + # Azure voice + voice_config = AzureStandardVoice(name=self.voice) + else: + # OpenAI voice (alloy, echo, fable, onyx, nova, shimmer) + voice_config = self.voice + + # Create turn detection configuration + turn_detection_config = ServerVad( + threshold=0.5, + prefix_padding_ms=300, + silence_duration_ms=500) + + # Create session configuration + session_config = RequestSession( + modalities=[Modality.TEXT, Modality.AUDIO], + instructions=self.instructions, + voice=voice_config, + input_audio_format=InputAudioFormat.PCM16, + output_audio_format=OutputAudioFormat.PCM16, + turn_detection=turn_detection_config, + input_audio_echo_cancellation=AudioEchoCancellation(), + input_audio_noise_reduction=AudioNoiseReduction(type="azure_deep_noise_suppression"), + ) + + conn = self.connection + assert conn is not None, "Connection must be established before setting up session" + await conn.session.update(session=session_config) + + logger.info("Session configuration sent") + + async def _process_events(self): + """Process events from the VoiceLive connection.""" + try: + conn = self.connection + assert conn is not None, "Connection must be established before processing events" + async for event in conn: + await self._handle_event(event) + except Exception: + logger.exception("Error processing events") + raise + + async def _handle_event(self, event): + """Handle different types of events from VoiceLive.""" + logger.debug("Received event: %s", event.type) + ap = self.audio_processor + conn = self.connection + assert ap is not None, "AudioProcessor must be initialized" + assert conn is not None, "Connection must be established" + + if event.type == ServerEventType.SESSION_UPDATED: + logger.info("Session ready: %s", event.session.id) + self.session_ready = True + + # Start audio capture once session is ready + ap.start_capture() + + elif event.type == ServerEventType.INPUT_AUDIO_BUFFER_SPEECH_STARTED: + logger.info("User started speaking - stopping playback") + print("🎤 Listening...") + + ap.skip_pending_audio() + + # Only cancel if response is active and not already done + if self._active_response and not self._response_api_done: + try: + await conn.response.cancel() + logger.debug("Cancelled in-progress response due to barge-in") + except Exception as e: + if "no active response" in str(e).lower(): + logger.debug("Cancel ignored - response already completed") + else: + logger.warning("Cancel failed: %s", e) + + elif event.type == ServerEventType.INPUT_AUDIO_BUFFER_SPEECH_STOPPED: + logger.info("🎤 User stopped speaking") + print("🤔 Processing...") + + elif event.type == ServerEventType.RESPONSE_CREATED: + logger.info("🤖 Assistant response created") + self._active_response = True + self._response_api_done = False + + elif event.type == ServerEventType.RESPONSE_AUDIO_DELTA: + logger.debug("Received audio delta") + ap.queue_audio(event.delta) + + elif event.type == ServerEventType.RESPONSE_AUDIO_DONE: + logger.info("🤖 Assistant finished speaking") + print("🎤 Ready for next input...") + + elif event.type == ServerEventType.RESPONSE_DONE: + logger.info("✅ Response complete") + self._active_response = False + self._response_api_done = True + + elif event.type == ServerEventType.ERROR: + msg = event.error.message + if "Cancellation failed: no active response" in msg: + logger.debug("Benign cancellation error: %s", msg) + else: + logger.error("❌ VoiceLive error: %s", msg) + print(f"Error: {msg}") + + elif event.type == ServerEventType.CONVERSATION_ITEM_CREATED: + logger.debug("Conversation item created: %s", event.item.id) + + else: + logger.debug("Unhandled event type: %s", event.type) + + +def parse_arguments(): + """Parse command line arguments.""" + parser = argparse.ArgumentParser( + description="Basic Voice Assistant using Azure VoiceLive SDK", + formatter_class=argparse.ArgumentDefaultsHelpFormatter, + ) + + parser.add_argument( + "--api-key", + help="Azure VoiceLive API key. If not provided, will use AZURE_VOICELIVE_API_KEY environment variable.", + type=str, + default=os.environ.get("AZURE_VOICELIVE_API_KEY"), + ) + + parser.add_argument( + "--endpoint", + help="Azure VoiceLive endpoint", + type=str, + default=os.environ.get("AZURE_VOICELIVE_ENDPOINT", "https://your-resource-name.services.ai.azure.com/"), + ) + + parser.add_argument( + "--model", + help="VoiceLive model to use", + type=str, + default=os.environ.get("AZURE_VOICELIVE_MODEL", "gpt-realtime"), + ) + + parser.add_argument( + "--voice", + help="Voice to use for the assistant. E.g. alloy, echo, fable, en-US-AvaNeural, en-US-GuyNeural", + type=str, + default=os.environ.get("AZURE_VOICELIVE_VOICE", "en-US-Ava:DragonHDLatestNeural"), + ) + + parser.add_argument( + "--instructions", + help="System instructions for the AI assistant", + type=str, + default=os.environ.get( + "AZURE_VOICELIVE_INSTRUCTIONS", + "You are a helpful AI assistant. Respond naturally and conversationally. " + "Keep your responses concise but engaging.", + ), + ) + + parser.add_argument( + "--use-token-credential", help="Use Azure token credential instead of API key", action="store_true", default=False + ) + + parser.add_argument("--verbose", help="Enable verbose logging", action="store_true") + + return parser.parse_args() + + +def main(): + """Main function.""" + args = parse_arguments() + + # Set logging level + if args.verbose: + logging.getLogger().setLevel(logging.DEBUG) + + # Validate credentials + if not args.api_key and not args.use_token_credential: + print("❌ Error: No authentication provided") + print("Please provide an API key using --api-key or set AZURE_VOICELIVE_API_KEY environment variable,") + print("or use --use-token-credential for Azure authentication.") + sys.exit(1) + + # Create client with appropriate credential + credential: Union[AzureKeyCredential, AsyncTokenCredential] + if args.use_token_credential: + credential = AzureCliCredential() # or DefaultAzureCredential() if needed + logger.info("Using Azure token credential") + else: + credential = AzureKeyCredential(args.api_key) + logger.info("Using API key credential") + + # Create and start voice assistant + assistant = BasicVoiceAssistant( + endpoint=args.endpoint, + credential=credential, + model=args.model, + voice=args.voice, + instructions=args.instructions, + ) + + # Setup signal handlers for graceful shutdown + def signal_handler(_sig, _frame): + logger.info("Received shutdown signal") + raise KeyboardInterrupt() + + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + # Start the assistant + try: + asyncio.run(assistant.start()) + except KeyboardInterrupt: + print("\n👋 Voice assistant shut down. Goodbye!") + except Exception as e: + print("Fatal Error: ", e) + +if __name__ == "__main__": + # Check audio system + try: + p = pyaudio.PyAudio() + # Check for input devices + input_devices = [ + i + for i in range(p.get_device_count()) + if cast(Union[int, float], p.get_device_info_by_index(i).get("maxInputChannels", 0) or 0) > 0 + ] + # Check for output devices + output_devices = [ + i + for i in range(p.get_device_count()) + if cast(Union[int, float], p.get_device_info_by_index(i).get("maxOutputChannels", 0) or 0) > 0 + ] + p.terminate() + + if not input_devices: + print("❌ No audio input devices found. Please check your microphone.") + sys.exit(1) + if not output_devices: + print("❌ No audio output devices found. Please check your speakers.") + sys.exit(1) + + except Exception as e: + print(f"❌ Audio system check failed: {e}") + sys.exit(1) + + print("🎙️ Basic Voice Assistant with Azure VoiceLive SDK") + print("=" * 50) + + # Run the assistant + main() diff --git a/sdk/ai/azure-ai-voicelive/samples/function_calling_sample_async.py b/sdk/ai/azure-ai-voicelive/samples/function_calling_sample_async.py new file mode 100644 index 000000000000..232dfe7556c3 --- /dev/null +++ b/sdk/ai/azure-ai-voicelive/samples/function_calling_sample_async.py @@ -0,0 +1,779 @@ +# pylint: disable=line-too-long,useless-suppression +#!/usr/bin/env python + +# ------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +# -------------------------------------------------------------------------- + +""" +FILE: async_function_calling_sample.py + +DESCRIPTION: + This sample demonstrates how to use the Azure AI Voice Live SDK asynchronously + with function calling capabilities. It shows how to define functions, + handle function calls from the AI model, and process the results. + +USAGE: + python function_calling_sample_async.py + + Set the environment variables with your own values before running the sample: + 1) AZURE_VOICELIVE_API_KEY - The Azure VoiceLive API key + 2) AZURE_VOICELIVE_ENDPOINT - The Azure VoiceLive endpoint + + Or copy .env.template to .env and fill in your values. + +REQUIREMENTS: + - azure-ai-voicelive + - python-dotenv + - pyaudio (for audio capture and playback) +""" + +from __future__ import annotations +import os +import sys +import argparse +import asyncio +import json +import base64 +from datetime import datetime +import logging +import queue +import signal +from typing import Union, Optional, Dict, Any, Mapping, Callable, TYPE_CHECKING, cast + +from azure.core.credentials import AzureKeyCredential +from azure.core.credentials_async import AsyncTokenCredential +from azure.identity.aio import AzureCliCredential, DefaultAzureCredential + +from azure.ai.voicelive.aio import connect +from azure.ai.voicelive.models import ( + AudioEchoCancellation, + AudioNoiseReduction, + AzureStandardVoice, + InputAudioFormat, + ItemType, + Modality, + OutputAudioFormat, + RequestSession, + ServerEventType, + ServerVad, + FunctionTool, + FunctionCallOutputItem, + ToolChoiceLiteral, + AudioInputTranscriptionOptions, + Tool, +) +from dotenv import load_dotenv +import pyaudio + +if TYPE_CHECKING: + from azure.ai.voicelive.aio import VoiceLiveConnection + +## Change to the directory where this script is located +os.chdir(os.path.dirname(os.path.abspath(__file__))) + +# Environment variable loading +load_dotenv('./.env', override=True) + +# Set up logging +## Add folder for logging +if not os.path.exists('logs'): + os.makedirs('logs') + +## Add timestamp for logfiles +timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") + +## Set up logging +logging.basicConfig( + filename=f'logs/{timestamp}_voicelive.log', + filemode="w", + format='%(asctime)s:%(name)s:%(levelname)s:%(message)s', + level=logging.INFO +) +logger = logging.getLogger(__name__) + + +class AudioProcessor: + """ + Handles real-time audio capture and playback for the voice assistant. + + Threading Architecture: + - Main thread: Event loop and UI + - Capture thread: PyAudio input stream reading + - Send thread: Async audio data transmission to VoiceLive + - Playback thread: PyAudio output stream writing + """ + + loop: asyncio.AbstractEventLoop + + class AudioPlaybackPacket: + """Represents a packet that can be sent to the audio playback queue.""" + def __init__(self, seq_num: int, data: Optional[bytes]): + self.seq_num = seq_num + self.data = data + + def __init__(self, connection): + self.connection = connection + self.audio = pyaudio.PyAudio() + + # Audio configuration - PCM16, 24kHz, mono as specified + self.format = pyaudio.paInt16 + self.channels = 1 + self.rate = 24000 + self.chunk_size = 1200 # 50ms + + # Capture and playback state + self.input_stream = None + + self.playback_queue: queue.Queue[AudioProcessor.AudioPlaybackPacket] = queue.Queue() + self.playback_base = 0 + self.next_seq_num = 0 + self.output_stream: Optional[pyaudio.Stream] = None + + logger.info("AudioProcessor initialized with 24kHz PCM16 mono audio") + + def start_capture(self): + """Start capturing audio from microphone.""" + def _capture_callback( + in_data, # data + _frame_count, # number of frames + _time_info, # dictionary + _status_flags): + """Audio capture thread - runs in background.""" + audio_base64 = base64.b64encode(in_data).decode("utf-8") + asyncio.run_coroutine_threadsafe( + self.connection.input_audio_buffer.append(audio=audio_base64), self.loop + ) + return (None, pyaudio.paContinue) + + if self.input_stream: + return + + # Store the current event loop for use in threads + self.loop = asyncio.get_event_loop() + + try: + self.input_stream = self.audio.open( + format=self.format, + channels=self.channels, + rate=self.rate, + input=True, + frames_per_buffer=self.chunk_size, + stream_callback=_capture_callback, + ) + logger.info("Started audio capture") + + except Exception: + logger.exception("Failed to start audio capture") + raise + + def start_playback(self): + """Initialize audio playback system.""" + if self.output_stream: + return + + remaining = bytes() + def _playback_callback( + _in_data, + frame_count, # number of frames + _time_info, + _status_flags): + + nonlocal remaining + frame_count *= pyaudio.get_sample_size(pyaudio.paInt16) + + out = remaining[:frame_count] + remaining = remaining[frame_count:] + + while len(out) < frame_count: + try: + packet = self.playback_queue.get_nowait() + except queue.Empty: + out = out + bytes(frame_count - len(out)) + continue + except Exception: + logger.exception("Error in audio playback") + raise + + if not packet or not packet.data: + # None packet indicates end of stream + logger.info("End of playback queue.") + break + + if packet.seq_num < self.playback_base: + # skip requested + # ignore skipped packet and clear remaining + if len(remaining) > 0: + remaining = bytes() + continue + + num_to_take = frame_count - len(out) + out = out + packet.data[:num_to_take] + remaining = packet.data[num_to_take:] + + if len(out) >= frame_count: + return (out, pyaudio.paContinue) + else: + return (out, pyaudio.paComplete) + + try: + self.output_stream = self.audio.open( + format=self.format, + channels=self.channels, + rate=self.rate, + output=True, + frames_per_buffer=self.chunk_size, + stream_callback=_playback_callback + ) + logger.info("Audio playback system ready") + except Exception: + logger.exception("Failed to initialize audio playback") + raise + + def _get_and_increase_seq_num(self): + seq = self.next_seq_num + self.next_seq_num += 1 + return seq + + def queue_audio(self, audio_data: Optional[bytes]) -> None: + """Queue audio data for playback.""" + self.playback_queue.put( + AudioProcessor.AudioPlaybackPacket( + seq_num=self._get_and_increase_seq_num(), + data=audio_data)) + + def skip_pending_audio(self): + """Skip current audio in playback queue.""" + self.playback_base = self._get_and_increase_seq_num() + + def shutdown(self): + """Clean up audio resources.""" + if self.input_stream: + self.input_stream.stop_stream() + self.input_stream.close() + self.input_stream = None + + logger.info("Stopped audio capture") + + # Inform thread to complete + if self.output_stream: + self.skip_pending_audio() + self.queue_audio(None) + self.output_stream.stop_stream() + self.output_stream.close() + self.output_stream = None + + logger.info("Stopped audio playback") + + if self.audio: + self.audio.terminate() + + logger.info("Audio processor cleaned up") + + + +class AsyncFunctionCallingClient: + """Voice assistant with function calling capabilities using VoiceLive SDK patterns.""" + + def __init__( + self, + endpoint: str, + credential: Union[AzureKeyCredential, AsyncTokenCredential], + model: str, + voice: str, + instructions: str, + ): + self.endpoint = endpoint + self.credential = credential + self.model = model + self.voice = voice + self.instructions = instructions + self.connection: Optional["VoiceLiveConnection"] = None + self.audio_processor: Optional[AudioProcessor] = None + self.session_ready = False + self.conversation_started = False + self._active_response = False + self._response_api_done = False + self._pending_function_call: Optional[Dict[str, Any]] = None + + # Define available functions + self.available_functions: Dict[str, Callable[[Union[str, Mapping[str, Any]]], Mapping[str, Any]]] = { + "get_current_time": self.get_current_time, + "get_current_weather": self.get_current_weather, + } + + async def start(self): + """Start the voice assistant session.""" + try: + logger.info("Connecting to VoiceLive API with model %s", self.model) + + # Connect to VoiceLive WebSocket API + async with connect( + endpoint=self.endpoint, + credential=self.credential, + model=self.model, + ) as connection: + conn = connection + self.connection = conn + + # Initialize audio processor + ap = AudioProcessor(conn) + self.audio_processor = ap + + # Configure session for voice conversation + await self._setup_session() + + # Start audio systems + ap.start_playback() + + logger.info("Voice assistant with function calling ready! Start speaking...") + print("\n" + "=" * 60) + print("🎤 VOICE ASSISTANT WITH FUNCTION CALLING READY") + print("Try saying:") + print(" • 'What's the current time?'") + print(" • 'What's the weather in Seattle?'") + print("Press Ctrl+C to exit") + print("=" * 60 + "\n") + + # Process events + await self._process_events() + finally: + if self.audio_processor: + self.audio_processor.shutdown() + + async def _setup_session(self): + """Configure the VoiceLive session for audio conversation with function tools.""" + logger.info("Setting up voice conversation session with function tools...") + + # Create voice configuration + voice_config: Union[AzureStandardVoice, str] + if self.voice.startswith("en-US-") or self.voice.startswith("en-CA-") or "-" in self.voice: + # Azure voice + voice_config = AzureStandardVoice(name=self.voice) + else: + # OpenAI voice (alloy, echo, fable, onyx, nova, shimmer) + voice_config = self.voice + + # Create turn detection configuration + turn_detection_config = ServerVad( + threshold=0.5, + prefix_padding_ms=300, + silence_duration_ms=500) + + # Define function tools + function_tools: list[Tool] = [ + FunctionTool( + name="get_current_time", + description="Get the current time", + parameters={ + "type": "object", + "properties": { + "timezone": { + "type": "string", + "description": "The timezone to get the current time for, e.g., 'UTC', 'local'", + } + }, + "required": [], + }, + ), + FunctionTool( + name="get_current_weather", + description="Get the current weather in a given location", + parameters={ + "type": "object", + "properties": { + "location": { + "type": "string", + "description": "The city and state, e.g., 'San Francisco, CA'", + }, + "unit": { + "type": "string", + "enum": ["celsius", "fahrenheit"], + "description": "The unit of temperature to use (celsius or fahrenheit)", + }, + }, + "required": ["location"], + }, + ), + ] + + # Create session configuration with function tools + session_config = RequestSession( + modalities=[Modality.TEXT, Modality.AUDIO], + instructions=self.instructions, + voice=voice_config, + input_audio_format=InputAudioFormat.PCM16, + output_audio_format=OutputAudioFormat.PCM16, + turn_detection=turn_detection_config, + input_audio_echo_cancellation=AudioEchoCancellation(), + input_audio_noise_reduction=AudioNoiseReduction(type="azure_deep_noise_suppression"), + tools=function_tools, + tool_choice=ToolChoiceLiteral.AUTO, + input_audio_transcription=AudioInputTranscriptionOptions(model="whisper-1"), + ) + + conn = self.connection + assert conn is not None, "Connection must be established before setting up session" + await conn.session.update(session=session_config) + + logger.info("Session configuration with function tools sent") + + async def _process_events(self): + """Process events from the VoiceLive connection.""" + try: + conn = self.connection + assert conn is not None, "Connection must be established before processing events" + async for event in conn: + await self._handle_event(event) + except Exception: + logger.exception("Error processing events") + raise + + async def _handle_event(self, event): + """Handle different types of events from VoiceLive.""" + logger.debug("Received event: %s", event.type) + ap = self.audio_processor + conn = self.connection + assert ap is not None, "AudioProcessor must be initialized" + assert conn is not None, "Connection must be established" + + if event.type == ServerEventType.SESSION_UPDATED: + logger.info("Session ready: %s", event.session.id) + self.session_ready = True + + # Proactive greeting + if not self.conversation_started: + self.conversation_started = True + logger.info("Sending proactive greeting request") + try: + await conn.response.create() + + except Exception: + logger.exception("Failed to send proactive greeting request") + + # Start audio capture once session is ready + ap.start_capture() + + elif event.type == ServerEventType.INPUT_AUDIO_BUFFER_SPEECH_STARTED: + logger.info("User started speaking - stopping playback") + print("🎤 Listening...") + + ap.skip_pending_audio() + + # Only cancel if response is active and not already done + if self._active_response and not self._response_api_done: + try: + await conn.response.cancel() + logger.debug("Cancelled in-progress response due to barge-in") + except Exception as e: + if "no active response" in str(e).lower(): + logger.debug("Cancel ignored - response already completed") + else: + logger.warning("Cancel failed: %s", e) + + elif event.type == ServerEventType.INPUT_AUDIO_BUFFER_SPEECH_STOPPED: + logger.info("🎤 User stopped speaking") + print("🤔 Processing...") + + elif event.type == ServerEventType.RESPONSE_CREATED: + logger.info("🤖 Assistant response created") + self._active_response = True + self._response_api_done = False + + elif event.type == ServerEventType.RESPONSE_AUDIO_DELTA: + logger.debug("Received audio delta") + ap.queue_audio(event.delta) + + elif event.type == ServerEventType.RESPONSE_AUDIO_DONE: + logger.info("🤖 Assistant finished speaking") + print("🎤 Ready for next input...") + + elif event.type == ServerEventType.RESPONSE_DONE: + logger.info("✅ Response complete") + self._active_response = False + self._response_api_done = True + + # Execute pending function call if arguments are ready + if self._pending_function_call and "arguments" in self._pending_function_call: + await self._execute_function_call(self._pending_function_call) + self._pending_function_call = None + + elif event.type == ServerEventType.ERROR: + msg = event.error.message + if "Cancellation failed: no active response" in msg: + logger.debug("Benign cancellation error: %s", msg) + else: + logger.error("❌ VoiceLive error: %s", msg) + print(f"Error: {msg}") + + elif event.type == ServerEventType.CONVERSATION_ITEM_CREATED: + logger.debug("Conversation item created: %s", event.item.id) + + if event.item.type == ItemType.FUNCTION_CALL: + function_call_item = event.item + self._pending_function_call = { + "name": function_call_item.name, + "call_id": function_call_item.call_id, + "previous_item_id": function_call_item.id + } + print(f"🔧 Calling function: {function_call_item.name}") + logger.info(f"Function call detected: {function_call_item.name} with call_id: {function_call_item.call_id}") + + elif event.type == ServerEventType.RESPONSE_FUNCTION_CALL_ARGUMENTS_DONE: + if self._pending_function_call and event.call_id == self._pending_function_call["call_id"]: + logger.info(f"Function arguments received: {event.arguments}") + self._pending_function_call["arguments"] = event.arguments + + async def _execute_function_call(self, function_call_info): + """Execute a function call and send the result back to the conversation.""" + conn = self.connection + assert conn is not None, "Connection must be established" + + function_name = function_call_info["name"] + call_id = function_call_info["call_id"] + previous_item_id = function_call_info["previous_item_id"] + arguments = function_call_info["arguments"] + + try: + if function_name in self.available_functions: + logger.info(f"Executing function: {function_name}") + result = self.available_functions[function_name](arguments) + + function_output = FunctionCallOutputItem(call_id=call_id, output=json.dumps(result)) + + # Send result back to conversation + await conn.conversation.item.create(previous_item_id=previous_item_id, item=function_output) + logger.info(f"Function result sent: {result}") + print(f"✅ Function {function_name} completed") + + # Request new response to process the function result + await conn.response.create() + logger.info("Requested new response with function result") + + else: + logger.error(f"Unknown function: {function_name}") + + except Exception as e: + logger.error(f"Error executing function {function_name}: {e}") + + def get_current_time(self, arguments: Optional[Union[str, Mapping[str, Any]]] = None) -> Dict[str, Any]: + """Get the current time.""" + from datetime import datetime, timezone + + if isinstance(arguments, str): + try: + args = json.loads(arguments) + except json.JSONDecodeError: + args = {} + else: + args = arguments if isinstance(arguments, dict) else {} + + timezone_arg = args.get("timezone", "local") + now = datetime.now() + + if timezone_arg.lower() == "utc": + now = datetime.now(timezone.utc) + timezone_name = "UTC" + else: + timezone_name = "local" + + formatted_time = now.strftime("%I:%M:%S %p") + formatted_date = now.strftime("%A, %B %d, %Y") + + return {"time": formatted_time, "date": formatted_date, "timezone": timezone_name} + + def get_current_weather(self, arguments: Union[str, Mapping[str, Any]]): + """Get the current weather for a location.""" + if isinstance(arguments, str): + try: + args = json.loads(arguments) + except json.JSONDecodeError: + logger.error(f"Failed to parse weather arguments: {arguments}") + return {"error": "Invalid arguments"} + else: + args = arguments if isinstance(arguments, dict) else {} + + location = args.get("location", "Unknown") + unit = args.get("unit", "celsius") + + # Simulated weather response + try: + return { + "location": location, + "temperature": 22 if unit == "celsius" else 72, + "unit": unit, + "condition": "Partly Cloudy", + "humidity": 65, + "wind_speed": 10, + } + except Exception as e: + logger.error(f"Error getting weather: {e}") + return {"error": str(e)} + + +def parse_arguments(): + """Parse command line arguments.""" + parser = argparse.ArgumentParser( + description="Voice Assistant with Function Calling using Azure VoiceLive SDK", + formatter_class=argparse.ArgumentDefaultsHelpFormatter, + ) + + parser.add_argument( + "--api-key", + help="Azure VoiceLive API key. If not provided, will use AZURE_VOICELIVE_API_KEY environment variable.", + type=str, + default=os.environ.get("AZURE_VOICELIVE_API_KEY"), + ) + + parser.add_argument( + "--endpoint", + help="Azure VoiceLive endpoint", + type=str, + default=os.environ.get("AZURE_VOICELIVE_ENDPOINT", "https://your-resource-name.services.ai.azure.com/"), + ) + + parser.add_argument( + "--model", + help="VoiceLive model to use", + type=str, + default=os.environ.get("AZURE_VOICELIVE_MODEL", "gpt-realtime"), + ) + + parser.add_argument( + "--voice", + help="Voice to use for the assistant. E.g. alloy, echo, fable, en-US-AvaNeural, en-US-GuyNeural", + type=str, + default=os.environ.get("AZURE_VOICELIVE_VOICE", "en-US-Ava:DragonHDLatestNeural"), + ) + + parser.add_argument( + "--instructions", + help="System instructions for the AI assistant", + type=str, + default=os.environ.get( + "AZURE_VOICELIVE_INSTRUCTIONS", + "You are a helpful AI assistant with access to functions. " + "Use the functions when appropriate to provide accurate, real-time information. " + "If you are asked about the weather, please respond with 'I will get the weather for you. Please wait a moment.' and then call the get_current_weather function. " + "If you are asked about the time, please respond with 'I will get the time for you. Please wait a moment.' and then call the get_current_time function. " + "Explain when you're using a function and include the results in your response naturally. Always start the conversation in English.", + ), + ) + + parser.add_argument( + "--use-token-credential", help="Use Azure token credential instead of API key", action="store_true", default=False + ) + + parser.add_argument("--verbose", help="Enable verbose logging", action="store_true") + + return parser.parse_args() + + +def main(): + """Main function.""" + args = parse_arguments() + + # Set logging level + if args.verbose: + logging.getLogger().setLevel(logging.DEBUG) + + # Validate credentials + if not args.api_key and not args.use_token_credential: + print("❌ Error: No authentication provided") + print("Please provide an API key using --api-key or set AZURE_VOICELIVE_API_KEY environment variable,") + print("or use --use-token-credential for Azure authentication.") + sys.exit(1) + + # Create client with appropriate credential + credential: Union[AzureKeyCredential, AsyncTokenCredential] + if args.use_token_credential: + credential = AzureCliCredential() + logger.info("Using Azure token credential") + else: + credential = AzureKeyCredential(args.api_key) + logger.info("Using API key credential") + + # Create and start voice assistant with function calling + client = AsyncFunctionCallingClient( + endpoint=args.endpoint, + credential=credential, + model=args.model, + voice=args.voice, + instructions=args.instructions, + ) + + # Signal handlers for graceful shutdown + def signal_handler(_sig, _frame): + logger.info("Received shutdown signal") + raise KeyboardInterrupt() + + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + try: + asyncio.run(client.start()) + except KeyboardInterrupt: + print("\n👋 Voice assistant shut down. Goodbye!") + except Exception as e: + logger.exception("Fatal error") + print(f"Fatal Error: {e}") + sys.exit(1) + + +if __name__ == "__main__": + # Check for required dependencies + dependencies = { + "pyaudio": "Audio processing", + "azure.ai.voicelive": "Azure VoiceLive SDK", + "azure.core": "Azure Core libraries", + } + + missing_deps = [] + for dep, description in dependencies.items(): + try: + __import__(dep.replace("-", "_")) + except ImportError: + missing_deps.append(f"{dep} ({description})") + + if missing_deps: + print("❌ Missing required dependencies:") + for dep in missing_deps: + print(f" - {dep}") + print("\nInstall with: pip install azure-ai-voicelive pyaudio python-dotenv") + sys.exit(1) + + # Check audio system + try: + p = pyaudio.PyAudio() + # Check for input devices + input_devices = [ + i + for i in range(p.get_device_count()) + if cast(Union[int, float], p.get_device_info_by_index(i).get("maxInputChannels", 0) or 0) > 0 + ] + # Check for output devices + output_devices = [ + i + for i in range(p.get_device_count()) + if cast(Union[int, float], p.get_device_info_by_index(i).get("maxOutputChannels", 0) or 0) > 0 + ] + p.terminate() + + if not input_devices: + print("❌ No audio input devices found. Please check your microphone.") + sys.exit(1) + if not output_devices: + print("❌ No audio output devices found. Please check your speakers.") + sys.exit(1) + + except Exception as e: + print(f"❌ Audio system check failed: {e}") + sys.exit(1) + + print("🎙️ Voice Assistant with Function Calling - Azure VoiceLive SDK") + print("=" * 65) + + # Run the assistant + main() diff --git a/sdk/ai/azure-ai-voicelive/samples/utils.py b/sdk/ai/azure-ai-voicelive/samples/utils.py index 96989cf98378..3505f347786a 100644 --- a/sdk/ai/azure-ai-voicelive/samples/utils.py +++ b/sdk/ai/azure-ai-voicelive/samples/utils.py @@ -1,139 +1,139 @@ -#!/usr/bin/env python - -# ------------------------------------------------------------------------- -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. See License.txt in the project root for -# license information. -# -------------------------------------------------------------------------- - -""" -Helper utilities for Azure VoiceLive samples. -""" - -import os -from pathlib import Path -import sys -from typing import Dict, Optional - - -def load_env_vars(verbose: bool = True) -> Dict[str, str]: - """ - Load environment variables from .env file if available. - Returns a dictionary of loaded environment variables. - - Args: - verbose (bool): Whether to print info about the loaded environment variables - - Returns: - Dict[str, str]: Dictionary of loaded environment variables - """ - # Try to import dotenv - try: - from dotenv import load_dotenv - except ImportError: - if verbose: - print("python-dotenv package not found. Environment variables will not be loaded from .env file.") - print("Install with: pip install python-dotenv") - return {} - - # Look for .env file in the current directory and parent directories - env_path = find_env_file() - - if env_path and os.path.isfile(env_path): - if verbose: - print(f"Loading environment variables from {env_path}") - load_dotenv(env_path) - if verbose: - print("Environment variables loaded successfully") - - # Return the loaded environment variables - loaded_vars = {} - with open(env_path, "r") as f: - for line in f: - line = line.strip() - if line and not line.startswith("#") and "=" in line: - key, value = line.split("=", 1) - if key.strip() in os.environ: - loaded_vars[key.strip()] = ( - "********" if "KEY" in key or "SECRET" in key else os.environ[key.strip()] - ) - - if verbose and loaded_vars: - print("Loaded environment variables:") - for key, value in loaded_vars.items(): - print(f" {key}: {value}") - - return loaded_vars - else: - if verbose: - print("No .env file found. Using existing environment variables.") - return {} - - -def find_env_file() -> Optional[str]: - """ - Find a .env file by looking in the current directory and walking up the directory tree. - - Returns: - Optional[str]: Path to the .env file if found, None otherwise - """ - # Start with the current working directory - current_dir = os.getcwd() - - # Check if we're in a sample directory and need to go up to the package root - if os.path.basename(current_dir) == "samples": - # Try the parent directory - parent_dir = os.path.dirname(current_dir) - env_path = os.path.join(parent_dir, ".env") - if os.path.isfile(env_path): - return env_path - - # Try the current directory first - env_path = os.path.join(current_dir, ".env") - if os.path.isfile(env_path): - return env_path - - # Walk up the directory tree - max_levels = 3 # Limit how far up we go - for _ in range(max_levels): - parent_dir = os.path.dirname(current_dir) - if parent_dir == current_dir: # We've reached the root directory - break - - current_dir = parent_dir - env_path = os.path.join(current_dir, ".env") - if os.path.isfile(env_path): - return env_path - - # If we didn't find it in the parent directories, check the samples directory - # in case we're running from the package root - script_dir = os.path.dirname(os.path.abspath(__file__)) - env_path = os.path.join(script_dir, ".env") - if os.path.isfile(env_path): - return env_path - - # Check one level up from the script directory - env_path = os.path.join(os.path.dirname(script_dir), ".env") - if os.path.isfile(env_path): - return env_path - - return None - - -def check_samples_prerequisites(): - """ - Check prerequisites for running the samples. - """ - try: - import azure.ai.voicelive - except ImportError: - print("azure-ai-voicelive package is not installed. Install with:") - print("pip install azure-ai-voicelive") - sys.exit(1) - - try: - import dotenv - except ImportError: - print("python-dotenv package is not installed. Install with:") - print("pip install python-dotenv") - print("Continuing without .env file support...") +#!/usr/bin/env python + +# ------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +# -------------------------------------------------------------------------- + +""" +Helper utilities for Azure VoiceLive samples. +""" + +import os +from pathlib import Path +import sys +from typing import Dict, Optional + + +def load_env_vars(verbose: bool = True) -> Dict[str, str]: + """ + Load environment variables from .env file if available. + Returns a dictionary of loaded environment variables. + + Args: + verbose (bool): Whether to print info about the loaded environment variables + + Returns: + Dict[str, str]: Dictionary of loaded environment variables + """ + # Try to import dotenv + try: + from dotenv import load_dotenv + except ImportError: + if verbose: + print("python-dotenv package not found. Environment variables will not be loaded from .env file.") + print("Install with: pip install python-dotenv") + return {} + + # Look for .env file in the current directory and parent directories + env_path = find_env_file() + + if env_path and os.path.isfile(env_path): + if verbose: + print(f"Loading environment variables from {env_path}") + load_dotenv(env_path) + if verbose: + print("Environment variables loaded successfully") + + # Return the loaded environment variables + loaded_vars = {} + with open(env_path, "r") as f: + for line in f: + line = line.strip() + if line and not line.startswith("#") and "=" in line: + key, value = line.split("=", 1) + if key.strip() in os.environ: + loaded_vars[key.strip()] = ( + "********" if "KEY" in key or "SECRET" in key else os.environ[key.strip()] + ) + + if verbose and loaded_vars: + print("Loaded environment variables:") + for key, value in loaded_vars.items(): + print(f" {key}: {value}") + + return loaded_vars + else: + if verbose: + print("No .env file found. Using existing environment variables.") + return {} + + +def find_env_file() -> Optional[str]: + """ + Find a .env file by looking in the current directory and walking up the directory tree. + + Returns: + Optional[str]: Path to the .env file if found, None otherwise + """ + # Start with the current working directory + current_dir = os.getcwd() + + # Check if we're in a sample directory and need to go up to the package root + if os.path.basename(current_dir) == "samples": + # Try the parent directory + parent_dir = os.path.dirname(current_dir) + env_path = os.path.join(parent_dir, ".env") + if os.path.isfile(env_path): + return env_path + + # Try the current directory first + env_path = os.path.join(current_dir, ".env") + if os.path.isfile(env_path): + return env_path + + # Walk up the directory tree + max_levels = 3 # Limit how far up we go + for _ in range(max_levels): + parent_dir = os.path.dirname(current_dir) + if parent_dir == current_dir: # We've reached the root directory + break + + current_dir = parent_dir + env_path = os.path.join(current_dir, ".env") + if os.path.isfile(env_path): + return env_path + + # If we didn't find it in the parent directories, check the samples directory + # in case we're running from the package root + script_dir = os.path.dirname(os.path.abspath(__file__)) + env_path = os.path.join(script_dir, ".env") + if os.path.isfile(env_path): + return env_path + + # Check one level up from the script directory + env_path = os.path.join(os.path.dirname(script_dir), ".env") + if os.path.isfile(env_path): + return env_path + + return None + + +def check_samples_prerequisites(): + """ + Check prerequisites for running the samples. + """ + try: + import azure.ai.voicelive + except ImportError: + print("azure-ai-voicelive package is not installed. Install with:") + print("pip install azure-ai-voicelive") + sys.exit(1) + + try: + import dotenv + except ImportError: + print("python-dotenv package is not installed. Install with:") + print("pip install python-dotenv") + print("Continuing without .env file support...") diff --git a/sdk/ai/azure-ai-voicelive/samples/voice_assistant_w_proactive_greeting_async.py b/sdk/ai/azure-ai-voicelive/samples/voice_assistant_w_proactive_greeting_async.py new file mode 100644 index 000000000000..49d1ea20f577 --- /dev/null +++ b/sdk/ai/azure-ai-voicelive/samples/voice_assistant_w_proactive_greeting_async.py @@ -0,0 +1,620 @@ +# pylint: disable=line-too-long,useless-suppression +#!/usr/bin/env python + +# ------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +# -------------------------------------------------------------------------- + +""" +FILE: voice_assistant_w_proactive_greeting_async.py + +DESCRIPTION: + This sample demonstrates the fundamental capabilities of the VoiceLive SDK by creating + a basic voice assistant that can engage in natural conversation with proper interruption + handling. This serves as the foundational example that showcases the core value + proposition of unified speech-to-speech interaction. + + This sample includes two proactive greeting options: + 1) Pre-generated assistant message using response.create with pre_generated_assistant_message + 2) Conversation item create approach that triggers a natural response generated by the model of the assistant + +USAGE: + python basic_voice_assistant_async.py + + Set the environment variables with your own values before running the sample: + 1) AZURE_VOICELIVE_API_KEY - The Azure VoiceLive API key + 2) AZURE_VOICELIVE_ENDPOINT - The Azure VoiceLive endpoint + + Or copy .env.template to .env and fill in your values. + +REQUIREMENTS: + - azure-ai-voicelive + - python-dotenv + - pyaudio (for audio capture and playback) +""" + +from __future__ import annotations +import os +import sys +import argparse +import asyncio +import base64 +from datetime import datetime +import logging +import queue +import signal +from typing import Union, Optional, TYPE_CHECKING, cast + +from azure.core.credentials import AzureKeyCredential +from azure.core.credentials_async import AsyncTokenCredential +from azure.identity.aio import AzureCliCredential, DefaultAzureCredential + +from azure.ai.voicelive.aio import connect +from azure.ai.voicelive.models import ( + AudioEchoCancellation, + AudioNoiseReduction, + AzureStandardVoice, + InputAudioFormat, + Modality, + OutputAudioFormat, + RequestSession, + ServerEventType, + ServerVad +) +from dotenv import load_dotenv +import pyaudio + +if TYPE_CHECKING: + # Only needed for type checking; avoids runtime import issues + from azure.ai.voicelive.aio import VoiceLiveConnection + +## Change to the directory where this script is located +os.chdir(os.path.dirname(os.path.abspath(__file__))) + +# Environment variable loading +load_dotenv('./.env', override=True) + +# Set up logging +## Add folder for logging +if not os.path.exists('logs'): + os.makedirs('logs') + +## Add timestamp for logfiles +timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") + +## Set up logging +logging.basicConfig( + filename=f'logs/{timestamp}_voicelive.log', + filemode="w", + format='%(asctime)s:%(name)s:%(levelname)s:%(message)s', + level=logging.INFO +) +logger = logging.getLogger(__name__) + +class AudioProcessor: + """ + Handles real-time audio capture and playback for the voice assistant. + + Threading Architecture: + - Main thread: Event loop and UI + - Capture thread: PyAudio input stream reading + - Send thread: Async audio data transmission to VoiceLive + - Playback thread: PyAudio output stream writing + """ + + loop: asyncio.AbstractEventLoop + + class AudioPlaybackPacket: + """Represents a packet that can be sent to the audio playback queue.""" + def __init__(self, seq_num: int, data: Optional[bytes]): + self.seq_num = seq_num + self.data = data + + def __init__(self, connection): + self.connection = connection + self.audio = pyaudio.PyAudio() + + # Audio configuration - PCM16, 24kHz, mono as specified + self.format = pyaudio.paInt16 + self.channels = 1 + self.rate = 24000 + self.chunk_size = 1200 # 50ms + + # Capture and playback state + self.input_stream = None + + self.playback_queue: queue.Queue[AudioProcessor.AudioPlaybackPacket] = queue.Queue() + self.playback_base = 0 + self.next_seq_num = 0 + self.output_stream: Optional[pyaudio.Stream] = None + + logger.info("AudioProcessor initialized with 24kHz PCM16 mono audio") + + def start_capture(self): + """Start capturing audio from microphone.""" + def _capture_callback( + in_data, # data + _frame_count, # number of frames + _time_info, # dictionary + _status_flags): + """Audio capture thread - runs in background.""" + audio_base64 = base64.b64encode(in_data).decode("utf-8") + asyncio.run_coroutine_threadsafe( + self.connection.input_audio_buffer.append(audio=audio_base64), self.loop + ) + return (None, pyaudio.paContinue) + + if self.input_stream: + return + + # Store the current event loop for use in threads + self.loop = asyncio.get_event_loop() + + try: + self.input_stream = self.audio.open( + format=self.format, + channels=self.channels, + rate=self.rate, + input=True, + frames_per_buffer=self.chunk_size, + stream_callback=_capture_callback, + ) + logger.info("Started audio capture") + + except Exception: + logger.exception("Failed to start audio capture") + raise + + def start_playback(self): + """Initialize audio playback system.""" + if self.output_stream: + return + + remaining = bytes() + def _playback_callback( + _in_data, + frame_count, # number of frames + _time_info, + _status_flags): + + nonlocal remaining + frame_count *= pyaudio.get_sample_size(pyaudio.paInt16) + + out = remaining[:frame_count] + remaining = remaining[frame_count:] + + while len(out) < frame_count: + try: + packet = self.playback_queue.get_nowait() + except queue.Empty: + out = out + bytes(frame_count - len(out)) + continue + except Exception: + logger.exception("Error in audio playback") + raise + + if not packet or not packet.data: + # None packet indicates end of stream + logger.info("End of playback queue.") + break + + if packet.seq_num < self.playback_base: + # skip requested + # ignore skipped packet and clear remaining + if len(remaining) > 0: + remaining = bytes() + continue + + num_to_take = frame_count - len(out) + out = out + packet.data[:num_to_take] + remaining = packet.data[num_to_take:] + + if len(out) >= frame_count: + return (out, pyaudio.paContinue) + else: + return (out, pyaudio.paComplete) + + try: + self.output_stream = self.audio.open( + format=self.format, + channels=self.channels, + rate=self.rate, + output=True, + frames_per_buffer=self.chunk_size, + stream_callback=_playback_callback + ) + logger.info("Audio playback system ready") + except Exception: + logger.exception("Failed to initialize audio playback") + raise + + def _get_and_increase_seq_num(self): + seq = self.next_seq_num + self.next_seq_num += 1 + return seq + + def queue_audio(self, audio_data: Optional[bytes]) -> None: + """Queue audio data for playback.""" + self.playback_queue.put( + AudioProcessor.AudioPlaybackPacket( + seq_num=self._get_and_increase_seq_num(), + data=audio_data)) + + def skip_pending_audio(self): + """Skip current audio in playback queue.""" + self.playback_base = self._get_and_increase_seq_num() + + def shutdown(self): + """Clean up audio resources.""" + if self.input_stream: + self.input_stream.stop_stream() + self.input_stream.close() + self.input_stream = None + + logger.info("Stopped audio capture") + + # Inform thread to complete + if self.output_stream: + self.skip_pending_audio() + self.queue_audio(None) + self.output_stream.stop_stream() + self.output_stream.close() + self.output_stream = None + + logger.info("Stopped audio playback") + + if self.audio: + self.audio.terminate() + + logger.info("Audio processor cleaned up") + +class BasicVoiceAssistant: + """Basic voice assistant implementing the VoiceLive SDK patterns.""" + + def __init__( + self, + endpoint: str, + credential: Union[AzureKeyCredential, AsyncTokenCredential], + model: str, + voice: str, + instructions: str, + ): + + self.endpoint = endpoint + self.credential = credential + self.model = model + self.voice = voice + self.instructions = instructions + self.connection: Optional["VoiceLiveConnection"] = None + self.audio_processor: Optional[AudioProcessor] = None + self.session_ready = False + self.conversation_started = False + self._active_response = False + self._response_api_done = False + + async def start(self): + """Start the voice assistant session.""" + try: + logger.info("Connecting to VoiceLive API with model %s", self.model) + + # Connect to VoiceLive WebSocket API + async with connect( + endpoint=self.endpoint, + credential=self.credential, + model=self.model, + ) as connection: + conn = connection + self.connection = conn + + # Initialize audio processor + ap = AudioProcessor(conn) + self.audio_processor = ap + + # Configure session for voice conversation + await self._setup_session() + + # Start audio systems + ap.start_playback() + + logger.info("Voice assistant ready! Start speaking...") + print("\n" + "=" * 60) + print("🎤 VOICE ASSISTANT READY") + print("Start speaking to begin conversation") + print("Press Ctrl+C to exit") + print("=" * 60 + "\n") + + # Process events + await self._process_events() + finally: + if self.audio_processor: + self.audio_processor.shutdown() + + async def _setup_session(self): + """Configure the VoiceLive session for audio conversation.""" + logger.info("Setting up voice conversation session...") + + # Create strongly typed voice configuration + voice_config: Union[AzureStandardVoice, str] + if self.voice.startswith("en-US-") or self.voice.startswith("en-CA-") or "-" in self.voice: + # Azure voice + voice_config = AzureStandardVoice(name=self.voice) + else: + # OpenAI voice (alloy, echo, fable, onyx, nova, shimmer) + voice_config = self.voice + + # Create turn detection configuration + turn_detection_config = ServerVad( + threshold=0.5, + prefix_padding_ms=300, + silence_duration_ms=500) + + # Create session configuration + session_config = RequestSession( + modalities=[Modality.TEXT, Modality.AUDIO], + instructions=self.instructions, + voice=voice_config, + input_audio_format=InputAudioFormat.PCM16, + output_audio_format=OutputAudioFormat.PCM16, + turn_detection=turn_detection_config, + input_audio_echo_cancellation=AudioEchoCancellation(), + input_audio_noise_reduction=AudioNoiseReduction(type="azure_deep_noise_suppression"), + ) + + conn = self.connection + assert conn is not None, "Connection must be established before setting up session" + await conn.session.update(session=session_config) + + logger.info("Session configuration sent") + + async def _process_events(self): + """Process events from the VoiceLive connection.""" + try: + conn = self.connection + assert conn is not None, "Connection must be established before processing events" + async for event in conn: + await self._handle_event(event) + except Exception: + logger.exception("Error processing events") + raise + + async def _handle_event(self, event): + """Handle different types of events from VoiceLive.""" + logger.debug("Received event: %s", event.type) + ap = self.audio_processor + conn = self.connection + assert ap is not None, "AudioProcessor must be initialized" + assert conn is not None, "Connection must be established" + + if event.type == ServerEventType.SESSION_UPDATED: + logger.info("Session ready: %s", event.session.id) + self.session_ready = True + + # Proactive greeting option 1: pre-generated assistant message + # if not self.conversation_started: + # self.conversation_started = True + # # Send proactive greeting using raw response.create event with a pre-generated assistant message. + # proactive_greeting = "Welcome to Contoso Insurance. You are now connected to our AI assistant. How can I help you today?" + # logger.info("Sending proactive greeting via response.create: %s", proactive_greeting) + # try: + # await conn.send({ + # "type": "response.create", + # "response": { + # "pre_generated_assistant_message": { + # "type": "message", + # "role": "assistant", + # "content": [ + # {"type": "text", "text": proactive_greeting} + # ], + # } + # } + # }) + # except Exception: + # logger.exception("Failed to send proactive greeting event") + + # Proactive greeting option 2: conversation item create + if not self.conversation_started: + self.conversation_started = True + logger.info("Sending proactive greeting request") + try: + await conn.response.create() + + except Exception: + logger.exception("Failed to send proactive greeting request") + + # Start audio capture once session is ready + ap.start_capture() + + elif event.type == ServerEventType.INPUT_AUDIO_BUFFER_SPEECH_STARTED: + logger.info("User started speaking - stopping playback") + print("🎤 Listening...") + + ap.skip_pending_audio() + + # Only cancel if response is active and not already done + if self._active_response and not self._response_api_done: + try: + await conn.response.cancel() + logger.debug("Cancelled in-progress response due to barge-in") + except Exception as e: + if "no active response" in str(e).lower(): + logger.debug("Cancel ignored - response already completed") + else: + logger.warning("Cancel failed: %s", e) + + elif event.type == ServerEventType.INPUT_AUDIO_BUFFER_SPEECH_STOPPED: + logger.info("🎤 User stopped speaking") + print("🤔 Processing...") + + elif event.type == ServerEventType.RESPONSE_CREATED: + logger.info("🤖 Assistant response created") + self._active_response = True + self._response_api_done = False + + elif event.type == ServerEventType.RESPONSE_AUDIO_DELTA: + logger.debug("Received audio delta") + ap.queue_audio(event.delta) + + elif event.type == ServerEventType.RESPONSE_AUDIO_DONE: + logger.info("🤖 Assistant finished speaking") + print("🎤 Ready for next input...") + + elif event.type == ServerEventType.RESPONSE_DONE: + logger.info("✅ Response complete") + self._active_response = False + self._response_api_done = True + + elif event.type == ServerEventType.ERROR: + msg = event.error.message + if "Cancellation failed: no active response" in msg: + logger.debug("Benign cancellation error: %s", msg) + else: + logger.error("❌ VoiceLive error: %s", msg) + print(f"Error: {msg}") + + elif event.type == ServerEventType.CONVERSATION_ITEM_CREATED: + logger.debug("Conversation item created: %s", event.item.id) + + else: + logger.debug("Unhandled event type: %s", event.type) + + +def parse_arguments(): + """Parse command line arguments.""" + parser = argparse.ArgumentParser( + description="Basic Voice Assistant using Azure VoiceLive SDK", + formatter_class=argparse.ArgumentDefaultsHelpFormatter, + ) + + parser.add_argument( + "--api-key", + help="Azure VoiceLive API key. If not provided, will use AZURE_VOICELIVE_API_KEY environment variable.", + type=str, + default=os.environ.get("AZURE_VOICELIVE_API_KEY"), + ) + + parser.add_argument( + "--endpoint", + help="Azure VoiceLive endpoint", + type=str, + default=os.environ.get("AZURE_VOICELIVE_ENDPOINT", "wss://api.voicelive.com/v1"), + ) + + parser.add_argument( + "--model", + help="VoiceLive model to use", + type=str, + default=os.environ.get("AZURE_VOICELIVE_MODEL", "gpt-realtime"), + ) + + parser.add_argument( + "--voice", + help="Voice to use for the assistant. E.g. alloy, echo, fable, en-US-AvaNeural, en-US-GuyNeural", + type=str, + default=os.environ.get("AZURE_VOICELIVE_VOICE", "en-US-Ava:DragonHDLatestNeural"), + ) + + parser.add_argument( + "--instructions", + help="System instructions for the AI assistant", + type=str, + default=os.environ.get( + "AZURE_VOICELIVE_INSTRUCTIONS", + "You are a helpful AI assistant. Respond naturally and conversationally. " + "Keep your responses concise but engaging. Always start the conversation in English.", + ), + ) + + parser.add_argument( + "--use-token-credential", help="Use Azure token credential instead of API key", action="store_true", default=False + ) + + parser.add_argument("--verbose", help="Enable verbose logging", action="store_true") + + return parser.parse_args() + + +def main(): + """Main function.""" + args = parse_arguments() + + # Set logging level + if args.verbose: + logging.getLogger().setLevel(logging.DEBUG) + + # Validate credentials + if not args.api_key and not args.use_token_credential: + print("❌ Error: No authentication provided") + print("Please provide an API key using --api-key or set AZURE_VOICELIVE_API_KEY environment variable,") + print("or use --use-token-credential for Azure authentication.") + sys.exit(1) + + # Create client with appropriate credential + credential: Union[AzureKeyCredential, AsyncTokenCredential] + if args.use_token_credential: + credential = AzureCliCredential() # or DefaultAzureCredential() if needed + logger.info("Using Azure token credential") + else: + credential = AzureKeyCredential(args.api_key) + logger.info("Using API key credential") + + # Create and start voice assistant + assistant = BasicVoiceAssistant( + endpoint=args.endpoint, + credential=credential, + model=args.model, + voice=args.voice, + instructions=args.instructions, + ) + + # Setup signal handlers for graceful shutdown + def signal_handler(_sig, _frame): + logger.info("Received shutdown signal") + raise KeyboardInterrupt() + + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + # Start the assistant + try: + asyncio.run(assistant.start()) + except KeyboardInterrupt: + print("\n👋 Voice assistant shut down. Goodbye!") + except Exception as e: + print("Fatal Error: ", e) + +if __name__ == "__main__": + # Check audio system + try: + p = pyaudio.PyAudio() + # Check for input devices + input_devices = [ + i + for i in range(p.get_device_count()) + if cast(Union[int, float], p.get_device_info_by_index(i).get("maxInputChannels", 0) or 0) > 0 + ] + # Check for output devices + output_devices = [ + i + for i in range(p.get_device_count()) + if cast(Union[int, float], p.get_device_info_by_index(i).get("maxOutputChannels", 0) or 0) > 0 + ] + p.terminate() + + if not input_devices: + print("❌ No audio input devices found. Please check your microphone.") + sys.exit(1) + if not output_devices: + print("❌ No audio output devices found. Please check your speakers.") + sys.exit(1) + + except Exception as e: + print(f"❌ Audio system check failed: {e}") + sys.exit(1) + + print("🎙️ Basic Voice Assistant with Azure VoiceLive SDK") + print("=" * 50) + + # Run the assistant + main()