From bccb07e6f0450363e2ebe30d474d2db336ae9a85 Mon Sep 17 00:00:00 2001 From: Daniel Huigens Date: Mon, 1 Sep 2025 15:01:34 +0200 Subject: [PATCH 1/4] Eliminate buffering in `transform` A `TransformStream` currently always requires an internal queue (until https://github.com/whatwg/streams/issues/1158 is resolved). Therefore, don't use `TransformStream` in `transform` anymore, but create a new `ReadableStream` with the transformed chunks directly, and with a `highWaterMark` of 0, such that the internal queue is always empty. --- lib/streams.js | 47 ++++++++++++++++++++++++++++++++++------------- 1 file changed, 34 insertions(+), 13 deletions(-) diff --git a/lib/streams.js b/lib/streams.js index 591eee2..18a16e7 100644 --- a/lib/streams.js +++ b/lib/streams.js @@ -243,24 +243,45 @@ function transform(input, process = () => undefined, finish = () => undefined) { return output; } if (isStream(input)) { - return transformRaw(input, { - async transform(value, controller) { - try { - const result = await process(value); - if (result !== undefined) controller.enqueue(result); - } catch(e) { - controller.error(e); - } + let reader; + let allDone = false; + return new ReadableStream({ + start() { + reader = input.getReader(); }, - async flush(controller) { + async pull(controller) { + if (allDone) { + controller.close(); + return; + } try { - const result = await finish(); - if (result !== undefined) controller.enqueue(result); - } catch(e) { + // Read repeatedly until we have a chunk to enqueue or until + // we can close the stream, as `pull` won't get called again + // until we call `enqueue` or `close`. + while (true) { + const { value, done } = await reader.read(); + allDone = done; + const result = await (done ? finish : process)(value); + if (result !== undefined) { + controller.enqueue(result); + return; // `pull` will get called again + } + if (done) { + // If `finish` didn't return a chunk to enqueue, call + // `close` here. Otherwise, it will get called in the + // next call to `pull`, above (since `allDone == true`). + controller.close(); + return; + } + } + } catch (e) { controller.error(e); } + }, + async cancel(reason) { + await reader.cancel(reason); } - }); + }, { highWaterMark: 0 }); } const result1 = process(input); const result2 = finish(); From 3a2724be09f3503eb31e202e2242e88a198ce582 Mon Sep 17 00:00:00 2001 From: Daniel Huigens Date: Mon, 1 Sep 2025 18:05:06 +0200 Subject: [PATCH 2/4] Unlock input reader after transforming stream --- lib/streams.js | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/streams.js b/lib/streams.js index 18a16e7..cb01301 100644 --- a/lib/streams.js +++ b/lib/streams.js @@ -252,6 +252,7 @@ function transform(input, process = () => undefined, finish = () => undefined) { async pull(controller) { if (allDone) { controller.close(); + input.releaseLock(); return; } try { @@ -271,6 +272,7 @@ function transform(input, process = () => undefined, finish = () => undefined) { // `close` here. Otherwise, it will get called in the // next call to `pull`, above (since `allDone == true`). controller.close(); + input.releaseLock(); return; } } From abb26d91efda6962dad97e73b6a8783353287526 Mon Sep 17 00:00:00 2001 From: Daniel Huigens Date: Wed, 3 Sep 2025 15:27:12 +0200 Subject: [PATCH 3/4] Add optional `queuingStrategy` parameter to `transform` --- lib/streams.js | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/lib/streams.js b/lib/streams.js index cb01301..fd265f4 100644 --- a/lib/streams.js +++ b/lib/streams.js @@ -217,12 +217,19 @@ function transformWithCancel(customCancel) { /** * Transform a stream using helper functions which are called on each chunk, and on stream close, respectively. + * Takes an optional queuing strategy for the resulting readable stream; + * see https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream/ReadableStream#queuingstrategy. + * By default, the queueing strategy is non-buffering. When the `process` + * function is asynchronous, it may be useful to pass a buffering + * queuing strategy to enable multiple chunks to be processed in parallel; + * e.g. pass `{ highWaterMark: 4 }` to process up to 4 chunks in parallel. * @param {ReadableStream|Uint8array|String} input * @param {Function} process * @param {Function} finish + * @param {Object} queuingStrategy * @returns {ReadableStream|Uint8array|String} */ -function transform(input, process = () => undefined, finish = () => undefined) { +function transform(input, process = () => undefined, finish = () => undefined, queuingStrategy = { highWaterMark: 0 }) { if (isArrayStream(input)) { const output = new ArrayStream(); (async () => { @@ -283,7 +290,7 @@ function transform(input, process = () => undefined, finish = () => undefined) { async cancel(reason) { await reader.cancel(reason); } - }, { highWaterMark: 0 }); + }, queuingStrategy); } const result1 = process(input); const result2 = finish(); From 17885597cbb679fd6fd49887f09ece7042d19713 Mon Sep 17 00:00:00 2001 From: Daniel Huigens Date: Wed, 3 Sep 2025 15:37:00 +0200 Subject: [PATCH 4/4] Eliminate buffering in `slice` --- lib/streams.js | 47 ++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 38 insertions(+), 9 deletions(-) diff --git a/lib/streams.js b/lib/streams.js index fd265f4..235a07a 100644 --- a/lib/streams.js +++ b/lib/streams.js @@ -468,19 +468,48 @@ function slice(input, begin=0, end=Infinity) { } if (isStream(input)) { if (begin >= 0 && end >= 0) { + let reader; let bytesRead = 0; - return transformRaw(input, { - transform(value, controller) { - if (bytesRead < end) { - if (bytesRead + value.length >= begin) { - controller.enqueue(slice(value, Math.max(begin - bytesRead, 0), end - bytesRead)); + return new ReadableStream({ + start() { + reader = input.getReader(); + }, + async pull(controller) { + try { + // Read repeatedly until we have a chunk to enqueue or until + // we can close the stream, as `pull` won't get called again + // until we call `enqueue` or `close`. + while (true) { + if (bytesRead < end) { + const { value, done } = await reader.read(); + if (done) { + controller.close(); + input.releaseLock(); + return; + } + let valueToEnqueue; + if (bytesRead + value.length >= begin) { + valueToEnqueue = slice(value, Math.max(begin - bytesRead, 0), end - bytesRead); + } + bytesRead += value.length; + if (valueToEnqueue) { + controller.enqueue(valueToEnqueue); + return; // `pull` will get called again + } + } else { + controller.close(); + input.releaseLock(); + return; + } } - bytesRead += value.length; - } else { - controller.terminate(); + } catch (e) { + controller.error(e); } + }, + async cancel(reason) { + await reader.cancel(reason); } - }); + }, { highWaterMark: 0 }); } if (begin < 0 && (end < 0 || end === Infinity)) { let lastBytes = [];