Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/hungry-schools-think.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@livekit/agents': patch
---

Fix improper resource cleanup inside AgentActivity by not close global STT / TTS / VAD components
3 changes: 1 addition & 2 deletions agents/src/inference/stt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -422,8 +422,7 @@ export class SpeechStream<TModel extends STTModels> extends BaseSpeechStream {
try {
ws = await this.stt.connectWs(this.connOptions.timeoutMs);

// Wrap tasks for proper cancellation support using Task signals
const controller = new AbortController();
const controller = this.abortController; // Use base class abortController for proper cancellation
const sendTask = Task.from(({ signal }) => send(ws!, signal), controller);
const wsListenerTask = Task.from(({ signal }) => createWsListener(ws!, signal), controller);
const recvTask = Task.from(({ signal }) => recv(signal), controller);
Expand Down
2 changes: 1 addition & 1 deletion agents/src/ipc/job_proc_lazy_main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ const startJob = (
let logger = log().child({ pid: proc.pid });

process.on('unhandledRejection', (reason) => {
logger.error(reason);
logger.debug({ error: reason }, 'Unhandled promise rejection');
});

logger.debug('initializing job runner');
Expand Down
17 changes: 13 additions & 4 deletions agents/src/stt/stream_adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,14 @@ export class StreamAdapter extends STT {
this.#stt.on('metrics_collected', (metrics) => {
this.emit('metrics_collected', metrics);
});

this.#stt.on('error', (error) => {
this.emit('error', error);
});
}

