Skip to content

Commit c605f81

Browse files
committed
Add optional queuingStrategy parameter to transform
1 parent 0a6c6d7 commit c605f81

File tree

1 file changed

+9
-2
lines changed

1 file changed

+9
-2
lines changed

lib/streams.js

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -217,12 +217,19 @@ function transformWithCancel(customCancel) {
217217

218218
/**
219219
* Transform a stream using helper functions which are called on each chunk, and on stream close, respectively.
220+
* Takes an optional queuing strategy for the resulting readable stream;
221+
* see https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream/ReadableStream#queuingstrategy.
222+
* By default, the queueing strategy is non-buffering. When the `process`
223+
* function is asynchronous, it may be useful to pass a buffering
224+
* queuing strategy to enable multiple chunks to be processed in parallel;
225+
* e.g. pass `{ highWaterMark: 4 }` to process up to 4 chunks in parallel.
220226
* @param {ReadableStream|Uint8array|String} input
221227
* @param {Function} process
222228
* @param {Function} finish
229+
* @param {Object} queuingStrategy
223230
* @returns {ReadableStream|Uint8array|String}
224231
*/
225-
function transform(input, process = () => undefined, finish = () => undefined) {
232+
function transform(input, process = () => undefined, finish = () => undefined, queuingStrategy = { highWaterMark: 0 }) {
226233
if (isArrayStream(input)) {
227234
const output = new ArrayStream();
228235
(async () => {
@@ -277,7 +284,7 @@ function transform(input, process = () => undefined, finish = () => undefined) {
277284
async cancel(reason) {
278285
await reader.cancel(reason);
279286
}
280-
}, { highWaterMark: 0 });
287+
}, queuingStrategy);
281288
}
282289
const result1 = process(input);
283290
const result2 = finish();

0 commit comments

Comments
 (0)