Skip to content

Commit f9da1b5

Browse files
committed
Benchmark and analysis of new streams adapters
1 parent e5f54c2 commit f9da1b5

File tree

8 files changed

+8205
-7
lines changed

8 files changed

+8205
-7
lines changed

docs/development/streams/JSG_PROMISE_PERFORMANCE_ANALYSIS.md

Lines changed: 2039 additions & 0 deletions
Large diffs are not rendered by default.

docs/development/streams/PUMP_PERFORMANCE_ANALYSIS.md

Lines changed: 3842 additions & 0 deletions
Large diffs are not rendered by default.

docs/development/streams/QUEUE_PERFORMANCE_ANALYSIS.md

Lines changed: 1284 additions & 0 deletions
Large diffs are not rendered by default.

src/workerd/api/streams/readable-source-adapter.c++

Lines changed: 39 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -755,8 +755,10 @@ jsg::Promise<kj::Own<ReadableSourceKjAdapter::ReadContext>> ReadableSourceKjAdap
755755
// At this point, we should have no left over data.
756756
KJ_DASSERT(context->maybeLeftOver == kj::none);
757757

758-
// We should also have some space left in our destination buffer.
759-
KJ_DASSERT(context->buffer.size() > 0);
758+
// If the buffer is exactly full (the chunk filled it perfectly), we're done.
759+
if (context->buffer.size() == 0) {
760+
return js.resolvedPromise(kj::mv(context));
761+
}
760762

761763
// We might continue reading only if the adapter is still alive and
762764
// in an active state...
@@ -1111,13 +1113,28 @@ kj::Promise<void> ReadableSourceKjAdapter::pumpToImpl(
11111113
.adapterRef = kj::none, // no need to track adapter liveness during pump
11121114
});
11131115

1114-
return active.ioContext.run([context = kj::mv(context), minReadPolicy](jsg::Lock& js) mutable {
1116+
// We need to pass a pointer to active into the promise continuation so we can
1117+
// save any leftover data. The caller (pumpToImpl) owns the Active and keeps it
1118+
// alive for the duration of the pump, so this is safe.
1119+
Active* activePtr = &active;
1120+
1121+
return active.ioContext.run(
1122+
[activePtr, context = kj::mv(context), minReadPolicy](jsg::Lock& js) mutable {
11151123
auto& ioContext = IoContext::current();
11161124
// The readInternal method (and the underlying read on the stream) should optimize
11171125
// itself based on the bytes available in the stream itself and the minBytes requested.
11181126
return ioContext
11191127
.awaitJs(js, ReadableSourceKjAdapter::readInternal(js, kj::mv(context), minReadPolicy))
1120-
.then([](kj::Own<ReadContext> context) mutable -> kj::Promise<size_t> {
1128+
.then([activePtr](kj::Own<ReadContext> context) mutable -> kj::Promise<size_t> {
1129+
// If there's leftover data from reading a chunk larger than the buffer,
1130+
// save it to active.state so it can be used on the next read iteration.
1131+
// Only do this if we're still in Idle state - if the state has transitioned
1132+
// to something else (e.g. Done, Canceling, Canceled), we discard the leftover.
1133+
KJ_IF_SOME(leftOver, context->maybeLeftOver) {
1134+
if (activePtr->state.is<Active::Idle>()) {
1135+
activePtr->state = kj::mv(leftOver);
1136+
}
1137+
}
11211138
return context->totalRead;
11221139
});
11231140
});
@@ -1238,12 +1255,29 @@ kj::Promise<void> ReadableSourceKjAdapter::pumpToImpl(
12381255
consecutiveFastReads = 0;
12391256
}
12401257

1241-
// Start working on the next read.
1258+
// If there's leftover data from the previous read (happens when a JS chunk
1259+
// is larger than the buffer), extract it before starting the next read.
1260+
// We must do this BEFORE starting the next read so that active->state is Idle
1261+
// when the next read's promise continuation tries to save its leftover.
1262+
kj::Maybe<Active::Readable> maybeLeftover;
1263+
KJ_IF_SOME(readable, active->state.tryGet<Active::Readable>()) {
1264+
maybeLeftover = kj::mv(readable);
1265+
}
1266+
1267+
// Start working on the next read. At this point, if there was leftover, we've
1268+
// moved it to maybeLeftover, so the next read can safely set its leftover
1269+
// to active->state when it completes.
1270+
active->state.init<Active::Idle>();
12421271
readPromise = pumpReadImpl(*active, buffers[currentReadBuf], minBytes, minReadPolicy);
12431272

12441273
{
12451274
KJ_ON_SCOPE_FAILURE(writeFailed = true);
12461275
co_await output.write(writeBuf);
1276+
1277+
// Write any leftover from the previous read.
1278+
KJ_IF_SOME(leftover, maybeLeftover) {
1279+
co_await output.write(leftover.view);
1280+
}
12471281
}
12481282
}
12491283
} catch (...) {

src/workerd/tests/BUILD.bazel

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,17 @@ wd_cc_benchmark(
111111
],
112112
)
113113

114+
# Benchmark for comparing stream piping implementations
115+
wd_cc_benchmark(
116+
name = "bench-stream-piping",
117+
srcs = ["bench-stream-piping.c++"],
118+
deps = [
119+
":test-fixture",
120+
"//src/workerd/io",
121+
"//src/workerd/jsg",
122+
],
123+
)
124+
114125
wd_test(
115126
src = "unknown-import-assertions-test.wd-test",
116127
args = ["--experimental"],

0 commit comments

Comments
 (0)