Skip to content

Commit f3497c9

Browse files
committed
refactor: stream API to be similar to non-stream API; created docs for streams
1 parent 7cdac31 commit f3497c9

File tree

10 files changed

+211
-94
lines changed

10 files changed

+211
-94
lines changed

README.md

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,61 @@ await encoder.encode();
331331

332332
Setting `disptime` changes how often progress is emitted by the LAME CLI, while `silent`, `quiet`, and `verbose` let you align terminal verbosity with your logging needs.
333333

334+
### Encode via Node.js streams
335+
336+
When you already operate on `Readable`/`Writable` streams (for example when piping uploaded audio to disk), use the encoder stream helper to avoid buffering everything in memory.
337+
338+
```js
339+
import { createReadStream, createWriteStream } from "node:fs";
340+
import { pipeline } from "node:stream/promises";
341+
import { LameStream } from "node-lame";
342+
343+
const encoderStream = new LameStream({
344+
mode: "encode",
345+
bitrate: 192,
346+
});
347+
348+
encoderStream.getEmitter().on("progress", ([progress, eta]) => {
349+
process.stdout.write(
350+
`Streaming progress: ${progress}%${eta ? ` – ETA ${eta}` : ""}\r`,
351+
);
352+
});
353+
354+
await pipeline(
355+
createReadStream("./audio-files/example.wav"),
356+
encoderStream,
357+
createWriteStream("./audio-files/example.stream.mp3"),
358+
);
359+
```
360+
361+
### Decode via Node.js streams
362+
363+
Decoding an MP3 back to WAV (or raw PCM) works the same way—swap in the decoder helper and wire it into your existing stream graph.
364+
365+
```js
366+
import { createReadStream, createWriteStream } from "node:fs";
367+
import { pipeline } from "node:stream/promises";
368+
import { LameStream } from "node-lame";
369+
370+
const decoderStream = new LameStream({
371+
mode: "decode",
372+
});
373+
374+
decoderStream.getEmitter().on("progress", ([progress, eta]) => {
375+
process.stdout.write(
376+
`Decoding progress: ${progress}%${eta ? ` – ETA ${eta}` : ""}\r`,
377+
);
378+
});
379+
380+
await pipeline(
381+
createReadStream("./audio-files/example.stream.mp3"),
382+
decoderStream,
383+
createWriteStream("./audio-files/example.stream.wav"),
384+
);
385+
```
386+
387+
`LameStream` exposes `getEmitter()` and `getStatus()` so you receive live progress regardless of whether you encode or decode. Choose the direction up front via `mode: "encode"` or `"decode"` when constructing the stream, then treat it like any other duplex pipeline component.
388+
334389
## All options
335390

336391
| Option | Description | Values | Default |

examples/convert-stream-to-mp3.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import { resolve } from "node:path";
33
import { pipeline } from "node:stream/promises";
44
import { fileURLToPath } from "node:url";
55

6-
import { createLameEncoderStream } from "./helpers/load-node-lame.js";
6+
import { LameStream } from "./helpers/load-node-lame.js";
77
import { removeIfExists } from "./helpers/remove-if-exists.js";
88

