Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .changeset/wet-carpets-clean.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
'@livekit/agents-plugins-test': patch
'@livekit/agents': patch
---

fix: handle VAD stream closed error during agent handover

- Fixed a race condition in `StreamAdapter` where `endInput()` could be called on an already-closed VAD stream during agent handover, causing an unrecoverable `stt_error`. This affected non-streaming STTs (like OpenAI STT) that use the StreamAdapter wrapper.
- Added `isStreamClosedError()` utility function for consistent error handling.
- Upgraded sharp from 0.34.3 to 0.34.5 to fix libvips version conflict (1.2.0 vs 1.2.4) that caused flaky agent behavior and ObjC class collision warnings on macOS.
- Fixed pre-existing build error in test plugin (Int16Array to ArrayBuffer conversion).
2 changes: 1 addition & 1 deletion agents/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@
"pidusage": "^4.0.1",
"pino": "^8.19.0",
"pino-pretty": "^11.0.0",
"sharp": "0.34.3",
"sharp": "0.34.5",
"uuid": "^11.1.0",
"ws": "^8.18.0",
"zod-to-json-schema": "^3.24.6"
Expand Down
13 changes: 12 additions & 1 deletion agents/src/stt/stream_adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import type { AudioFrame } from '@livekit/rtc-node';
import { log } from '../log.js';
import type { APIConnectOptions } from '../types.js';
import { isStreamClosedError } from '../utils.js';
import type { VAD, VADStream } from '../vad.js';
import { VADEventType } from '../vad.js';
import type { SpeechEvent } from './stt.js';
Expand Down Expand Up @@ -68,7 +69,17 @@ export class StreamAdapterWrapper extends SpeechStream {
this.#vadStream.pushFrame(input);
}
}
this.#vadStream.endInput();

// Guard against calling endInput() on already-closed stream
// This happens during handover when close() is called while forwardInput is running
try {
this.#vadStream.endInput();
} catch (e) {
if (isStreamClosedError(e)) {
return;
}
throw e;
}
};

const recognize = async () => {
Expand Down
14 changes: 14 additions & 0 deletions agents/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,20 @@ export class InvalidErrorType extends Error {
}
}

/**
* Check if an error is a stream closed error that can be safely ignored during cleanup.
* This happens during handover/cleanup when close() is called while operations are still running.
*
* @param error - The error to check.
* @returns True if the error is a stream closed error.
*/
export function isStreamClosedError(error: unknown): boolean {
return (
error instanceof Error &&
(error.message === 'Stream is closed' || error.message === 'Input is closed')
);
}

/**
* In JS an error can be any arbitrary value.
* This function converts an unknown error to an Error and stores the original value in the error object.
Expand Down
9 changes: 8 additions & 1 deletion plugins/test/src/stt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,14 @@ const makeTestSpeech = (targetSampleRate: number, chunkDuration?: number): Audio

const chunkSize = (targetSampleRate * chunkDuration) / 1000;
const bstream = new AudioByteStream(targetSampleRate, channels, chunkSize);
frames = bstream.write(merged.data);

// Convert Int16Array to ArrayBuffer
const arrayBuffer = merged.data.buffer.slice(
merged.data.byteOffset,
merged.data.byteOffset + merged.data.byteLength,
) as ArrayBuffer;

frames = bstream.write(arrayBuffer);
frames.push(...bstream.flush());
return frames;
};
Loading