Skip to content

Commit 78a0d21

Browse files
authored
Improve TTS resource cleanup (#893)
1 parent 48923ab commit 78a0d21

File tree

14 files changed

+309
-75
lines changed

14 files changed

+309
-75
lines changed

.changeset/hungry-schools-think.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@livekit/agents': patch
3+
---
4+
5+
Fix improper resource cleanup inside AgentActivity by not close global STT / TTS / VAD components

.changeset/twenty-houses-smash.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
---
2+
'@livekit/agents-plugin-neuphonic': patch
3+
'@livekit/agents-plugin-cartesia': patch
4+
'@livekit/agents-plugin-deepgram': patch
5+
'@livekit/agents-plugin-resemble': patch
6+
'@livekit/agents-plugin-inworld': patch
7+
'@livekit/agents-plugin-google': patch
8+
'@livekit/agents-plugin-openai': patch
9+
'@livekit/agents-plugin-rime': patch
10+
'@livekit/agents': patch
11+
---
12+
13+
Improve TTS resource cleanup

agents/src/inference/tts.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -238,15 +238,13 @@ export class TTS<TModel extends TTSModels> extends BaseTTS {
238238
export class SynthesizeStream<TModel extends TTSModels> extends BaseSynthesizeStream {
239239
private opts: InferenceTTSOptions<TModel>;
240240
private tts: TTS<TModel>;
241-
private connOptions: APIConnectOptions;
242241

243242
#logger = log();
244243

245244
constructor(tts: TTS<TModel>, opts: InferenceTTSOptions<TModel>, connOptions: APIConnectOptions) {
246245
super(tts, connOptions);
247246
this.opts = opts;
248247
this.tts = tts;
249-
this.connOptions = connOptions;
250248
}
251249

252250
get label() {

agents/src/tts/stream_adapter.ts

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,17 @@ export class StreamAdapter extends TTS {
2222
this.#tts.on('metrics_collected', (metrics) => {
2323
this.emit('metrics_collected', metrics);
2424
});
25+
this.#tts.on('error', (error) => {
26+
this.emit('error', error);
27+
});
2528
}
2629

