@@ -721,6 +721,7 @@ class ReadableStreamJsController final: public ReadableStreamController {
721721 state = StreamStates::Closed();
722722
723723 kj::Maybe<uint64_t > expectedLength = kj::none;
724+ bool canceling = false ;
724725
725726 // The lock state is separate because a closed or errored stream can still be locked.
726727 ReadableLockImpl lock;
@@ -1607,6 +1608,8 @@ struct ValueReadable final: private api::ValueQueue::ConsumerImpl::StateListener
16071608
16081609 using State = ReadableState<DefaultController, ValueQueue>;
16091610 kj::Maybe<State> state;
1611+ bool reading = false ;
1612+ bool pendingCancel = false ;
16101613
16111614 JSG_MEMORY_INFO (ValueReadable) {
16121615 KJ_IF_SOME (s, state) {
@@ -1647,10 +1650,17 @@ struct ValueReadable final: private api::ValueQueue::ConsumerImpl::StateListener
16471650 jsg::Promise<ReadResult> read (jsg::Lock& js) {
16481651 KJ_IF_SOME (s, state) {
16491652 auto prp = js.newPromiseAndResolver <ReadResult>();
1653+ reading = true ;
16501654 s.consumer ->read (js,
16511655 ValueQueue::ReadRequest{
16521656 .resolver = kj::mv (prp.resolver ),
16531657 });
1658+ reading = false ;
1659+ if (pendingCancel) {
1660+ // If we were canceled while reading, we need to drop our state now.
1661+ state = kj::none;
1662+ pendingCancel = false ;
1663+ }
16541664 return kj::mv (prp.promise );
16551665 }
16561666
@@ -1666,10 +1676,17 @@ struct ValueReadable final: private api::ValueQueue::ConsumerImpl::StateListener
16661676 // the underlying controller only when the last reader is canceled.
16671677 // Here, we rely on the controller implementing the correct behavior since it owns
16681678 // the queue that knows about all of the attached consumers.
1679+ if (pendingCancel) return js.resolvedPromise ();
16691680 KJ_IF_SOME (s, state) {
16701681 s.consumer ->cancel (js, maybeReason);
16711682 auto promise = s.controller ->cancel (js, kj::mv (maybeReason));
1672- state = kj::none;
1683+ // If we're currently in a read, we need to wait for that to finish
1684+ // before dropping our state.
1685+ if (reading) {
1686+ pendingCancel = true ;
1687+ } else {
1688+ state = kj::none;
1689+ }
16731690 return kj::mv (promise);
16741691 }
16751692
@@ -2251,9 +2268,13 @@ jsg::Promise<void> ReadableStreamJsController::cancel(
22512268 return js.rejectedPromise <void >(errored.addRef (js));
22522269 }
22532270 KJ_CASE_ONEOF (consumer, kj::Own<ValueReadable>) {
2271+ if (canceling) return js.resolvedPromise ();
2272+ canceling = true ;
22542273 return doCancel (consumer);
22552274 }
22562275 KJ_CASE_ONEOF (consumer, kj::Own<ByteReadable>) {
2276+ if (canceling) return js.resolvedPromise ();
2277+ canceling = true ;
22572278 return doCancel (consumer);
22582279 }
22592280 }
@@ -4106,6 +4127,22 @@ jsg::Ref<ReadableStream> ReadableStream::from(
41064127 (controller),
41074128 (jsg::Lock& js, kj::Maybe<jsg::Value> value) {
41084129 KJ_IF_SOME (v, value) {
4130+ auto handle = v.getHandle (js);
4131+ // Per the ReadableStream.from spec, if the value is a promise,
4132+ // the stream should wait for it to resolve and enqueue the
4133+ // resolved value...
4134+ // ... yes, this means that ReadableStream.from where the inputs
4135+ // are promises will be slow, but that's the spec.
4136+ if (handle->IsPromise ()) {
4137+ return js.toPromise (handle.As <v8::Promise>()).then (js,
4138+ JSG_VISITABLE_LAMBDA (
4139+ (controller=controller.addRef ()),
4140+ (controller),
4141+ (jsg::Lock& js, jsg::Value val) mutable {
4142+ controller->enqueue (js, val.getHandle (js));
4143+ return js.resolvedPromise ();
4144+ }));
4145+ }
41094146 controller->enqueue (js, v.getHandle (js));
41104147 } else {
41114148 controller->close (js);
@@ -4119,7 +4156,7 @@ jsg::Ref<ReadableStream> ReadableStream::from(
41194156 }));
41204157 },
41214158 .cancel = [generator = rcGenerator.addRef ()](jsg::Lock& js, auto reason) mutable {
4122- return generator->generator .return_ (js, kj::none )
4159+ return generator->generator .return_ (js, js. v8Ref (reason) )
41234160 .then (js, [generator = kj::mv (generator)](auto & lock, auto ) {
41244161 // The generator might produce a value on return and might even want to continue,
41254162 // but the stream has been canceled at this point, so we stop here.
0 commit comments