@@ -710,14 +710,6 @@ class ReadableStreamJsController final: public ReadableStreamController {
710710
711711 void doError (jsg::Lock& js, v8::Local<v8::Value> reason);
712712
713- // Increment the pending read count to prevent the controller from
714- // being closed/errored while a synchronous callback is in progress.
715- void incrementPendingReadCount ();
716-
717- // Decrement the pending read count and process any deferred close/error.
718- // This may destroy the ByteReadable/ValueReadable if close was deferred.
719- void decrementPendingReadCountAndProcess (jsg::Lock& js);
720-
721713 bool canCloseOrEnqueue ();
722714 bool hasBackpressure ();
723715
@@ -824,6 +816,8 @@ class ReadableStreamJsController final: public ReadableStreamController {
824816
825817 friend ReadableLockImpl;
826818 friend ReadableLockImpl::PipeLocked;
819+ friend struct ValueReadable ;
820+ friend struct ByteReadable ;
827821
828822 template <typename Controller>
829823 friend jsg::Promise<ReadResult> deferControllerStateChange (jsg::Lock& js,
@@ -1861,10 +1855,10 @@ struct ValueReadable final: private api::ValueQueue::ConsumerImpl::StateListener
18611855 KJ_IF_SOME (s, state) {
18621856 // Save a reference to the owner before calling pull. The pull callback
18631857 // may trigger close/error which could destroy this ValueReadable. By
1864- // incrementing pendingReadCount , we ensure doClose/doError defers the
1858+ // using beginOperation() , we ensure doClose/doError defers the
18651859 // actual destruction until after we return.
18661860 ReadableStreamJsController& owner = s.owner ;
1867- owner.incrementPendingReadCount ();
1861+ owner.state . beginOperation ();
18681862
18691863 // For draining reads, use forcePull to bypass backpressure checks.
18701864 // This ensures we pull all available data regardless of highWaterMark.
@@ -1874,13 +1868,22 @@ struct ValueReadable final: private api::ValueQueue::ConsumerImpl::StateListener
18741868 s.controller ->pull (js);
18751869 }
18761870
1877- // Check if state is still valid BEFORE calling decrementPendingReadCountAndProcess ,
1871+ // Check if state is still valid BEFORE calling endOperation() ,
18781872 // because that call may destroy this ValueReadable if close was deferred.
18791873 bool result =
18801874 state.map ([](State& s2) { return !s2.controller ->isPulling (); }).orDefault (false );
18811875
18821876 // Process any deferred close/error. This may destroy this ValueReadable.
1883- owner.decrementPendingReadCountAndProcess (js);
1877+ if (owner.state .endOperation ()) {
1878+ // A pending state was applied. Call the appropriate callback.
1879+ if (owner.state .template is <StreamStates::Closed>()) {
1880+ owner.lock .onClose (js);
1881+ } else if (owner.state .template is <StreamStates::Errored>()) {
1882+ KJ_IF_SOME (err, owner.state .template tryGetUnsafe <StreamStates::Errored>()) {
1883+ owner.lock .onError (js, err.getHandle (js));
1884+ }
1885+ }
1886+ }
18841887
18851888 return result;
18861889 }
@@ -2087,10 +2090,10 @@ struct ByteReadable final: private api::ByteQueue::ConsumerImpl::StateListener {
20872090 KJ_IF_SOME (s, state) {
20882091 // Save a reference to the owner before calling pull. The pull callback
20892092 // may trigger close/error which could destroy this ByteReadable. By
2090- // incrementing pendingReadCount , we ensure doClose/doError defers the
2093+ // using beginOperation() , we ensure doClose/doError defers the
20912094 // actual destruction until after we return.
20922095 ReadableStreamJsController& owner = s.owner ;
2093- owner.incrementPendingReadCount ();
2096+ owner.state . beginOperation ();
20942097
20952098 // For draining reads, use forcePull to bypass backpressure checks.
20962099 // This ensures we pull all available data regardless of highWaterMark.
@@ -2100,13 +2103,22 @@ struct ByteReadable final: private api::ByteQueue::ConsumerImpl::StateListener {
21002103 s.controller ->pull (js);
21012104 }
21022105
2103- // Check if state is still valid BEFORE calling decrementPendingReadCountAndProcess ,
2106+ // Check if state is still valid BEFORE calling endOperation() ,
21042107 // because that call may destroy this ByteReadable if close was deferred.
21052108 bool result =
21062109 state.map ([](State& s2) { return !s2.controller ->isPulling (); }).orDefault (false );
21072110
21082111 // Process any deferred close/error. This may destroy this ByteReadable.
2109- owner.decrementPendingReadCountAndProcess (js);
2112+ if (owner.state .endOperation ()) {
2113+ // A pending state was applied. Call the appropriate callback.
2114+ if (owner.state .template is <StreamStates::Closed>()) {
2115+ owner.lock .onClose (js);
2116+ } else if (owner.state .template is <StreamStates::Errored>()) {
2117+ KJ_IF_SOME (err, owner.state .template tryGetUnsafe <StreamStates::Errored>()) {
2118+ owner.lock .onError (js, err.getHandle (js));
2119+ }
2120+ }
2121+ }
21102122
21112123 return result;
21122124 }
@@ -2567,28 +2579,6 @@ void ReadableStreamJsController::doError(jsg::Lock& js, v8::Local<v8::Value> rea
25672579 // via applyPendingState in deferControllerStateChange.
25682580}
25692581
2570- void ReadableStreamJsController::incrementPendingReadCount () {
2571- pendingReadCount++;
2572- }
2573-
2574- void ReadableStreamJsController::decrementPendingReadCountAndProcess (jsg::Lock& js) {
2575- KJ_ASSERT (pendingReadCount > 0 );
2576- pendingReadCount--;
2577- if (!isReadPending ()) {
2578- KJ_IF_SOME (pending, maybePendingState) {
2579- KJ_SWITCH_ONEOF (pending) {
2580- KJ_CASE_ONEOF (closed, StreamStates::Closed) {
2581- doClose (js);
2582- }
2583- KJ_CASE_ONEOF (errored, StreamStates::Errored) {
2584- doError (js, errored.getHandle (js));
2585- }
2586- }
2587- maybePendingState = kj::none;
2588- }
2589- }
2590- }
2591-
25922582bool ReadableStreamJsController::isByteOriented () const {
25932583 return state.is <kj::Own<ByteReadable>>();
25942584}
@@ -2708,18 +2698,15 @@ kj::Maybe<jsg::Promise<DrainingReadResult>> ReadableStreamJsController::draining
27082698 jsg::Lock& js, size_t maxRead) {
27092699 disturbed = true ;
27102700
2711- KJ_IF_SOME (pendingState, maybePendingState) {
2712- KJ_SWITCH_ONEOF (pendingState) {
2713- KJ_CASE_ONEOF (closed, StreamStates::Closed) {
2714- return js.resolvedPromise (DrainingReadResult{
2715- .chunks = kj::Array<kj::Array<kj::byte>>(),
2716- .done = true ,
2717- });
2718- }
2719- KJ_CASE_ONEOF (errored, StreamStates::Errored) {
2720- return js.rejectedPromise <DrainingReadResult>(errored.addRef (js));
2721- }
2722- }
2701+ // Check for pending state first (deferred close/error during a prior read operation)
2702+ if (state.pendingStateIs <StreamStates::Closed>()) {
2703+ return js.resolvedPromise (DrainingReadResult{
2704+ .chunks = kj::Array<kj::Array<kj::byte>>(),
2705+ .done = true ,
2706+ });
2707+ }
2708+ KJ_IF_SOME (pendingError, state.tryGetPendingStateUnsafe <StreamStates::Errored>()) {
2709+ return js.rejectedPromise <DrainingReadResult>(pendingError.addRef (js));
27232710 }
27242711
27252712 // Like deferControllerStateChange for regular reads, we need to prevent the controller
@@ -2730,17 +2717,34 @@ kj::Maybe<jsg::Promise<DrainingReadResult>> ReadableStreamJsController::draining
27302717 auto wrapDrainingRead =
27312718 [this ](jsg::Lock& js,
27322719 jsg::Promise<DrainingReadResult> promise) -> jsg::Promise<DrainingReadResult> {
2733- pendingReadCount++ ;
2720+ state. beginOperation () ;
27342721 return promise.then (js, [this ](jsg::Lock& js, DrainingReadResult result) {
2735- decrementPendingReadCountAndProcess (js);
2722+ if (state.endOperation ()) {
2723+ // A pending state was applied. Call the appropriate callback.
2724+ if (state.template is <StreamStates::Closed>()) {
2725+ lock.onClose (js);
2726+ } else if (state.template is <StreamStates::Errored>()) {
2727+ KJ_IF_SOME (err, state.template tryGetUnsafe <StreamStates::Errored>()) {
2728+ lock.onError (js, err.getHandle (js));
2729+ }
2730+ }
2731+ }
27362732 return kj::mv (result);
27372733 }, [this ](jsg::Lock& js, jsg::Value exception) -> DrainingReadResult {
2738- decrementPendingReadCountAndProcess (js);
2734+ state.clearPendingState ();
2735+ (void )state.endOperation ();
27392736 js.throwException (kj::mv (exception));
27402737 });
27412738 };
27422739
27432740 KJ_SWITCH_ONEOF (state) {
2741+ KJ_CASE_ONEOF (initial, Initial) {
2742+ // Stream not yet set up, treat as closed.
2743+ return js.resolvedPromise (DrainingReadResult{
2744+ .chunks = kj::Array<kj::Array<kj::byte>>(),
2745+ .done = true ,
2746+ });
2747+ }
27442748 KJ_CASE_ONEOF (closed, StreamStates::Closed) {
27452749 return js.resolvedPromise (DrainingReadResult{
27462750 .chunks = kj::Array<kj::Array<kj::byte>>(),
0 commit comments