Skip to content

Commit b83dc6c

Browse files
authored
Merge pull request #5014 from cloudflare/jasnell/streams-refactor-next-gen
2 parents 6fe71ed + 8f84ad2 commit b83dc6c

13 files changed

+5851
-15
lines changed

src/workerd/api/BUILD.bazel

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -347,6 +347,24 @@ kj_test(
347347
deps = ["//src/workerd/tests:test-fixture"],
348348
)
349349

350+
kj_test(
351+
src = "streams/writable-sink-adapter-test.c++",
352+
deps = [
353+
"//src/workerd/io",
354+
"//src/workerd/jsg",
355+
"//src/workerd/tests:test-fixture",
356+
],
357+
)
358+
359+
kj_test(
360+
src = "streams/readable-source-adapter-test.c++",
361+
deps = [
362+
"//src/workerd/io",
363+
"//src/workerd/jsg",
364+
"//src/workerd/tests:test-fixture",
365+
],
366+
)
367+
350368
wd_test(
351369
src = "actor-alarms-delete-test.wd-test",
352370
args = ["--experimental"],

src/workerd/api/streams/common.h

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -593,7 +593,8 @@ class WritableStreamController {
593593
//
594594
// The controller is guaranteed to either outlive the Writer or will detach the Writer so the
595595
// WritableStreamController& reference should always remain valid.
596-
virtual void attach(WritableStreamController& controller,
596+
virtual void attach(jsg::Lock& js,
597+
WritableStreamController& controller,
597598
jsg::Promise<void> closedPromise,
598599
jsg::Promise<void> readyPromise) = 0;
599600

@@ -603,7 +604,7 @@ class WritableStreamController {
603604

604605
// The ready promise can be replaced whenever backpressure is signaled by the underlying
605606
// controller.
606-
virtual void replaceReadyPromise(jsg::Promise<void> readyPromise) = 0;
607+
virtual void replaceReadyPromise(jsg::Lock& js, jsg::Promise<void> readyPromise) = 0;
607608
};
608609

609610
struct PendingAbort {
@@ -821,10 +822,10 @@ class WriterLocked {
821822
return readyFulfiller;
822823
}
823824

824-
void setReadyFulfiller(jsg::PromiseResolverPair<void>& pair) {
825+
void setReadyFulfiller(jsg::Lock& js, jsg::PromiseResolverPair<void>& pair) {
825826
KJ_IF_SOME(w, writer) {
826827
readyFulfiller = kj::mv(pair.resolver);
827-
w.replaceReadyPromise(kj::mv(pair.promise));
828+
w.replaceReadyPromise(js, kj::mv(pair.promise));
828829
}
829830
}
830831

src/workerd/api/streams/internal.c++

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1023,7 +1023,7 @@ void WritableStreamInternalController::updateBackpressure(jsg::Lock& js, bool ba
10231023
// the existing one is resolved or not.
10241024
auto prp = js.newPromiseAndResolver<void>();
10251025
prp.promise.markAsHandled(js);
1026-
writerLock.setReadyFulfiller(prp);
1026+
writerLock.setReadyFulfiller(js, prp);
10271027
return;
10281028
}
10291029

@@ -1406,7 +1406,7 @@ bool WritableStreamInternalController::lockWriter(jsg::Lock& js, Writer& writer)
14061406
}
14071407

14081408
writeState = kj::mv(lock);
1409-
writer.attach(*this, kj::mv(closedPrp.promise), kj::mv(readyPrp.promise));
1409+
writer.attach(js, *this, kj::mv(closedPrp.promise), kj::mv(readyPrp.promise));
14101410
return true;
14111411
}
14121412

0 commit comments

Comments
 (0)