Skip to content
5 changes: 5 additions & 0 deletions .changeset/hungry-schools-think.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@livekit/agents': patch
---

Fix improper resource cleanup inside AgentActivity by not close global STT / TTS / VAD components
13 changes: 13 additions & 0 deletions .changeset/twenty-houses-smash.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
---
'@livekit/agents-plugin-neuphonic': patch
'@livekit/agents-plugin-cartesia': patch
'@livekit/agents-plugin-deepgram': patch
'@livekit/agents-plugin-resemble': patch
'@livekit/agents-plugin-inworld': patch
'@livekit/agents-plugin-google': patch
'@livekit/agents-plugin-openai': patch
'@livekit/agents-plugin-rime': patch
'@livekit/agents': patch
---

Improve TTS resource cleanup
2 changes: 0 additions & 2 deletions agents/src/inference/tts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -238,15 +238,13 @@ export class TTS<TModel extends TTSModels> extends BaseTTS {
export class SynthesizeStream<TModel extends TTSModels> extends BaseSynthesizeStream {
private opts: InferenceTTSOptions<TModel>;
private tts: TTS<TModel>;
private connOptions: APIConnectOptions;

#logger = log();

constructor(tts: TTS<TModel>, opts: InferenceTTSOptions<TModel>, connOptions: APIConnectOptions) {
super(tts, connOptions);
this.opts = opts;
this.tts = tts;
this.connOptions = connOptions;
}

get label() {
Expand Down
13 changes: 10 additions & 3 deletions agents/src/tts/stream_adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,17 @@ export class StreamAdapter extends TTS {
this.#tts.on('metrics_collected', (metrics) => {
this.emit('metrics_collected', metrics);
});
this.#tts.on('error', (error) => {
this.emit('error', error);
});
}

