Skip to content

Commit 1788559

Browse files
committed
Eliminate buffering in slice
1 parent abb26d9 commit 1788559

File tree

1 file changed

+38
-9
lines changed

1 file changed

+38
-9
lines changed

lib/streams.js

Lines changed: 38 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -468,19 +468,48 @@ function slice(input, begin=0, end=Infinity) {
468468
}
469469
if (isStream(input)) {
470470
if (begin >= 0 && end >= 0) {
471+
let reader;
471472
let bytesRead = 0;
472-
return transformRaw(input, {
473-
transform(value, controller) {
474-
if (bytesRead < end) {
475-
if (bytesRead + value.length >= begin) {
476-
controller.enqueue(slice(value, Math.max(begin - bytesRead, 0), end - bytesRead));
473+
return new ReadableStream({
474+
start() {
475+
reader = input.getReader();
476+
},
477+
async pull(controller) {
478+
try {
479+
// Read repeatedly until we have a chunk to enqueue or until
480+
// we can close the stream, as `pull` won't get called again
481+
// until we call `enqueue` or `close`.
482+
while (true) {
483+
if (bytesRead < end) {
484+
const { value, done } = await reader.read();
485+
if (done) {
486+
controller.close();
487+
input.releaseLock();
488+
return;
489+
}
490+
let valueToEnqueue;
491+
if (bytesRead + value.length >= begin) {
492+
valueToEnqueue = slice(value, Math.max(begin - bytesRead, 0), end - bytesRead);
493+
}
494+
bytesRead += value.length;
495+
if (valueToEnqueue) {
496+
controller.enqueue(valueToEnqueue);
497+
return; // `pull` will get called again
498+
}
499+
} else {
500+
controller.close();
501+
input.releaseLock();
502+
return;
503+
}
477504
}
478-
bytesRead += value.length;
479-
} else {
480-
controller.terminate();
505+
} catch (e) {
506+
controller.error(e);
481507
}
508+
},
509+
async cancel(reason) {
510+
await reader.cancel(reason);
482511
}
483-
});
512+
}, { highWaterMark: 0 });
484513
}
485514
if (begin < 0 && (end < 0 || end === Infinity)) {
486515
let lastBytes = [];

0 commit comments

Comments
 (0)