Skip to content

Commit 4182bb0

Browse files
committed
Optimize read loop in ReadableStreamSource pump impl
1 parent 2c7fa7b commit 4182bb0

File tree

1 file changed

+19
-5
lines changed

1 file changed

+19
-5
lines changed

src/workerd/api/streams/readable-source.c++

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -454,7 +454,7 @@ class ReadableStreamSourceImpl: public ReadableStreamSource {
454454

455455
// The default non-optimized pumpTo() implementation which initiates a loop
456456
// that reads a chunk from the input stream and writes it to the output
457-
// stream until EOF is reached. The maximum size of each read is 8192 bytes.
457+
// stream until EOF is reached. The maximum size of each read is 16384 bytes.
458458
// The pump is canceled by dropping the returned.
459459
virtual kj::Promise<void> pumpImpl(WritableStreamSink& output, bool end) {
460460
KJ_SWITCH_ONEOF(state) {
@@ -468,16 +468,30 @@ class ReadableStreamSourceImpl: public ReadableStreamSource {
468468
co_return;
469469
}
470470
KJ_CASE_ONEOF(_, kj::Own<kj::AsyncInputStream>) {
471-
kj::FixedArray<kj::byte, 8192> buffer;
471+
kj::FixedArray<kj::byte, 16384> buffer;
472+
static constexpr size_t N = buffer.size();
473+
static constexpr size_t kMinBytes = (N >> 2) + (N >> 1); // 3/4 of N
472474
while (true) {
473-
auto amount = co_await readImpl(buffer, 1);
474-
if (amount == 0) {
475+
// It's most likely that our write below is potentially a write into
476+
// a JS-backed stream which requires grabbing the isolate lock.
477+
// To minimize the number of times we need to grab the lock, we
478+
// want to read as much data as we can here before doing the write.
479+
// We obviously need to balance that with not waiting too long
480+
// between writes. We'll set our minBytes to 3/4 of the buffer size
481+
// to try to strike a balance.
482+
auto amount = co_await readImpl(buffer, kMinBytes);
483+
// If the amount is less than kMinBytes, we assume we've reached EOF.
484+
485+
if (amount > 0) {
486+
co_await output.write(buffer.asPtr().first(amount));
487+
}
488+
489+
if (amount < kMinBytes) {
475490
if (end) {
476491
co_await output.end();
477492
}
478493
co_return;
479494
}
480-
co_await output.write(buffer.asPtr().first(amount));
481495
}
482496
}
483497
}

0 commit comments

Comments
 (0)