Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/sour-mugs-lay.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@livekit/agents-plugin-google': patch
'@livekit/agents-plugin-openai': patch
'@livekit/agents': patch
---

Support openai half-duplex mode (audio in -> text out -> custom TTS model)
9 changes: 8 additions & 1 deletion agents/src/llm/realtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ export interface MessageGeneration {
messageId: string;
textStream: ReadableStream<string>;
audioStream: ReadableStream<AudioFrame>;
modalities?: Promise<('text' | 'audio')[]>;
}

export interface GenerationCreatedEvent {
Expand All @@ -40,6 +41,7 @@ export interface RealtimeCapabilities {
turnDetection: boolean;
userTranscription: boolean;
autoToolReplyGeneration: boolean;
audioOutput: boolean;
}

export interface InputTranscriptionCompleted {
Expand Down Expand Up @@ -121,7 +123,12 @@ export abstract class RealtimeSession extends EventEmitter {
/**
* Truncate the message at the given audio end time
*/
abstract truncate(options: { messageId: string; audioEndMs: number }): Promise<void>;
abstract truncate(options: {
messageId: string;
audioEndMs: number;
modalities?: ('text' | 'audio')[];
audioTranscript?: string;
}): Promise<void>;

async close(): Promise<void> {
this._mainTask.cancel();
Expand Down
83 changes: 67 additions & 16 deletions agents/src/voice/agent_activity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,14 @@ export class AgentActivity implements RecognitionHooks {
} catch (error) {
this.logger.error(error, 'failed to update the tools');
}

if (!this.llm.capabilities.audioOutput && !this.tts && this.agentSession.output.audio) {
this.logger.error(
'audio output is enabled but RealtimeModel has no audio modality ' +
'and no TTS is set. Either enable audio modality in the RealtimeModel ' +
'or set a TTS model.',
);
}
} else if (this.llm instanceof LLM) {
try {
updateInstructions({
Expand Down Expand Up @@ -1612,7 +1620,7 @@ export class AgentActivity implements RecognitionHooks {

const readMessages = async (
abortController: AbortController,
outputs: Array<[string, _TextOut | null, _AudioOut | null]>,
outputs: Array<[string, _TextOut | null, _AudioOut | null, ('text' | 'audio')[] | undefined]>,
) => {
replyAbortController.signal.addEventListener('abort', () => abortController.abort(), {
once: true,
Expand All @@ -1627,7 +1635,25 @@ export class AgentActivity implements RecognitionHooks {
);
break;
}
const trNodeResult = await this.agent.transcriptionNode(msg.textStream, modelSettings);

const msgModalities = msg.modalities ? await msg.modalities : undefined;
let ttsTextInput: ReadableStream<string> | null = null;
let trTextInput: ReadableStream<string>;

if (msgModalities && !msgModalities.includes('audio') && this.tts) {
if (this.llm instanceof RealtimeModel && this.llm.capabilities.audioOutput) {
this.logger.warn(
'text response received from realtime API, falling back to use a TTS model.',
);
}
const [_ttsTextInput, _trTextInput] = msg.textStream.tee();
ttsTextInput = _ttsTextInput;
trTextInput = _trTextInput;
} else {
trTextInput = msg.textStream;
}

const trNodeResult = await this.agent.transcriptionNode(trTextInput, modelSettings);
let textOut: _TextOut | null = null;
if (trNodeResult) {
const [textForwardTask, _textOut] = performTextForwarding(
Expand All @@ -1638,30 +1664,51 @@ export class AgentActivity implements RecognitionHooks {
forwardTasks.push(textForwardTask);
textOut = _textOut;
}

let audioOut: _AudioOut | null = null;
if (audioOutput) {
const realtimeAudio = await this.agent.realtimeAudioOutputNode(
msg.audioStream,
modelSettings,
);
if (realtimeAudio) {
let realtimeAudioResult: ReadableStream<AudioFrame> | null = null;

if (ttsTextInput) {
const [ttsTask, ttsStream] = performTTSInference(
(...args) => this.agent.ttsNode(...args),
ttsTextInput,
modelSettings,
abortController,
);
tasks.push(ttsTask);
realtimeAudioResult = ttsStream;
} else if (msgModalities && msgModalities.includes('audio')) {
realtimeAudioResult = await this.agent.realtimeAudioOutputNode(
msg.audioStream,
modelSettings,
);
} else if (this.llm instanceof RealtimeModel && this.llm.capabilities.audioOutput) {
this.logger.error(
'Text message received from Realtime API with audio modality. ' +
'This usually happens when text chat context is synced to the API. ' +
'Try to add a TTS model as fallback or use text modality with TTS instead.',
);
} else {
this.logger.warn(
'audio output is enabled but neither tts nor realtime audio is available',
);
}

if (realtimeAudioResult) {
const [forwardTask, _audioOut] = performAudioForwarding(
realtimeAudio,
realtimeAudioResult,
audioOutput,
abortController,
);
forwardTasks.push(forwardTask);
audioOut = _audioOut;
audioOut.firstFrameFut.await.finally(onFirstFrame);
} else {
this.logger.warn(
'audio output is enabled but neither tts nor realtime audio is available',
);
}
} else if (textOut) {
textOut.firstTextFut.await.finally(onFirstFrame);
}
outputs.push([msg.messageId, textOut, audioOut]);
outputs.push([msg.messageId, textOut, audioOut, msgModalities]);
}
await waitFor(forwardTasks);
} catch (error) {
Expand All @@ -1671,7 +1718,9 @@ export class AgentActivity implements RecognitionHooks {
}
};

const messageOutputs: Array<[string, _TextOut | null, _AudioOut | null]> = [];
const messageOutputs: Array<
[string, _TextOut | null, _AudioOut | null, ('text' | 'audio')[] | undefined]
> = [];
const tasks = [
Task.from(
(controller) => readMessages(controller, messageOutputs),
Expand Down Expand Up @@ -1750,7 +1799,7 @@ export class AgentActivity implements RecognitionHooks {

if (messageOutputs.length > 0) {
// there should be only one message
const [msgId, textOut, audioOut] = messageOutputs[0]!;
const [msgId, textOut, audioOut, msgModalities] = messageOutputs[0]!;
let forwardedText = textOut?.text || '';

if (audioOutput) {
Expand All @@ -1775,6 +1824,8 @@ export class AgentActivity implements RecognitionHooks {
this.realtimeSession.truncate({
messageId: msgId,
audioEndMs: Math.floor(playbackPosition),
modalities: msgModalities,
audioTranscript: forwardedText,
});
}

Expand Down Expand Up @@ -1805,7 +1856,7 @@ export class AgentActivity implements RecognitionHooks {

if (messageOutputs.length > 0) {
// there should be only one message
const [msgId, textOut, _] = messageOutputs[0]!;
const [msgId, textOut, _, __] = messageOutputs[0]!;
const message = ChatMessage.create({
role: 'assistant',
content: textOut?.text || '',
Expand Down
77 changes: 77 additions & 0 deletions examples/src/realtime_with_tts.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// SPDX-FileCopyrightText: 2025 LiveKit, Inc.
//
// SPDX-License-Identifier: Apache-2.0
import {
type JobContext,
type JobProcess,
ServerOptions,
cli,
defineAgent,
llm,
log,
voice,
} from '@livekit/agents';
import * as elevenlabs from '@livekit/agents-plugin-elevenlabs';
import * as openai from '@livekit/agents-plugin-openai';
import * as silero from '@livekit/agents-plugin-silero';
import { BackgroundVoiceCancellation } from '@livekit/noise-cancellation-node';
import { fileURLToPath } from 'node:url';
import { z } from 'zod';

export default defineAgent({
prewarm: async (proc: JobProcess) => {
proc.userData.vad = await silero.VAD.load();
},
entry: async (ctx: JobContext) => {
const logger = log();

const getWeather = llm.tool({
description: 'Called when the user asks about the weather.',
parameters: z.object({
location: z.string().describe('The location to get the weather for'),
}),
execute: async ({ location }) => {
logger.info(`getting weather for ${location}`);
return `The weather in ${location} is sunny, and the temperature is 20 degrees Celsius.`;
},
});

const agent = new voice.Agent({
instructions: 'You are a helpful assistant. Always speak in English.',
tools: {
getWeather,
},
});

const session = new voice.AgentSession({
// Use RealtimeModel with text-only modality + separate TTS
llm: new openai.realtime.RealtimeModel({
modalities: ['text'],
}),
tts: new elevenlabs.TTS(),
voiceOptions: {
maxToolSteps: 5,
},
});

await session.start({
agent,
room: ctx.room,
inputOptions: {
noiseCancellation: BackgroundVoiceCancellation(),
},
outputOptions: {
transcriptionEnabled: true,
audioEnabled: true, // You can also disable audio output to use text modality only
},
});

session.say('Hello, how can I help you today?');

session.on(voice.AgentSessionEventTypes.MetricsCollected, (ev) => {
logger.debug('metrics_collected', ev);
});
},
});

cli.runApp(new ServerOptions({ agent: fileURLToPath(import.meta.url) }));
3 changes: 2 additions & 1 deletion plugins/google/src/beta/realtime/realtime_api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ export class RealtimeModel extends llm.RealtimeModel {
turnDetection: serverTurnDetection,
userTranscription: inputAudioTranscription !== null,
autoToolReplyGeneration: true,
audioOutput: options.modalities?.includes(Modality.AUDIO) ?? true,
});

// Environment variable fallbacks
Expand Down Expand Up @@ -600,7 +601,7 @@ export class RealtimeSession extends llm.RealtimeSession {
this.hasReceivedAudioInput = true;

for (const f of this.resampleAudio(frame)) {
for (const nf of this.bstream.write(f.data.buffer)) {
for (const nf of this.bstream.write(f.data.buffer as ArrayBuffer)) {
const realtimeInput: types.LiveClientRealtimeInput = {
mediaChunks: [
{
Expand Down
8 changes: 5 additions & 3 deletions plugins/openai/src/realtime/api_proto.ts
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ export interface SessionResource {
id: string;
object: 'realtime.session';
model: string;
modalities: ['text', 'audio'] | ['text']; // default: ["text", "audio"]
modalities: Modality[]; // default: ["text", "audio"]
instructions: string;
voice: Voice; // default: "alloy"
input_audio_format: AudioFormat; // default: "pcm16"
Expand Down Expand Up @@ -267,7 +267,7 @@ export interface SessionUpdateEvent extends BaseClientEvent {
type: 'session.update';
session: Partial<{
model: Model;
modalities: ['text', 'audio'] | ['text'];
modalities: Modality[];
instructions: string;
voice: Voice;
input_audio_format: AudioFormat;
Expand Down Expand Up @@ -350,7 +350,7 @@ export interface ConversationItemDeleteEvent extends BaseClientEvent {
export interface ResponseCreateEvent extends BaseClientEvent {
type: 'response.create';
response?: Partial<{
modalities: ['text', 'audio'] | ['text'];
modalities: Modality[];
instructions: string;
voice: Voice;
output_audio_format: AudioFormat;
Expand Down Expand Up @@ -511,6 +511,7 @@ export interface ResponseContentPartDoneEvent extends BaseServerEvent {
export interface ResponseTextDeltaEvent extends BaseServerEvent {
type: 'response.text.delta';
response_id: string;
item_id: string;
output_index: number;
content_index: number;
delta: string;
Expand All @@ -519,6 +520,7 @@ export interface ResponseTextDeltaEvent extends BaseServerEvent {
export interface ResponseTextDoneEvent extends BaseServerEvent {
type: 'response.text.done';
response_id: string;
item_id: string;
output_index: number;
content_index: number;
text: string;
Expand Down
Loading