Skip to content

Commit 8f84ad2

Browse files
committed
Add readable-source/writable-sink adapters
This is the first step towards a significant refactoring of the streams implementation with the goal of making it safer and more performance. This commits adds four new classes: * `ReadableStreamSourceKjAdapter`: Adapts a ReadableStream to the `ReadableStreamSource` API, fully encapsulating the JS interactions behind the KJ-friendly interface. * `ReadableStreamSourceJsAdapter`: Adapts a `ReadableStreamSource` to a more friendly JS interface. * `WritableStreamSinkKjAdapter`: Adapts a WritableStream to the `WritableStreamSink` API, fully encapsulating the JS interactions behind the KJ-friendly interface. * `WritableStreamSinkJsAdapter`: Adapts a `WritableStreamSink` to a more friendly JS interface. Also adds tests. This commit does not update any existing code to use these adapters yet, that will come in follow-up commits. There are several key goals for this refactoring: * Make the streams implementation safer by using the new checked Queue for buffering operations and removing direct reliance on raw references. * Reducing the overall complexity of the streams implementation through better encapsulation and separation of concerns. These require very careful review to ensure the behavior is correct before we start switching existing code over to use these adapters. In future commits I will be incrementally removing the old code and switching to use these adapters. Based on Harris' feedback, I've updated the implementation of the ReadableStreamSourceKjAdapter to include a minimum read policy. When the policy is IMMEDIATE, the adapter will always return immediately after minimally fulfilling the minimum read, even if more data is available and there is more room in buffer we're reading into. If the policy is OPPORTUNISTIC, then the adapter will attempt to fill the buffer as much as possible (up until there is <= 512 bytes remaining in the buffer or we hit maxBytes) while we are holding the isolate lock. The default policy is OPPORTUNISTIC, with the idea being that we should do as much work as possible while we have the isolate lock before returning to the kj event loop.
1 parent 910d7ec commit 8f84ad2

12 files changed

+5836
-15
lines changed

src/workerd/api/BUILD.bazel

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -347,6 +347,24 @@ kj_test(
347347
deps = ["//src/workerd/tests:test-fixture"],
348348
)
349349

350+
kj_test(
351+
src = "streams/writable-sink-adapter-test.c++",
352+
deps = [
353+
"//src/workerd/io",
354+
"//src/workerd/jsg",
355+
"//src/workerd/tests:test-fixture",
356+
],
357+
)
358+
359+
kj_test(
360+
src = "streams/readable-source-adapter-test.c++",
361+
deps = [
362+
"//src/workerd/io",
363+
"//src/workerd/jsg",
364+
"//src/workerd/tests:test-fixture",
365+
],
366+
)
367+
350368
wd_test(
351369
src = "actor-alarms-delete-test.wd-test",
352370
args = ["--experimental"],

src/workerd/api/streams/common.h

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -593,7 +593,8 @@ class WritableStreamController {
593593
//
594594
// The controller is guaranteed to either outlive the Writer or will detach the Writer so the
595595
// WritableStreamController& reference should always remain valid.
596-
virtual void attach(WritableStreamController& controller,
596+
virtual void attach(jsg::Lock& js,
597+
WritableStreamController& controller,
597598
jsg::Promise<void> closedPromise,
598599
jsg::Promise<void> readyPromise) = 0;
599600

@@ -603,7 +604,7 @@ class WritableStreamController {
603604

604605
// The ready promise can be replaced whenever backpressure is signaled by the underlying
605606
// controller.
606-
virtual void replaceReadyPromise(jsg::Promise<void> readyPromise) = 0;
607+
virtual void replaceReadyPromise(jsg::Lock& js, jsg::Promise<void> readyPromise) = 0;
607608
};
608609

609610
struct PendingAbort {
@@ -821,10 +822,10 @@ class WriterLocked {
821822
return readyFulfiller;
822823
}
823824

824-
void setReadyFulfiller(jsg::PromiseResolverPair<void>& pair) {
825+
void setReadyFulfiller(jsg::Lock& js, jsg::PromiseResolverPair<void>& pair) {
825826
KJ_IF_SOME(w, writer) {
826827
readyFulfiller = kj::mv(pair.resolver);
827-
w.replaceReadyPromise(kj::mv(pair.promise));
828+
w.replaceReadyPromise(js, kj::mv(pair.promise));
828829
}
829830
}
830831

src/workerd/api/streams/internal.c++

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1023,7 +1023,7 @@ void WritableStreamInternalController::updateBackpressure(jsg::Lock& js, bool ba
10231023
// the existing one is resolved or not.
10241024
auto prp = js.newPromiseAndResolver<void>();
10251025
prp.promise.markAsHandled(js);
1026-
writerLock.setReadyFulfiller(prp);
1026+
writerLock.setReadyFulfiller(js, prp);
10271027
return;
10281028
}
10291029

@@ -1406,7 +1406,7 @@ bool WritableStreamInternalController::lockWriter(jsg::Lock& js, Writer& writer)
14061406
}
14071407

14081408
writeState = kj::mv(lock);
1409-
writer.attach(*this, kj::mv(closedPrp.promise), kj::mv(readyPrp.promise));
1409+
writer.attach(js, *this, kj::mv(closedPrp.promise), kj::mv(readyPrp.promise));
14101410
return true;
14111411
}
14121412

0 commit comments

Comments
 (0)