Skip to content

Commit 94e8352

Browse files
committed
feat: implement streaming encode/decode pipelines #11
1 parent e2cd1a0 commit 94e8352

File tree

11 files changed

+1892
-266
lines changed

11 files changed

+1892
-266
lines changed

package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,8 @@
4444
"format": "prettier --check .",
4545
"typecheck": "tsup --dts-only --silent",
4646
"test": "pnpm run test:unit && pnpm run test:integration",
47-
"test:unit": "vitest run tests/unit --coverage",
48-
"test:integration": "vitest run tests/integration",
47+
"test:unit": "vitest run tests/unit --coverage --pool=threads",
48+
"test:integration": "vitest run tests/integration --pool=threads",
4949
"fix": "pnpm exec eslint . --ext .ts,.mjs --fix && pnpm exec prettier --write .",
5050
"postinstall": "node ./scripts/install-lame.mjs",
5151
"prepublishOnly": "pnpm run build",

src/core/lame-process.ts

Lines changed: 363 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,363 @@
1+
import { spawn, type ChildProcessWithoutNullStreams } from "node:child_process";
2+
3+
import type { LameProgressEmitter, LameStatus } from "../types";
4+
import { resolveLameBinary } from "../internal/binary/resolve-binary";
5+
import { LameOptions } from "./lame-options";
6+
7+
type ProgressKind = "encode" | "decode";
8+
9+
const LAME_TAG_MESSAGE = "Writing LAME Tag...done";
10+
11+
function createInitialStatus(): LameStatus {
12+
return {
13+
started: false,
14+
finished: false,
15+
progress: 0,
16+
eta: undefined,
17+
};
18+
}
19+
20+
function buildLameSpawnArgs(
21+
builder: LameOptions,
22+
kind: ProgressKind,
23+
input: string,
24+
output: string,
25+
): string[] {
26+
const args = builder.getArguments();
27+
const normalizedArgs = [...args];
28+
29+
if (
30+
builder.shouldUseDefaultDisptime() &&
31+
!normalizedArgs.includes("--disptime")
32+
) {
33+
normalizedArgs.push("--disptime", "1");
34+
}
35+
36+
if (kind === "decode") {
37+
normalizedArgs.push("--decode");
38+
}
39+
40+
return [input, output, ...normalizedArgs.map((value) => String(value))];
41+
}
42+
43+
function markProcessFinished(
44+
status: LameStatus,
45+
emitter: LameProgressEmitter,
46+
) {
47+
if (status.finished) {
48+
return;
49+
}
50+
51+
status.finished = true;
52+
status.progress = 100;
53+
status.eta = "00:00";
54+
55+
emitter.emit("finish");
56+
emitter.emit("progress", [status.progress, status.eta]);
57+
}
58+
59+
function parseEncodeProgressLine(content: string): {
60+
progress?: number;
61+
eta?: string;
62+
} | null {
63+
const progressMatch = content.match(/\(\s*((?:[0-9]{1,2})|100)%\)\|/);
64+
if (!progressMatch) {
65+
return null;
66+
}
67+
68+
const etaMatch = content.match(/[0-9]{1,2}:[0-9][0-9] /);
69+
70+
/* c8 ignore next */
71+
const progress = Number(progressMatch[1]);
72+
const eta = etaMatch ? etaMatch[0].trim() : undefined;
73+
74+
return { progress, eta };
75+
}
76+
77+
function parseDecodeProgressLine(content: string): number | null {
78+
const progressMatch = content.match(/[0-9]{1,10}\/[0-9]{1,10}/);
79+
if (!progressMatch) {
80+
return null;
81+
}
82+
83+
const [current, total] = progressMatch[0].split("/").map(Number);
84+
if (!Number.isFinite(current) || !Number.isFinite(total) || total === 0) {
85+
return NaN;
86+
}
87+
88+
return Math.floor((current / total) * 100);
89+
}
90+
91+
function normalizeCliMessage(content: string): string | null {
92+
if (
93+
content.startsWith("lame: ") ||
94+
content.startsWith("Warning: ") ||
95+
content.includes("Error ")
96+
) {
97+
return content.startsWith("lame: ") ? content : `lame: ${content}`;
98+
}
99+
100+
return null;
101+
}
102+
103+
interface ProgressProcessorOptions {
104+
kind: ProgressKind;
105+
status: LameStatus;
106+
emitter: LameProgressEmitter;
107+
completeOnTag?: boolean;
108+
}
109+
110+
interface ProgressProcessorResult {
111+
error?: Error;
112+
}
113+
114+
function processProgressChunk(
115+
payload: Buffer,
116+
options: ProgressProcessorOptions,
117+
): ProgressProcessorResult {
118+
const { kind, status, emitter, completeOnTag } = options;
119+
const content = payload.toString();
120+
const lines = content.split(/\r?\n/);
121+
122+
for (const rawLine of lines) {
123+
const line = rawLine.trim();
124+
if (line === "") {
125+
continue;
126+
}
127+
128+
if (completeOnTag && line.includes(LAME_TAG_MESSAGE)) {
129+
markProcessFinished(status, emitter);
130+
continue;
131+
}
132+
133+
if (kind === "encode") {
134+
const parsed = parseEncodeProgressLine(line);
135+
if (parsed) {
136+
if (
137+
parsed.progress !== undefined &&
138+
parsed.progress > status.progress
139+
) {
140+
status.progress = parsed.progress;
141+
}
142+
143+
if (parsed.eta) {
144+
status.eta = parsed.eta;
145+
}
146+
147+
emitter.emit("progress", [status.progress, status.eta]);
148+
continue;
149+
}
150+
} else {
151+
const parsed = parseDecodeProgressLine(line);
152+
if (parsed !== null) {
153+
if (!Number.isNaN(parsed)) {
154+
status.progress = parsed;
155+
}
156+
157+
emitter.emit("progress", [status.progress, status.eta]);
158+
continue;
159+
}
160+
}
161+
162+
const normalized = normalizeCliMessage(line);
163+
if (normalized) {
164+
return { error: new Error(normalized) };
165+
}
166+
}
167+
168+
return {};
169+
}
170+
171+
function getExitError(code: number | null): Error | null {
172+
if (code === 0) {
173+
return null;
174+
}
175+
176+
if (code === 255) {
177+
return new Error(
178+
"Unexpected termination of the process, possibly directly after the start. Please check if the input and/or output does not exist.",
179+
);
180+
}
181+
182+
if (code !== null) {
183+
return new Error(`lame: Process exited with code ${code}`);
184+
}
185+
186+
return new Error("lame: Process exited unexpectedly");
187+
}
188+
189+
interface SpawnLameProcessOptions {
190+
binaryPath?: string;
191+
spawnArgs: string[];
192+
kind: ProgressKind;
193+
status: LameStatus;
194+
emitter: LameProgressEmitter;
195+
onError: (error: Error) => void;
196+
progressSources: Array<"stdout" | "stderr">;
197+
completeOnTag?: boolean;
198+
onStdoutData?: (chunk: Buffer) => void;
199+
onStdoutEnd?: () => void;
200+
onStdoutError?: (error: Error) => void;
201+
onStderrData?: (chunk: Buffer) => void;
202+
onStderrError?: (error: Error) => void;
203+
onStdinError?: (error: Error) => void;
204+
onSuccess?: () => void;
205+
}
206+
207+
function spawnLameProcess(
208+
options: SpawnLameProcessOptions,
209+
): ChildProcessWithoutNullStreams {
210+
const {
211+
binaryPath,
212+
spawnArgs,
213+
kind,
214+
status,
215+
emitter,
216+
onError,
217+
progressSources,
218+
completeOnTag,
219+
onStdoutData,
220+
onStdoutEnd,
221+
onStdoutError,
222+
onStderrData,
223+
onStderrError,
224+
onStdinError,
225+
onSuccess,
226+
} = options;
227+
228+
status.started = true;
229+
status.finished = false;
230+
status.progress = 0;
231+
status.eta = undefined;
232+
233+
const executable = binaryPath ?? resolveLameBinary();
234+
const child = spawn(executable, spawnArgs);
235+
236+
const progressTargets = new Set(progressSources);
237+
let hasSeenCliError = false;
238+
let stderrBuffer = "";
239+
240+
const deliveredErrorMessages = new Set<string>();
241+
242+
const deliverError = (error: Error) => {
243+
const message = error.message ?? String(error);
244+
if (deliveredErrorMessages.has(message)) {
245+
return;
246+
}
247+
248+
deliveredErrorMessages.add(message);
249+
onError(error);
250+
};
251+
252+
const emitCliError = (error: Error) => {
253+
hasSeenCliError = true;
254+
deliverError(error);
255+
};
256+
257+
const emitExitError = (error: Error) => {
258+
if (hasSeenCliError) {
259+
return;
260+
}
261+
262+
deliverError(error);
263+
};
264+
265+
const handleStdout = (chunk: Buffer) => {
266+
if (progressTargets.has("stdout")) {
267+
const { error } = processProgressChunk(chunk, {
268+
kind,
269+
status,
270+
emitter,
271+
completeOnTag,
272+
});
273+
274+
if (error) {
275+
emitCliError(error);
276+
return;
277+
}
278+
}
279+
280+
onStdoutData?.(chunk);
281+
};
282+
283+
const handleStderr = (chunk: Buffer) => {
284+
stderrBuffer += chunk.toString();
285+
if (progressTargets.has("stderr")) {
286+
const { error } = processProgressChunk(chunk, {
287+
kind,
288+
status,
289+
emitter,
290+
completeOnTag,
291+
});
292+
293+
if (error) {
294+
emitCliError(error);
295+
return;
296+
}
297+
}
298+
299+
onStderrData?.(chunk);
300+
};
301+
302+
child.stdout.on("data", handleStdout);
303+
child.stderr.on("data", handleStderr);
304+
305+
if (onStdoutEnd) {
306+
child.stdout.on("end", onStdoutEnd);
307+
}
308+
309+
child.stdout.on("error", (error) => {
310+
onStdoutError?.(error);
311+
emitCliError(error);
312+
});
313+
314+
child.stderr.on("error", (error) => {
315+
onStderrError?.(error);
316+
emitCliError(error);
317+
});
318+
319+
child.stdin.on("error", (error) => {
320+
onStdinError?.(error);
321+
emitCliError(error);
322+
});
323+
324+
child.on("error", emitCliError);
325+
child.on("close", (code) => {
326+
const exitError = getExitError(code);
327+
if (exitError) {
328+
const bufferedLines = stderrBuffer
329+
.split(/\r?\n/)
330+
.map((value) => value.trim())
331+
.filter((value) => value.length > 0);
332+
333+
for (const line of bufferedLines) {
334+
const normalized = normalizeCliMessage(line);
335+
if (normalized) {
336+
emitCliError(new Error(normalized));
337+
return;
338+
}
339+
}
340+
341+
emitExitError(exitError);
342+
return;
343+
}
344+
345+
markProcessFinished(status, emitter);
346+
onSuccess?.();
347+
});
348+
349+
return child;
350+
}
351+
352+
export {
353+
buildLameSpawnArgs,
354+
createInitialStatus,
355+
getExitError,
356+
markProcessFinished,
357+
normalizeCliMessage,
358+
parseDecodeProgressLine,
359+
parseEncodeProgressLine,
360+
processProgressChunk,
361+
spawnLameProcess,
362+
};
363+
export type { ProgressKind, SpawnLameProcessOptions };

0 commit comments

Comments
 (0)