99
const inputPath = resolve(
@@ -16,7 +16,8 @@ const outputPath = resolve(
1616
async function main(): Promise<void> {
1717
await removeIfExists(outputPath);
1818

19-
const encoderStream = createLameEncoderStream({
19+
const encoderStream = new LameStream({
20+
mode: "encode",
2021
bitrate: 192,
2122
});
2223

examples/helpers/load-node-lame.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
type NodeLameModule = typeof import("../../src/index");
22

33
const MODULE_NOT_FOUND_CODE = "ERR_MODULE_NOT_FOUND";
4-
const LOCAL_FALLBACKS = ["../../dist/index.cjs", "../../src/index.ts"] as const;
4+
const LOCAL_FALLBACKS = ["../../src/index.ts", "../../dist/index.cjs"] as const;
55

66
type ErrorWithCode = {
77
code?: string;
@@ -53,6 +53,5 @@ const loadNodeLame = async (): Promise<NodeLameModule> => {
5353

5454
const nodeLame = await loadNodeLame();
5555

56-
export const { Lame, createLameDecoderStream, createLameEncoderStream } =
57-
nodeLame;
56+
export const { Lame, LameStream } = nodeLame;
5857
export default nodeLame;

src/core/lame-process.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import { spawn, type ChildProcessWithoutNullStreams } from "node:child_process";
2-
import type { ProcessEnv } from "node:process";
32
import { delimiter } from "node:path";
43

54
import type {
@@ -14,6 +13,7 @@ import {
1413
import { LameOptions } from "./lame-options";
1514

1615
type ProgressKind = LameStreamMode;
16+
type ProcessEnv = Record<string, string | undefined>;
1717

1818
const LAME_TAG_MESSAGE = "Writing LAME Tag...done";
1919

src/core/lame-stream.ts

Lines changed: 89 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import type {
88
LameOptionsBag,
99
LameProgressEmitter,
1010
LameStatus,
11+
LameStreamMode,
1112
LameStreamOptions,
1213
} from "../types";
1314
import { LameOptions } from "./lame-options";
@@ -20,53 +21,45 @@ import type { ProgressKind } from "./lame-process";
2021

2122
type StreamKind = ProgressKind;
2223

23-
interface LameStreamConfig extends LameStreamOptions {
24+
type LameStreamConfig = Omit<LameStreamOptions, "output"> & {
2425
binaryPath?: string;
25-
}
26+
};
2627

27-
class LameCodecStream extends Duplex {
28+
class LameStream extends Duplex {
2829
private readonly emitter: LameProgressEmitter;
2930
private readonly status: LameStatus;
30-
private readonly kind: StreamKind;
31-
private readonly child: ChildProcessWithoutNullStreams;
3231
private readonly builder: LameOptions;
32+
private readonly binaryPath?: string;
33+
private readonly kind: StreamKind;
34+
35+
private child?: ChildProcessWithoutNullStreams;
3336

3437
private isStdoutPaused = false;
3538
private hasErrored = false;
3639
private finished = false;
3740

38-
constructor(kind: StreamKind, options: LameStreamConfig) {
41+
constructor(options: LameStreamConfig) {
3942
super({ allowHalfOpen: false });
4043

41-
const { binaryPath, ...cliOptions } = options;
44+
const { binaryPath, mode, ...cliOptions } = options;
45+
if (!isValidStreamMode(mode)) {
46+
throw new Error(
47+
'lame: LameStream requires a mode of either "encode" or "decode"',
48+
);
49+
}
50+
4251
const normalizedOptions: LameOptionsBag = {
43-
...(cliOptions as Omit<LameStreamOptions, "output">),
52+
...(cliOptions as Omit<LameStreamOptions, "output" | "mode">),
4453
output: "stream",
4554
} as LameOptionsBag;
4655

47-
this.kind = kind;
56+
this.binaryPath = binaryPath;
4857
this.status = createInitialStatus();
4958
this.emitter = new EventEmitter() as LameProgressEmitter;
5059
this.builder = new LameOptions(normalizedOptions);
60+
this.kind = mode;
5161

52-
const spawnArgs = buildLameSpawnArgs(this.builder, this.kind, "-", "-");
53-
54-
this.child = spawnLameProcess({
55-
binaryPath,
56-
spawnArgs,
57-
kind: this.kind,
58-
status: this.status,
59-
emitter: this.emitter,
60-
progressSources: ["stderr"],
61-
completeOnTag: true,
62-
onError: (error) => this.emitStreamError(error),
63-
onStdoutData: (chunk) => this.forwardStdout(chunk),
64-
onStdoutEnd: () => this.push(null),
65-
onStdoutError: (error) => this.emitStreamError(error),
66-
onStderrError: (error) => this.emitStreamError(error),
67-
onStdinError: (error) => this.emitStreamError(error),
68-
onSuccess: () => this.handleSuccessfulClose(),
69-
});
62+
this.initialize(this.kind);
7063
}
7164

7265
public getEmitter(): LameProgressEmitter {
@@ -78,6 +71,10 @@ class LameCodecStream extends Duplex {
7871
}
7972

8073
public override _read(): void {
74+
if (!this.child) {
75+
return;
76+
}
77+
8178
if (this.isStdoutPaused && !this.child.stdout.readableEnded) {
8279
this.isStdoutPaused = false;
8380
this.child.stdout.resume();
@@ -89,18 +86,24 @@ class LameCodecStream extends Duplex {
8986
encoding: BufferEncoding,
9087
callback: (error?: Error | null) => void,
9188
): void {
92-
if (this.finished || this.child.stdin.destroyed) {
89+
const child = this.child;
90+
if (!child) {
91+
callback(new Error("lame: Stream mode is not initialized yet"));
92+
return;
93+
}
94+
95+
if (this.finished || child.stdin.destroyed) {
9396
callback(new Error("lame: Stream has already finished"));
9497
return;
9598
}
9699

97100
try {
98-
const flushed = this.child.stdin.write(chunk, encoding);
101+
const flushed = child.stdin.write(chunk, encoding);
99102
if (!flushed) {
100103
const cleanup = () => {
101-
this.child.stdin.off("drain", onDrain);
102-
this.child.stdin.off("error", onError);
103-
this.child.stdin.off("close", onClose);
104+
child.stdin.off("drain", onDrain);
105+
child.stdin.off("error", onError);
106+
child.stdin.off("close", onClose);
104107
};
105108

106109
const onDrain = () => {
@@ -116,9 +119,9 @@ class LameCodecStream extends Duplex {
116119
callback(new Error("lame: Input stream closed before drain"));
117120
};
118121

119-
this.child.stdin.once("drain", onDrain);
120-
this.child.stdin.once("error", onError);
121-
this.child.stdin.once("close", onClose);
122+
child.stdin.once("drain", onDrain);
123+
child.stdin.once("error", onError);
124+
child.stdin.once("close", onClose);
122125
return;
123126
}
124127
} catch (error) {
@@ -130,8 +133,14 @@ class LameCodecStream extends Duplex {
130133
}
131134

132135
public override _final(callback: (error?: Error | null) => void): void {
136+
const child = this.child;
137+
if (!child) {
138+
callback(new Error("lame: Stream mode is not initialized yet"));
139+
return;
140+
}
141+
133142
try {
134-
this.child.stdin.end();
143+
child.stdin.end();
135144
} catch (error) {
136145
callback(error as Error);
137146
return;
@@ -144,9 +153,16 @@ class LameCodecStream extends Duplex {
144153
error: Error | null,
145154
callback: (error?: Error | null) => void,
146155
): void {
156+
const child = this.child;
157+
158+
if (!child) {
159+
callback(error ?? null);
160+
return;
161+
}
162+
147163
try {
148-
if (!this.child.killed) {
149-
this.child.kill();
164+
if (!child.killed) {
165+
child.kill();
150166
}
151167
} catch (killError) {
152168
callback(killError as Error);
@@ -157,13 +173,38 @@ class LameCodecStream extends Duplex {
157173
callback(error ?? null);
158174
}
159175

176+
private initialize(kind: StreamKind): void {
177+
if (this.child) {
178+
return;
179+
}
180+
181+
const spawnArgs = buildLameSpawnArgs(this.builder, kind, "-", "-");
182+
183+
this.child = spawnLameProcess({
184+
binaryPath: this.binaryPath,
185+
spawnArgs,
186+
kind,
187+
status: this.status,
188+
emitter: this.emitter,
189+
progressSources: ["stderr"],
190+
completeOnTag: true,
191+
onError: (error) => this.emitStreamError(error),
192+
onStdoutData: (chunk) => this.forwardStdout(chunk),
193+
onStdoutEnd: () => this.push(null),
194+
onStdoutError: (error) => this.emitStreamError(error),
195+
onStderrError: (error) => this.emitStreamError(error),
196+
onStdinError: (error) => this.emitStreamError(error),
197+
onSuccess: () => this.handleSuccessfulClose(),
198+
});
199+
}
200+
160201
private forwardStdout(chunk: Buffer) {
161202
if (this.hasErrored || this.finished) {
162203
return;
163204
}
164205

165206
const shouldContinue = this.push(chunk);
166-
if (!shouldContinue) {
207+
if (!shouldContinue && this.child) {
167208
this.isStdoutPaused = true;
168209
this.child.stdout.pause();
169210
}
@@ -188,8 +229,9 @@ class LameCodecStream extends Duplex {
188229
this.status.finished = true;
189230
this.cleanupChildListeners();
190231
this.emitter.emit("error", error);
232+
191233
try {
192-
if (!this.child.killed) {
234+
if (this.child && !this.child.killed) {
193235
this.child.kill();
194236
}
195237
} catch {
@@ -200,24 +242,20 @@ class LameCodecStream extends Duplex {
200242
}
201243

202244
private cleanupChildListeners() {
245+
if (!this.child) {
246+
return;
247+
}
248+
203249
this.child.stdout.removeAllListeners();
204250
this.child.stderr.removeAllListeners();
205251
this.child.stdin.removeAllListeners();
206252
this.child.removeAllListeners();
207253
}
208254
}
209255

210-
const createLameEncoderStream = (options: LameStreamConfig) => {
211-
return new LameCodecStream("encode", options);
256+
const isValidStreamMode = (value: unknown): value is LameStreamMode => {
257+
return value === "encode" || value === "decode";
212258
};
213259

214-
const createLameDecoderStream = (options: LameStreamConfig) => {
215-
return new LameCodecStream("decode", options);
216-
};
217-
218-
export {
219-
LameCodecStream,
220-
createLameDecoderStream,
221-
createLameEncoderStream,
222-
};
260+
export { LameStream };
223261
export type { LameStreamConfig };

src/core/lame.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ class Lame {
4848
constructor(options: LameOptionsBag) {
4949
if (options.output === "stream") {
5050
throw new Error(
51-
"lame: The streaming output mode requires createLameEncoderStream or createLameDecoderStream",
51+
'lame: The streaming output mode requires LameStream with mode set to "encode" or "decode"',
5252
);
5353
}
5454

src/types.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,8 +86,11 @@ interface LameOptionsBag {
8686
meta?: MetaOptions;
8787
}
8888

89+
type LameStreamMode = "encode" | "decode";
90+
8991
type LameStreamOptions = Omit<LameOptionsBag, "output"> & {
9092
output?: "stream";
93+
mode: LameStreamMode;
9194
};
9295

9396
type SampleFrequency = 8 | 11.025 | 12 | 16 | 22.05 | 24 | 32 | 44.1 | 48;
@@ -186,4 +189,5 @@ export type {
186189
HelpTopic,
187190
CustomFrameRecord,
188191
LameStreamOptions,
192+
LameStreamMode,
189193
};

0 commit comments

Comments
 (0)