_recognize(frame: AudioFrame): Promise<SpeechEvent> {
return this.#stt.recognize(frame);
_recognize(frame: AudioFrame, abortSignal?: AbortSignal): Promise<SpeechEvent> {
return this.#stt.recognize(frame, abortSignal);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

plugins can now attach this abortSignal for any external request cancellation

}

stream(options?: { connOptions?: APIConnectOptions }): StreamAdapterWrapper {
Expand All @@ -46,6 +50,11 @@ export class StreamAdapterWrapper extends SpeechStream {
this.label = `stt.StreamAdapterWrapper<${this.#stt.label}>`;
}

close() {
super.close();
this.#vadStream.close();
}

async monitorMetrics() {
return; // do nothing
}
Expand All @@ -72,7 +81,7 @@ export class StreamAdapterWrapper extends SpeechStream {
this.output.put({ type: SpeechEventType.END_OF_SPEECH });

try {
const event = await this.#stt.recognize(ev.frames);
const event = await this.#stt.recognize(ev.frames, this.abortSignal);
if (!event.alternatives![0].text) {
continue;
}
Expand All @@ -93,6 +102,6 @@ export class StreamAdapterWrapper extends SpeechStream {
}
};

Promise.all([forwardInput(), recognize()]);
await Promise.all([forwardInput(), recognize()]);
}
}
17 changes: 14 additions & 3 deletions agents/src/stt/stt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,9 @@ export abstract class STT extends (EventEmitter as new () => TypedEmitter<STTCal
}

/** Receives an audio buffer and returns transcription in the form of a {@link SpeechEvent} */
async recognize(frame: AudioBuffer): Promise<SpeechEvent> {
async recognize(frame: AudioBuffer, abortSignal?: AbortSignal): Promise<SpeechEvent> {
const startTime = process.hrtime.bigint();
const event = await this._recognize(frame);
const event = await this._recognize(frame, abortSignal);
const durationMs = Number((process.hrtime.bigint() - startTime) / BigInt(1000000));
this.emit('metrics_collected', {
type: 'stt_metrics',
Expand All @@ -128,7 +128,11 @@ export abstract class STT extends (EventEmitter as new () => TypedEmitter<STTCal
});
return event;
}
protected abstract _recognize(frame: AudioBuffer): Promise<SpeechEvent>;

protected abstract _recognize(
frame: AudioBuffer,
abortSignal?: AbortSignal,
): Promise<SpeechEvent>;

/**
* Returns a {@link SpeechStream} that can be used to push audio frames and receive
Expand Down Expand Up @@ -173,6 +177,8 @@ export abstract class SpeechStream implements AsyncIterableIterator<SpeechEvent>
private logger = log();
private _connOptions: APIConnectOptions;

protected abortController = new AbortController();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add an abortController in root STT class, which mainly handles any external request cancellation such as fetch. abortSignal will be aborted whenever the underling STT stream is closed.


constructor(
stt: STT,
sampleRate?: number,
Expand Down Expand Up @@ -290,6 +296,10 @@ export abstract class SpeechStream implements AsyncIterableIterator<SpeechEvent>

protected abstract run(): Promise<void>;

protected get abortSignal(): AbortSignal {
return this.abortController.signal;
}

updateInputStream(audioStream: ReadableStream<AudioFrame>) {
this.deferredInputStream.setSource(audioStream);
}
Expand Down Expand Up @@ -354,6 +364,7 @@ export abstract class SpeechStream implements AsyncIterableIterator<SpeechEvent>
if (!this.input.closed) this.input.close();
if (!this.queue.closed) this.queue.close();
if (!this.output.closed) this.output.close();
if (!this.abortController.signal.aborted) this.abortController.abort();
this.closed = true;
}

Expand Down
70 changes: 53 additions & 17 deletions agents/src/voice/agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -260,28 +260,41 @@ export class Agent<UserData = any> {
let wrapped_stt = activity.stt;

if (!wrapped_stt.capabilities.streaming) {
if (!agent.vad) {
const vad = agent.vad || activity.vad;
if (!vad) {
throw new Error(
'STT does not support streaming, add a VAD to the AgentTask/VoiceAgent to enable streaming',
);
}
wrapped_stt = new STTStreamAdapter(wrapped_stt, agent.vad);
wrapped_stt = new STTStreamAdapter(wrapped_stt, vad);
}

const connOptions = activity.agentSession.connOptions.sttConnOptions;
const stream = wrapped_stt.stream({ connOptions });
stream.updateInputStream(audio);

let cleaned = false;
const cleanup = () => {
if (cleaned) return;
cleaned = true;
stream.detachInputStream();
stream.close();
};

return new ReadableStream({
async start(controller) {
for await (const event of stream) {
controller.enqueue(event);
try {
for await (const event of stream) {
controller.enqueue(event);
}
controller.close();
} finally {
// Always clean up the STT stream, whether it ends naturally or is cancelled
cleanup();
}
controller.close();
},
cancel() {
stream.detachInputStream();
stream.close();
cleanup();
},
});
},
Expand Down Expand Up @@ -314,15 +327,27 @@ export class Agent<UserData = any> {
connOptions,
parallelToolCalls: true,
});

let cleaned = false;
const cleanup = () => {
if (cleaned) return;
cleaned = true;
stream.close();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

^ stream.close() in STT / TTS / LLM stream should be the method that plugin uses to do proper resource cancellation. @simllll @KrishnaShuk

};

return new ReadableStream({
async start(controller) {
for await (const chunk of stream) {
controller.enqueue(chunk);
try {
for await (const chunk of stream) {
controller.enqueue(chunk);
}
controller.close();
} finally {
cleanup();
}
controller.close();
},
cancel() {
stream.close();
cleanup();
},
});
},
Expand All @@ -347,18 +372,29 @@ export class Agent<UserData = any> {
const stream = wrapped_tts.stream({ connOptions });
stream.updateInputStream(text);

let cleaned = false;
const cleanup = () => {
if (cleaned) return;
cleaned = true;
stream.close();
};

return new ReadableStream({
async start(controller) {
for await (const chunk of stream) {
if (chunk === SynthesizeStream.END_OF_STREAM) {
break;
try {
for await (const chunk of stream) {
if (chunk === SynthesizeStream.END_OF_STREAM) {
break;
}
controller.enqueue(chunk.frame);
}
controller.enqueue(chunk.frame);
controller.close();
} finally {
cleanup();
}
controller.close();
},
cancel() {
stream.close();
cleanup();
},
});
},
Expand Down
3 changes: 0 additions & 3 deletions agents/src/voice/agent_activity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2259,15 +2259,12 @@ export class AgentActivity implements RecognitionHooks {
}
if (this.stt instanceof STT) {
this.stt.off('metrics_collected', this.onMetricsCollected);
await this.stt.close();
}
if (this.tts instanceof TTS) {
this.tts.off('metrics_collected', this.onMetricsCollected);
await this.tts.close();
}
if (this.vad instanceof VAD) {
this.vad.off('metrics_collected', this.onMetricsCollected);
await this.vad.close();
}

this.detachAudioInput();
Expand Down
9 changes: 0 additions & 9 deletions agents/src/voice/agent_session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -328,15 +328,6 @@ export class AgentSession<
);
}

this.logger.info(
{
input: this.input.audio,
output: this.output.audio,
enableRecording: this._enableRecording,
},
'Recording audio input and output',
);

if (this.input.audio && this.output.audio && this._enableRecording) {
this._recorderIO = new RecorderIO({ agentSession: this });
this.input.audio = this._recorderIO.recordInput(this.input.audio);
Expand Down
2 changes: 1 addition & 1 deletion agents/src/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -729,7 +729,7 @@ export class AgentServer {

const req = new JobRequest(msg.job!, onReject, onAccept);
this.#logger
.child({ job: msg.job, resuming: msg.resuming, agentName: this.#opts.agentName })
.child({ jobId: msg.job?.id, resuming: msg.resuming, agentName: this.#opts.agentName })
.info('received job request');

const jobRequestTask = async () => {
Expand Down
8 changes: 6 additions & 2 deletions examples/src/basic_tool_call_agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@
import {
type JobContext,
type JobProcess,
WorkerOptions,
ServerOptions,
cli,
defineAgent,
llm,
voice,
} from '@livekit/agents';
import * as livekit from '@livekit/agents-plugin-livekit';
import * as silero from '@livekit/agents-plugin-silero';
import { BackgroundVoiceCancellation } from '@livekit/noise-cancellation-node';
import { fileURLToPath } from 'node:url';
import { z } from 'zod';

Expand Down Expand Up @@ -148,8 +149,11 @@ export default defineAgent({
await session.start({
agent: routerAgent,
room: ctx.room,
inputOptions: {
noiseCancellation: BackgroundVoiceCancellation(),
},
});
},
});

cli.runApp(new WorkerOptions({ agent: fileURLToPath(import.meta.url) }));
cli.runApp(new ServerOptions({ agent: fileURLToPath(import.meta.url) }));
Loading