Skip to content

Commit a21eb72

Browse files
authored
fix: silence stream cleanup errors during teardown (#884)
1 parent e590012 commit a21eb72

File tree

4 files changed

+36
-9
lines changed

4 files changed

+36
-9
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
"@livekit/agents": patch
3+
---
4+
5+
Fix error spam during stream cleanup. Gracefully handle edge cases when detaching audio streams that were never initialized.
6+

agents/src/stream/deferred_stream.test.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -522,11 +522,11 @@ describe('DeferredReadableStream', () => {
522522
expect(() => deferred.setSource(source2)).toThrow('Stream source already set');
523523
});
524524

525-
it('should throw error when trying to detach source before setting it', async () => {
525+
it('should safely detach source before setting it (no-op)', async () => {
526526
const deferred = new DeferredReadableStream<string>();
527527

528-
// Attempting to detach source before setting it should throw
529-
await expect(deferred.detachSource()).rejects.toThrow('Source not set');
528+
// Attempting to detach source before setting it should be a no-op, not throw
529+
await expect(deferred.detachSource()).resolves.toBeUndefined();
530530
});
531531

532532
it('read returns undefined as soon as reader is cancelled', async () => {

agents/src/stream/deferred_stream.ts

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,22 @@ import type {
99
import { IdentityTransform } from './identity_transform.js';
1010

1111
/**
12-
* Check if error is related to reader.read after release lock
12+
* Check if error is related to stream cleanup operations.
13+
*
14+
* These errors are expected when calling reader.read() after releaseLock()
15+
* or when writing to already closed streams during cleanup:
1316
*
1417
* Invalid state: Releasing reader
1518
* Invalid state: The reader is not attached to a stream
19+
* Invalid state: Controller is already closed
20+
* Invalid state: WritableStream is closed
1621
*/
1722
export function isStreamReaderReleaseError(e: unknown) {
1823
const allowedMessages = [
1924
'Invalid state: Releasing reader',
2025
'Invalid state: The reader is not attached to a stream',
26+
'Controller is already closed',
27+
'WritableStream is closed',
2128
];
2229

2330
if (e instanceof TypeError) {
@@ -66,18 +73,27 @@ export class DeferredReadableStream<T> {
6673
await this.writer.write(value);
6774
}
6875
} catch (e) {
69-
// skip source detach related errors
76+
// skip stream cleanup related errors
7077
if (isStreamReaderReleaseError(e)) return;
78+
7179
sourceError = e;
7280
} finally {
7381
// any other error from source will be propagated to the consumer
7482
if (sourceError) {
75-
this.writer.abort(sourceError);
83+
try {
84+
this.writer.abort(sourceError);
85+
} catch (e) {
86+
// ignore if writer is already closed
87+
}
7688
return;
7789
}
7890

7991
// release lock so this.stream.getReader().read() will terminate with done: true
80-
this.writer.releaseLock();
92+
try {
93+
this.writer.releaseLock();
94+
} catch (e) {
95+
// ignore if writer lock is already released
96+
}
8197

8298
// we only close the writable stream after done
8399
try {
@@ -98,7 +114,8 @@ export class DeferredReadableStream<T> {
98114
*/
99115
async detachSource() {
100116
if (!this.isSourceSet) {
101-
throw new Error('Source not set');
117+
// No-op if source was never set - this is a common case during cleanup
118+
return;
102119
}
103120

104121
// release lock will make any pending read() throw TypeError

agents/src/voice/agent_session.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -788,7 +788,11 @@ export class AgentSession<
788788
await this.activity.drain();
789789
// wait any uninterruptible speech to finish
790790
await this.activity.currentSpeech?.waitForPlayout();
791-
this.activity.detachAudioInput();
791+
try {
792+
this.activity.detachAudioInput();
793+
} catch (error) {
794+
// Ignore detach errors during cleanup - source may not have been set
795+
}
792796
}
793797

794798
// Close recorder before detaching inputs/outputs (keep reference for session report)

0 commit comments

Comments
 (0)