Skip to content
5 changes: 5 additions & 0 deletions .changeset/hungry-schools-think.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@livekit/agents': patch
---

Fix improper resource cleanup inside AgentActivity by not close global STT / TTS / VAD components
13 changes: 13 additions & 0 deletions .changeset/twenty-houses-smash.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
---
'@livekit/agents-plugin-neuphonic': patch
'@livekit/agents-plugin-cartesia': patch
'@livekit/agents-plugin-deepgram': patch
'@livekit/agents-plugin-resemble': patch
'@livekit/agents-plugin-inworld': patch
'@livekit/agents-plugin-google': patch
'@livekit/agents-plugin-openai': patch
'@livekit/agents-plugin-rime': patch
'@livekit/agents': patch
---

Improve TTS resource cleanup
3 changes: 1 addition & 2 deletions agents/src/inference/stt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -422,8 +422,7 @@ export class SpeechStream<TModel extends STTModels> extends BaseSpeechStream {
try {
ws = await this.stt.connectWs(this.connOptions.timeoutMs);

// Wrap tasks for proper cancellation support using Task signals
const controller = new AbortController();
const controller = this.abortController; // Use base class abortController for proper cancellation
const sendTask = Task.from(({ signal }) => send(ws!, signal), controller);
const wsListenerTask = Task.from(({ signal }) => createWsListener(ws!, signal), controller);
const recvTask = Task.from(({ signal }) => recv(signal), controller);
Expand Down
2 changes: 0 additions & 2 deletions agents/src/inference/tts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -238,15 +238,13 @@ export class TTS<TModel extends TTSModels> extends BaseTTS {
export class SynthesizeStream<TModel extends TTSModels> extends BaseSynthesizeStream {
private opts: InferenceTTSOptions<TModel>;
private tts: TTS<TModel>;
private connOptions: APIConnectOptions;

#logger = log();

constructor(tts: TTS<TModel>, opts: InferenceTTSOptions<TModel>, connOptions: APIConnectOptions) {
super(tts, connOptions);
this.opts = opts;
this.tts = tts;
this.connOptions = connOptions;
}

get label() {
Expand Down
2 changes: 1 addition & 1 deletion agents/src/ipc/job_proc_lazy_main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ const startJob = (
let logger = log().child({ pid: proc.pid });

process.on('unhandledRejection', (reason) => {
logger.error(reason);
logger.debug({ error: reason }, 'Unhandled promise rejection');
});

logger.debug('initializing job runner');
Expand Down
17 changes: 13 additions & 4 deletions agents/src/stt/stream_adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,14 @@ export class StreamAdapter extends STT {
this.#stt.on('metrics_collected', (metrics) => {
this.emit('metrics_collected', metrics);
});

this.#stt.on('error', (error) => {
this.emit('error', error);
});
}

_recognize(frame: AudioFrame): Promise<SpeechEvent> {
return this.#stt.recognize(frame);
_recognize(frame: AudioFrame, abortSignal?: AbortSignal): Promise<SpeechEvent> {
return this.#stt.recognize(frame, abortSignal);
}

stream(options?: { connOptions?: APIConnectOptions }): StreamAdapterWrapper {
Expand All @@ -46,6 +50,11 @@ export class StreamAdapterWrapper extends SpeechStream {
this.label = `stt.StreamAdapterWrapper<${this.#stt.label}>`;
}

close() {
super.close();
this.#vadStream.close();
}

async monitorMetrics() {
return; // do nothing
}
Expand All @@ -72,7 +81,7 @@ export class StreamAdapterWrapper extends SpeechStream {
this.output.put({ type: SpeechEventType.END_OF_SPEECH });

try {
const event = await this.#stt.recognize(ev.frames);
const event = await this.#stt.recognize(ev.frames, this.abortSignal);
if (!event.alternatives![0].text) {
continue;
}
Expand All @@ -93,6 +102,6 @@ export class StreamAdapterWrapper extends SpeechStream {
}
};

Promise.all([forwardInput(), recognize()]);
await Promise.all([forwardInput(), recognize()]);
}
}
17 changes: 14 additions & 3 deletions agents/src/stt/stt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,9 @@ export abstract class STT extends (EventEmitter as new () => TypedEmitter<STTCal
}

/** Receives an audio buffer and returns transcription in the form of a {@link SpeechEvent} */
async recognize(frame: AudioBuffer): Promise<SpeechEvent> {
async recognize(frame: AudioBuffer, abortSignal?: AbortSignal): Promise<SpeechEvent> {
const startTime = process.hrtime.bigint();
const event = await this._recognize(frame);
const event = await this._recognize(frame, abortSignal);
const durationMs = Number((process.hrtime.bigint() - startTime) / BigInt(1000000));
this.emit('metrics_collected', {
type: 'stt_metrics',
Expand All @@ -128,7 +128,11 @@ export abstract class STT extends (EventEmitter as new () => TypedEmitter<STTCal
});
return event;
}
protected abstract _recognize(frame: AudioBuffer): Promise<SpeechEvent>;

protected abstract _recognize(
frame: AudioBuffer,
abortSignal?: AbortSignal,
): Promise<SpeechEvent>;

/**
* Returns a {@link SpeechStream} that can be used to push audio frames and receive
Expand Down Expand Up @@ -173,6 +177,8 @@ export abstract class SpeechStream implements AsyncIterableIterator<SpeechEvent>
private logger = log();
private _connOptions: APIConnectOptions;

protected abortController = new AbortController();

constructor(
stt: STT,
sampleRate?: number,
Expand Down Expand Up @@ -290,6 +296,10 @@ export abstract class SpeechStream implements AsyncIterableIterator<SpeechEvent>

protected abstract run(): Promise<void>;

get abortSignal(): AbortSignal {
return this.abortController.signal;
}

updateInputStream(audioStream: ReadableStream<AudioFrame>) {
this.deferredInputStream.setSource(audioStream);
}
Expand Down Expand Up @@ -354,6 +364,7 @@ export abstract class SpeechStream implements AsyncIterableIterator<SpeechEvent>
if (!this.input.closed) this.input.close();
if (!this.queue.closed) this.queue.close();
if (!this.output.closed) this.output.close();
if (!this.abortController.signal.aborted) this.abortController.abort();
this.closed = true;
}

Expand Down
13 changes: 10 additions & 3 deletions agents/src/tts/stream_adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,17 @@ export class StreamAdapter extends TTS {
this.#tts.on('metrics_collected', (metrics) => {
this.emit('metrics_collected', metrics);
});
this.#tts.on('error', (error) => {
this.emit('error', error);
});
}

synthesize(text: string): ChunkedStream {
return this.#tts.synthesize(text);
synthesize(
text: string,
connOptions?: APIConnectOptions,
abortSignal?: AbortSignal,
): ChunkedStream {
return this.#tts.synthesize(text, connOptions, abortSignal);
}

stream(options?: { connOptions?: APIConnectOptions }): StreamAdapterWrapper {
Expand Down Expand Up @@ -85,7 +92,7 @@ export class StreamAdapterWrapper extends SynthesizeStream {
prevTask: Task<void> | undefined,
controller: AbortController,
) => {
const audioStream = this.#tts.synthesize(token);
const audioStream = this.#tts.synthesize(token, this.connOptions, this.abortSignal);

// wait for previous audio transcription to complete before starting
// to queuing audio frames of the current token
Expand Down
59 changes: 41 additions & 18 deletions agents/src/tts/tts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,11 @@ export abstract class TTS extends (EventEmitter as new () => TypedEmitter<TTSCal
/**
* Receives text and returns synthesis in the form of a {@link ChunkedStream}
*/
abstract synthesize(text: string): ChunkedStream;
abstract synthesize(
text: string,
connOptions?: APIConnectOptions,
abortSignal?: AbortSignal,
): ChunkedStream;

/**
* Returns a {@link SynthesizeStream} that can be used to push text and receive audio data
Expand Down Expand Up @@ -131,30 +135,33 @@ export abstract class SynthesizeStream
SynthesizedAudio | typeof SynthesizeStream.END_OF_STREAM
>();
protected closed = false;
abstract label: string;
#tts: TTS;
#metricsPendingTexts: string[] = [];
#metricsText = '';
#monitorMetricsTask?: Promise<void>;
private _connOptions: APIConnectOptions;
protected connOptions: APIConnectOptions;
protected abortController = new AbortController();
#ttsRequestSpan?: Span;

private deferredInputStream: DeferredReadableStream<
string | typeof SynthesizeStream.FLUSH_SENTINEL
>;
private logger = log();

abstract label: string;

#tts: TTS;
#metricsPendingTexts: string[] = [];
#metricsText = '';
#monitorMetricsTask?: Promise<void>;
#ttsRequestSpan?: Span;

constructor(tts: TTS, connOptions: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS) {
this.#tts = tts;
this._connOptions = connOptions;
this.connOptions = connOptions;
this.deferredInputStream = new DeferredReadableStream();
this.pumpInput();

this.abortController.signal.addEventListener('abort', () => {
this.deferredInputStream.detachSource();
// TODO (AJS-36) clean this up when we refactor with streams
this.input.close();
this.output.close();
if (!this.input.closed) this.input.close();
if (!this.output.closed) this.output.close();
this.closed = true;
});

Expand All @@ -172,7 +179,7 @@ export abstract class SynthesizeStream
[traceTypes.ATTR_TTS_LABEL]: this.#tts.label,
});

for (let i = 0; i < this._connOptions.maxRetry + 1; i++) {
for (let i = 0; i < this.connOptions.maxRetry + 1; i++) {
try {
return await tracer.startActiveSpan(
async (attemptSpan) => {
Expand All @@ -188,15 +195,15 @@ export abstract class SynthesizeStream
);
} catch (error) {
if (error instanceof APIError) {
const retryInterval = intervalForRetry(this._connOptions, i);
const retryInterval = intervalForRetry(this.connOptions, i);

if (this._connOptions.maxRetry === 0 || !error.retryable) {
if (this.connOptions.maxRetry === 0 || !error.retryable) {
this.emitError({ error, recoverable: false });
throw error;
} else if (i === this._connOptions.maxRetry) {
} else if (i === this.connOptions.maxRetry) {
this.emitError({ error, recoverable: false });
throw new APIConnectionError({
message: `failed to generate TTS completion after ${this._connOptions.maxRetry + 1} attempts`,
message: `failed to generate TTS completion after ${this.connOptions.maxRetry + 1} attempts`,
options: { retryable: false },
});
} else {
Expand Down Expand Up @@ -380,6 +387,10 @@ export abstract class SynthesizeStream
return this.output.next();
}

get abortSignal(): AbortSignal {
return this.abortController.signal;
}

/** Close both the input and output of the TTS stream */
close() {
this.abortController.abort();
Expand Down Expand Up @@ -415,15 +426,22 @@ export abstract class ChunkedStream implements AsyncIterableIterator<Synthesized
private _connOptions: APIConnectOptions;
private logger = log();

protected abortController = new AbortController();

constructor(
text: string,
tts: TTS,
connOptions: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
abortSignal?: AbortSignal,
) {
this.#text = text;
this.#tts = tts;
this._connOptions = connOptions;

if (abortSignal) {
abortSignal.addEventListener('abort', () => this.abortController.abort(), { once: true });
}

this.monitorMetrics();

// this is a hack to immitate asyncio.create_task so that mainTask
Expand Down Expand Up @@ -510,6 +528,10 @@ export abstract class ChunkedStream implements AsyncIterableIterator<Synthesized
return this.#text;
}

get abortSignal(): AbortSignal {
return this.abortController.signal;
}

protected async monitorMetrics() {
const startTime = process.hrtime.bigint();
let audioDurationMs = 0;
Expand Down Expand Up @@ -564,8 +586,9 @@ export abstract class ChunkedStream implements AsyncIterableIterator<Synthesized

/** Close both the input and output of the TTS stream */
close() {
this.queue.close();
this.output.close();
if (!this.queue.closed) this.queue.close();
if (!this.output.closed) this.output.close();
if (!this.abortController.signal.aborted) this.abortController.abort();
this.closed = true;
}

Expand Down
Loading