-
Notifications
You must be signed in to change notification settings - Fork 429
Open
Description
I am using Gemini Live API with FastRTC. It works fine with AsyncStreamHandler class without interruptions. for interruptions when i use reply on pause class agent is not interrupting at all. I want that while agent is replying when I speak it should stop.
Any help will be appreciated.
import asyncio
import base64
import json
import os
import pathlib
from typing import AsyncGenerator, Literal
import gradio as gr
import numpy as np
from dotenv import load_dotenv
from fastapi import FastAPI
from fastapi.responses import HTMLResponse
from fastrtc import (
AlgoOptions,
ReplyOnPause,
SileroVadOptions,
Stream,
get_cloudflare_turn_credentials_async,
)
from google import genai
from google.genai.types import (
LiveConnectConfig,
PrebuiltVoiceConfig,
SpeechConfig,
VoiceConfig,
)
from gradio.utils import get_space
from pydantic import BaseModel
current_dir = pathlib.Path(__file__).parent
load_dotenv()
def encode_audio(data: np.ndarray) -> str:
"""Encode Audio data to send to the server"""
return base64.b64encode(data.tobytes()).decode("UTF-8")
# Gemini client singleton to maintain session across calls
gemini_sessions = {}
async def audio_generator(audio_array: np.ndarray) -> AsyncGenerator[bytes, None]:
"""Convert numpy audio to bytes generator for Gemini"""
audio_message = encode_audio(audio_array)
yield audio_message
async def get_gemini_response(
audio: tuple[int, np.ndarray],
state: dict,
api_key: str = None,
voice_name: str = "Puck",
) -> AsyncGenerator[tuple[int, np.ndarray], None]:
"""
Generator function for ReplyOnPause that sends user audio to Gemini
and yields back the AI response audio chunks.
This function handles pause detection and allows interruption.
When interrupted (user starts speaking), GeneratorExit is raised
and we clean up gracefully.
"""
sample_rate, audio_array = audio
# Initialize Gemini client
client = genai.Client(
api_key=api_key or os.getenv("GEMINI_API_KEY"),
http_options={"api_version": "v1alpha"},
)
config = LiveConnectConfig(
response_modalities=["AUDIO"], # type: ignore
speech_config=SpeechConfig(
voice_config=VoiceConfig(
prebuilt_voice_config=PrebuiltVoiceConfig(
voice_name=voice_name,
)
)
),
)
# Convert input audio for Gemini
audio_array = audio_array.squeeze()
try:
async with client.aio.live.connect(
model="gemini-2.0-flash-exp", config=config
) as session:
async for audio_response in session.start_stream(
stream=audio_generator(audio_array),
mime_type="audio/pcm"
):
if audio_response.data:
array = np.frombuffer(audio_response.data, dtype=np.int16)
yield (24000, array) # Gemini outputs at 24kHz
except GeneratorExit:
print("Response interrupted by user")
except Exception as e:
print(f"Error in Gemini response: {e}")
stream = Stream(
modality="audio",
mode="send-receive",
handler=ReplyOnPause(
get_gemini_response,
can_interrupt=True,
algo_options=AlgoOptions(
audio_chunk_duration=0.6, # Process audio in 600ms chunks
started_talking_threshold=0.2, # Need 200ms of speech to start
speech_threshold=0.1, # Less than 100ms speech indicates pause
max_continuous_speech_s=30.0, # Max 30s before forcing response
),
model_options=SileroVadOptions(
threshold=0.5, # Confidence threshold for speech detection
min_speech_duration_ms=250, # Minimum speech duration
min_silence_duration_ms=100, # Minimum silence to detect pause
),
input_sample_rate=16000, # Input audio at 16kHz
output_sample_rate=24000, # Output audio at 24kHz
),
rtc_configuration=get_cloudflare_turn_credentials_async if get_space() else None,
concurrency_limit=5 if get_space() else None,
time_limit=90 if get_space() else None,
additional_inputs=[
gr.Textbox(
label="API Key",
type="password",
value=os.getenv("GEMINI_API_KEY") if not get_space() else "",
),
gr.Dropdown(
label="Voice",
choices=[
"Puck",
"Charon",
"Kore",
"Fenrir",
"Aoede",
],
value="Puck",
),
],
)
class InputData(BaseModel):
webrtc_id: str
voice_name: str
api_key: str
app = FastAPI()
stream.mount(app)
@app.post("/input_hook")
async def _(body: InputData):
stream.set_input(body.webrtc_id, body.api_key, body.voice_name)
return {"status": "ok"}
@app.get("/")
async def index():
rtc_config = await get_cloudflare_turn_credentials_async() if get_space() else None
html_content = (current_dir / "index.html").read_text()
html_content = html_content.replace("__RTC_CONFIGURATION__", json.dumps(rtc_config))
return HTMLResponse(content=html_content)
if __name__ == "__main__":
import os
if (mode := os.getenv("MODE")) == "UI":
stream.ui.launch(server_port=7860)
elif mode == "PHONE":
stream.fastphone(host="0.0.0.0", port=7860)
else:
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels