Skip to content

Commit 31147d6

Browse files
samohovetsVanilagy
andauthored
fix: surface StreamTarget write errors instead of swallowing them (#305)
* fix: await StreamTarget writes to prevent overlapping OPFS operations * no need for flush * preserve fire and forget writes * Remove pendingWrites, add missing mutex acquire to ADTS muxer finalize method --------- Co-authored-by: Vanilagy <1696106+Vanilagy@users.noreply.github.com>
1 parent 48f9bda commit 31147d6

File tree

3 files changed

+151
-3
lines changed

3 files changed

+151
-3
lines changed

src/adts/adts-muxer.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,5 +111,8 @@ export class AdtsMuxer extends Muxer {
111111
throw new Error('ADTS does not support subtitles.');
112112
}
113113

114-
async finalize() {}
114+
async finalize() {
115+
const release = await this.mutex.acquire(); // Required so that finalize() can't resolve before other calls
116+
release();
117+
}
115118
}

src/writer.ts

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,7 @@ export class StreamTargetWriter extends Writer {
215215
private lastWriteEnd = 0;
216216
private lastFlushEnd = 0;
217217
private writer: WritableStreamDefaultWriter<StreamTargetChunk> | null = null;
218+
private writeError: unknown = null;
218219

219220
// These variables regard chunked mode:
220221
private chunked: boolean;
@@ -267,6 +268,11 @@ export class StreamTargetWriter extends Writer {
267268
}
268269

269270
async flush() {
271+
if (this.writeError !== null) {
272+
// eslint-disable-next-line @typescript-eslint/only-throw-error
273+
throw this.writeError;
274+
}
275+
270276
if (this.pos > this.lastWriteEnd) {
271277
// There's a "void" between the last written byte and the next byte we're about to write. Let's pad that
272278
// void with zeroes explicitly.
@@ -329,11 +335,12 @@ export class StreamTargetWriter extends Writer {
329335
throw new Error('Internal error: Monotonicity violation.');
330336
}
331337

332-
// Write out the data immediately
333338
void this.writer.write({
334339
type: 'write',
335340
data: chunk.data,
336341
position: chunk.start,
342+
}).catch((error) => {
343+
this.writeError ??= error;
337344
});
338345

339346
this.lastFlushEnd = chunk.start + chunk.data.byteLength;
@@ -440,6 +447,8 @@ export class StreamTargetWriter extends Writer {
440447
type: 'write',
441448
data: chunk.data.subarray(section.start, section.end),
442449
position,
450+
}).catch((error) => {
451+
this.writeError ??= error;
443452
});
444453

445454
this.lastFlushEnd = chunk.start + section.end;
@@ -449,12 +458,18 @@ export class StreamTargetWriter extends Writer {
449458
}
450459
}
451460

452-
finalize() {
461+
async finalize() {
453462
if (this.chunked) {
454463
this.tryToFlushChunks(true);
455464
}
456465

466+
if (this.writeError !== null) {
467+
// eslint-disable-next-line @typescript-eslint/only-throw-error
468+
throw this.writeError;
469+
}
470+
457471
assert(this.writer);
472+
await this.writer.ready;
458473
return this.writer.close();
459474
}
460475

test/browser/adts-muxing.test.ts

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
import { expect, test } from 'vitest';
2+
import { Input } from '../../src/input.js';
3+
import { BufferSource, UrlSource } from '../../src/source.js';
4+
import { ALL_FORMATS } from '../../src/input-format.js';
5+
import { EncodedPacketSink } from '../../src/media-sink.js';
6+
import { EncodedAudioPacketSource } from '../../src/media-source.js';
7+
import { Output } from '../../src/output.js';
8+
import { StreamTarget, type StreamTargetChunk } from '../../src/target.js';
9+
import { AdtsOutputFormat } from '../../src/output-format.js';
10+
import { assert } from '../../src/misc.js';
11+
12+
const createBufferingStreamTarget = () => {
13+
const written = new Map<number, Uint8Array>();
14+
15+
const stream = new WritableStream<StreamTargetChunk>({
16+
async write(chunk: StreamTargetChunk) {
17+
written.set(chunk.position, chunk.data.slice());
18+
},
19+
});
20+
21+
const toBuffer = () => {
22+
let maxEnd = 0;
23+
for (const [offset, data] of written) {
24+
maxEnd = Math.max(maxEnd, offset + data.byteLength);
25+
}
26+
const buffer = new Uint8Array(maxEnd);
27+
for (const [offset, data] of written) {
28+
buffer.set(data, offset);
29+
}
30+
return buffer;
31+
};
32+
33+
return { stream, toBuffer };
34+
};
35+
36+
test('ADTS with metadata over StreamTarget', async () => {
37+
const target = createBufferingStreamTarget();
38+
39+
const output = new Output({
40+
format: new AdtsOutputFormat(),
41+
target: new StreamTarget(target.stream),
42+
});
43+
44+
output.setMetadataTags({ comment: 'Remotion' });
45+
46+
const audioSource = new EncodedAudioPacketSource('aac');
47+
output.addAudioTrack(audioSource);
48+
49+
await output.start();
50+
51+
using input = new Input({
52+
source: new UrlSource('/sample3.aac'),
53+
formats: ALL_FORMATS,
54+
});
55+
56+
const audioTrack = await input.getPrimaryAudioTrack();
57+
assert(audioTrack);
58+
59+
const sink = new EncodedPacketSink(audioTrack);
60+
61+
let isFirst = true;
62+
for await (const packet of sink.packets()) {
63+
await audioSource.add(packet, {
64+
decoderConfig: isFirst ? (await audioTrack.getDecoderConfig())! : undefined,
65+
});
66+
isFirst = false;
67+
}
68+
69+
await output.finalize();
70+
71+
const buffer = target.toBuffer();
72+
using outputAsInput = new Input({
73+
source: new BufferSource(buffer.buffer),
74+
formats: ALL_FORMATS,
75+
});
76+
77+
const readTags = await outputAsInput.getMetadataTags();
78+
expect(readTags.comment).toBe('Remotion');
79+
80+
const outputAudioTrack = await outputAsInput.getPrimaryAudioTrack();
81+
assert(outputAudioTrack);
82+
expect(outputAudioTrack.codec).toBe('aac');
83+
});
84+
85+
// Previously, write handler rejections were silently swallowed and surfaced as
86+
// "Cannot write to a closing writable stream" instead of the actual error.
87+
test('StreamTarget write errors surface directly', async () => {
88+
let writeCount = 0;
89+
const stream = new WritableStream<StreamTargetChunk>({
90+
async write() {
91+
writeCount++;
92+
if (writeCount === 2) {
93+
throw new Error('OPFS write failed');
94+
}
95+
},
96+
});
97+
98+
const output = new Output({
99+
format: new AdtsOutputFormat(),
100+
target: new StreamTarget(stream),
101+
});
102+
103+
const audioSource = new EncodedAudioPacketSource('aac');
104+
output.addAudioTrack(audioSource);
105+
106+
await output.start();
107+
108+
using input = new Input({
109+
source: new UrlSource('/sample3.aac'),
110+
formats: ALL_FORMATS,
111+
});
112+
113+
const audioTrack = await input.getPrimaryAudioTrack();
114+
assert(audioTrack);
115+
116+
const sink = new EncodedPacketSink(audioTrack);
117+
118+
const run = async () => {
119+
let isFirst = true;
120+
for await (const packet of sink.packets()) {
121+
await audioSource.add(packet, {
122+
decoderConfig: isFirst ? (await audioTrack.getDecoderConfig())! : undefined,
123+
});
124+
isFirst = false;
125+
}
126+
await output.finalize();
127+
};
128+
129+
await expect(run()).rejects.toThrow('OPFS write failed');
130+
});

0 commit comments

Comments
 (0)