Skip to content

Commit 1e45ae0

Browse files
committed
Eliminate buffering in stream.transform()
A `TransformStream` currently always requires an internal queue (until whatwg/streams#1158 is resolved). Therefore, don't use `TransformStream` in stream.transform(), but create a new `ReadableStream` with the transformed chunks directly, and with a `highWaterMark` of 0, such that the internal queue is always empty.
1 parent 4fb76d5 commit 1e45ae0

File tree

1 file changed

+28
-13
lines changed

1 file changed

+28
-13
lines changed

lib/streams.js

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -243,24 +243,39 @@ function transform(input, process = () => undefined, finish = () => undefined) {
243243
return output;
244244
}
245245
if (isStream(input)) {
246-
return transformRaw(input, {
247-
async transform(value, controller) {
248-
try {
249-
const result = await process(value);
250-
if (result !== undefined) controller.enqueue(result);
251-
} catch(e) {
252-
controller.error(e);
253-
}
246+
let reader;
247+
let allDone = false;
248+
return new ReadableStream({
249+
start() {
250+
reader = input.getReader();
254251
},
255-
async flush(controller) {
252+
async pull(controller) {
253+
if (allDone) {
254+
controller.close();
255+
return;
256+
}
256257
try {
257-
const result = await finish();
258-
if (result !== undefined) controller.enqueue(result);
259-
} catch(e) {
258+
while (true) {
259+
const { value, done } = await reader.read();
260+
allDone = done;
261+
const result = await (done ? finish : process)(value);
262+
if (result !== undefined) {
263+
controller.enqueue(result);
264+
return;
265+
}
266+
if (done) {
267+
controller.close();
268+
return;
269+
}
270+
}
271+
} catch (e) {
260272
controller.error(e);
261273
}
274+
},
275+
async cancel(reason) {
276+
await reader.cancel(reason);
262277
}
263-
});
278+
}, { highWaterMark: 0 });
264279
}
265280
const result1 = process(input);
266281
const result2 = finish();

0 commit comments

Comments
 (0)