Skip to content

Commit e8c4a0a

Browse files
committed
Update to address merge conflicts and other changes
1 parent 9d7a83f commit e8c4a0a

File tree

1 file changed

+57
-53
lines changed

1 file changed

+57
-53
lines changed

src/workerd/api/streams/standard.c++

Lines changed: 57 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -710,14 +710,6 @@ class ReadableStreamJsController final: public ReadableStreamController {
710710

711711
void doError(jsg::Lock& js, v8::Local<v8::Value> reason);
712712

713-
// Increment the pending read count to prevent the controller from
714-
// being closed/errored while a synchronous callback is in progress.
715-
void incrementPendingReadCount();
716-
717-
// Decrement the pending read count and process any deferred close/error.
718-
// This may destroy the ByteReadable/ValueReadable if close was deferred.
719-
void decrementPendingReadCountAndProcess(jsg::Lock& js);
720-
721713
bool canCloseOrEnqueue();
722714
bool hasBackpressure();
723715

@@ -824,6 +816,8 @@ class ReadableStreamJsController final: public ReadableStreamController {
824816

825817
friend ReadableLockImpl;
826818
friend ReadableLockImpl::PipeLocked;
819+
friend struct ValueReadable;
820+
friend struct ByteReadable;
827821

828822
template <typename Controller>
829823
friend jsg::Promise<ReadResult> deferControllerStateChange(jsg::Lock& js,
@@ -1861,10 +1855,10 @@ struct ValueReadable final: private api::ValueQueue::ConsumerImpl::StateListener
18611855
KJ_IF_SOME(s, state) {
18621856
// Save a reference to the owner before calling pull. The pull callback
18631857
// may trigger close/error which could destroy this ValueReadable. By
1864-
// incrementing pendingReadCount, we ensure doClose/doError defers the
1858+
// using beginOperation(), we ensure doClose/doError defers the
18651859
// actual destruction until after we return.
18661860
ReadableStreamJsController& owner = s.owner;
1867-
owner.incrementPendingReadCount();
1861+
owner.state.beginOperation();
18681862

18691863
// For draining reads, use forcePull to bypass backpressure checks.
18701864
// This ensures we pull all available data regardless of highWaterMark.
@@ -1874,13 +1868,22 @@ struct ValueReadable final: private api::ValueQueue::ConsumerImpl::StateListener
18741868
s.controller->pull(js);
18751869
}
18761870

1877-
// Check if state is still valid BEFORE calling decrementPendingReadCountAndProcess,
1871+
// Check if state is still valid BEFORE calling endOperation(),
18781872
// because that call may destroy this ValueReadable if close was deferred.
18791873
bool result =
18801874
state.map([](State& s2) { return !s2.controller->isPulling(); }).orDefault(false);
18811875

18821876
// Process any deferred close/error. This may destroy this ValueReadable.
1883-
owner.decrementPendingReadCountAndProcess(js);
1877+
if (owner.state.endOperation()) {
1878+
// A pending state was applied. Call the appropriate callback.
1879+
if (owner.state.template is<StreamStates::Closed>()) {
1880+
owner.lock.onClose(js);
1881+
} else if (owner.state.template is<StreamStates::Errored>()) {
1882+
KJ_IF_SOME(err, owner.state.template tryGetUnsafe<StreamStates::Errored>()) {
1883+
owner.lock.onError(js, err.getHandle(js));
1884+
}
1885+
}
1886+
}
18841887

18851888
return result;
18861889
}
@@ -2087,10 +2090,10 @@ struct ByteReadable final: private api::ByteQueue::ConsumerImpl::StateListener {
20872090
KJ_IF_SOME(s, state) {
20882091
// Save a reference to the owner before calling pull. The pull callback
20892092
// may trigger close/error which could destroy this ByteReadable. By
2090-
// incrementing pendingReadCount, we ensure doClose/doError defers the
2093+
// using beginOperation(), we ensure doClose/doError defers the
20912094
// actual destruction until after we return.
20922095
ReadableStreamJsController& owner = s.owner;
2093-
owner.incrementPendingReadCount();
2096+
owner.state.beginOperation();
20942097

20952098
// For draining reads, use forcePull to bypass backpressure checks.
20962099
// This ensures we pull all available data regardless of highWaterMark.
@@ -2100,13 +2103,22 @@ struct ByteReadable final: private api::ByteQueue::ConsumerImpl::StateListener {
21002103
s.controller->pull(js);
21012104
}
21022105

2103-
// Check if state is still valid BEFORE calling decrementPendingReadCountAndProcess,
2106+
// Check if state is still valid BEFORE calling endOperation(),
21042107
// because that call may destroy this ByteReadable if close was deferred.
21052108
bool result =
21062109
state.map([](State& s2) { return !s2.controller->isPulling(); }).orDefault(false);
21072110

21082111
// Process any deferred close/error. This may destroy this ByteReadable.
2109-
owner.decrementPendingReadCountAndProcess(js);
2112+
if (owner.state.endOperation()) {
2113+
// A pending state was applied. Call the appropriate callback.
2114+
if (owner.state.template is<StreamStates::Closed>()) {
2115+
owner.lock.onClose(js);
2116+
} else if (owner.state.template is<StreamStates::Errored>()) {
2117+
KJ_IF_SOME(err, owner.state.template tryGetUnsafe<StreamStates::Errored>()) {
2118+
owner.lock.onError(js, err.getHandle(js));
2119+
}
2120+
}
2121+
}
21102122

21112123
return result;
21122124
}
@@ -2567,28 +2579,6 @@ void ReadableStreamJsController::doError(jsg::Lock& js, v8::Local<v8::Value> rea
25672579
// via applyPendingState in deferControllerStateChange.
25682580
}
25692581

2570-
void ReadableStreamJsController::incrementPendingReadCount() {
2571-
pendingReadCount++;
2572-
}
2573-
2574-
void ReadableStreamJsController::decrementPendingReadCountAndProcess(jsg::Lock& js) {
2575-
KJ_ASSERT(pendingReadCount > 0);
2576-
pendingReadCount--;
2577-
if (!isReadPending()) {
2578-
KJ_IF_SOME(pending, maybePendingState) {
2579-
KJ_SWITCH_ONEOF(pending) {
2580-
KJ_CASE_ONEOF(closed, StreamStates::Closed) {
2581-
doClose(js);
2582-
}
2583-
KJ_CASE_ONEOF(errored, StreamStates::Errored) {
2584-
doError(js, errored.getHandle(js));
2585-
}
2586-
}
2587-
maybePendingState = kj::none;
2588-
}
2589-
}
2590-
}
2591-
25922582
bool ReadableStreamJsController::isByteOriented() const {
25932583
return state.is<kj::Own<ByteReadable>>();
25942584
}
@@ -2708,18 +2698,15 @@ kj::Maybe<jsg::Promise<DrainingReadResult>> ReadableStreamJsController::draining
27082698
jsg::Lock& js, size_t maxRead) {
27092699
disturbed = true;
27102700

2711-
KJ_IF_SOME(pendingState, maybePendingState) {
2712-
KJ_SWITCH_ONEOF(pendingState) {
2713-
KJ_CASE_ONEOF(closed, StreamStates::Closed) {
2714-
return js.resolvedPromise(DrainingReadResult{
2715-
.chunks = kj::Array<kj::Array<kj::byte>>(),
2716-
.done = true,
2717-
});
2718-
}
2719-
KJ_CASE_ONEOF(errored, StreamStates::Errored) {
2720-
return js.rejectedPromise<DrainingReadResult>(errored.addRef(js));
2721-
}
2722-
}
2701+
// Check for pending state first (deferred close/error during a prior read operation)
2702+
if (state.pendingStateIs<StreamStates::Closed>()) {
2703+
return js.resolvedPromise(DrainingReadResult{
2704+
.chunks = kj::Array<kj::Array<kj::byte>>(),
2705+
.done = true,
2706+
});
2707+
}
2708+
KJ_IF_SOME(pendingError, state.tryGetPendingStateUnsafe<StreamStates::Errored>()) {
2709+
return js.rejectedPromise<DrainingReadResult>(pendingError.addRef(js));
27232710
}
27242711

27252712
// Like deferControllerStateChange for regular reads, we need to prevent the controller
@@ -2730,17 +2717,34 @@ kj::Maybe<jsg::Promise<DrainingReadResult>> ReadableStreamJsController::draining
27302717
auto wrapDrainingRead =
27312718
[this](jsg::Lock& js,
27322719
jsg::Promise<DrainingReadResult> promise) -> jsg::Promise<DrainingReadResult> {
2733-
pendingReadCount++;
2720+
state.beginOperation();
27342721
return promise.then(js, [this](jsg::Lock& js, DrainingReadResult result) {
2735-
decrementPendingReadCountAndProcess(js);
2722+
if (state.endOperation()) {
2723+
// A pending state was applied. Call the appropriate callback.
2724+
if (state.template is<StreamStates::Closed>()) {
2725+
lock.onClose(js);
2726+
} else if (state.template is<StreamStates::Errored>()) {
2727+
KJ_IF_SOME(err, state.template tryGetUnsafe<StreamStates::Errored>()) {
2728+
lock.onError(js, err.getHandle(js));
2729+
}
2730+
}
2731+
}
27362732
return kj::mv(result);
27372733
}, [this](jsg::Lock& js, jsg::Value exception) -> DrainingReadResult {
2738-
decrementPendingReadCountAndProcess(js);
2734+
state.clearPendingState();
2735+
(void)state.endOperation();
27392736
js.throwException(kj::mv(exception));
27402737
});
27412738
};
27422739

27432740
KJ_SWITCH_ONEOF(state) {
2741+
KJ_CASE_ONEOF(initial, Initial) {
2742+
// Stream not yet set up, treat as closed.
2743+
return js.resolvedPromise(DrainingReadResult{
2744+
.chunks = kj::Array<kj::Array<kj::byte>>(),
2745+
.done = true,
2746+
});
2747+
}
27442748
KJ_CASE_ONEOF(closed, StreamStates::Closed) {
27452749
return js.resolvedPromise(DrainingReadResult{
27462750
.chunks = kj::Array<kj::Array<kj::byte>>(),

0 commit comments

Comments
 (0)