synthesize(text: string): ChunkedStream {
return this.#tts.synthesize(text);
synthesize(
text: string,
connOptions?: APIConnectOptions,
abortSignal?: AbortSignal,
): ChunkedStream {
return this.#tts.synthesize(text, connOptions, abortSignal);
}

stream(options?: { connOptions?: APIConnectOptions }): StreamAdapterWrapper {
Expand Down Expand Up @@ -85,7 +92,7 @@ export class StreamAdapterWrapper extends SynthesizeStream {
prevTask: Task<void> | undefined,
controller: AbortController,
) => {
const audioStream = this.#tts.synthesize(token);
const audioStream = this.#tts.synthesize(token, this.connOptions, this.abortSignal);

// wait for previous audio transcription to complete before starting
// to queuing audio frames of the current token
Expand Down
59 changes: 41 additions & 18 deletions agents/src/tts/tts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,11 @@ export abstract class TTS extends (EventEmitter as new () => TypedEmitter<TTSCal
/**
* Receives text and returns synthesis in the form of a {@link ChunkedStream}
*/
abstract synthesize(text: string): ChunkedStream;
abstract synthesize(
text: string,
connOptions?: APIConnectOptions,
abortSignal?: AbortSignal,
): ChunkedStream;

/**
* Returns a {@link SynthesizeStream} that can be used to push text and receive audio data
Expand Down Expand Up @@ -131,30 +135,33 @@ export abstract class SynthesizeStream
SynthesizedAudio | typeof SynthesizeStream.END_OF_STREAM
>();
protected closed = false;
abstract label: string;
#tts: TTS;
#metricsPendingTexts: string[] = [];
#metricsText = '';
#monitorMetricsTask?: Promise<void>;
private _connOptions: APIConnectOptions;
protected connOptions: APIConnectOptions;
protected abortController = new AbortController();
#ttsRequestSpan?: Span;

private deferredInputStream: DeferredReadableStream<
string | typeof SynthesizeStream.FLUSH_SENTINEL
>;
private logger = log();

abstract label: string;

#tts: TTS;
#metricsPendingTexts: string[] = [];
#metricsText = '';
#monitorMetricsTask?: Promise<void>;
#ttsRequestSpan?: Span;

constructor(tts: TTS, connOptions: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS) {
this.#tts = tts;
this._connOptions = connOptions;
this.connOptions = connOptions;
this.deferredInputStream = new DeferredReadableStream();
this.pumpInput();

this.abortController.signal.addEventListener('abort', () => {
this.deferredInputStream.detachSource();
// TODO (AJS-36) clean this up when we refactor with streams
this.input.close();
this.output.close();
if (!this.input.closed) this.input.close();
if (!this.output.closed) this.output.close();
this.closed = true;
});

Expand All @@ -172,7 +179,7 @@ export abstract class SynthesizeStream
[traceTypes.ATTR_TTS_LABEL]: this.#tts.label,
});

for (let i = 0; i < this._connOptions.maxRetry + 1; i++) {
for (let i = 0; i < this.connOptions.maxRetry + 1; i++) {
try {
return await tracer.startActiveSpan(
async (attemptSpan) => {
Expand All @@ -188,15 +195,15 @@ export abstract class SynthesizeStream
);
} catch (error) {
if (error instanceof APIError) {
const retryInterval = intervalForRetry(this._connOptions, i);
const retryInterval = intervalForRetry(this.connOptions, i);

if (this._connOptions.maxRetry === 0 || !error.retryable) {
if (this.connOptions.maxRetry === 0 || !error.retryable) {
this.emitError({ error, recoverable: false });
throw error;
} else if (i === this._connOptions.maxRetry) {
} else if (i === this.connOptions.maxRetry) {
this.emitError({ error, recoverable: false });
throw new APIConnectionError({
message: `failed to generate TTS completion after ${this._connOptions.maxRetry + 1} attempts`,
message: `failed to generate TTS completion after ${this.connOptions.maxRetry + 1} attempts`,
options: { retryable: false },
});
} else {
Expand Down Expand Up @@ -380,6 +387,10 @@ export abstract class SynthesizeStream
return this.output.next();
}

get abortSignal(): AbortSignal {
return this.abortController.signal;
}

/** Close both the input and output of the TTS stream */
close() {
this.abortController.abort();
Expand Down Expand Up @@ -415,15 +426,22 @@ export abstract class ChunkedStream implements AsyncIterableIterator<Synthesized
private _connOptions: APIConnectOptions;
private logger = log();

protected abortController = new AbortController();

constructor(
text: string,
tts: TTS,
connOptions: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
abortSignal?: AbortSignal,
) {
this.#text = text;
this.#tts = tts;
this._connOptions = connOptions;

if (abortSignal) {
abortSignal.addEventListener('abort', () => this.abortController.abort(), { once: true });
}

this.monitorMetrics();

// this is a hack to immitate asyncio.create_task so that mainTask
Expand Down Expand Up @@ -510,6 +528,10 @@ export abstract class ChunkedStream implements AsyncIterableIterator<Synthesized
return this.#text;
}

get abortSignal(): AbortSignal {
return this.abortController.signal;
}

protected async monitorMetrics() {
const startTime = process.hrtime.bigint();
let audioDurationMs = 0;
Expand Down Expand Up @@ -564,8 +586,9 @@ export abstract class ChunkedStream implements AsyncIterableIterator<Synthesized

/** Close both the input and output of the TTS stream */
close() {
this.queue.close();
this.output.close();
if (!this.queue.closed) this.queue.close();
if (!this.output.closed) this.output.close();
if (!this.abortController.signal.aborted) this.abortController.abort();
this.closed = true;
}

Expand Down
2 changes: 2 additions & 0 deletions agents/src/voice/agent_session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ export class AgentSession<
ctx = getJobContext();
} catch (error) {
// JobContext is not available in evals
this.logger.warn('JobContext is not available');
}

if (ctx) {
Expand Down Expand Up @@ -393,6 +394,7 @@ export class AgentSession<
}
} catch (error) {
// JobContext is not available in evals
this.logger.warn('JobContext is not available');
}

this.sessionSpan = tracer.startSpan({
Expand Down
45 changes: 39 additions & 6 deletions plugins/cartesia/src/tts.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
// SPDX-FileCopyrightText: 2024 LiveKit, Inc.
//
// SPDX-License-Identifier: Apache-2.0
import { AudioByteStream, log, shortuuid, tokenize, tts } from '@livekit/agents';
import {
type APIConnectOptions,
AudioByteStream,
Future,
log,
shortuuid,
tokenize,
tts,
} from '@livekit/agents';
import type { AudioFrame } from '@livekit/rtc-node';
import { request } from 'node:https';
import { type RawData, WebSocket } from 'ws';
Expand Down Expand Up @@ -88,8 +96,12 @@ export class TTS extends tts.TTS {
}
}

synthesize(text: string): tts.ChunkedStream {
return new ChunkedStream(this, text, this.#opts);
synthesize(
text: string,
connOptions?: APIConnectOptions,
abortSignal?: AbortSignal,
): tts.ChunkedStream {
return new ChunkedStream(this, text, this.#opts, connOptions, abortSignal);
}

stream(): SynthesizeStream {
Expand All @@ -99,12 +111,18 @@ export class TTS extends tts.TTS {

export class ChunkedStream extends tts.ChunkedStream {
label = 'cartesia.ChunkedStream';
#logger = log();
#opts: TTSOptions;
#text: string;

// set Promise<T> to any because OpenAI returns an annoying Response type
constructor(tts: TTS, text: string, opts: TTSOptions) {
super(text, tts);
constructor(
tts: TTS,
text: string,
opts: TTSOptions,
connOptions?: APIConnectOptions,
abortSignal?: AbortSignal,
) {
super(text, tts, connOptions, abortSignal);
this.#text = text;
this.#opts = opts;
}
Expand All @@ -116,6 +134,8 @@ export class ChunkedStream extends tts.ChunkedStream {
json.transcript = this.#text;

const baseUrl = new URL(this.#opts.baseUrl);
const doneFut = new Future<void>();

const req = request(
{
hostname: baseUrl.hostname,
Expand All @@ -126,6 +146,7 @@ export class ChunkedStream extends tts.ChunkedStream {
[AUTHORIZATION_HEADER]: this.#opts.apiKey!,
[VERSION_HEADER]: VERSION,
},
signal: this.abortSignal,
},
(res) => {
res.on('data', (chunk) => {
Expand All @@ -148,12 +169,24 @@ export class ChunkedStream extends tts.ChunkedStream {
});
}
this.queue.close();
doneFut.resolve();
});
res.on('error', (err) => {
if (err.message === 'aborted') return;
this.#logger.error({ err }, 'Cartesia TTS response error');
});
},
);

req.on('error', (err) => {
if (err.name === 'AbortError') return;
this.#logger.error({ err }, 'Cartesia TTS request error');
});
req.on('close', () => doneFut.resolve());
req.write(JSON.stringify(json));
req.end();

await doneFut.await;
}
}

Expand Down
Loading