27-
synthesize(text: string): ChunkedStream {
28-
return this.#tts.synthesize(text);
30+
synthesize(
31+
text: string,
32+
connOptions?: APIConnectOptions,
33+
abortSignal?: AbortSignal,
34+
): ChunkedStream {
35+
return this.#tts.synthesize(text, connOptions, abortSignal);
2936
}
3037

3138
stream(options?: { connOptions?: APIConnectOptions }): StreamAdapterWrapper {
@@ -85,7 +92,7 @@ export class StreamAdapterWrapper extends SynthesizeStream {
8592
prevTask: Task<void> | undefined,
8693
controller: AbortController,
8794
) => {
88-
const audioStream = this.#tts.synthesize(token);
95+
const audioStream = this.#tts.synthesize(token, this.connOptions, this.abortSignal);
8996

9097
// wait for previous audio transcription to complete before starting
9198
// to queuing audio frames of the current token

agents/src/tts/tts.ts

Lines changed: 41 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,11 @@ export abstract class TTS extends (EventEmitter as new () => TypedEmitter<TTSCal
9090
/**
9191
* Receives text and returns synthesis in the form of a {@link ChunkedStream}
9292
*/
93-
abstract synthesize(text: string): ChunkedStream;
93+
abstract synthesize(
94+
text: string,
95+
connOptions?: APIConnectOptions,
96+
abortSignal?: AbortSignal,
97+
): ChunkedStream;
9498

9599
/**
96100
* Returns a {@link SynthesizeStream} that can be used to push text and receive audio data
@@ -131,30 +135,33 @@ export abstract class SynthesizeStream
131135
SynthesizedAudio | typeof SynthesizeStream.END_OF_STREAM
132136
>();
133137
protected closed = false;
134-
abstract label: string;
135-
#tts: TTS;
136-
#metricsPendingTexts: string[] = [];
137-
#metricsText = '';
138-
#monitorMetricsTask?: Promise<void>;
139-
private _connOptions: APIConnectOptions;
138+
protected connOptions: APIConnectOptions;
140139
protected abortController = new AbortController();
141-
#ttsRequestSpan?: Span;
142140

143141
private deferredInputStream: DeferredReadableStream<
144142
string | typeof SynthesizeStream.FLUSH_SENTINEL
145143
>;
146144
private logger = log();
147145

146+
abstract label: string;
147+
148+
#tts: TTS;
149+
#metricsPendingTexts: string[] = [];
150+
#metricsText = '';
151+
#monitorMetricsTask?: Promise<void>;
152+
#ttsRequestSpan?: Span;
153+
148154
constructor(tts: TTS, connOptions: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS) {
149155
this.#tts = tts;
150-
this._connOptions = connOptions;
156+
this.connOptions = connOptions;
151157
this.deferredInputStream = new DeferredReadableStream();
152158
this.pumpInput();
159+
153160
this.abortController.signal.addEventListener('abort', () => {
154161
this.deferredInputStream.detachSource();
155162
// TODO (AJS-36) clean this up when we refactor with streams
156-
this.input.close();
157-
this.output.close();
163+
if (!this.input.closed) this.input.close();
164+
if (!this.output.closed) this.output.close();
158165
this.closed = true;
159166
});
160167

@@ -172,7 +179,7 @@ export abstract class SynthesizeStream
172179
[traceTypes.ATTR_TTS_LABEL]: this.#tts.label,
173180
});
174181

175-
for (let i = 0; i < this._connOptions.maxRetry + 1; i++) {
182+
for (let i = 0; i < this.connOptions.maxRetry + 1; i++) {
176183
try {
177184
return await tracer.startActiveSpan(
178185
async (attemptSpan) => {
@@ -188,15 +195,15 @@ export abstract class SynthesizeStream
188195
);
189196
} catch (error) {
190197
if (error instanceof APIError) {
191-
const retryInterval = intervalForRetry(this._connOptions, i);
198+
const retryInterval = intervalForRetry(this.connOptions, i);
192199

193-
if (this._connOptions.maxRetry === 0 || !error.retryable) {
200+
if (this.connOptions.maxRetry === 0 || !error.retryable) {
194201
this.emitError({ error, recoverable: false });
195202
throw error;
196-
} else if (i === this._connOptions.maxRetry) {
203+
} else if (i === this.connOptions.maxRetry) {
197204
this.emitError({ error, recoverable: false });
198205
throw new APIConnectionError({
199-
message: `failed to generate TTS completion after ${this._connOptions.maxRetry + 1} attempts`,
206+
message: `failed to generate TTS completion after ${this.connOptions.maxRetry + 1} attempts`,
200207
options: { retryable: false },
201208
});
202209
} else {
@@ -380,6 +387,10 @@ export abstract class SynthesizeStream
380387
return this.output.next();
381388
}
382389

390+
get abortSignal(): AbortSignal {
391+
return this.abortController.signal;
392+
}
393+
383394
/** Close both the input and output of the TTS stream */
384395
close() {
385396
this.abortController.abort();
@@ -415,15 +426,22 @@ export abstract class ChunkedStream implements AsyncIterableIterator<Synthesized
415426
private _connOptions: APIConnectOptions;
416427
private logger = log();
417428

429+
protected abortController = new AbortController();
430+
418431
constructor(
419432
text: string,
420433
tts: TTS,
421434
connOptions: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
435+
abortSignal?: AbortSignal,
422436
) {
423437
this.#text = text;
424438
this.#tts = tts;
425439
this._connOptions = connOptions;
426440

441+
if (abortSignal) {
442+
abortSignal.addEventListener('abort', () => this.abortController.abort(), { once: true });
443+
}
444+
427445
this.monitorMetrics();
428446

429447
// this is a hack to immitate asyncio.create_task so that mainTask
@@ -510,6 +528,10 @@ export abstract class ChunkedStream implements AsyncIterableIterator<Synthesized
510528
return this.#text;
511529
}
512530

531+
get abortSignal(): AbortSignal {
532+
return this.abortController.signal;
533+
}
534+
513535
protected async monitorMetrics() {
514536
const startTime = process.hrtime.bigint();
515537
let audioDurationMs = 0;
@@ -564,8 +586,9 @@ export abstract class ChunkedStream implements AsyncIterableIterator<Synthesized
564586

565587
/** Close both the input and output of the TTS stream */
566588
close() {
567-
this.queue.close();
568-
this.output.close();
589+
if (!this.queue.closed) this.queue.close();
590+
if (!this.output.closed) this.output.close();
591+
if (!this.abortController.signal.aborted) this.abortController.abort();
569592
this.closed = true;
570593
}
571594

agents/src/voice/agent_session.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -313,6 +313,7 @@ export class AgentSession<
313313
ctx = getJobContext();
314314
} catch (error) {
315315
// JobContext is not available in evals
316+
this.logger.warn('JobContext is not available');
316317
}
317318

318319
if (ctx) {
@@ -393,6 +394,7 @@ export class AgentSession<
393394
}
394395
} catch (error) {
395396
// JobContext is not available in evals
397+
this.logger.warn('JobContext is not available');
396398
}
397399

398400
this.sessionSpan = tracer.startSpan({

plugins/cartesia/src/tts.ts

Lines changed: 39 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,15 @@
11
// SPDX-FileCopyrightText: 2024 LiveKit, Inc.
22
//
33
// SPDX-License-Identifier: Apache-2.0
4-
import { AudioByteStream, log, shortuuid, tokenize, tts } from '@livekit/agents';
4+
import {
5+
type APIConnectOptions,
6+
AudioByteStream,
7+
Future,
8+
log,
9+
shortuuid,
10+
tokenize,
11+
tts,
12+
} from '@livekit/agents';
513
import type { AudioFrame } from '@livekit/rtc-node';
614
import { request } from 'node:https';
715
import { type RawData, WebSocket } from 'ws';
@@ -88,8 +96,12 @@ export class TTS extends tts.TTS {
8896
}
8997
}
9098

91-
synthesize(text: string): tts.ChunkedStream {
92-
return new ChunkedStream(this, text, this.#opts);
99+
synthesize(
100+
text: string,
101+
connOptions?: APIConnectOptions,
102+
abortSignal?: AbortSignal,
103+
): tts.ChunkedStream {
104+
return new ChunkedStream(this, text, this.#opts, connOptions, abortSignal);
93105
}
94106

95107
stream(): SynthesizeStream {
@@ -99,12 +111,18 @@ export class TTS extends tts.TTS {
99111

100112
export class ChunkedStream extends tts.ChunkedStream {
101113
label = 'cartesia.ChunkedStream';
114+
#logger = log();
102115
#opts: TTSOptions;
103116
#text: string;
104117

105-
// set Promise<T> to any because OpenAI returns an annoying Response type
106-
constructor(tts: TTS, text: string, opts: TTSOptions) {
107-
super(text, tts);
118+
constructor(
119+
tts: TTS,
120+
text: string,
121+
opts: TTSOptions,
122+
connOptions?: APIConnectOptions,
123+
abortSignal?: AbortSignal,
124+
) {
125+
super(text, tts, connOptions, abortSignal);
108126
this.#text = text;
109127
this.#opts = opts;
110128
}
@@ -116,6 +134,8 @@ export class ChunkedStream extends tts.ChunkedStream {
116134
json.transcript = this.#text;
117135

118136
const baseUrl = new URL(this.#opts.baseUrl);
137+
const doneFut = new Future<void>();
138+
119139
const req = request(
120140
{
121141
hostname: baseUrl.hostname,
@@ -126,6 +146,7 @@ export class ChunkedStream extends tts.ChunkedStream {
126146
[AUTHORIZATION_HEADER]: this.#opts.apiKey!,
127147
[VERSION_HEADER]: VERSION,
128148
},
149+
signal: this.abortSignal,
129150
},
130151
(res) => {
131152
res.on('data', (chunk) => {
@@ -148,12 +169,24 @@ export class ChunkedStream extends tts.ChunkedStream {
148169
});
149170
}
150171
this.queue.close();
172+
doneFut.resolve();
173+
});
174+
res.on('error', (err) => {
175+
if (err.message === 'aborted') return;
176+
this.#logger.error({ err }, 'Cartesia TTS response error');
151177
});
152178
},
153179
);
154180

181+
req.on('error', (err) => {
182+
if (err.name === 'AbortError') return;
183+
this.#logger.error({ err }, 'Cartesia TTS request error');
184+
});
185+
req.on('close', () => doneFut.resolve());
155186
req.write(JSON.stringify(json));
156187
req.end();
188+
189+
await doneFut.await;
157190
}
158191
}
159192

0 commit comments

Comments
 (0)