diff --git a/.changeset/hungry-schools-think.md b/.changeset/hungry-schools-think.md new file mode 100644 index 000000000..8f0b1b30a --- /dev/null +++ b/.changeset/hungry-schools-think.md @@ -0,0 +1,5 @@ +--- +'@livekit/agents': patch +--- + +Fix improper resource cleanup inside AgentActivity by not close global STT / TTS / VAD components diff --git a/.changeset/twenty-houses-smash.md b/.changeset/twenty-houses-smash.md new file mode 100644 index 000000000..0bbf77399 --- /dev/null +++ b/.changeset/twenty-houses-smash.md @@ -0,0 +1,13 @@ +--- +'@livekit/agents-plugin-neuphonic': patch +'@livekit/agents-plugin-cartesia': patch +'@livekit/agents-plugin-deepgram': patch +'@livekit/agents-plugin-resemble': patch +'@livekit/agents-plugin-inworld': patch +'@livekit/agents-plugin-google': patch +'@livekit/agents-plugin-openai': patch +'@livekit/agents-plugin-rime': patch +'@livekit/agents': patch +--- + +Improve TTS resource cleanup diff --git a/agents/src/inference/tts.ts b/agents/src/inference/tts.ts index c9432b097..3c6541764 100644 --- a/agents/src/inference/tts.ts +++ b/agents/src/inference/tts.ts @@ -238,7 +238,6 @@ export class TTS extends BaseTTS { export class SynthesizeStream extends BaseSynthesizeStream { private opts: InferenceTTSOptions; private tts: TTS; - private connOptions: APIConnectOptions; #logger = log(); @@ -246,7 +245,6 @@ export class SynthesizeStream extends BaseSynthesizeSt super(tts, connOptions); this.opts = opts; this.tts = tts; - this.connOptions = connOptions; } get label() { diff --git a/agents/src/tts/stream_adapter.ts b/agents/src/tts/stream_adapter.ts index 10a1e59a2..5b4bcef7e 100644 --- a/agents/src/tts/stream_adapter.ts +++ b/agents/src/tts/stream_adapter.ts @@ -22,10 +22,17 @@ export class StreamAdapter extends TTS { this.#tts.on('metrics_collected', (metrics) => { this.emit('metrics_collected', metrics); }); + this.#tts.on('error', (error) => { + this.emit('error', error); + }); } - synthesize(text: string): ChunkedStream { - return this.#tts.synthesize(text); + synthesize( + text: string, + connOptions?: APIConnectOptions, + abortSignal?: AbortSignal, + ): ChunkedStream { + return this.#tts.synthesize(text, connOptions, abortSignal); } stream(options?: { connOptions?: APIConnectOptions }): StreamAdapterWrapper { @@ -85,7 +92,7 @@ export class StreamAdapterWrapper extends SynthesizeStream { prevTask: Task | undefined, controller: AbortController, ) => { - const audioStream = this.#tts.synthesize(token); + const audioStream = this.#tts.synthesize(token, this.connOptions, this.abortSignal); // wait for previous audio transcription to complete before starting // to queuing audio frames of the current token diff --git a/agents/src/tts/tts.ts b/agents/src/tts/tts.ts index a4ced77ad..f53e32449 100644 --- a/agents/src/tts/tts.ts +++ b/agents/src/tts/tts.ts @@ -90,7 +90,11 @@ export abstract class TTS extends (EventEmitter as new () => TypedEmitter(); protected closed = false; - abstract label: string; - #tts: TTS; - #metricsPendingTexts: string[] = []; - #metricsText = ''; - #monitorMetricsTask?: Promise; - private _connOptions: APIConnectOptions; + protected connOptions: APIConnectOptions; protected abortController = new AbortController(); - #ttsRequestSpan?: Span; private deferredInputStream: DeferredReadableStream< string | typeof SynthesizeStream.FLUSH_SENTINEL >; private logger = log(); + abstract label: string; + + #tts: TTS; + #metricsPendingTexts: string[] = []; + #metricsText = ''; + #monitorMetricsTask?: Promise; + #ttsRequestSpan?: Span; + constructor(tts: TTS, connOptions: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS) { this.#tts = tts; - this._connOptions = connOptions; + this.connOptions = connOptions; this.deferredInputStream = new DeferredReadableStream(); this.pumpInput(); + this.abortController.signal.addEventListener('abort', () => { this.deferredInputStream.detachSource(); // TODO (AJS-36) clean this up when we refactor with streams - this.input.close(); - this.output.close(); + if (!this.input.closed) this.input.close(); + if (!this.output.closed) this.output.close(); this.closed = true; }); @@ -172,7 +179,7 @@ export abstract class SynthesizeStream [traceTypes.ATTR_TTS_LABEL]: this.#tts.label, }); - for (let i = 0; i < this._connOptions.maxRetry + 1; i++) { + for (let i = 0; i < this.connOptions.maxRetry + 1; i++) { try { return await tracer.startActiveSpan( async (attemptSpan) => { @@ -188,15 +195,15 @@ export abstract class SynthesizeStream ); } catch (error) { if (error instanceof APIError) { - const retryInterval = intervalForRetry(this._connOptions, i); + const retryInterval = intervalForRetry(this.connOptions, i); - if (this._connOptions.maxRetry === 0 || !error.retryable) { + if (this.connOptions.maxRetry === 0 || !error.retryable) { this.emitError({ error, recoverable: false }); throw error; - } else if (i === this._connOptions.maxRetry) { + } else if (i === this.connOptions.maxRetry) { this.emitError({ error, recoverable: false }); throw new APIConnectionError({ - message: `failed to generate TTS completion after ${this._connOptions.maxRetry + 1} attempts`, + message: `failed to generate TTS completion after ${this.connOptions.maxRetry + 1} attempts`, options: { retryable: false }, }); } else { @@ -380,6 +387,10 @@ export abstract class SynthesizeStream return this.output.next(); } + get abortSignal(): AbortSignal { + return this.abortController.signal; + } + /** Close both the input and output of the TTS stream */ close() { this.abortController.abort(); @@ -415,15 +426,22 @@ export abstract class ChunkedStream implements AsyncIterableIterator this.abortController.abort(), { once: true }); + } + this.monitorMetrics(); // this is a hack to immitate asyncio.create_task so that mainTask @@ -510,6 +528,10 @@ export abstract class ChunkedStream implements AsyncIterableIterator to any because OpenAI returns an annoying Response type - constructor(tts: TTS, text: string, opts: TTSOptions) { - super(text, tts); + constructor( + tts: TTS, + text: string, + opts: TTSOptions, + connOptions?: APIConnectOptions, + abortSignal?: AbortSignal, + ) { + super(text, tts, connOptions, abortSignal); this.#text = text; this.#opts = opts; } @@ -116,6 +134,8 @@ export class ChunkedStream extends tts.ChunkedStream { json.transcript = this.#text; const baseUrl = new URL(this.#opts.baseUrl); + const doneFut = new Future(); + const req = request( { hostname: baseUrl.hostname, @@ -126,6 +146,7 @@ export class ChunkedStream extends tts.ChunkedStream { [AUTHORIZATION_HEADER]: this.#opts.apiKey!, [VERSION_HEADER]: VERSION, }, + signal: this.abortSignal, }, (res) => { res.on('data', (chunk) => { @@ -148,12 +169,24 @@ export class ChunkedStream extends tts.ChunkedStream { }); } this.queue.close(); + doneFut.resolve(); + }); + res.on('error', (err) => { + if (err.message === 'aborted') return; + this.#logger.error({ err }, 'Cartesia TTS response error'); }); }, ); + req.on('error', (err) => { + if (err.name === 'AbortError') return; + this.#logger.error({ err }, 'Cartesia TTS request error'); + }); + req.on('close', () => doneFut.resolve()); req.write(JSON.stringify(json)); req.end(); + + await doneFut.await; } } diff --git a/plugins/deepgram/src/tts.ts b/plugins/deepgram/src/tts.ts index 9b638225c..5e9aceb30 100644 --- a/plugins/deepgram/src/tts.ts +++ b/plugins/deepgram/src/tts.ts @@ -1,7 +1,14 @@ // SPDX-FileCopyrightText: 2024 LiveKit, Inc. // // SPDX-License-Identifier: Apache-2.0 -import { AudioByteStream, shortuuid, tokenize, tts } from '@livekit/agents'; +import { + type APIConnectOptions, + AudioByteStream, + log, + shortuuid, + tokenize, + tts, +} from '@livekit/agents'; import type { AudioFrame } from '@livekit/rtc-node'; import { request } from 'node:https'; import { type RawData, WebSocket } from 'ws'; @@ -56,8 +63,12 @@ export class TTS extends tts.TTS { } } - synthesize(text: string): tts.ChunkedStream { - return new ChunkedStream(this, text, this.opts); + synthesize( + text: string, + connOptions?: APIConnectOptions, + abortSignal?: AbortSignal, + ): tts.ChunkedStream { + return new ChunkedStream(this, text, this.opts, connOptions, abortSignal); } stream(): tts.SynthesizeStream { @@ -67,11 +78,18 @@ export class TTS extends tts.TTS { export class ChunkedStream extends tts.ChunkedStream { label = 'deepgram.ChunkedStream'; + #logger = log(); private opts: TTSOptions; private text: string; - constructor(tts: TTS, text: string, opts: TTSOptions) { - super(text, tts); + constructor( + tts: TTS, + text: string, + opts: TTSOptions, + connOptions?: APIConnectOptions, + abortSignal?: AbortSignal, + ) { + super(text, tts, connOptions, abortSignal); this.text = text; this.opts = opts; } @@ -96,6 +114,7 @@ export class ChunkedStream extends tts.ChunkedStream { [AUTHORIZATION_HEADER]: `Token ${this.opts.apiKey!}`, 'Content-Type': 'application/json', }, + signal: this.abortSignal, }, (res) => { if (res.statusCode !== 200) { @@ -119,7 +138,8 @@ export class ChunkedStream extends tts.ChunkedStream { }); res.on('error', (err) => { - reject(err); + if (err.message === 'aborted') return; + this.#logger.error({ err }, 'Deepgram TTS response error'); }); res.on('close', () => { @@ -142,9 +162,11 @@ export class ChunkedStream extends tts.ChunkedStream { ); req.on('error', (err) => { - reject(err); + if (err.name === 'AbortError') return; + this.#logger.error({ err }, 'Deepgram TTS request error'); }); + req.on('close', () => resolve()); req.write(JSON.stringify(json)); req.end(); }); diff --git a/plugins/google/src/beta/gemini_tts.ts b/plugins/google/src/beta/gemini_tts.ts index 0d46914d7..9fb6331da 100644 --- a/plugins/google/src/beta/gemini_tts.ts +++ b/plugins/google/src/beta/gemini_tts.ts @@ -4,6 +4,7 @@ import type * as types from '@google/genai'; import { GoogleGenAI } from '@google/genai'; import { + type APIConnectOptions, APIConnectionError, APIStatusError, AudioByteStream, @@ -138,8 +139,12 @@ export class TTS extends tts.TTS { this.#client = new GoogleGenAI(clientOptions); } - synthesize(text: string): ChunkedStream { - return new ChunkedStream(text, this); + synthesize( + text: string, + connOptions?: APIConnectOptions, + abortSignal?: AbortSignal, + ): ChunkedStream { + return new ChunkedStream(text, this, connOptions, abortSignal); } /** @@ -170,8 +175,13 @@ export class ChunkedStream extends tts.ChunkedStream { #tts: TTS; label = 'google.gemini.ChunkedStream'; - constructor(inputText: string, tts: TTS) { - super(inputText, tts); + constructor( + inputText: string, + tts: TTS, + connOptions?: APIConnectOptions, + abortSignal?: AbortSignal, + ) { + super(inputText, tts, connOptions, abortSignal); this.#tts = tts; } @@ -188,6 +198,7 @@ export class ChunkedStream extends tts.ChunkedStream { }, }, }, + abortSignal: this.abortSignal, }; let inputText = this.inputText; @@ -213,6 +224,9 @@ export class ChunkedStream extends tts.ChunkedStream { await this.#processResponse(response, bstream, requestId); } } catch (error: unknown) { + if (error instanceof Error && error.name === 'AbortError') { + return; + } if (isAPIError(error)) throw error; const err = error as { diff --git a/plugins/inworld/src/tts.ts b/plugins/inworld/src/tts.ts index c924b64f9..f139cb0eb 100644 --- a/plugins/inworld/src/tts.ts +++ b/plugins/inworld/src/tts.ts @@ -1,7 +1,14 @@ // SPDX-FileCopyrightText: 2025 LiveKit, Inc. // // SPDX-License-Identifier: Apache-2.0 -import { AudioByteStream, log, shortuuid, tokenize, tts } from '@livekit/agents'; +import { + type APIConnectOptions, + AudioByteStream, + log, + shortuuid, + tokenize, + tts, +} from '@livekit/agents'; import { type RawData, WebSocket } from 'ws'; const DEFAULT_BIT_RATE = 64000; @@ -313,8 +320,12 @@ export class TTS extends tts.TTS { return data.voices; } - synthesize(text: string): tts.ChunkedStream { - return new ChunkedStream(this, text, this.#opts); + synthesize( + text: string, + connOptions?: APIConnectOptions, + abortSignal?: AbortSignal, + ): tts.ChunkedStream { + return new ChunkedStream(this, text, this.#opts, connOptions, abortSignal); } stream(): tts.SynthesizeStream { @@ -340,8 +351,14 @@ class ChunkedStream extends tts.ChunkedStream { #tts: TTS; label = 'inworld.ChunkedStream'; - constructor(ttsInstance: TTS, text: string, opts: TTSOptions) { - super(text, ttsInstance); + constructor( + ttsInstance: TTS, + text: string, + opts: TTSOptions, + connOptions?: APIConnectOptions, + abortSignal?: AbortSignal, + ) { + super(text, ttsInstance, connOptions, abortSignal); this.#tts = ttsInstance; this.#opts = opts; } @@ -367,14 +384,23 @@ class ChunkedStream extends tts.ChunkedStream { const url = new URL('tts/v1/voice:stream', this.#opts.baseURL); - const response = await fetch(url.toString(), { - method: 'POST', - headers: { - Authorization: this.#tts.authorization, - 'Content-Type': 'application/json', - }, - body: JSON.stringify(bodyParams), - }); + let response: Response; + try { + response = await fetch(url.toString(), { + method: 'POST', + headers: { + Authorization: this.#tts.authorization, + 'Content-Type': 'application/json', + }, + body: JSON.stringify(bodyParams), + signal: this.abortSignal, + }); + } catch (e) { + if (e instanceof Error && e.name === 'AbortError') { + return; + } + throw e; + } if (!response.ok) { throw new Error(`Inworld API error: ${response.status} ${response.statusText}`); diff --git a/plugins/neuphonic/src/tts.ts b/plugins/neuphonic/src/tts.ts index a74d0ad89..76177bfb0 100644 --- a/plugins/neuphonic/src/tts.ts +++ b/plugins/neuphonic/src/tts.ts @@ -1,7 +1,14 @@ // SPDX-FileCopyrightText: 2024 LiveKit, Inc. // // SPDX-License-Identifier: Apache-2.0 -import { AudioByteStream, log, shortuuid, tts } from '@livekit/agents'; +import { + type APIConnectOptions, + AudioByteStream, + Future, + log, + shortuuid, + tts, +} from '@livekit/agents'; import type { AudioFrame } from '@livekit/rtc-node'; import { request } from 'node:https'; import { WebSocket } from 'ws'; @@ -51,8 +58,12 @@ export class TTS extends tts.TTS { } } - synthesize(text: string): tts.ChunkedStream { - return new ChunkedStream(this, text, this.#opts); + synthesize( + text: string, + connOptions?: APIConnectOptions, + abortSignal?: AbortSignal, + ): tts.ChunkedStream { + return new ChunkedStream(this, text, this.#opts, connOptions, abortSignal); } stream(): tts.SynthesizeStream { @@ -62,11 +73,18 @@ export class TTS extends tts.TTS { export class ChunkedStream extends tts.ChunkedStream { label = 'neuphonic.ChunkedStream'; + #logger = log(); #opts: TTSOptions; #text: string; - constructor(tts: TTS, text: string, opts: TTSOptions) { - super(text, tts); + constructor( + tts: TTS, + text: string, + opts: TTSOptions, + connOptions?: APIConnectOptions, + abortSignal?: AbortSignal, + ) { + super(text, tts, connOptions, abortSignal); this.#text = text; this.#opts = opts; } @@ -80,6 +98,7 @@ export class ChunkedStream extends tts.ChunkedStream { }; let buffer = ''; + const doneFut = new Future(); const req = request( { @@ -92,6 +111,7 @@ export class ChunkedStream extends tts.ChunkedStream { 'Content-Type': 'application/json', Accept: 'text/event-stream', }, + signal: this.abortSignal, }, (res) => { res.on('data', (chunk) => { @@ -129,12 +149,24 @@ export class ChunkedStream extends tts.ChunkedStream { }); } this.queue.close(); + doneFut.resolve(); + }); + res.on('error', (err) => { + if (err.message === 'aborted') return; + this.#logger.error({ err }, 'Neuphonic TTS response error'); }); }, ); + req.on('error', (err) => { + if (err.name === 'AbortError') return; + this.#logger.error({ err }, 'Neuphonic TTS request error'); + }); + req.on('close', () => doneFut.resolve()); req.write(JSON.stringify(json)); req.end(); + + await doneFut.await; } } diff --git a/plugins/openai/src/tts.ts b/plugins/openai/src/tts.ts index 5c8466df5..25991ee37 100644 --- a/plugins/openai/src/tts.ts +++ b/plugins/openai/src/tts.ts @@ -1,7 +1,7 @@ // SPDX-FileCopyrightText: 2024 LiveKit, Inc. // // SPDX-License-Identifier: Apache-2.0 -import { AudioByteStream, shortuuid, tts } from '@livekit/agents'; +import { type APIConnectOptions, AudioByteStream, shortuuid, tts } from '@livekit/agents'; import type { AudioFrame } from '@livekit/rtc-node'; import { OpenAI } from 'openai'; import type { TTSModels, TTSVoices } from './models.js'; @@ -59,7 +59,11 @@ export class TTS extends tts.TTS { this.#opts = { ...this.#opts, ...opts }; } - synthesize(text: string): ChunkedStream { + synthesize( + text: string, + connOptions?: APIConnectOptions, + abortSignal?: AbortSignal, + ): ChunkedStream { return new ChunkedStream( this, text, @@ -72,8 +76,10 @@ export class TTS extends tts.TTS { response_format: 'pcm', speed: this.#opts.speed, }, - { signal: this.abortController.signal }, + { signal: abortSignal }, ), + connOptions, + abortSignal, ); } @@ -91,8 +97,14 @@ export class ChunkedStream extends tts.ChunkedStream { private stream: Promise; // set Promise to any because OpenAI returns an annoying Response type - constructor(tts: TTS, text: string, stream: Promise) { - super(text, tts); + constructor( + tts: TTS, + text: string, + stream: Promise, + connOptions?: APIConnectOptions, + abortSignal?: AbortSignal, + ) { + super(text, tts, connOptions, abortSignal); this.stream = stream; } diff --git a/plugins/resemble/src/tts.ts b/plugins/resemble/src/tts.ts index cf576a81b..b967c5026 100644 --- a/plugins/resemble/src/tts.ts +++ b/plugins/resemble/src/tts.ts @@ -1,7 +1,15 @@ // SPDX-FileCopyrightText: 2024 LiveKit, Inc. // // SPDX-License-Identifier: Apache-2.0 -import { AudioByteStream, log, shortuuid, tokenize, tts } from '@livekit/agents'; +import { + type APIConnectOptions, + AudioByteStream, + Future, + log, + shortuuid, + tokenize, + tts, +} from '@livekit/agents'; import type { AudioFrame } from '@livekit/rtc-node'; import { request } from 'node:https'; import { WebSocket } from 'ws'; @@ -49,8 +57,12 @@ export class TTS extends tts.TTS { } } - synthesize(text: string): tts.ChunkedStream { - return new ChunkedStream(this, text, this.#opts); + synthesize( + text: string, + connOptions?: APIConnectOptions, + abortSignal?: AbortSignal, + ): tts.ChunkedStream { + return new ChunkedStream(this, text, this.#opts, connOptions, abortSignal); } stream(): tts.SynthesizeStream { @@ -64,8 +76,14 @@ export class ChunkedStream extends tts.ChunkedStream { #opts: TTSOptions; #text: string; - constructor(tts: TTS, text: string, opts: TTSOptions) { - super(text, tts); + constructor( + tts: TTS, + text: string, + opts: TTSOptions, + connOptions?: APIConnectOptions, + abortSignal?: AbortSignal, + ) { + super(text, tts, connOptions, abortSignal); this.#text = text; this.#opts = opts; } @@ -76,6 +94,7 @@ export class ChunkedStream extends tts.ChunkedStream { const json = toResembleOptions(this.#opts); json.data = this.#text; + const doneFut = new Future(); const req = request( { @@ -88,6 +107,7 @@ export class ChunkedStream extends tts.ChunkedStream { 'Content-Type': 'application/json', Accept: 'application/json', }, + signal: this.abortSignal, }, (res) => { let data = ''; @@ -132,21 +152,35 @@ export class ChunkedStream extends tts.ChunkedStream { } this.queue.close(); + doneFut.resolve(); } catch (error) { this.#logger.error('Error processing Resemble API response:', error); this.queue.close(); + doneFut.resolve(); } }); - res.on('error', (error) => { - this.#logger.error('Resemble API error:', error); + res.on('close', () => { this.queue.close(); + doneFut.resolve(); + }); + + res.on('error', (err) => { + if (err.message === 'aborted') return; + this.#logger.error({ err }, 'Resemble TTS response error'); }); }, ); + req.on('error', (err) => { + if (err.name === 'AbortError') return; + this.#logger.error({ err }, 'Resemble TTS request error'); + }); + req.on('close', () => doneFut.resolve()); req.write(JSON.stringify(json)); req.end(); + + await doneFut.await; } } diff --git a/plugins/rime/src/tts.ts b/plugins/rime/src/tts.ts index 190cefdc2..333378408 100644 --- a/plugins/rime/src/tts.ts +++ b/plugins/rime/src/tts.ts @@ -1,7 +1,7 @@ // SPDX-FileCopyrightText: 2024 LiveKit, Inc. // // SPDX-License-Identifier: Apache-2.0 -import { AudioByteStream, shortuuid, tts } from '@livekit/agents'; +import { type APIConnectOptions, AudioByteStream, shortuuid, tts } from '@livekit/agents'; import type { AudioFrame } from '@livekit/rtc-node'; import type { DefaultLanguages, TTSModels } from './models.js'; @@ -101,8 +101,12 @@ export class TTS extends tts.TTS { * @param text - Text to synthesize * @returns A chunked stream of synthesized audio */ - synthesize(text: string): ChunkedStream { - return new ChunkedStream(this, text, this.opts); + synthesize( + text: string, + connOptions?: APIConnectOptions, + abortSignal?: AbortSignal, + ): ChunkedStream { + return new ChunkedStream(this, text, this.opts, connOptions, abortSignal); } stream(): tts.SynthesizeStream { @@ -121,9 +125,17 @@ export class ChunkedStream extends tts.ChunkedStream { * @param tts - The parent TTS instance * @param text - Text to synthesize * @param opts - TTS configuration options + * @param connOptions - API connection options + * @param abortSignal - Abort signal for cancellation */ - constructor(tts: TTS, text: string, opts: TTSOptions) { - super(text, tts); + constructor( + tts: TTS, + text: string, + opts: TTSOptions, + connOptions?: APIConnectOptions, + abortSignal?: AbortSignal, + ) { + super(text, tts, connOptions, abortSignal); this.text = text; this.opts = opts; } @@ -143,6 +155,7 @@ export class ChunkedStream extends tts.ChunkedStream { ), text: this.text, }), + signal: this.abortSignal, }); if (!response.ok) {