diff --git a/lib/streams.js b/lib/streams.js index 591eee2..235a07a 100644 --- a/lib/streams.js +++ b/lib/streams.js @@ -217,12 +217,19 @@ function transformWithCancel(customCancel) { /** * Transform a stream using helper functions which are called on each chunk, and on stream close, respectively. + * Takes an optional queuing strategy for the resulting readable stream; + * see https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream/ReadableStream#queuingstrategy. + * By default, the queueing strategy is non-buffering. When the `process` + * function is asynchronous, it may be useful to pass a buffering + * queuing strategy to enable multiple chunks to be processed in parallel; + * e.g. pass `{ highWaterMark: 4 }` to process up to 4 chunks in parallel. * @param {ReadableStream|Uint8array|String} input * @param {Function} process * @param {Function} finish + * @param {Object} queuingStrategy * @returns {ReadableStream|Uint8array|String} */ -function transform(input, process = () => undefined, finish = () => undefined) { +function transform(input, process = () => undefined, finish = () => undefined, queuingStrategy = { highWaterMark: 0 }) { if (isArrayStream(input)) { const output = new ArrayStream(); (async () => { @@ -243,24 +250,47 @@ function transform(input, process = () => undefined, finish = () => undefined) { return output; } if (isStream(input)) { - return transformRaw(input, { - async transform(value, controller) { - try { - const result = await process(value); - if (result !== undefined) controller.enqueue(result); - } catch(e) { - controller.error(e); - } + let reader; + let allDone = false; + return new ReadableStream({ + start() { + reader = input.getReader(); }, - async flush(controller) { + async pull(controller) { + if (allDone) { + controller.close(); + input.releaseLock(); + return; + } try { - const result = await finish(); - if (result !== undefined) controller.enqueue(result); - } catch(e) { + // Read repeatedly until we have a chunk to enqueue or until + // we can close the stream, as `pull` won't get called again + // until we call `enqueue` or `close`. + while (true) { + const { value, done } = await reader.read(); + allDone = done; + const result = await (done ? finish : process)(value); + if (result !== undefined) { + controller.enqueue(result); + return; // `pull` will get called again + } + if (done) { + // If `finish` didn't return a chunk to enqueue, call + // `close` here. Otherwise, it will get called in the + // next call to `pull`, above (since `allDone == true`). + controller.close(); + input.releaseLock(); + return; + } + } + } catch (e) { controller.error(e); } + }, + async cancel(reason) { + await reader.cancel(reason); } - }); + }, queuingStrategy); } const result1 = process(input); const result2 = finish(); @@ -438,19 +468,48 @@ function slice(input, begin=0, end=Infinity) { } if (isStream(input)) { if (begin >= 0 && end >= 0) { + let reader; let bytesRead = 0; - return transformRaw(input, { - transform(value, controller) { - if (bytesRead < end) { - if (bytesRead + value.length >= begin) { - controller.enqueue(slice(value, Math.max(begin - bytesRead, 0), end - bytesRead)); + return new ReadableStream({ + start() { + reader = input.getReader(); + }, + async pull(controller) { + try { + // Read repeatedly until we have a chunk to enqueue or until + // we can close the stream, as `pull` won't get called again + // until we call `enqueue` or `close`. + while (true) { + if (bytesRead < end) { + const { value, done } = await reader.read(); + if (done) { + controller.close(); + input.releaseLock(); + return; + } + let valueToEnqueue; + if (bytesRead + value.length >= begin) { + valueToEnqueue = slice(value, Math.max(begin - bytesRead, 0), end - bytesRead); + } + bytesRead += value.length; + if (valueToEnqueue) { + controller.enqueue(valueToEnqueue); + return; // `pull` will get called again + } + } else { + controller.close(); + input.releaseLock(); + return; + } } - bytesRead += value.length; - } else { - controller.terminate(); + } catch (e) { + controller.error(e); } + }, + async cancel(reason) { + await reader.cancel(reason); } - }); + }, { highWaterMark: 0 }); } if (begin < 0 && (end < 0 || end === Infinity)) { let lastBytes = [];