diff --git a/examples/src/elevenlabs_agent.ts b/examples/src/elevenlabs_agent.ts new file mode 100644 index 000000000..0eab7dc5b --- /dev/null +++ b/examples/src/elevenlabs_agent.ts @@ -0,0 +1,54 @@ +// SPDX-FileCopyrightText: 2024 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import type { llm } from '@livekit/agents'; +import { type JobContext, WorkerOptions, cli, defineAgent, multimodal } from '@livekit/agents'; +import * as elevenlabs from '@livekit/agents-plugin-elevenlabs'; +import { fileURLToPath } from 'node:url'; +import { z } from 'zod'; + +export default defineAgent({ + entry: async (ctx: JobContext) => { + await ctx.connect(); + + console.log('waiting for participant'); + const participant = await ctx.waitForParticipant(); + console.log(`starting assistant example agent for ${participant.identity}`); + + const model = new elevenlabs.realtime.RealtimeModel({ + agentId: 'oYxMlLkXbNtZDS3zCikc', //Placeholder for public agent, no API key required + audioOptions: { + sampleRate: 22050, + inFrameSize: 2205, + outFrameSize: 1102, + }, + }); + + // This function will not execute, this is just an example, + // in order to define client tools you need to configure your agent + // see: https://elevenlabs.io/docs/conversational-ai/customization/tools/client-tools + const fncCtx: llm.FunctionContext = { + logMessage: { + description: 'logMessage', + parameters: z.object({ + message: z.string(), + }), + execute: async ({ message }) => { + console.debug(`executing logMessage function for ${message}`); + return `Logged message`; + }, + }, + }; + + const agent = new multimodal.MultimodalAgent({ + model, + fncCtx, + }); + + await agent + .start(ctx.room, participant) + .then((session) => session as elevenlabs.realtime.RealtimeSession); + }, +}); + +cli.runApp(new WorkerOptions({ agent: fileURLToPath(import.meta.url) })); diff --git a/plugins/elevenlabs/src/index.ts b/plugins/elevenlabs/src/index.ts index 6b176049a..0e6690c88 100644 --- a/plugins/elevenlabs/src/index.ts +++ b/plugins/elevenlabs/src/index.ts @@ -2,4 +2,6 @@ // // SPDX-License-Identifier: Apache-2.0 -export * from './tts.js'; +export * as realtime from './realtime/index.js'; +export * from './models.js'; +export { type TTSOptions, TTS, SynthesizeStream } from './tts.js'; diff --git a/plugins/elevenlabs/src/realtime/api_proto.ts b/plugins/elevenlabs/src/realtime/api_proto.ts new file mode 100644 index 000000000..e84477da0 --- /dev/null +++ b/plugins/elevenlabs/src/realtime/api_proto.ts @@ -0,0 +1,187 @@ +// SPDX-FileCopyrightText: 2024 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 + +export const BASE_URL = 'wss://api.elevenlabs.io/v1/convai/conversation'; +export const AUTHORIZATION_HEADER = 'xi-api-key'; + +export const DEFAULT_SAMPLE_RATE = 16000; +export const NUM_CHANNELS = 1; +export const DEFAULT_IN_FRAME_SIZE = 1600; // 100ms +export const DEFAULT_OUT_FRAME_SIZE = 800; // 50ms + +export type AudioFormat = string; //'pcm_16000' | 'pcm_22050' | 'pcm_44100' +export type ContentType = 'audio'; + +export type ClientEventType = + | 'user_audio_chunk' + | 'pong' + | 'conversation_initiation_client_data' + | 'client_tool_result' //TODO: Add support + | 'contextual_update'; //TODO: Add support +export type ServerEventType = + | 'conversation_initiation_metadata' + | 'user_transcript' + | 'agent_response' + | 'agent_response_correction' + | 'audio' + | 'interruption' + | 'ping' + | 'client_tool_call' //TODO: Add support + | 'contextual_update' //TODO: Add support + | 'vad_score' + | 'internal_tentative_agent_response'; + +export type AudioBase64Bytes = string; + +interface BaseClientEvent { + type: ClientEventType; +} + +export interface UserAudioChunkEvent extends BaseClientEvent { + type: 'user_audio_chunk'; + audio: AudioBase64Bytes; +} + +export interface PongEvent extends BaseClientEvent { + type: 'pong'; + event_id: number; //The id of the ping event being responded to +} + +export interface ConversationInitiationClientDataEvent extends BaseClientEvent { + type: 'conversation_initiation_client_data'; + conversation_config_override?: { + agent?: { + prompt?: { + prompt?: string; + }; + first_message?: string; + language?: string; + }; + tts?: { + voice_id?: string; + }; + }; + custom_llm_extra_body?: { + temperature?: number; + max_tokenx?: number; + }; + dynamic_variables?: Record; +} + +// This is only used for client-side tools +// see: https://elevenlabs.io/docs/conversational-ai/customization/tools/client-tools +export interface ClientToolResultEvent extends BaseClientEvent { + type: 'client_tool_result'; + tool_call_id?: string; //Id of the tool call being responded to + result?: string; + is_error: boolean; +} + +export interface ContextualUpdateEvent extends BaseClientEvent { + type: 'contextual_update'; + text: string; //Contextual information to be added to the conversation state +} + +export type ClientEvent = + | UserAudioChunkEvent + | PongEvent + | ConversationInitiationClientDataEvent + | ClientToolResultEvent + | ContextualUpdateEvent; + +interface BaseServerEvent { + type: ServerEventType; +} + +export interface ConversationInitiationMetadataEvent extends BaseServerEvent { + type: 'conversation_initiation_metadata'; + conversation_id: string; + agent_output_audio_format: AudioFormat; + user_input_audio_format: AudioFormat; +} + +export interface UserTranscriptEvent extends BaseServerEvent { + type: 'user_transcript'; + user_transcription_event: { + user_transcript?: string; + }; +} + +export interface AgentResponseEvent extends BaseServerEvent { + type: 'agent_response'; + agent_response_event: { + agent_response: string; + }; +} + +export interface AgentResponseCorrectionEvent extends BaseServerEvent { + type: 'agent_response_correction'; + correction_event: { + corrected_response: string; + }; +} + +export interface AudioResponseEvent extends BaseServerEvent { + type: 'audio'; + audio_event: { + audio_base_64: AudioBase64Bytes; + event_id: number; //Sequential identifier for the audio chunk + }; +} + +export interface InterruptionEvent extends BaseServerEvent { + type: 'interruption'; + interruption_event?: { + event_id: number; //Id of the event that was interrupted + }; +} + +export interface PingEvent extends BaseServerEvent { + type: 'ping'; + ping_event: { + event_id: number; + ping_ms?: number; //Measured round-trip latency in ms + }; +} + +export interface ClientToolCallEvent extends BaseServerEvent { + type: 'client_tool_call'; + client_tool_call: { + tool_name: string; + tool_call_id: string; + parameters?: Record; + }; +} + +export interface ContextualUpdateServerEvent extends BaseServerEvent { + type: 'contextual_update'; + text: string; //Contextual information to be added to the conversation state +} + +export interface VadScoreEvent extends BaseServerEvent { + type: 'vad_score'; + vad_score_event: { + vad_score: number; // VAD confidence score between 0 and 1 + }; +} + +export interface InternalTentativeAgentResponseEvent extends BaseServerEvent { + type: 'internal_tentative_agent_response'; + internal_tentative_agent_response_event: { + tentative_agent_response: string; // Preliminary text from the agent + }; +} + +export type ServerEvent = + | ConversationInitiationMetadataEvent + | UserTranscriptEvent + | AgentResponseEvent + | AgentResponseCorrectionEvent + | AudioResponseEvent + | InterruptionEvent + | PingEvent + | ClientToolCallEvent + | ContextualUpdateServerEvent + | VadScoreEvent + | InternalTentativeAgentResponseEvent; diff --git a/plugins/elevenlabs/src/realtime/index.ts b/plugins/elevenlabs/src/realtime/index.ts new file mode 100644 index 000000000..29a91473a --- /dev/null +++ b/plugins/elevenlabs/src/realtime/index.ts @@ -0,0 +1,5 @@ +// SPDX-FileCopyrightText: 2024 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +export * from './api_proto.js'; +export * from './realtime_model.js'; diff --git a/plugins/elevenlabs/src/realtime/realtime_model.ts b/plugins/elevenlabs/src/realtime/realtime_model.ts new file mode 100644 index 000000000..20d124409 --- /dev/null +++ b/plugins/elevenlabs/src/realtime/realtime_model.ts @@ -0,0 +1,444 @@ +// SPDX-FileCopyrightText: 2024 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import { AsyncIterableQueue, Queue, type llm, log, multimodal } from '@livekit/agents'; +import { AudioFrame } from '@livekit/rtc-node'; +import { randomUUID } from 'node:crypto'; +import { once } from 'node:events'; +import { WebSocket } from 'ws'; +import * as api_proto from './api_proto.js'; + +interface ModelAudioOptions { + sampleRate: number; + inFrameSize: number; + outFrameSize: number; +} + +interface ModelOptions { + agentId: string; + configOverride?: Omit; + apiKey?: string; + audioOptions?: ModelAudioOptions; +} + +class InputAudioBuffer { + #session: RealtimeSession; + + constructor(session: RealtimeSession) { + this.#session = session; + } + + append(frame: AudioFrame) { + this.#session.queueMsg({ + type: 'user_audio_chunk', + audio: Buffer.from(frame.data.buffer).toString('base64'), + }); + } +} + +export interface RealtimeContent { + itemId: string; + contentIndex: number; + audio: AudioFrame[]; + text: string; + textStream: AsyncIterableQueue; + audioStream: AsyncIterableQueue; + contentType: api_proto.ContentType; + locked: boolean; +} + +export class RealtimeModel extends multimodal.RealtimeModel { + numChannels = api_proto.NUM_CHANNELS; + sampleRate: number; + inFrameSize: number; + outFrameSize: number; + + #defaultOpts: ModelOptions; + #sessions: RealtimeSession[] = []; + + constructor({ + agentId, + configOverride = {}, + audioOptions = { + sampleRate: api_proto.DEFAULT_SAMPLE_RATE, + inFrameSize: api_proto.DEFAULT_IN_FRAME_SIZE, + outFrameSize: api_proto.DEFAULT_OUT_FRAME_SIZE, + }, + apiKey = process.env.ELEVEN_API_KEY, + }: ModelOptions) { + super(); + + // ElevenLabs' agents might have different sample rates, + // most commonly 16000 and 22050. + this.sampleRate = audioOptions.sampleRate; + this.inFrameSize = audioOptions.inFrameSize; + this.outFrameSize = audioOptions.outFrameSize; + + if (!apiKey) { + console.debug( + 'ElevenLabs API key is required in order to use private agents. Access restricted to public agents.', + ); + } + + this.#defaultOpts = { + agentId, + configOverride, + audioOptions, + apiKey, + }; + } + + get sessions(): RealtimeSession[] { + return this.#sessions; + } + + session({ fncCtx }: { fncCtx?: llm.FunctionContext }): multimodal.RealtimeSession { + const newSession = new RealtimeSession(this.#defaultOpts, { fncCtx }); + this.#sessions.push(newSession); + return newSession; + } + + async close(): Promise { + await Promise.allSettled(this.#sessions.map((session) => session.close())); + } +} + +export class RealtimeSession extends multimodal.RealtimeSession { + #pendingContent: { [id: number]: RealtimeContent } = {}; + #fncCtx: llm.FunctionContext | undefined = undefined; + #opts: ModelOptions; + #ws: WebSocket | null = null; + #task: Promise; + #logger = log(); + #sendQueue = new Queue(); + + constructor(opts: ModelOptions, { fncCtx }: { fncCtx?: llm.FunctionContext }) { + super(); + this.#opts = opts; + this.#fncCtx = fncCtx; + + this.#task = this.#start(); + } + + get fncCtx(): llm.FunctionContext | undefined { + return this.#fncCtx; + } + + set fncCtx(ctx: llm.FunctionContext | undefined) { + this.#fncCtx = ctx; + } + + get conversation() { + return { + item: { + truncate: () => {}, + }, + }; + } + + get inputAudioBuffer(): InputAudioBuffer { + return new InputAudioBuffer(this); + } + + queueMsg(command: api_proto.ClientEvent): void { + this.#sendQueue.put(command); + } + + #getContent(id?: number): RealtimeContent | undefined { + if (id) return this.#pendingContent[id]; + //if no id is provided, return the free content with the lowest index + const freeContents = Object.values(this.#pendingContent).filter((a) => !a.locked); + if (freeContents.length === 0) return undefined; + return freeContents.reduce((a, b) => (a.contentIndex < b.contentIndex ? a : b)); + } + + /// Truncates the data field of the event to the specified maxLength to avoid overwhelming logs + /// with large amounts of base64 audio data. + #loggableEvent( + event: api_proto.ClientEvent | api_proto.ServerEvent, + maxLength: number = 30, + ): Record { + const truncateString = (str: string, maxLength: number) => + str.slice(0, maxLength) + (str.length > maxLength ? '…' : ''); + + if (event.type === 'user_audio_chunk') { + return { ...event, audio: truncateString(event.audio as string, maxLength) }; + } + + if (event.type === 'audio') { + return { + ...event, + audio_event: { + ...event.audio_event, + audio_base_64: truncateString(event.audio_event.audio_base_64 as string, maxLength), + }, + }; + } + + return { ...event }; + } + + /** + * Required method in the LiveKit API. + * + * @remarks + * This method is part of the LiveKit API contract but has no equivalent functionality + * in the ElevenLabs implementation. It is not supported and will throw an error if called. + */ + recoverFromTextResponse() { + throw new Error('Recovery from text is not supported on this model'); + } + + async #getSignedURL(): Promise { + const url = new URL(api_proto.BASE_URL + '/get_signed_url'); + url.protocol = 'https:'; + url.searchParams.set('agent_id', this.#opts.agentId); + + const response = await fetch(url.toString(), { + headers: { + [api_proto.AUTHORIZATION_HEADER]: this.#opts.apiKey || '', + }, + }); + + if (!response.ok) { + throw new Error(`Failed to fetch signed URL: ${response.statusText}`); + } + + const data = await response.json(); + return data.signed_url; + } + + #getPublicURL(): string { + const url = new URL(api_proto.BASE_URL); + url.searchParams.set('agent_id', this.#opts.agentId); + return url.toString(); + } + + #start(): Promise { + return new Promise(async (resolve, reject) => { + const headers: Record = { + 'User-Agent': 'LiveKit-Agents-JS', + }; + + const url = this.#opts.apiKey ? await this.#getSignedURL() : this.#getPublicURL(); + + console.debug('Connecting to ElevenLabs Conversational AI API at', url); + this.#ws = new WebSocket(url, { + headers, + }); + + this.#ws.onerror = (error) => { + reject(new Error('ElevenLabs Conversational AI WebSocket error: ' + error.message)); + }; + + await once(this.#ws, 'open'); + + // First, we send the conversation initiation event to the server. + this.#ws.send( + JSON.stringify({ + type: 'conversation_initiation_client_data', + ...this.#opts.configOverride, + } satisfies api_proto.ConversationInitiationClientDataEvent), + ); + + this.#ws.onmessage = (message) => { + const event: api_proto.ServerEvent = JSON.parse(message.data.toString()); + if (event.type !== 'ping') { + this.#logger.debug(`<- ${JSON.stringify(this.#loggableEvent(event))}`); + } + switch (event.type) { + case 'conversation_initiation_metadata': + this.#handleConversationInitialization(event); + break; + case 'user_transcript': + this.#handleUserTranscript(event); + break; + case 'agent_response': + this.#handleAgentTextResponse(event); + break; + case 'audio': + this.#handleIncomingAudio(event); + break; + case 'interruption': + this.#handleInterruption(event); + break; + case 'ping': + this.#handlePing(event); + break; + case 'client_tool_call': + this.#handleClientToolCall(event); + break; + //NOTE: Not supported yet + case 'contextual_update': + case 'vad_score': + //NOTE: defug events + case 'agent_response_correction': + case 'internal_tentative_agent_response': + break; + } + }; + + const sendTask = async () => { + while (this.#ws && this.#ws.readyState === WebSocket.OPEN) { + try { + const event = await this.#sendQueue.get(); + this.#logger.debug(`-> ${JSON.stringify(this.#loggableEvent(event))}`); + + // The ElevenLabs API defines all events in a consistent manner, except for `user_audio_chunk`. + // To address this inconsistency, we standardize the event to match the common structure. + // However, when sending the event, it must be transformed back to align with the ElevenLabs specification. + // see: https://elevenlabs.io/docs/conversational-ai/api-reference/conversational-ai/websocket#send.User-Audio-Chunk + const dataToSend = + event.type === 'user_audio_chunk' ? { user_audio_chunk: event.audio } : event; + + this.#ws.send(JSON.stringify(dataToSend)); + } catch (error) { + this.#logger.error('Error sending event:', error); + } + } + }; + + sendTask(); + + this.#ws.onclose = () => { + this.#ws = null; + this.#logger.debug('WebSocket connection closed by ElevenLabs'); + resolve(); + }; + }); + } + + async close() { + if (!this.#ws) return; + this.#ws.close(); + await this.#task; + } + + #handlePing(event: api_proto.PingEvent): void { + this.queueMsg({ + type: 'pong', + event_id: event.ping_event.event_id, + }); + } + + #handleIncomingAudio(event: api_proto.AudioResponseEvent): void { + const data = Buffer.from(event.audio_event.audio_base_64, 'base64'); + const audio = new AudioFrame( + new Int16Array(data.buffer), + this.#opts.audioOptions?.sampleRate || api_proto.DEFAULT_SAMPLE_RATE, + api_proto.NUM_CHANNELS, + data.length / 2, + ); + const content = this.#getContent(event.audio_event.event_id); + if (content) { + // If we already received an event with the same id, we can just append the audio to the existing content + content.audio.push(audio); + content.audioStream.put(audio); + this.emit('response_content_updated', content); + } else { + // If we haven't received an event with the same id, we need to create a new content object + const textStream = new AsyncIterableQueue(); + const audioStream = new AsyncIterableQueue(); + audioStream.put(audio); + + const newContent: RealtimeContent = { + itemId: randomUUID(), + contentIndex: event.audio_event.event_id, + text: '', + audio: [audio], + textStream, + audioStream, + contentType: 'audio', + locked: false, + }; + this.#pendingContent[event.audio_event.event_id] = newContent; + this.emit('response_content_added', newContent); + } + } + + #handleAgentTextResponse(event: api_proto.AgentResponseEvent): void { + const content = this.#getContent(); // get the first remaining content + + //Ignore, is a silence that doesn't have an audio content, + //but ElevenLabs sends '...' + if (!content) return; + + //lock the content so it can't be modified anymore + content.locked = true; + + const transcript = event.agent_response_event.agent_response; + content.text = transcript; + content.textStream.put(transcript); + + //close the streams because the AgentReponseEvent is the last event for this content + content.textStream.close(); + content.audioStream.close(); + + //TODO: figure out how to clean the content from the pendingContent map + + this.emit('response_content_done', content); + } + + #handleUserTranscript(event: api_proto.UserTranscriptEvent): void { + const transcript = event.user_transcription_event.user_transcript || ''; + this.emit('input_speech_transcription_completed', { + itemId: randomUUID(), + transcript, + }); + } + + /** + * Handles client-side tools for ElevenLabs Conversational AI. + * ElevenLabs supports multiple types of tools, most of which are executed server-side. + * This method is specifically for handling client-side tools. + * For more details, see: + * https://elevenlabs.io/docs/conversational-ai/customization/tools/client-tools + */ + #handleClientToolCall(event: api_proto.ClientToolCallEvent) { + if (!this.#fncCtx) { + this.#logger.error('function call received but no fncCtx is available'); + return; + } + + const toolName = event.client_tool_call.tool_name; + const callId = event.client_tool_call.tool_call_id; + const params = event.client_tool_call.parameters || {}; + + const func = this.#fncCtx?.[toolName]; + if (!func) { + this.#logger.error(`no function with name ${toolName} in fncCtx`); + return; + } + this.#logger.debug(`[Function Call ${callId}] Executing ${toolName} with arguments ${params}`); + + func.execute(params).then( + (content) => { + this.#logger.debug(`[Function Call ${callId}] ${toolName} returned ${content}`); + this.queueMsg({ + type: 'client_tool_result', + tool_call_id: callId, + result: content || '', + is_error: false, + }); + }, + (error) => { + this.#logger.error(`[Function Call ${callId}] ${toolName} failed with ${error}`); + this.queueMsg({ + type: 'client_tool_result', + tool_call_id: callId, + is_error: true, + }); + }, + ); + } + // eslint-disable-next-line @typescript-eslint/no-unused-vars + #handleConversationInitialization(event: api_proto.ConversationInitiationMetadataEvent): void {} + + // eslint-disable-next-line @typescript-eslint/no-unused-vars + #handleInterruption(event: api_proto.InterruptionEvent) { + //NOTE: we don't need to close the content here, because the "agent_response_event" will do that + this.emit('input_speech_started', { + itemId: randomUUID(), + }); + } +}