diff --git a/src/workerd/api/streams/README.md b/src/workerd/api/streams/README.md index 9938eeb64fa..a71160ee8d2 100644 --- a/src/workerd/api/streams/README.md +++ b/src/workerd/api/streams/README.md @@ -630,3 +630,213 @@ In each iteration of the loop we first perform a read followed by a write if the not end the stream. The promise loop ends either when all of the data has been consumed or the stream errors, whichever comes first. +# Memory Safety Patterns + +The streams implementation uses several sophisticated patterns to ensure memory safety, +particularly around async operations and re-entrant JavaScript callbacks. Understanding +these patterns is critical for anyone maintaining this code. + +## State Machine Architecture + +Both ***Internal*** and ***Standard*** stream controllers use the `StateMachine<>` template +from `util/state-machine.h` to manage stream states. Key properties: + +* **Terminal States:** `Closed` and `Errored` are terminal states - once entered, no + further transitions are allowed. This prevents double-close and double-error bugs. +* **Pending States (Standard only):** The `PendingStates<>` template parameter allows + deferring state transitions while an operation is in progress. +* **Active State:** The `ActiveState<>` marker identifies which state contains the + operational data (queues, consumers, etc.). + +## Safety Patterns for Standard Streams + +***Standard*** streams are more complex because user-provided JavaScript callbacks can +re-enter the stream implementation at arbitrary points. + +### Deferred State Transitions + +When a JS callback (like a pull algorithm) triggers a close or error during a read +operation, the state transition must be deferred until the read completes: + +```cpp +// deferControllerStateChange() wraps read operations +controller.state.beginOperation(); // Increment counter +auto result = readCallback(); // May trigger JS that calls close() +controller.state.endOperation(); // Apply pending state if counter == 0 +``` + +If `controller.close()` is called while `beginOperation()` is active, the transition is +stored as pending and applied when `endOperation()` sees the counter reach zero. + +### Consumer Snapshot Pattern + +When iterating over consumers (e.g., during a push to a teed stream), the consumer set +must be copied first because JS callbacks during iteration could modify the set: + +```cpp +auto consumers = ready.consumers.snapshot(); // Copy the set +for (auto consumer: consumers) { + consumer->push(js, entry->clone(js)); // May trigger JS that modifies consumers +} +``` + +### WeakRef Pattern + +For handles that user code may hold longer than the underlying object (like `ByobRequest` +or `PumpToReader`), the code uses a `WeakRef` pattern: + +```cpp +KJ_IF_SOME(reader, pumpToReader->tryGet()) { + // Safe to use reader - it's still alive + reader.pumpLoop(js, ...); +} else { + // PumpToReader was destroyed, handle gracefully +} +``` + +### Reference Counting for Shared Entries + +Queue entries shared across multiple consumers (e.g., teed streams) use `kj::Rc` +reference counting to prevent use-after-free: + +```cpp +class Entry: public kj::Refcounted { + kj::Rc clone(jsg::Lock& js); +}; +``` + +### Lambda Capture Safety in Pipe Loops + +When capturing references in lambdas attached to promise continuations, the code must +re-check that the referenced object still exists: + +```cpp +auto onSuccess = JSG_VISITABLE_LAMBDA((this, ref = addRef(), ...), ..., (...) { + auto maybePipeLock = lock.tryGetPipe(); + if (maybePipeLock == kj::none) return js.resolvedPromise(); // Lock was released + auto& pipeLock = KJ_REQUIRE_NONNULL(maybePipeLock); + // Now safe to use pipeLock +}); +``` + +**Important:** Never capture raw references (like `&source`) that may become invalid +before the lambda executes. Either re-acquire the reference inside the lambda or use +a refcounted/weak reference pattern. + +### StateListener Callbacks + +Consumer state listeners must not access `this` after calling methods that may destroy +the listener: + +```cpp +void onConsumerClose(jsg::Lock& js) override { + KJ_IF_SOME(s, state) { + s.owner.doClose(js); // May destroy *this! + } + // DO NOT ACCESS *this AFTER THIS POINT +} +``` + +## Safety Patterns for Internal Streams + +***Internal*** streams are simpler but still require careful handling across async +boundaries. + +### Refcounted Pipe State + +The `Pipe::State` structure uses `kj::Refcounted` to survive async operations: + +```cpp +struct Pipe { + struct State: public kj::Refcounted { + bool aborted = false; // Set when Pipe is destroyed + // ... + }; + kj::Own state; + + ~Pipe() noexcept(false) { + state->aborted = true; // Signal continuations to stop + } +}; +``` + +Lambda continuations capture `state = kj::addRef(*this)` and check `state->aborted` +before accessing any state that might have been destroyed. + +### Generation Counter Pattern + +The `writeLoop()` uses a generation counter to detect if the queue was modified during +an async operation: + +```cpp +auto check = [expectedGeneration = queue.currentGeneration()]() { + KJ_ASSERT(queue.currentGeneration() == expectedGeneration); + return queue.front(); +}; +``` + +### Promise Resolution Ordering + +State transitions occur BEFORE promise resolution to ensure continuations see consistent +state: + +```cpp +void doClose(jsg::Lock& js) { + state.transitionTo(); // State changes NOW + maybeResolvePromise(js, locked.getClosedFulfiller()); // Schedules microtask +} +// Continuations will see state == Closed +``` + +### V8Ref for Buffer Ownership + +Write operations store a `jsg::V8Ref` to prevent GC of the data buffer +while the async write is in progress: + +```cpp +struct Write { + jsg::V8Ref ownBytes; // Prevents GC + kj::ArrayPtr bytes; // Raw pointer into ownBytes +}; +``` + +## Cross-Request Considerations + +In the Workers runtime, multiple requests can be processed concurrently by the same +isolate using green threads. When one request yields for I/O, another may run. The +`SetPromiseCrossContextResolveCallback` mechanism ensures promise reactions are deferred +to the correct request context. + +Stream implementations interact with this model through: + +* **`ioContext.addFunctor()`**: Binds continuations to the correct `IoContext` +* **`IoOwn<>`**: Ensures objects are accessed from the correct context +* **Promise context tagging**: All promises created during a request are tagged with + that request's `IoContext` + +When a promise is resolved but its tag doesn't match the current context, the reactions +are deferred until the correct context is active. + +## Guidelines for Maintenance + +1. **Always use `deferControllerStateChange()`** when calling code that may invoke JS + callbacks during a read operation. + +2. **Always use `snapshot()`** when iterating over consumers if the loop body may + trigger JavaScript. + +3. **Never access `this`** after calling a StateListener callback that may destroy + the object. + +4. **Always re-check lock state** in lambda continuations that might execute after + the lock is released. + +5. **Use WeakRef** for any handle that user code may hold longer than the underlying + object. + +6. **Order operations correctly**: Transition state before resolving promises, and + pop queue entries only after resolving/rejecting their associated promises. + +7. **Be careful with reference captures**: Prefer capturing `this` and re-acquiring + references inside lambdas over capturing raw references that may become dangling. + diff --git a/src/workerd/api/streams/internal.c++ b/src/workerd/api/streams/internal.c++ index 02bba17cf25..16294febe77 100644 --- a/src/workerd/api/streams/internal.c++ +++ b/src/workerd/api/streams/internal.c++ @@ -525,7 +525,7 @@ kj::Maybe>> WritableStreamSink::tryPumpFrom( ReadableStreamInternalController::~ReadableStreamInternalController() noexcept(false) { if (readState.is()) { - readState.init(); + readState.transitionTo(); } } @@ -620,7 +620,7 @@ kj::Maybe> ReadableStreamInternalController::read( auto promise = kj::evalNow([&] { return readable->tryRead(bytes.begin(), atLeast, bytes.size()); }); - KJ_IF_SOME(readerLock, readState.tryGet()) { + KJ_IF_SOME(readerLock, readState.tryGetUnsafe()) { promise = KJ_ASSERT_NONNULL(readerLock.getCanceler())->wrap(kj::mv(promise)); } @@ -702,7 +702,7 @@ jsg::Promise ReadableStreamInternalController::cancel( jsg::Lock& js, jsg::Optional> maybeReason) { disturbed = true; - KJ_IF_SOME(errored, state.tryGet()) { + KJ_IF_SOME(errored, state.tryGetUnsafe()) { return js.rejectedPromise(errored.getHandle(js)); } @@ -714,32 +714,38 @@ jsg::Promise ReadableStreamInternalController::cancel( void ReadableStreamInternalController::doCancel( jsg::Lock& js, jsg::Optional> maybeReason) { auto exception = reasonToException(js, maybeReason); - KJ_IF_SOME(locked, readState.tryGet()) { + KJ_IF_SOME(locked, readState.tryGetUnsafe()) { KJ_IF_SOME(canceler, locked.getCanceler()) { canceler->cancel(kj::cp(exception)); } } - KJ_IF_SOME(readable, state.tryGet()) { + KJ_IF_SOME(readable, state.tryGetUnsafe()) { readable->cancel(kj::mv(exception)); doClose(js); } } void ReadableStreamInternalController::doClose(jsg::Lock& js) { - state.init(); - KJ_IF_SOME(locked, readState.tryGet()) { + // If already in a terminal state, nothing to do. + if (state.isTerminal()) return; + + state.transitionTo(); + KJ_IF_SOME(locked, readState.tryGetUnsafe()) { maybeResolvePromise(js, locked.getClosedFulfiller()); - } else if (readState.tryGet() != kj::none) { - readState.init(); + } else { + (void)readState.transitionFromTo(); } } void ReadableStreamInternalController::doError(jsg::Lock& js, v8::Local reason) { - state.init(js.v8Ref(reason)); - KJ_IF_SOME(locked, readState.tryGet()) { + // If already in a terminal state, nothing to do. + if (state.isTerminal()) return; + + state.transitionTo(js.v8Ref(reason)); + KJ_IF_SOME(locked, readState.tryGetUnsafe()) { maybeRejectPromise(js, locked.getClosedFulfiller(), reason); - } else if (readState.tryGet() != kj::none) { - readState.init(); + } else { + (void)readState.transitionFromTo(); } } @@ -748,7 +754,7 @@ ReadableStreamController::Tee ReadableStreamInternalController::tee(jsg::Lock& j !isLockedToReader(), TypeError, "This ReadableStream is currently locked to a reader."); JSG_REQUIRE( !isPendingClosure, TypeError, "This ReadableStream belongs to an object that is closing."); - readState.init(); + readState.transitionTo(); disturbed = true; KJ_SWITCH_ONEOF(state) { KJ_CASE_ONEOF(closed, StreamStates::Closed) { @@ -805,7 +811,7 @@ kj::Maybe> ReadableStreamInternalController::remov !isLockedToReader(), TypeError, "This ReadableStream is currently locked to a reader."); JSG_REQUIRE(!disturbed || ignoreDisturbed, TypeError, "This ReadableStream is disturbed."); - readState.init(); + readState.transitionTo(); disturbed = true; KJ_SWITCH_ONEOF(state) { @@ -828,7 +834,7 @@ kj::Maybe> ReadableStreamInternalController::remov } KJ_CASE_ONEOF(readable, Readable) { auto result = kj::mv(readable); - state.init(); + state.transitionTo(); return kj::Maybe>(kj::mv(result)); } } @@ -859,14 +865,14 @@ bool ReadableStreamInternalController::lockReader(jsg::Lock& js, Reader& reader) } } - readState = kj::mv(lock); + readState.transitionTo(kj::mv(lock)); reader.attach(*this, kj::mv(prp.promise)); return true; } void ReadableStreamInternalController::releaseReader( Reader& reader, kj::Maybe maybeJs) { - KJ_IF_SOME(locked, readState.tryGet()) { + KJ_IF_SOME(locked, readState.tryGetUnsafe()) { KJ_ASSERT(&locked.getReader() == &reader); KJ_IF_SOME(js, maybeJs) { KJ_IF_SOME(canceler, locked.getCanceler()) { @@ -884,7 +890,7 @@ void ReadableStreamInternalController::releaseReader( // an isolate lock. Clearing the lock above will free the lock state while keeping the // ReadableStream marked as locked. if (maybeJs != kj::none) { - readState.template init(); + readState.transitionTo(); } } } @@ -896,7 +902,7 @@ void WritableStreamInternalController::Writable::abort(kj::Exception&& ex) { WritableStreamInternalController::~WritableStreamInternalController() noexcept(false) { if (writeState.is()) { - writeState.init(); + writeState.transitionTo(); } } @@ -1001,7 +1007,7 @@ void WritableStreamInternalController::adjustWriteBufferSize(jsg::Lock& js, int6 } void WritableStreamInternalController::updateBackpressure(jsg::Lock& js, bool backpressure) { - KJ_IF_SOME(writerLock, writeState.tryGet()) { + KJ_IF_SOME(writerLock, writeState.tryGetUnsafe()) { if (backpressure) { // Per the spec, when backpressure is updated and is true, we replace the existing // ready promise on the writer with a new pending promise, regardless of whether @@ -1137,7 +1143,7 @@ jsg::Promise WritableStreamInternalController::doAbort( return kj::mv(promise); } - KJ_IF_SOME(writable, state.tryGet>()) { + KJ_IF_SOME(writable, state.tryGetUnsafe>()) { auto exception = js.exceptionToKj(js.v8Ref(reason)); if (FeatureFlags::get(js).getInternalWritableStreamAbortClearsQueue()) { @@ -1204,7 +1210,7 @@ kj::Maybe> WritableStreamInternalController::tryPipeFrom( auto& sourceLock = KJ_ASSERT_NONNULL(source->getController().tryPipeLock()); // Let's also acquire the destination pipe lock. - writeState = PipeLocked{*source}; + writeState.transitionTo(*source); // If the source has errored, the spec requires us to reject the pipe promise and, if preventAbort // is false, error the destination (Propagate error forward). The errored source will be unlocked @@ -1212,21 +1218,21 @@ kj::Maybe> WritableStreamInternalController::tryPipeFrom( KJ_IF_SOME(errored, sourceLock.tryGetErrored(js)) { sourceLock.release(js); if (!preventAbort) { - if (state.tryGet>() != kj::none) { + if (state.tryGetUnsafe>() != kj::none) { return doAbort(js, errored, {.reject = true, .handled = pipeThrough}); } } // If preventAbort was true, we're going to unlock the destination now. - writeState.init(); + writeState.transitionTo(); return rejectedMaybeHandledPromise(js, errored, pipeThrough); } // If the destination has errored, the spec requires us to reject the pipe promise and, if // preventCancel is false, error the source (Propagate error backward). The errored destination // will be unlocked immediately. - KJ_IF_SOME(errored, state.tryGet()) { - writeState.init(); + KJ_IF_SOME(errored, state.tryGetUnsafe()) { + writeState.transitionTo(); if (!preventCancel) { sourceLock.release(js, errored.getHandle(js)); } else { @@ -1251,7 +1257,7 @@ kj::Maybe> WritableStreamInternalController::tryPipeFrom( return close(js); } } - writeState.init(); + writeState.transitionTo(); return js.resolvedPromise(); } @@ -1259,7 +1265,7 @@ kj::Maybe> WritableStreamInternalController::tryPipeFrom( // preventCancel is false (Propagate closing backward). if (isClosedOrClosing()) { auto destClosed = js.v8TypeError("This destination writable stream is closed."_kj); - writeState.init(); + writeState.transitionTo(); if (!preventCancel) { sourceLock.release(js, destClosed); @@ -1294,7 +1300,7 @@ kj::Maybe> WritableStreamInternalController::removeS !isLockedToWriter(), TypeError, "This WritableStream is currently locked to a writer."); JSG_REQUIRE(!isClosedOrClosing(), TypeError, "This WritableStream is closed."); - writeState.init(); + writeState.transitionTo(); KJ_SWITCH_ONEOF(state) { KJ_CASE_ONEOF(closed, StreamStates::Closed) { @@ -1306,7 +1312,7 @@ kj::Maybe> WritableStreamInternalController::removeS } KJ_CASE_ONEOF(writable, IoOwn) { auto result = kj::mv(writable->sink); - state.init(); + state.transitionTo(); return kj::Maybe>(kj::mv(result)); } } @@ -1319,7 +1325,7 @@ void WritableStreamInternalController::detach(jsg::Lock& js) { !isLockedToWriter(), TypeError, "This WritableStream is currently locked to a writer."); JSG_REQUIRE(!isClosedOrClosing(), TypeError, "This WritableStream is closed."); - writeState.init(); + writeState.transitionTo(); KJ_SWITCH_ONEOF(state) { KJ_CASE_ONEOF(closed, StreamStates::Closed) { @@ -1330,7 +1336,7 @@ void WritableStreamInternalController::detach(jsg::Lock& js) { kj::throwFatalException(js.exceptionToKj(errored.addRef(js))); } KJ_CASE_ONEOF(writable, IoOwn) { - state.init(); + state.transitionTo(); return; } } @@ -1384,14 +1390,14 @@ bool WritableStreamInternalController::lockWriter(jsg::Lock& js, Writer& writer) } } - writeState = kj::mv(lock); + writeState.transitionTo(kj::mv(lock)); writer.attach(js, *this, kj::mv(closedPrp.promise), kj::mv(readyPrp.promise)); return true; } void WritableStreamInternalController::releaseWriter( Writer& writer, kj::Maybe maybeJs) { - KJ_IF_SOME(locked, writeState.tryGet()) { + KJ_IF_SOME(locked, writeState.tryGetUnsafe()) { KJ_ASSERT(&locked.getWriter() == &writer); KJ_IF_SOME(js, maybeJs) { maybeRejectPromise(js, locked.getClosedFulfiller(), @@ -1405,7 +1411,7 @@ void WritableStreamInternalController::releaseWriter( // state itself. Clearing the lock above will free the lock state while keeping the // WritableStream marked as locked. if (maybeJs != kj::none) { - writeState.template init(); + writeState.transitionTo(); } } } @@ -1426,25 +1432,31 @@ bool WritableStreamInternalController::isErrored() { } void WritableStreamInternalController::doClose(jsg::Lock& js) { - state.init(); - KJ_IF_SOME(locked, writeState.tryGet()) { + // If already in a terminal state, nothing to do. + if (state.isTerminal()) return; + + state.transitionTo(); + KJ_IF_SOME(locked, writeState.tryGetUnsafe()) { maybeResolvePromise(js, locked.getClosedFulfiller()); maybeResolvePromise(js, locked.getReadyFulfiller()); - writeState.init(); - } else if (writeState.tryGet() != kj::none) { - writeState.init(); + writeState.transitionTo(); + } else { + (void)writeState.transitionFromTo(); } PendingAbort::dequeue(maybePendingAbort); } void WritableStreamInternalController::doError(jsg::Lock& js, v8::Local reason) { - state.init(js.v8Ref(reason)); - KJ_IF_SOME(locked, writeState.tryGet()) { + // If already in a terminal state, nothing to do. + if (state.isTerminal()) return; + + state.transitionTo(js.v8Ref(reason)); + KJ_IF_SOME(locked, writeState.tryGetUnsafe()) { maybeRejectPromise(js, locked.getClosedFulfiller(), reason); maybeResolvePromise(js, locked.getReadyFulfiller()); - writeState.init(); - } else if (writeState.tryGet() != kj::none) { - writeState.init(); + writeState.transitionTo(); + } else { + (void)writeState.transitionFromTo(); } PendingAbort::dequeue(maybePendingAbort); } @@ -1546,8 +1558,8 @@ jsg::Promise WritableStreamInternalController::writeLoopAfterFrontOutputLo }; }; - const auto maybeAbort = [this](jsg::Lock& js, auto& request) -> bool { - auto& writable = KJ_ASSERT_NONNULL(state.tryGet>()); + const auto maybeAbort = [this](jsg::Lock& js) -> bool { + auto& writable = KJ_ASSERT_NONNULL(state.tryGetUnsafe>()); KJ_IF_SOME(pendingAbort, WritableStreamController::PendingAbort::dequeue(maybePendingAbort)) { auto ex = js.exceptionToKj(pendingAbort->reason.addRef(js)); writable->abort(kj::mv(ex)); @@ -1577,7 +1589,7 @@ jsg::Promise WritableStreamInternalController::writeLoopAfterFrontOutputLo } // writeLoop() is only called with the sink in the Writable state. - auto& writable = state.get>(); + auto& writable = state.getUnsafe>(); auto check = makeChecker(); auto amountToWrite = request->bytes.size(); @@ -1605,7 +1617,7 @@ jsg::Promise WritableStreamInternalController::writeLoopAfterFrontOutputLo o->onChunkDequeued(amountToWrite); } queue.pop_front(); - maybeAbort(js, request); + maybeAbort(js); return writeLoop(js, IoContext::current()); }), ioContext.addFunctor([this, check, maybeAbort, amountToWrite]( @@ -1614,14 +1626,14 @@ jsg::Promise WritableStreamInternalController::writeLoopAfterFrontOutputLo if (queue.empty()) return js.resolvedPromise(); auto handle = reason.getHandle(js); auto& request = check.template operator()(); - auto& writable = state.get>(); + auto& writable = state.getUnsafe>(); adjustWriteBufferSize(js, -amountToWrite); KJ_IF_SOME(o, observer) { o->onChunkDequeued(amountToWrite); } maybeRejectPromise(js, request.promise, handle); queue.pop_front(); - if (!maybeAbort(js, request)) { + if (!maybeAbort(js)) { auto ex = js.exceptionToKj(reason.addRef(js)); writable->abort(kj::mv(ex)); drain(js, handle); @@ -1633,7 +1645,7 @@ jsg::Promise WritableStreamInternalController::writeLoopAfterFrontOutputLo // The destination should still be Writable, because the only way to transition to an // errored state would have been if a write request in the queue ahead of us encountered an // error. But in that case, the queue would already have been drained and we wouldn't be here. - auto& writable = state.get>(); + auto& writable = state.getUnsafe>(); if (request->checkSignal(js)) { // If the signal is triggered, checkSignal will handle erroring the source and destination. @@ -1649,7 +1661,7 @@ jsg::Promise WritableStreamInternalController::writeLoopAfterFrontOutputLo if (!request->preventClose() && !isClosedOrClosing()) { doClose(js); } else { - writeState.init(); + writeState.transitionTo(); } return js.resolvedPromise(); } @@ -1663,7 +1675,7 @@ jsg::Promise WritableStreamInternalController::writeLoopAfterFrontOutputLo writable->abort(kj::mv(ex)); drain(js, errored); } else { - writeState.init(); + writeState.transitionTo(); } return js.resolvedPromise(); } @@ -1696,7 +1708,7 @@ jsg::Promise WritableStreamInternalController::writeLoopAfterFrontOutputLo // Even through we're not going to close the destination, we still want the // pipe promise itself to be rejected in this case. maybeRejectPromise(js, request.promise(), errored); - } else KJ_IF_SOME(errored, state.tryGet()) { + } else KJ_IF_SOME(errored, state.tryGetUnsafe()) { maybeRejectPromise(js, request.promise(), errored.getHandle(js)); } else { maybeResolvePromise(js, request.promise()); @@ -1712,7 +1724,7 @@ jsg::Promise WritableStreamInternalController::writeLoopAfterFrontOutputLo // Note: unlike a real Close request, it's not possible for us to have been aborted. return close(js, true); } else { - writeState.init(); + writeState.transitionTo(); } return js.resolvedPromise(); }), @@ -1754,7 +1766,7 @@ jsg::Promise WritableStreamInternalController::writeLoopAfterFrontOutputLo } KJ_CASE_ONEOF(request, kj::Own) { // writeLoop() is only called with the sink in the Writable state. - auto& writable = state.get>(); + auto& writable = state.getUnsafe>(); auto check = makeChecker(); return ioContext.awaitIo(js, writable->canceler.wrap(writable->sink->end())) @@ -1812,15 +1824,15 @@ bool WritableStreamInternalController::Pipe::State::checkSignal(jsg::Lock& js) { auto promiseCopy = kj::mv(this->promise); if (!preventAbort) { - KJ_IF_SOME(writable, parentRef.state.tryGet>()) { + KJ_IF_SOME(writable, parent.state.tryGetUnsafe>()) { auto ex = js.exceptionToKj(reason); writable->abort(kj::mv(ex)); parentRef.drain(js, reason); } else { - parentRef.writeState.init(); + parent.writeState.transitionTo(); } } else { - parentRef.writeState.init(); + parent.writeState.transitionTo(); } if (!preventCancelCopy) { sourceRef.release(js, v8::Local(reason)); @@ -1836,7 +1848,7 @@ bool WritableStreamInternalController::Pipe::State::checkSignal(jsg::Lock& js) { jsg::Promise WritableStreamInternalController::Pipe::State::write( v8::Local handle) { - auto& writable = parent.state.get>(); + auto& writable = parent.state.getUnsafe>(); // TODO(soon): Once jsg::BufferSource lands and we're able to use it, this can be simplified. KJ_ASSERT(handle->IsArrayBuffer() || handle->IsArrayBufferView()); std::shared_ptr store; @@ -1892,7 +1904,7 @@ jsg::Promise WritableStreamInternalController::Pipe::State::pipeLoop(jsg:: KJ_IF_SOME(errored, source.tryGetErrored(js)) { source.release(js); if (!preventAbort) { - KJ_IF_SOME(writable, parent.state.tryGet>()) { + KJ_IF_SOME(writable, parent.state.tryGetUnsafe>()) { auto ex = js.exceptionToKj(js.v8Ref(errored)); writable->abort(kj::mv(ex)); return js.rejectedPromise(errored); @@ -1901,12 +1913,12 @@ jsg::Promise WritableStreamInternalController::Pipe::State::pipeLoop(jsg:: // If preventAbort was true, we're going to unlock the destination now. // We are not going to propagate the error here tho. - parent.writeState.init(); + parent.writeState.transitionTo(); return js.resolvedPromise(); } - KJ_IF_SOME(errored, parent.state.tryGet()) { - parent.writeState.init(); + KJ_IF_SOME(errored, parent.state.tryGetUnsafe()) { + parent.writeState.transitionTo(); if (!preventCancel) { auto reason = errored.getHandle(js); source.release(js, reason); @@ -1925,7 +1937,7 @@ jsg::Promise WritableStreamInternalController::Pipe::State::pipeLoop(jsg:: auto& ioContext = IoContext::current(); // Capture a ref to the state to keep it alive during async operations. return ioContext - .awaitIo(js, parent.state.get>()->sink->end(), [](jsg::Lock&) {}) + .awaitIo(js, parent.state.getUnsafe>()->sink->end(), [](jsg::Lock&) {}) .then(js, ioContext.addFunctor([state = kj::addRef(*this)](jsg::Lock& js) { if (state->aborted) return; state->parent.finishClose(js); @@ -1935,14 +1947,14 @@ jsg::Promise WritableStreamInternalController::Pipe::State::pipeLoop(jsg:: state->parent.finishError(js, reason.getHandle(js)); })); } - parent.writeState.init(); + parent.writeState.transitionTo(); } return js.resolvedPromise(); } if (parent.isClosedOrClosing()) { auto destClosed = js.v8TypeError("This destination writable stream is closed."_kj); - parent.writeState.init(); + parent.writeState.transitionTo(); if (!preventCancel) { source.release(js, destClosed); @@ -1987,7 +1999,7 @@ jsg::Promise WritableStreamInternalController::Pipe::State::pipeLoop(jsg:: // Undefined and null are perfectly valid values to pass through a ReadableStream, // but we can't interpret them as bytes so if we get them here, we error the pipe. auto error = js.v8TypeError("This WritableStream only supports writing byte types."_kj); - auto& writable = state->parent.state.get>(); + auto& writable = state->parent.state.getUnsafe>(); auto ex = js.exceptionToKj(js.v8Ref(error)); writable->abort(kj::mv(ex)); // The error condition will be handled at the start of the next iteration. @@ -2044,7 +2056,7 @@ void WritableStreamInternalController::visitForGc(jsg::GcVisitor& visitor) { } } } - KJ_IF_SOME(locked, writeState.tryGet()) { + KJ_IF_SOME(locked, writeState.tryGetUnsafe()) { visitor.visit(locked); } KJ_IF_SOME(pendingAbort, maybePendingAbort) { @@ -2053,7 +2065,7 @@ void WritableStreamInternalController::visitForGc(jsg::GcVisitor& visitor) { } void ReadableStreamInternalController::visitForGc(jsg::GcVisitor& visitor) { - KJ_IF_SOME(locked, readState.tryGet()) { + KJ_IF_SOME(locked, readState.tryGetUnsafe()) { visitor.visit(locked); } } @@ -2063,8 +2075,7 @@ kj::Maybe ReadableStreamInternalContr if (isLockedToReader()) { return kj::none; } - readState.init(*this); - return readState.get(); + return readState.transitionTo(*this); } bool ReadableStreamInternalController::PipeLocked::isClosed() { @@ -2073,7 +2084,7 @@ bool ReadableStreamInternalController::PipeLocked::isClosed() { kj::Maybe> ReadableStreamInternalController::PipeLocked::tryGetErrored( jsg::Lock& js) { - KJ_IF_SOME(errored, inner.state.tryGet()) { + KJ_IF_SOME(errored, inner.state.tryGetUnsafe()) { return errored.getHandle(js); } return kj::none; @@ -2100,14 +2111,14 @@ void ReadableStreamInternalController::PipeLocked::release( KJ_IF_SOME(error, maybeError) { cancel(js, error); } - inner.readState.init(); + inner.readState.transitionTo(); } kj::Maybe> ReadableStreamInternalController::PipeLocked::tryPumpTo( WritableStreamSink& sink, bool end) { // This is safe because the caller should have already checked isClosed and tryGetErrored // and handled those before calling tryPumpTo. - auto& readable = KJ_ASSERT_NONNULL(inner.state.tryGet()); + auto& readable = KJ_ASSERT_NONNULL(inner.state.tryGetUnsafe()); return IoContext::current().waitForDeferredProxy(readable->pumpTo(sink, end)); } @@ -2245,7 +2256,7 @@ kj::Promise> ReadableStreamInternalController::pumpTo( } StreamEncoding ReadableStreamInternalController::getPreferredEncoding() { - return state.tryGet() + return state.tryGetUnsafe() .map([](Readable& readable) { return readable->getPreferredEncoding(); }).orDefault(StreamEncoding::IDENTITY); @@ -2285,7 +2296,7 @@ void WritableStreamInternalController::jsgGetMemoryInfo(jsg::MemoryTracker& trac tracker.trackFieldWithSize("IoOwn", sizeof(IoOwn)); } } - KJ_IF_SOME(writerLocked, writeState.tryGet()) { + KJ_IF_SOME(writerLocked, writeState.tryGetUnsafe()) { tracker.trackField("writerLocked", writerLocked); } tracker.trackField("pendingAbort", maybePendingAbort); diff --git a/src/workerd/api/streams/internal.h b/src/workerd/api/streams/internal.h index 73f52ff227c..09d95ef8f06 100644 --- a/src/workerd/api/streams/internal.h +++ b/src/workerd/api/streams/internal.h @@ -10,6 +10,7 @@ #include #include #include +#include #include @@ -40,10 +41,12 @@ class ReadableStreamInternalController: public ReadableStreamController { public: using Readable = IoOwn; - explicit ReadableStreamInternalController(StreamStates::Closed closed): state(closed) {} + explicit ReadableStreamInternalController(StreamStates::Closed closed) + : state(State::create()) {} explicit ReadableStreamInternalController(StreamStates::Errored errored) - : state(kj::mv(errored)) {} - explicit ReadableStreamInternalController(Readable readable): state(kj::mv(readable)) {} + : state(State::create(kj::mv(errored))) {} + explicit ReadableStreamInternalController(Readable readable) + : state(State::create(kj::mv(readable))) {} KJ_DISALLOW_COPY_AND_MOVE(ReadableStreamInternalController); @@ -124,6 +127,7 @@ class ReadableStreamInternalController: public ReadableStreamController { class PipeLocked: public PipeController { public: + static constexpr kj::StringPtr NAME KJ_UNUSED = "pipe-locked"_kj; PipeLocked(ReadableStreamInternalController& inner): inner(inner) {} bool isClosed() override; @@ -147,8 +151,28 @@ class ReadableStreamInternalController: public ReadableStreamController { }; kj::Maybe owner; - kj::OneOf state; - kj::OneOf readState = Unlocked(); + + // State machine for ReadableStreamInternalController: + // Closed is terminal, Errored is implicitly terminal via ErrorState. + // Readable is the active state (stream has data). + using State = StateMachine, + ErrorState, + ActiveState, + StreamStates::Closed, + StreamStates::Errored, + Readable>; + State state; + + // Lock state machine for ReadableStreamInternalController: + // All states can transition to any other state (no terminal states). + // Unlocked -> Locked (removeSink() or pumpTo() called) + // Unlocked -> ReaderLocked (lockReader() called) + // Unlocked -> PipeLocked (tryPipeLock() called) + // ReaderLocked -> Unlocked (releaseReader() called) + // PipeLocked -> Unlocked (release() or doClose/doError called) + // Locked -> (remains until stream is done) + using ReadLockState = StateMachine; + ReadLockState readState = ReadLockState::create(); bool disturbed = false; bool readPending = false; @@ -170,14 +194,16 @@ class WritableStreamInternalController: public WritableStreamController { void abort(kj::Exception&& ex); }; - explicit WritableStreamInternalController(StreamStates::Closed closed): state(closed) {} + explicit WritableStreamInternalController(StreamStates::Closed closed) + : state(State::create()) {} explicit WritableStreamInternalController(StreamStates::Errored errored) - : state(kj::mv(errored)) {} + : state(State::create(kj::mv(errored))) {} explicit WritableStreamInternalController(kj::Own writable, kj::Maybe> observer, kj::Maybe maybeHighWaterMark = kj::none, kj::Maybe> maybeClosureWaitable = kj::none) - : state(IoContext::current().addObject(kj::heap(kj::mv(writable)))), + : state(State::create>( + IoContext::current().addObject(kj::heap(kj::mv(writable))))), observer(kj::mv(observer)), maybeHighWaterMark(maybeHighWaterMark), maybeClosureWaitable(kj::mv(maybeClosureWaitable)) {} @@ -265,12 +291,33 @@ class WritableStreamInternalController: public WritableStreamController { jsg::Promise closeImpl(jsg::Lock& js, bool markAsHandled); struct PipeLocked { + static constexpr kj::StringPtr NAME KJ_UNUSED = "pipe-locked"_kj; ReadableStream& ref; }; kj::Maybe owner; - kj::OneOf> state; - kj::OneOf writeState = Unlocked(); + + // State machine for WritableStreamInternalController: + // Closed is terminal, Errored is implicitly terminal via ErrorState. + // IoOwn is the active state (stream is writable). + using State = StateMachine, + ErrorState, + ActiveState>, + StreamStates::Closed, + StreamStates::Errored, + IoOwn>; + State state; + + // Lock state machine for WritableStreamInternalController: + // All states can transition to any other state (no terminal states). + // Unlocked -> Locked (removeSink() or detach() called) + // Unlocked -> WriterLocked (lockWriter() called) + // Unlocked -> PipeLocked (tryPipeFrom() called) + // WriterLocked -> Unlocked (releaseWriter() called) + // WriterLocked -> Locked (doClose/doError called - stream closed but writer still attached) + // PipeLocked -> Unlocked (pipe completes) + using WriteLockState = StateMachine; + WriteLockState writeState = WriteLockState::create(); kj::Maybe> observer; diff --git a/src/workerd/api/streams/queue.h b/src/workerd/api/streams/queue.h index e7ab67e4d1c..d540303059c 100644 --- a/src/workerd/api/streams/queue.h +++ b/src/workerd/api/streams/queue.h @@ -202,12 +202,12 @@ class QueueImpl final { // If we are already closed or errored, set totalQueueSize to zero. void maybeUpdateBackpressure() { totalQueueSize = 0; - KJ_IF_SOME(ready, state.tryGetActiveUnsafe()) { + state.whenActive([this](Ready& ready) { auto consumers = ready.consumers.snapshot(); for (auto consumer: consumers) { totalQueueSize = kj::max(totalQueueSize, consumer->size()); } - } + }); } // Forwards the entry to all consumers (except skipConsumer if given). @@ -483,12 +483,12 @@ class ConsumerImpl final { void cancelPendingReads(jsg::Lock& js, jsg::JsValue reason) { // Already closed or errored - nothing to do. - KJ_IF_SOME(ready, state.tryGetActiveUnsafe()) { + state.whenActive([&](Ready& ready) { for (auto& request: ready.readRequests) { request->resolver.reject(js, reason); } ready.readRequests.clear(); - } + }); } void visitForGc(jsg::GcVisitor& visitor) { @@ -552,13 +552,9 @@ class ConsumerImpl final { bool isClosing() { // Closing state is determined by whether there is a Close sentinel that has been // pushed into the end of Ready state buffer. - KJ_IF_SOME(ready, state.tryGetActiveUnsafe()) { - if (ready.buffer.empty()) { - return false; - } - return ready.buffer.back().template is(); - } - return false; + return state.whenActiveOr([](Ready& ready) { + return !ready.buffer.empty() && ready.buffer.back().template is(); + }, false); } void maybeDrainAndSetState(jsg::Lock& js, kj::Maybe maybeReason = kj::none) { diff --git a/src/workerd/api/streams/readable.c++ b/src/workerd/api/streams/readable.c++ index 71f068a862e..63ecd24e051 100644 --- a/src/workerd/api/streams/readable.c++ +++ b/src/workerd/api/streams/readable.c++ @@ -16,65 +16,46 @@ namespace workerd::api { ReaderImpl::ReaderImpl(ReadableStreamController::Reader& reader) : ioContext(tryGetIoContext()), - reader(reader) {} + reader(reader), + state(ReaderState::create()) {} ReaderImpl::~ReaderImpl() noexcept(false) { - KJ_IF_SOME(stream, state.tryGet()) { - stream->getController().releaseReader(reader, kj::none); + KJ_IF_SOME(attached, state.tryGetActiveUnsafe()) { + attached.stream->getController().releaseReader(reader, kj::none); } } void ReaderImpl::attach(ReadableStreamController& controller, jsg::Promise closedPromise) { KJ_ASSERT(state.is()); - state = controller.addRef(); + state.transitionTo(controller.addRef()); this->closedPromise = kj::mv(closedPromise); } void ReaderImpl::detach() { - KJ_SWITCH_ONEOF(state) { - KJ_CASE_ONEOF(i, Initial) { - // Do nothing in this case. - return; - } - KJ_CASE_ONEOF(stream, Attached) { - state.init(); - return; - } - KJ_CASE_ONEOF(c, StreamStates::Closed) { - // Do nothing in this case. - return; - } - KJ_CASE_ONEOF(r, Released) { - // Do nothing in this case. - return; - } + // Only transition from Attached to Closed. + // All other states (Initial, Closed, Released) are no-ops. + if (state.isActive()) { + state.transitionTo(); } - KJ_UNREACHABLE; } jsg::Promise ReaderImpl::cancel( jsg::Lock& js, jsg::Optional> maybeReason) { - KJ_SWITCH_ONEOF(state) { - KJ_CASE_ONEOF(i, Initial) { - KJ_FAIL_ASSERT("this reader was never attached"); - } - KJ_CASE_ONEOF(stream, Attached) { - // In some edge cases, this reader is the last thing holding a strong - // reference to the stream. Calling cancel might cause the readers strong - // reference to be cleared, so let's make sure we keep a reference to - // the stream at least until the call to cancel completes. - auto ref = stream.addRef(); - return stream->getController().cancel(js, maybeReason); - } - KJ_CASE_ONEOF(r, Released) { - return js.rejectedPromise( - js.v8TypeError("This ReadableStream reader has been released."_kj)); - } - KJ_CASE_ONEOF(c, StreamStates::Closed) { - return js.resolvedPromise(); - } + assertAttachedOrTerminal(); + if (state.is()) { + return js.rejectedPromise( + js.v8TypeError("This ReadableStream reader has been released."_kj)); } - KJ_UNREACHABLE; + if (state.is()) { + return js.resolvedPromise(); + } + auto& attached = state.requireActiveUnsafe(); + // In some edge cases, this reader is the last thing holding a strong + // reference to the stream. Calling cancel might cause the readers strong + // reference to be cleared, so let's make sure we keep a reference to + // the stream at least until the call to cancel completes. + auto ref = attached.stream.addRef(); + return attached.stream->getController().cancel(js, maybeReason); } jsg::MemoizedIdentity>& ReaderImpl::getClosed() { @@ -90,79 +71,60 @@ void ReaderImpl::lockToStream(jsg::Lock& js, ReadableStream& stream) { jsg::Promise ReaderImpl::read( jsg::Lock& js, kj::Maybe byobOptions) { - KJ_SWITCH_ONEOF(state) { - KJ_CASE_ONEOF(i, Initial) { - KJ_FAIL_ASSERT("this reader was never attached"); - } - KJ_CASE_ONEOF(stream, Attached) { - KJ_IF_SOME(options, byobOptions) { - // Per the spec, we must perform these checks before disturbing the stream. - size_t atLeast = options.atLeast.orDefault(1); - - if (options.byteLength == 0) { - return js.rejectedPromise( - js.v8TypeError("You must call read() on a \"byob\" reader with a positive-sized " - "TypedArray object."_kj)); - } - if (atLeast == 0) { - return js.rejectedPromise(js.v8TypeError( - kj::str("Requested invalid minimum number of bytes to read (", atLeast, ")."))); - } - if (atLeast > options.byteLength) { - return js.rejectedPromise(js.v8TypeError(kj::str("Minimum bytes to read (", - atLeast, ") exceeds size of buffer (", options.byteLength, ")."))); - } - - jsg::BufferSource source(js, options.bufferView.getHandle(js)); - options.atLeast = atLeast * source.getElementSize(); - } + assertAttachedOrTerminal(); + if (state.is()) { + return js.rejectedPromise( + js.v8TypeError("This ReadableStream reader has been released."_kj)); + } + if (state.is()) { + return js.rejectedPromise( + js.v8TypeError("This ReadableStream has been closed."_kj)); + } + auto& attached = state.requireActiveUnsafe(); + KJ_IF_SOME(options, byobOptions) { + // Per the spec, we must perform these checks before disturbing the stream. + size_t atLeast = options.atLeast.orDefault(1); - return KJ_ASSERT_NONNULL(stream->getController().read(js, kj::mv(byobOptions))); - } - KJ_CASE_ONEOF(r, Released) { + if (options.byteLength == 0) { return js.rejectedPromise( - js.v8TypeError("This ReadableStream reader has been released."_kj)); + js.v8TypeError("You must call read() on a \"byob\" reader with a positive-sized " + "TypedArray object."_kj)); } - KJ_CASE_ONEOF(c, StreamStates::Closed) { - return js.rejectedPromise( - js.v8TypeError("This ReadableStream has been closed."_kj)); + if (atLeast == 0) { + return js.rejectedPromise(js.v8TypeError( + kj::str("Requested invalid minimum number of bytes to read (", atLeast, ")."))); + } + if (atLeast > options.byteLength) { + return js.rejectedPromise(js.v8TypeError(kj::str("Minimum bytes to read (", + atLeast, ") exceeds size of buffer (", options.byteLength, ")."))); } + + jsg::BufferSource source(js, options.bufferView.getHandle(js)); + options.atLeast = atLeast * source.getElementSize(); } - KJ_UNREACHABLE; + + return KJ_ASSERT_NONNULL(attached.stream->getController().read(js, kj::mv(byobOptions))); } void ReaderImpl::releaseLock(jsg::Lock& js) { // TODO(soon): Releasing the lock should cancel any pending reads. This is a recent // modification to the spec that we have not yet implemented. - KJ_SWITCH_ONEOF(state) { - KJ_CASE_ONEOF(i, Initial) { - KJ_FAIL_ASSERT("this reader was never attached"); - } - KJ_CASE_ONEOF(stream, Attached) { - // In some edge cases, this reader is the last thing holding a strong - // reference to the stream. Calling releaseLock might cause the readers strong - // reference to be cleared, so let's make sure we keep a reference to - // the stream at least until the call to releaseLock completes. - auto ref = stream.addRef(); - stream->getController().releaseReader(reader, js); - state.init(); - return; - } - KJ_CASE_ONEOF(c, StreamStates::Closed) { - // Do nothing in this case. - return; - } - KJ_CASE_ONEOF(r, Released) { - // Do nothing in this case. - return; - } + assertAttachedOrTerminal(); + // Closed and Released states are no-ops. + KJ_IF_SOME(attached, state.tryGetActiveUnsafe()) { + // In some edge cases, this reader is the last thing holding a strong + // reference to the stream. Calling releaseLock might cause the readers strong + // reference to be cleared, so let's make sure we keep a reference to + // the stream at least until the call to releaseLock completes. + auto ref = attached.stream.addRef(); + attached.stream->getController().releaseReader(reader, js); + state.transitionTo(); } - KJ_UNREACHABLE; } void ReaderImpl::visitForGc(jsg::GcVisitor& visitor) { - KJ_IF_SOME(readable, state.tryGet()) { - visitor.visit(readable); + KJ_IF_SOME(attached, state.tryGetActiveUnsafe()) { + visitor.visit(attached.stream); } visitor.visit(closedPromise); } @@ -759,8 +721,8 @@ size_t ReaderImpl::jsgGetMemorySelfSize() const { } void ReaderImpl::jsgGetMemoryInfo(jsg::MemoryTracker& tracker) const { - KJ_IF_SOME(stream, state.tryGet()) { - tracker.trackField("stream", stream); + KJ_IF_SOME(attached, state.tryGetActiveUnsafe()) { + tracker.trackField("stream", attached.stream); } tracker.trackField("closedPromise", closedPromise); } diff --git a/src/workerd/api/streams/readable.h b/src/workerd/api/streams/readable.h index 769ae350873..dd38bc4ab43 100644 --- a/src/workerd/api/streams/readable.h +++ b/src/workerd/api/streams/readable.h @@ -5,7 +5,9 @@ #pragma once #include "common.h" + #include +#include namespace workerd::api { @@ -40,7 +42,9 @@ class ReaderImpl final { void jsgGetMemoryInfo(jsg::MemoryTracker& tracker) const; private: - struct Initial {}; + struct Initial { + static constexpr kj::StringPtr NAME KJ_UNUSED = "initial"_kj; + }; // While a Reader is attached to a ReadableStream, it holds a strong reference to the // ReadableStream to prevent it from being GC'ed so long as the Reader is available. // Once the reader is closed, released, or GC'ed the reference to the ReadableStream @@ -48,13 +52,42 @@ class ReaderImpl final { // it being held anywhere. If the reader is still attached to the ReadableStream when // it is destroyed, the ReadableStream's reference to the reader is cleared but the // ReadableStream remains in the "reader locked" state, per the spec. - using Attached = jsg::Ref; - struct Released {}; + struct Attached { + static constexpr kj::StringPtr NAME KJ_UNUSED = "attached"_kj; + jsg::Ref stream; + }; + // Released: The user explicitly called releaseLock() to detach the reader from the stream. + // The stream remains usable and can be locked by a new reader. + struct Released { + static constexpr kj::StringPtr NAME KJ_UNUSED = "released"_kj; + }; + // Closed: The underlying stream ended (closed or errored) while the reader was attached. + // The stream is no longer usable. + struct Closed { + static constexpr kj::StringPtr NAME KJ_UNUSED = "closed"_kj; + }; + + // State machine for ReaderImpl: + // Initial -> Attached (attach() called) + // Attached -> Closed (detach() called when stream closes) + // Attached -> Released (releaseLock() called) + // Closed and Released are terminal states. + // Initial is not terminal but most methods assert if called in this state. + using ReaderState = StateMachine, + ActiveState, + Initial, + Attached, + Closed, + Released>; kj::Maybe ioContext; ReadableStreamController::Reader& reader; - kj::OneOf state = Initial(); + ReaderState state; + + inline void assertAttachedOrTerminal() const { + KJ_ASSERT(!state.is(), "this reader was never attached"); + } kj::Maybe>> closedPromise; friend class ReadableStreamDefaultReader; diff --git a/src/workerd/api/streams/standard-test.c++ b/src/workerd/api/streams/standard-test.c++ index bd55520b972..1359143dd14 100644 --- a/src/workerd/api/streams/standard-test.c++ +++ b/src/workerd/api/streams/standard-test.c++ @@ -854,5 +854,76 @@ KJ_TEST("ReadableStream read all bytes (byte readable, failed start 2)") { }); } +// ====================================================================================== +// Execution Termination Tests + +// Test that execution termination during a read doesn't cause an assertion failure. +// This tests the fix for the production error: +// "expected !js.v8Isolate->IsExecutionTerminating()" +// +// Note: This test verifies that if IsExecutionTerminating() is true after readCallback() +// returns, we handle it gracefully instead of asserting. However, actually triggering +// the termination check in the exact right spot in a unit test is difficult because +// V8 only checks the termination flag during JS execution, not during C++ callbacks. +// The production scenario occurs when the termination happens during actual JS execution +// inside readCallback() (e.g., user's pull function runs too long). +KJ_TEST("ReadableStream handles execution termination during read") { + RsIsolate isolate(v8System, kj::heap()); + isolate.runInLockScope([&](RsIsolate::Lock& lock) { + // We need to handle termination at this level because the context scope + // will propagate the termination exception. + v8::TryCatch outerTryCatch(lock.v8Isolate); + bool testCompleted = false; + + try { + JSG_WITHIN_CONTEXT_SCOPE( + lock, lock.newContext().getHandle(lock), [&](jsg::Lock& js) { + v8::TryCatch tryCatch(js.v8Isolate); + + auto rs = js.alloc(newReadableStreamJsController()); + + // Set up a stream where the pull callback terminates execution + rs->getController().setup(js, + UnderlyingSource{ + .pull = + [&](jsg::Lock& js, UnderlyingSource::Controller controller) { + // Terminate execution - this simulates CPU time limit exceeded + js.terminateNextExecution(); + + // Force V8 to check the termination flag by doing JS work. + // This is similar to what terminateExecutionNow() does. + jsg::check(v8::JSON::Stringify(js.v8Context(), js.str("test"_kj))); + + // We shouldn't get here - termination should have been triggered + return js.resolvedPromise(); + }, + }, + StreamQueuingStrategy{.highWaterMark = 0}); + + // Start a read - this will call pull which terminates execution + auto promise = rs->getController().readAllText(js, 100); + + // Run microtasks - this will propagate the termination + js.runMicrotasks(); + + // If we get here without the assertion failing, the fix works + testCompleted = true; + }); + } catch (jsg::JsExceptionThrown&) { + // Expected - execution was terminated + // The important thing is we didn't hit the assertion failure + testCompleted = true; + } + + // Cancel termination so cleanup can proceed + if (lock.v8Isolate->IsExecutionTerminating()) { + lock.v8Isolate->CancelTerminateExecution(); + } + + // The test passes if we got here without an assertion failure + KJ_ASSERT(testCompleted, "Test did not complete as expected"); + }); +} + } // namespace } // namespace workerd::api diff --git a/src/workerd/api/streams/standard.c++ b/src/workerd/api/streams/standard.c++ index a666da51a57..81c13eb8897 100644 --- a/src/workerd/api/streams/standard.c++ +++ b/src/workerd/api/streams/standard.c++ @@ -9,6 +9,7 @@ #include #include +#include #include #include @@ -86,6 +87,7 @@ class ReadableLockImpl { private: class PipeLocked final: public PipeController { public: + static constexpr kj::StringPtr NAME KJ_UNUSED = "pipe-locked"_kj; explicit PipeLocked(Controller& inner): inner(inner) {} bool isClosed() override { @@ -93,7 +95,7 @@ class ReadableLockImpl { } kj::Maybe> tryGetErrored(jsg::Lock& js) override { - KJ_IF_SOME(errored, inner.state.template tryGet()) { + KJ_IF_SOME(errored, inner.state.template tryGetUnsafe()) { return errored.getHandle(js); } return kj::none; @@ -117,7 +119,7 @@ class ReadableLockImpl { KJ_IF_SOME(error, maybeError) { cancel(js, error); } - inner.lock.state.template init(); + inner.lock.state.template transitionTo(); } kj::Maybe> tryPumpTo(WritableStreamSink& sink, bool end) override; @@ -130,7 +132,16 @@ class ReadableLockImpl { friend Controller; }; - kj::OneOf state = Unlocked(); + // State machine for ReadableLockImpl: + // All states can transition to any other state (no terminal states). + // Unlocked -> Locked (lock() called for tee) + // Unlocked -> ReaderLocked (lockReader() called) + // Unlocked -> PipeLocked (tryPipeLock() called) + // ReaderLocked -> Unlocked (releaseReader() called) + // PipeLocked -> Unlocked (release() or onClose/onError called) + // Locked -> (remains until stream is done) + using LockState = StateMachine; + LockState state = LockState::template create(); friend Controller; }; @@ -169,6 +180,7 @@ class WritableLockImpl { private: struct PipeLocked { + static constexpr kj::StringPtr NAME KJ_UNUSED = "pipe-locked"_kj; ReadableStreamController::PipeController& source; jsg::Ref readableStreamRef; @@ -189,10 +201,19 @@ class WritableLockImpl { tracker.trackField("signal", maybeSignal); } }; - kj::OneOf state = Unlocked(); + + // State machine for WritableLockImpl: + // All states can transition to any other state (no terminal states). + // Unlocked -> Locked (not currently used) + // Unlocked -> WriterLocked (lockWriter() called) + // Unlocked -> PipeLocked (pipeLock() called) + // WriterLocked -> Unlocked (releaseWriter() called) + // PipeLocked -> Unlocked (releasePipeLock() called) + using LockState = StateMachine; + LockState state = LockState::template create(); inline kj::Maybe tryGetPipe() { - KJ_IF_SOME(locked, state.template tryGet()) { + KJ_IF_SOME(locked, state.template tryGetUnsafe()) { return locked; } return kj::none; @@ -209,7 +230,7 @@ bool ReadableLockImpl::lock() { return false; } - state.template init(); + state.template transitionTo(); return true; } @@ -226,11 +247,11 @@ bool ReadableLockImpl::lockReader(jsg::Lock& js, Controller& self, R if (self.state.template is()) { maybeResolvePromise(js, lock.getClosedFulfiller()); - } else KJ_IF_SOME(errored, self.state.template tryGet()) { + } else KJ_IF_SOME(errored, self.state.template tryGetUnsafe()) { maybeRejectPromise(js, lock.getClosedFulfiller(), errored.getHandle(js)); } - state = kj::mv(lock); + state.template transitionTo(kj::mv(lock)); reader.attach(self, kj::mv(prp.promise)); return true; } @@ -238,12 +259,13 @@ bool ReadableLockImpl::lockReader(jsg::Lock& js, Controller& self, R template void ReadableLockImpl::releaseReader( Controller& self, Reader& reader, kj::Maybe maybeJs) { - KJ_IF_SOME(locked, state.template tryGet()) { + KJ_IF_SOME(locked, state.template tryGetUnsafe()) { KJ_ASSERT(&locked.getReader() == &reader); KJ_IF_SOME(js, maybeJs) { auto reason = js.typeError("This ReadableStream reader has been released."_kj); KJ_SWITCH_ONEOF(self.state) { + KJ_CASE_ONEOF(initial, typename Controller::Initial) {} KJ_CASE_ONEOF(closed, StreamStates::Closed) {} KJ_CASE_ONEOF(errored, StreamStates::Errored) {} KJ_CASE_ONEOF(consumer, kj::Own) { @@ -267,7 +289,7 @@ void ReadableLockImpl::releaseReader( // state itself. Moving the lock above will free the lock state while keeping the // ReadableStream marked as locked. if (maybeJs != kj::none) { - state.template init(); + state.template transitionTo(); } } } @@ -278,8 +300,7 @@ kj::Maybe ReadableLockImpl(self); - return state.template get(); + return state.template transitionTo(self); } template @@ -296,45 +317,35 @@ void ReadableLockImpl::visitForGc(jsg::GcVisitor& visitor) { template void ReadableLockImpl::onClose(jsg::Lock& js) { - KJ_SWITCH_ONEOF(state) { - KJ_CASE_ONEOF(locked, ReaderLocked) { - try { - maybeResolvePromise(js, locked.getClosedFulfiller()); - } catch (jsg::JsExceptionThrown&) { - // Resolving the promise could end up throwing an exception in some cases, - // causing a jsg::JsExceptionThrown to be thrown. At this point, however, - // we are already in the process of closing the stream and an error at this - // point is not recoverable. Log and move on. - LOG_NOSENTRY(ERROR, "Error resolving ReadableStream reader closed promise"); - }; - } - KJ_CASE_ONEOF(locked, ReadableLockImpl::PipeLocked) { - state.template init(); - } - KJ_CASE_ONEOF(locked, Locked) {} - KJ_CASE_ONEOF(locked, Unlocked) {} + KJ_IF_SOME(locked, state.template tryGetUnsafe()) { + try { + maybeResolvePromise(js, locked.getClosedFulfiller()); + } catch (jsg::JsExceptionThrown&) { + // Resolving the promise could end up throwing an exception in some cases, + // causing a jsg::JsExceptionThrown to be thrown. At this point, however, + // we are already in the process of closing the stream and an error at this + // point is not recoverable. Log and move on. + LOG_NOSENTRY(ERROR, "Error resolving ReadableStream reader closed promise"); + }; + } else { + (void)state.template transitionFromTo(); } } template void ReadableLockImpl::onError(jsg::Lock& js, v8::Local reason) { - KJ_SWITCH_ONEOF(state) { - KJ_CASE_ONEOF(locked, ReaderLocked) { - try { - maybeRejectPromise(js, locked.getClosedFulfiller(), reason); - } catch (jsg::JsExceptionThrown&) { - // Rejecting the promise could end up throwing an exception in some cases, - // causing a jsg::JsExceptionThrown to be thrown. At this point, however, - // we are already in the process of closing the stream and an error at this - // point is not recoverable. Log and move on. - LOG_NOSENTRY(ERROR, "Error rejecting ReadableStream reader closed promise"); - } - } - KJ_CASE_ONEOF(locked, ReadableLockImpl::PipeLocked) { - state.template init(); + KJ_IF_SOME(locked, state.template tryGetUnsafe()) { + try { + maybeRejectPromise(js, locked.getClosedFulfiller(), reason); + } catch (jsg::JsExceptionThrown&) { + // Rejecting the promise could end up throwing an exception in some cases, + // causing a jsg::JsExceptionThrown to be thrown. At this point, however, + // we are already in the process of closing the stream and an error at this + // point is not recoverable. Log and move on. + LOG_NOSENTRY(ERROR, "Error rejecting ReadableStream reader closed promise"); } - KJ_CASE_ONEOF(locked, Locked) {} - KJ_CASE_ONEOF(locked, Unlocked) {} + } else { + (void)state.template transitionFromTo(); } } @@ -373,7 +384,7 @@ bool WritableLockImpl::lockWriter(jsg::Lock& js, Controller& self, W if (self.state.template is()) { maybeResolvePromise(js, lock.getClosedFulfiller()); maybeResolvePromise(js, lock.getReadyFulfiller()); - } else KJ_IF_SOME(errored, self.state.template tryGet()) { + } else KJ_IF_SOME(errored, self.state.template tryGetUnsafe()) { maybeRejectPromise(js, lock.getClosedFulfiller(), errored.getHandle(js)); maybeRejectPromise(js, lock.getReadyFulfiller(), errored.getHandle(js)); } else { @@ -382,7 +393,7 @@ bool WritableLockImpl::lockWriter(jsg::Lock& js, Controller& self, W } } - state = kj::mv(lock); + state.template transitionTo(kj::mv(lock)); writer.attach(js, self, kj::mv(closedPrp.promise), kj::mv(readyPrp.promise)); return true; } @@ -390,30 +401,32 @@ bool WritableLockImpl::lockWriter(jsg::Lock& js, Controller& self, W template void WritableLockImpl::releaseWriter( Controller& self, Writer& writer, kj::Maybe maybeJs) { - auto& locked = state.template get(); - KJ_ASSERT(&locked.getWriter() == &writer); - KJ_IF_SOME(js, maybeJs) { - KJ_SWITCH_ONEOF(self.state) { - KJ_CASE_ONEOF(closed, StreamStates::Closed) {} - KJ_CASE_ONEOF(errored, StreamStates::Errored) {} - KJ_CASE_ONEOF(controller, jsg::Ref) { - controller->cancelPendingWrites( - js, js.typeError("This WritableStream writer has been released."_kjc)); + KJ_IF_SOME(locked, state.template tryGetUnsafe()) { + KJ_ASSERT(&locked.getWriter() == &writer); + KJ_IF_SOME(js, maybeJs) { + KJ_SWITCH_ONEOF(self.state) { + KJ_CASE_ONEOF(initial, typename Controller::Initial) {} + KJ_CASE_ONEOF(closed, StreamStates::Closed) {} + KJ_CASE_ONEOF(errored, StreamStates::Errored) {} + KJ_CASE_ONEOF(controller, jsg::Ref) { + controller->cancelPendingWrites( + js, js.typeError("This WritableStream writer has been released."_kjc)); + } } - } - maybeRejectPromise(js, locked.getClosedFulfiller(), - js.v8TypeError("This WritableStream writer has been released."_kjc)); - } - locked.clear(); + maybeRejectPromise(js, locked.getClosedFulfiller(), + js.v8TypeError("This WritableStream writer has been released."_kjc)); + } + locked.clear(); - // When maybeJs is nullptr, that means releaseWriter was called when the writer is - // being deconstructed and not as the result of explicitly calling releaseLock and - // we do not have an isolate lock. In that case, we don't want to change the lock - // state itself. Moving the lock above will free the lock state while keeping the - // WritableStream marked as locked. - if (maybeJs != kj::none) { - state.template init(); + // When maybeJs is nullptr, that means releaseWriter was called when the writer is + // being deconstructed and not as the result of explicitly calling releaseLock and + // we do not have an isolate lock. In that case, we don't want to change the lock + // state itself. Moving the lock above will free the lock state while keeping the + // WritableStream marked as locked. + if (maybeJs != kj::none) { + state.template transitionTo(); + } } } @@ -426,7 +439,7 @@ bool WritableLockImpl::pipeLock( auto& sourceLock = KJ_ASSERT_NONNULL(source->getController().tryPipeLock()); - state.template init(PipeLocked{ + state.template transitionTo(PipeLocked{ .source = sourceLock, .readableStreamRef = kj::mv(source), .maybeSignal = kj::mv(options.signal), @@ -444,7 +457,7 @@ bool WritableLockImpl::pipeLock( template void WritableLockImpl::releasePipeLock() { if (state.template is()) { - state.template init(); + state.template transitionTo(); } } @@ -593,45 +606,47 @@ int getHighWaterMark( // It is possible for the controller state to be released synchronously while // we are in the middle of a read. When that happens we need to defer the actual // close/error state change until the read call is complete. deferControllerStateChange -// handles this for us by making sure that pendingReadCount is -// incremented until the read operation completes and deferring -// a state change until it is 0 +// handles this for us by using the state machine's operation tracking to defer +// pending close/error transitions until the read is complete. template jsg::Promise deferControllerStateChange(jsg::Lock& js, Controller& controller, kj::FunctionParam()> readCallback) { - bool decrementCount = true; + bool endOperation = true; // The readCallback and the controller.doClose(..) and controller.doError(...) // methods, as well as the methods can trigger JavaScript errors to be thrown // synchronously in some cases. We want to make sure non-fatal errors cause the // stream to error and only fatal cases bubble up. return js.tryCatch([&] { - controller.pendingReadCount++; + controller.state.beginOperation(); auto result = readCallback(); - decrementCount = false; - --controller.pendingReadCount; - - KJ_ASSERT(!js.v8Isolate->IsExecutionTerminating()); - - if (!controller.isReadPending()) { - KJ_IF_SOME(state, controller.maybePendingState) { - KJ_SWITCH_ONEOF(state) { - KJ_CASE_ONEOF(closed, StreamStates::Closed) { - controller.doClose(js); - } - KJ_CASE_ONEOF(errored, StreamStates::Errored) { - controller.doError(js, errored.getHandle(js)); + endOperation = false; + + // endOperation() will automatically apply any pending state if this was the last operation. + // Returns true if a pending state was applied. + if (controller.state.endOperation()) { + // A pending state was applied. Call the appropriate callback. + // Skip callbacks if execution is being terminated (e.g., CPU time limit) since we can't + // safely execute JavaScript in that state. + if (!js.v8Isolate->IsExecutionTerminating()) { + if (controller.state.template is()) { + controller.lock.onClose(js); + } else if (controller.state.template is()) { + KJ_IF_SOME(err, controller.state.template tryGetUnsafe()) { + controller.lock.onError(js, err.getHandle(js)); } } - controller.maybePendingState = kj::none; } } return kj::mv(result); }, [&](jsg::Value exception) -> jsg::Promise { - if (decrementCount) --controller.pendingReadCount; + if (endOperation) { + // Clear any pending state since we're erroring + controller.state.clearPendingState(); + (void)controller.state.endOperation(); + } controller.doError(js, exception.getHandle(js)); - controller.maybePendingState = kj::none; return js.rejectedPromise(kj::mv(exception)); }); } @@ -757,11 +772,33 @@ class ReadableStreamJsController final: public ReadableStreamController { kj::Maybe ioContext; kj::Maybe owner; - kj::OneOf ValueReadable or ByteReadable (setup() called) + // Initial -> Closed (constructed with Closed) + // Initial -> Errored (constructed with Errored) + // ValueReadable -> Closed (doClose() or cancel() called) + // ValueReadable -> Errored (doError() called) + // ByteReadable -> Closed (doClose() or cancel() called) + // ByteReadable -> Errored (doError() called) + // Note: No single ActiveState since there are two active variants. + // PendingStates allows Closed/Errored transitions to be deferred during reads. + using State = StateMachine, + ErrorState, + PendingStates, + Initial, + StreamStates::Closed, StreamStates::Errored, kj::Own, - kj::Own> - state = StreamStates::Closed(); + kj::Own>; + State state = State::create(); kj::Maybe expectedLength = kj::none; bool canceling = false; @@ -769,23 +806,11 @@ class ReadableStreamJsController final: public ReadableStreamController { // The lock state is separate because a closed or errored stream can still be locked. ReadableLockImpl lock; - size_t pendingReadCount = 0; - kj::Maybe> maybePendingState = kj::none; bool disturbed = false; template jsg::Promise readAll(jsg::Lock& js, uint64_t limit); - void setPendingState(kj::OneOf pending) { - if (maybePendingState == kj::none) { - maybePendingState = kj::mv(pending); - } - } - - bool isReadPending() const { - return pendingReadCount > 0; - } - friend ReadableLockImpl; friend ReadableLockImpl::PipeLocked; @@ -840,7 +865,7 @@ class WritableStreamJsController final: public WritableStreamController { bool isStarted(); inline bool isWritable() const { - return state.is(); + return state.isActive(); } bool lockWriter(jsg::Lock& js, Writer& writer) override; @@ -890,7 +915,25 @@ class WritableStreamJsController final: public WritableStreamController { kj::Maybe ioContext; kj::Maybe owner; - kj::OneOf state = StreamStates::Closed(); + + // Initial state before setup() is called. + struct Initial { + static constexpr kj::StringPtr NAME KJ_UNUSED = "initial"_kj; + }; + + // State machine for WritableStreamJsController: + // Initial is the default state before setup() is called + // Controller is the active state (stream is writable) + // Closed is terminal, Errored is implicitly terminal via ErrorState + using State = StateMachine, + ErrorState, + ActiveState, + Initial, + StreamStates::Closed, + StreamStates::Errored, + Controller>; + State state = State::create(); + WritableLockImpl lock; kj::Maybe> maybeAbortPromise; @@ -908,7 +951,7 @@ kj::Own newWritableStreamJsController() { template ReadableImpl::ReadableImpl( UnderlyingSource underlyingSource, StreamQueuingStrategy queuingStrategy) - : state(Queue(getHighWaterMark(underlyingSource, queuingStrategy))), + : state(State::template create(getHighWaterMark(underlyingSource, queuingStrategy))), algorithms(kj::mv(underlyingSource), kj::mv(queuingStrategy)) {} template @@ -941,70 +984,56 @@ void ReadableImpl::start(jsg::Lock& js, jsg::Ref self) { template size_t ReadableImpl::consumerCount() { - KJ_SWITCH_ONEOF(state) { - KJ_CASE_ONEOF(closed, StreamStates::Closed) { - return 0; - } - KJ_CASE_ONEOF(errored, StreamStates::Errored) { - return 0; - } - KJ_CASE_ONEOF(queue, Queue) { - return queue.getConsumerCount(); - } - } - KJ_UNREACHABLE; + return state.whenActiveOr([](Queue& q) { return q.getConsumerCount(); }, size_t{0}); } template jsg::Promise ReadableImpl::cancel( jsg::Lock& js, jsg::Ref self, v8::Local reason) { - KJ_SWITCH_ONEOF(state) { - KJ_CASE_ONEOF(closed, StreamStates::Closed) { - // We are already closed. There's nothing to cancel. - // This shouldn't happen but we handle the case anyway, just to be safe. - return js.resolvedPromise(); - } - KJ_CASE_ONEOF(errored, StreamStates::Errored) { - // We are already errored. There's nothing to cancel. - // This shouldn't happen but we handle the case anyway, just to be safe. - return js.rejectedPromise(errored.getHandle(js)); - } - KJ_CASE_ONEOF(queue, Queue) { - size_t consumerCount = queue.getConsumerCount(); - if (consumerCount > 1) { - // If there is more than 1 consumer, then we just return here with an - // immediately resolved promise. The consumer will remove itself, - // canceling its interest in the underlying source but we do not yet - // want to cancel the underlying source since there are still other - // consumers that want data. - return js.resolvedPromise(); - } + if (state.template is()) { + // We are already closed. There's nothing to cancel. + // This shouldn't happen but we handle the case anyway, just to be safe. + return js.resolvedPromise(); + } + KJ_IF_SOME(errored, state.template tryGetUnsafe()) { + // We are already errored. There's nothing to cancel. + // This shouldn't happen but we handle the case anyway, just to be safe. + return js.rejectedPromise(errored.getHandle(js)); + } - // Otherwise, there should be exactly one consumer at this point. - KJ_ASSERT(consumerCount == 1); - KJ_IF_SOME(pendingCancel, maybePendingCancel) { - // If we're already waiting for cancel to complete, just return the - // already existing pending promise. - // This shouldn't happen but we handle the case anyway, just to be safe. - return pendingCancel.promise.whenResolved(js); - } + auto& queue = state.template getUnsafe(); + size_t consumerCount = queue.getConsumerCount(); + if (consumerCount > 1) { + // If there is more than 1 consumer, then we just return here with an + // immediately resolved promise. The consumer will remove itself, + // canceling its interest in the underlying source but we do not yet + // want to cancel the underlying source since there are still other + // consumers that want data. + return js.resolvedPromise(); + } - auto prp = js.newPromiseAndResolver(); - maybePendingCancel = PendingCancel{ - .fulfiller = kj::mv(prp.resolver), - .promise = kj::mv(prp.promise), - }; - auto promise = KJ_ASSERT_NONNULL(maybePendingCancel).promise.whenResolved(js); - doCancel(js, kj::mv(self), reason); - return kj::mv(promise); - } + // Otherwise, there should be exactly one consumer at this point. + KJ_ASSERT(consumerCount == 1); + KJ_IF_SOME(pendingCancel, maybePendingCancel) { + // If we're already waiting for cancel to complete, just return the + // already existing pending promise. + // This shouldn't happen but we handle the case anyway, just to be safe. + return pendingCancel.promise.whenResolved(js); } - KJ_UNREACHABLE; + + auto prp = js.newPromiseAndResolver(); + maybePendingCancel = PendingCancel{ + .fulfiller = kj::mv(prp.resolver), + .promise = kj::mv(prp.promise), + }; + auto promise = KJ_ASSERT_NONNULL(maybePendingCancel).promise.whenResolved(js); + doCancel(js, kj::mv(self), reason); + return kj::mv(promise); } template bool ReadableImpl::canCloseOrEnqueue() { - return state.template is(); + return state.isActive(); } // doCancel() is triggered by cancel() being called, which is an explicit signal from @@ -1014,7 +1043,7 @@ bool ReadableImpl::canCloseOrEnqueue() { // and trigger the cancel algorithm. template void ReadableImpl::doCancel(jsg::Lock& js, jsg::Ref self, v8::Local reason) { - state.template init(); + state.template transitionTo(); auto onSuccess = JSG_VISITABLE_LAMBDA((this, self = self.addRef()), (self), (jsg::Lock& js) { doClose(js); @@ -1044,14 +1073,14 @@ template void ReadableImpl::enqueue(jsg::Lock& js, kj::Rc entry, jsg::Ref self) { JSG_REQUIRE(canCloseOrEnqueue(), TypeError, "This ReadableStream is closed."); KJ_DEFER(pullIfNeeded(js, kj::mv(self))); - auto& queue = state.template get(); + auto& queue = state.template getUnsafe(); queue.push(js, kj::mv(entry)); } template void ReadableImpl::close(jsg::Lock& js) { JSG_REQUIRE(canCloseOrEnqueue(), TypeError, "This ReadableStream is closed."); - auto& queue = state.template get(); + auto& queue = state.template getUnsafe(); if (queue.hasPartiallyFulfilledRead()) { auto error = @@ -1063,7 +1092,7 @@ void ReadableImpl::close(jsg::Lock& js) { queue.close(js); - state.template init(); + state.template transitionTo(); doClose(js); } @@ -1076,47 +1105,34 @@ void ReadableImpl::doClose(jsg::Lock& js) { template void ReadableImpl::doError(jsg::Lock& js, jsg::Value reason) { - KJ_SWITCH_ONEOF(state) { - KJ_CASE_ONEOF(closed, StreamStates::Closed) { - // We're already closed, so we really don't care if there was an error. Do nothing. - return; - } - KJ_CASE_ONEOF(errored, StreamStates::Errored) { - // We're already errored, so we really don't care if there was an error. Do nothing. - return; - } - KJ_CASE_ONEOF(queue, Queue) { - queue.error(js, reason.addRef(js)); - state = kj::mv(reason); - algorithms.clear(); - return; - } + // If already closed or errored, do nothing + if (state.isInactive()) { + return; } - KJ_UNREACHABLE; + + auto& queue = state.template getUnsafe(); + queue.error(js, reason.addRef(js)); + state.template transitionTo(kj::mv(reason)); + algorithms.clear(); } template kj::Maybe ReadableImpl::getDesiredSize() { - KJ_SWITCH_ONEOF(state) { - KJ_CASE_ONEOF(closed, StreamStates::Closed) { - return 0; - } - KJ_CASE_ONEOF(errored, StreamStates::Errored) { - return kj::none; - } - KJ_CASE_ONEOF(queue, Queue) { - return queue.desiredSize(); - } + if (state.template is()) { + return 0; } - KJ_UNREACHABLE; + if (state.template is()) { + return kj::none; + } + return state.template getUnsafe().desiredSize(); } // We should call pull if any of the consumers known to the queue have read requests or // we haven't yet signalled backpressure. template bool ReadableImpl::shouldCallPull() { - return canCloseOrEnqueue() && - (state.template get().wantsRead() || getDesiredSize().orDefault(0) > 0); + return state.whenActiveOr( + [this](Queue& q) { return q.wantsRead() || getDesiredSize().orDefault(0) > 0; }, false); } template @@ -1153,15 +1169,7 @@ void ReadableImpl::pullIfNeeded(jsg::Lock& js, jsg::Ref self) { template void ReadableImpl::visitForGc(jsg::GcVisitor& visitor) { - KJ_SWITCH_ONEOF(state) { - KJ_CASE_ONEOF(closed, StreamStates::Closed) {} - KJ_CASE_ONEOF(errored, StreamStates::Errored) { - visitor.visit(errored); - } - KJ_CASE_ONEOF(queue, Queue) { - visitor.visit(queue); - } - } + state.visitForGc(visitor); KJ_IF_SOME(pendingCancel, maybePendingCancel) { visitor.visit(pendingCancel.fulfiller, pendingCancel.promise); } @@ -1171,7 +1179,7 @@ void ReadableImpl::visitForGc(jsg::GcVisitor& visitor) { template kj::Own::Consumer> ReadableImpl::getConsumer( kj::Maybe::StateListener&> listener) { - auto& queue = state.template get(); + auto& queue = state.template getUnsafe(); return kj::heap::Consumer>(queue, listener); } @@ -1201,7 +1209,7 @@ jsg::Promise WritableImpl::abort( signal->triggerAbort(js, signalReason); // We have to check this again after the AbortSignal is triggered. - if (state.template is() || state.template is()) { + if (state.isTerminal()) { return js.resolvedPromise(); } @@ -1362,7 +1370,8 @@ void WritableImpl::doClose(jsg::Lock& js) { KJ_ASSERT(inFlightWrite == kj::none); KJ_ASSERT(maybePendingAbort == kj::none); KJ_ASSERT(writeRequests.empty()); - state.template init(); + // State should have already been transitioned to Closed + KJ_ASSERT(state.template is()); algorithms.clear(); KJ_IF_SOME(owner, tryGetOwner()) { @@ -1377,7 +1386,8 @@ void WritableImpl::doError(jsg::Lock& js, v8::Local reason) { KJ_ASSERT(inFlightWrite == kj::none); KJ_ASSERT(maybePendingAbort == kj::none); KJ_ASSERT(writeRequests.empty()); - state = js.v8Ref(reason); + // State should have already been transitioned to Errored + KJ_ASSERT(state.template is()); algorithms.clear(); KJ_IF_SOME(owner, tryGetOwner()) { @@ -1395,11 +1405,11 @@ void WritableImpl::error(jsg::Lock& js, jsg::Ref self, v8::Local void WritableImpl::finishErroring(jsg::Lock& js, jsg::Ref self) { - auto erroring = kj::mv(KJ_ASSERT_NONNULL(state.template tryGet())); + auto erroring = kj::mv(KJ_ASSERT_NONNULL(state.template tryGetUnsafe())); auto reason = erroring.reason.getHandle(js); KJ_ASSERT(inFlightWrite == kj::none); KJ_ASSERT(inFlightClose == kj::none); - state.template init(kj::mv(erroring.reason)); + state.template transitionTo(kj::mv(erroring.reason)); while (!writeRequests.empty()) { dequeueWriteRequest().resolver.reject(js, reason); @@ -1459,7 +1469,7 @@ void WritableImpl::finishInFlightClose( } KJ_ASSERT(maybePendingAbort == kj::none); - state.template init(); + state.template transitionTo(); doClose(js); } @@ -1487,7 +1497,8 @@ bool WritableImpl::isCloseQueuedOrInFlight() { template void WritableImpl::rejectCloseAndClosedPromiseIfNeeded(jsg::Lock& js) { algorithms.clear(); - auto reason = KJ_ASSERT_NONNULL(state.template tryGet()).getHandle(js); + auto reason = + KJ_ASSERT_NONNULL(state.template tryGetUnsafe()).getHandle(js); maybeRejectPromise(js, closeRequest, reason); PendingAbort::dequeue(maybePendingAbort); doError(js, reason); @@ -1557,7 +1568,7 @@ void WritableImpl::startErroring( KJ_IF_SOME(owner, tryGetOwner()) { owner.maybeRejectReadyPromise(js, reason); } - state.template init(js.v8Ref(reason)); + state.template transitionTo(js.v8Ref(reason)); if (inFlightWrite == kj::none && inFlightClose == kj::none && flags.started) { finishErroring(js, kj::mv(self)); } @@ -1593,7 +1604,7 @@ jsg::Promise WritableImpl::write( } } - KJ_IF_SOME(error, state.template tryGet()) { + KJ_IF_SOME(error, state.template tryGetUnsafe()) { return js.rejectedPromise(error.addRef(js)); } @@ -1601,7 +1612,7 @@ jsg::Promise WritableImpl::write( return js.rejectedPromise(js.v8TypeError("This ReadableStream is closed."_kj)); } - KJ_IF_SOME(erroring, state.template tryGet()) { + KJ_IF_SOME(erroring, state.template tryGetUnsafe()) { return js.rejectedPromise(erroring.reason.addRef(js)); } @@ -1622,16 +1633,7 @@ jsg::Promise WritableImpl::write( template void WritableImpl::visitForGc(jsg::GcVisitor& visitor) { - KJ_SWITCH_ONEOF(state) { - KJ_CASE_ONEOF(closed, StreamStates::Closed) {} - KJ_CASE_ONEOF(writable, Writable) {} - KJ_CASE_ONEOF(error, StreamStates::Errored) { - visitor.visit(error); - } - KJ_CASE_ONEOF(erroring, StreamStates::Erroring) { - visitor.visit(erroring.reason); - } - } + state.visitForGc(visitor); visitor.visit(inFlightWrite, inFlightClose, closeRequest, algorithms, signal); KJ_IF_SOME(pendingAbort, maybePendingAbort) { visitor.visit(*pendingAbort); @@ -1641,7 +1643,7 @@ void WritableImpl::visitForGc(jsg::GcVisitor& visitor) { template bool WritableImpl::isWritable() const { - return state.template is(); + return state.isActive(); } template @@ -1981,7 +1983,7 @@ ReadableStreamDefaultController::ReadableStreamDefaultController( kj::Maybe ReadableStreamDefaultController::getMaybeErrorState( jsg::Lock& js) { - KJ_IF_SOME(errored, impl.state.tryGet()) { + KJ_IF_SOME(errored, impl.state.tryGetUnsafe()) { return errored.addRef(js); } return kj::none; @@ -2265,7 +2267,7 @@ void ReadableByteStreamController::error(jsg::Lock& js, v8::Local rea kj::Maybe> ReadableByteStreamController::getByobRequest( jsg::Lock& js) { if (maybeByobRequest == kj::none) { - KJ_IF_SOME(queue, impl.state.tryGet()) { + KJ_IF_SOME(queue, impl.state.tryGetUnsafe()) { KJ_IF_SOME(pendingByob, queue.nextPendingByobReadRequest()) { maybeByobRequest = js.alloc(js, kj::mv(pendingByob), weakSelf.addRef()); @@ -2296,20 +2298,24 @@ kj::Own ReadableByteStreamController::getConsumer( ReadableStreamJsController::ReadableStreamJsController(): ioContext(tryGetIoContext()) {} ReadableStreamJsController::ReadableStreamJsController(StreamStates::Closed closed) - : ioContext(tryGetIoContext()), - state(closed) {} + : ioContext(tryGetIoContext()) { + state.transitionTo(); +} ReadableStreamJsController::ReadableStreamJsController(StreamStates::Errored errored) - : ioContext(tryGetIoContext()), - state(kj::mv(errored)) {} + : ioContext(tryGetIoContext()) { + state.transitionTo(kj::mv(errored)); +} ReadableStreamJsController::ReadableStreamJsController(jsg::Lock& js, ValueReadable& consumer) - : ioContext(tryGetIoContext()), - state(consumer.clone(js, *this)) {} + : ioContext(tryGetIoContext()) { + state.transitionTo>(consumer.clone(js, *this)); +} ReadableStreamJsController::ReadableStreamJsController(jsg::Lock& js, ByteReadable& consumer) - : ioContext(tryGetIoContext()), - state(consumer.clone(js, *this)) {} + : ioContext(tryGetIoContext()) { + state.transitionTo>(consumer.clone(js, *this)); +} jsg::Ref ReadableStreamJsController::addRef() { return KJ_REQUIRE_NONNULL(owner).addRef(); @@ -2325,18 +2331,19 @@ jsg::Promise ReadableStreamJsController::cancel( return consumer->cancel(js, reason.getHandle(js)); }; - KJ_IF_SOME(pendingState, maybePendingState) { - KJ_SWITCH_ONEOF(pendingState) { - KJ_CASE_ONEOF(closed, StreamStates::Closed) { - return js.resolvedPromise(); - } - KJ_CASE_ONEOF(errored, StreamStates::Errored) { - return js.rejectedPromise(errored.addRef(js)); - } - } + // Check for pending state first (deferred close/error during a read operation) + if (state.pendingStateIs()) { + return js.resolvedPromise(); + } + KJ_IF_SOME(pendingError, state.tryGetPendingStateUnsafe()) { + return js.rejectedPromise(pendingError.addRef(js)); } KJ_SWITCH_ONEOF(state) { + KJ_CASE_ONEOF(initial, Initial) { + // Stream not yet set up, treat as closed. + return js.resolvedPromise(); + } KJ_CASE_ONEOF(closed, StreamStates::Closed) { return js.resolvedPromise(); } @@ -2365,12 +2372,16 @@ jsg::Promise ReadableStreamJsController::cancel( // ByteReadable in the state and changing that to closed. // We also clean up other state here. void ReadableStreamJsController::doClose(jsg::Lock& js) { - if (isReadPending()) { - setPendingState(StreamStates::Closed()); - } else { - state.init(); + // If already in a terminal state, nothing to do. + if (state.isTerminal()) return; + + // deferTransitionTo will defer if an operation is in progress, otherwise transition immediately. + // Returns true if transition happened immediately. + if (state.deferTransitionTo()) { lock.onClose(js); } + // If deferred, lock.onClose will be called when the pending state is applied + // via applyPendingState in deferControllerStateChange. } // As with doClose(), doError() finalizes the error state of this ReadableStream. @@ -2380,12 +2391,16 @@ void ReadableStreamJsController::doClose(jsg::Lock& js) { // or ByteReadable in the state and changing that to errored. // We also clean up other state here. void ReadableStreamJsController::doError(jsg::Lock& js, v8::Local reason) { - if (isReadPending()) { - setPendingState(js.v8Ref(reason)); - } else { - state.init(js.v8Ref(reason)); + // If already in a terminal state, nothing to do. + if (state.isTerminal()) return; + + // deferTransitionTo will defer if an operation is in progress, otherwise transition immediately. + // Returns true if transition happened immediately. + if (state.deferTransitionTo(js.v8Ref(reason))) { lock.onError(js, reason); } + // If deferred, lock.onError will be called when the pending state is applied + // via applyPendingState in deferControllerStateChange. } bool ReadableStreamJsController::isByteOriented() const { @@ -2393,17 +2408,14 @@ bool ReadableStreamJsController::isByteOriented() const { } bool ReadableStreamJsController::isClosedOrErrored() const { - if (maybePendingState != kj::none) { - return true; - } - return state.is() || state.is(); + // Check if we're in a terminal state or have one pending + return state.isTerminal() || state.hasPendingState(); } bool ReadableStreamJsController::isClosed() const { - KJ_IF_SOME(s, maybePendingState) { - return s.is(); - } - return state.is(); + // Check current state first, then pending state + if (state.is()) return true; + return state.pendingStateIs(); } bool ReadableStreamJsController::isDisturbed() { @@ -2449,17 +2461,12 @@ kj::Maybe> ReadableStreamJsController::read( js.v8TypeError("Unable to use a zero-length ArrayBuffer."_kj)); } - if (state.is() || maybePendingState != kj::none) { - KJ_IF_SOME(pendingState, maybePendingState) { - KJ_SWITCH_ONEOF(pendingState) { - KJ_CASE_ONEOF(closed, StreamStates::Closed) { - // Fall through to the BYOB read case below. - } - KJ_CASE_ONEOF(errored, StreamStates::Errored) { - return js.rejectedPromise(errored.addRef(js)); - } - } - } + // Check for pending error first (deferred error during a prior read operation) + KJ_IF_SOME(pendingError, state.tryGetPendingStateUnsafe()) { + return js.rejectedPromise(pendingError.addRef(js)); + } + + if (state.is() || state.pendingStateIs()) { // If it is a BYOB read, then the spec requires that we return an empty // view of the same type provided, that uses the same backing memory // as that provided, but with zero-length. @@ -2473,20 +2480,22 @@ kj::Maybe> ReadableStreamJsController::read( } } - KJ_IF_SOME(pendingState, maybePendingState) { - KJ_SWITCH_ONEOF(pendingState) { - KJ_CASE_ONEOF(closed, StreamStates::Closed) { - // The closed state for BYOB reads is handled in the maybeByobOptions check above. - KJ_ASSERT(maybeByobOptions == kj::none); - return js.resolvedPromise(ReadResult{.done = true}); - } - KJ_CASE_ONEOF(errored, StreamStates::Errored) { - return js.rejectedPromise(errored.addRef(js)); - } - } + // Check for pending state (deferred close/error during a prior read operation) + if (state.pendingStateIs()) { + // The closed state for BYOB reads is handled in the maybeByobOptions check above. + KJ_ASSERT(maybeByobOptions == kj::none); + return js.resolvedPromise(ReadResult{.done = true}); + } + KJ_IF_SOME(pendingError, state.tryGetPendingStateUnsafe()) { + return js.rejectedPromise(pendingError.addRef(js)); } KJ_SWITCH_ONEOF(state) { + KJ_CASE_ONEOF(initial, Initial) { + // Stream not yet set up, treat as closed. + KJ_ASSERT(maybeByobOptions == kj::none); + return js.resolvedPromise(ReadResult{.done = true}); + } KJ_CASE_ONEOF(closed, StreamStates::Closed) { // The closed state for BYOB reads is handled in the maybeByobOptions check above. KJ_ASSERT(maybeByobOptions == kj::none); @@ -2515,33 +2524,39 @@ void ReadableStreamJsController::releaseReader(Reader& reader, kj::Maybe(); + lock.state.transitionTo(); disturbed = true; // This will leave this stream locked, disturbed, and closed. - KJ_IF_SOME(pendingState, maybePendingState) { - KJ_SWITCH_ONEOF(pendingState) { - KJ_CASE_ONEOF(closed, StreamStates::Closed) { - return Tee{ - .branch1 = js.alloc( - kj::heap(StreamStates::Closed())), - .branch2 = js.alloc( - kj::heap(StreamStates::Closed())), - }; - } - KJ_CASE_ONEOF(errored, StreamStates::Errored) { - return Tee{ - .branch1 = - js.alloc(kj::heap(errored.addRef(js))), - .branch2 = - js.alloc(kj::heap(errored.addRef(js))), - }; - } - } + // Check for pending state first (deferred close/error during a prior read operation) + if (state.pendingStateIs()) { + return Tee{ + .branch1 = + js.alloc(kj::heap(StreamStates::Closed())), + .branch2 = + js.alloc(kj::heap(StreamStates::Closed())), + }; + } + KJ_IF_SOME(pendingError, state.tryGetPendingStateUnsafe()) { + return Tee{ + .branch1 = + js.alloc(kj::heap(pendingError.addRef(js))), + .branch2 = + js.alloc(kj::heap(pendingError.addRef(js))), + }; } KJ_SWITCH_ONEOF(state) { + KJ_CASE_ONEOF(initial, Initial) { + // Stream not yet set up, treat as closed. + return Tee{ + .branch1 = + js.alloc(kj::heap(StreamStates::Closed())), + .branch2 = + js.alloc(kj::heap(StreamStates::Closed())), + }; + } KJ_CASE_ONEOF(closed, StreamStates::Closed) { return Tee{ .branch1 = @@ -2559,7 +2574,7 @@ ReadableStreamController::Tee ReadableStreamJsController::tee(jsg::Lock& js) { }; } KJ_CASE_ONEOF(consumer, kj::Own) { - KJ_DEFER(state.init()); + KJ_DEFER(state.transitionTo()); // We create two additional streams that clone this stream's consumer state, // then close this stream's consumer. return Tee{ @@ -2568,7 +2583,7 @@ ReadableStreamController::Tee ReadableStreamJsController::tee(jsg::Lock& js) { }; } KJ_CASE_ONEOF(consumer, kj::Own) { - KJ_DEFER(state.init()); + KJ_DEFER(state.transitionTo()); // We create two additional streams that clone this stream's consumer state, // then close this stream's consumer. return Tee{ @@ -2607,18 +2622,20 @@ void ReadableStreamJsController::setup(jsg::Lock& js, // We account for the memory usage of the ByteReadable and its controller together because // their lifetimes are identical (in practice) and memory accounting itself has a memory // overhead. The same applies to ValueReadable below. - state = kj::heap(controller.addRef(), *this, autoAllocateChunkSize) - .attach(js.getExternalMemoryAdjustment( - sizeof(ByteReadable) + sizeof(ReadableByteStreamController))); + state.transitionTo>( + kj::heap(controller.addRef(), *this, autoAllocateChunkSize) + .attach(js.getExternalMemoryAdjustment( + sizeof(ByteReadable) + sizeof(ReadableByteStreamController)))); controller->start(js); } else { JSG_REQUIRE( type == "", TypeError, kj::str("\"", type, "\" is not a valid type of ReadableStream.")); auto controller = js.alloc( kj::mv(underlyingSource), kj::mv(queuingStrategy)); - state = kj::heap(controller.addRef(), *this) - .attach(js.getExternalMemoryAdjustment( - sizeof(ValueReadable) + sizeof(ReadableStreamDefaultController))); + state.transitionTo>( + kj::heap(controller.addRef(), *this) + .attach(js.getExternalMemoryAdjustment( + sizeof(ValueReadable) + sizeof(ReadableStreamDefaultController)))); controller->start(js); } } @@ -2628,16 +2645,16 @@ kj::Maybe ReadableStreamJsController: } void ReadableStreamJsController::visitForGc(jsg::GcVisitor& visitor) { - KJ_IF_SOME(pendingState, maybePendingState) { - KJ_SWITCH_ONEOF(pendingState) { - KJ_CASE_ONEOF(closed, StreamStates::Closed) {} - KJ_CASE_ONEOF(error, StreamStates::Errored) { - visitor.visit(error); - } - } + // Visit pending state if it's an error (Closed has no GC-traceable content) + KJ_IF_SOME(pendingError, state.tryGetPendingStateUnsafe()) { + visitor.visit(pendingError); } + // Note: We cannot use state.visitForGc(visitor) here because the state machine's + // visitForGc passes kj::Own& to visitor.visit(), but GcVisitor expects T& for + // types with visitForGc methods. We must dereference kj::Own manually. KJ_SWITCH_ONEOF(state) { + KJ_CASE_ONEOF(initial, Initial) {} KJ_CASE_ONEOF(closed, StreamStates::Closed) {} KJ_CASE_ONEOF(error, StreamStates::Errored) { visitor.visit(error); @@ -2650,14 +2667,18 @@ void ReadableStreamJsController::visitForGc(jsg::GcVisitor& visitor) { } } visitor.visit(lock); -}; +} kj::Maybe ReadableStreamJsController::getDesiredSize() { - if (maybePendingState != kj::none) { + // If there's a pending state transition, return none + if (state.hasPendingState()) { return kj::none; } KJ_SWITCH_ONEOF(state) { + KJ_CASE_ONEOF(initial, Initial) { + return kj::none; + } KJ_CASE_ONEOF(closed, StreamStates::Closed) { return kj::none; } @@ -2675,26 +2696,25 @@ kj::Maybe ReadableStreamJsController::getDesiredSize() { } kj::Maybe> ReadableStreamJsController::isErrored(jsg::Lock& js) { - KJ_IF_SOME(pendingState, maybePendingState) { - KJ_SWITCH_ONEOF(pendingState) { - KJ_CASE_ONEOF(closed, StreamStates::Closed) { - return kj::none; - } - KJ_CASE_ONEOF(error, StreamStates::Errored) { - return error.getHandle(js); - } - } + // Check for pending error first + KJ_IF_SOME(pendingError, state.tryGetPendingStateUnsafe()) { + return pendingError.getHandle(js); } - return state.tryGet().map( + // Pending Closed means not errored, so we can just check current state + return state.tryGetUnsafe().map( [&](jsg::Value& reason) { return reason.getHandle(js); }); } bool ReadableStreamJsController::canCloseOrEnqueue() { - if (maybePendingState != kj::none) { + // If there's a pending state transition, can't close or enqueue + if (state.hasPendingState()) { return false; } KJ_SWITCH_ONEOF(state) { + KJ_CASE_ONEOF(initial, Initial) { + return false; + } KJ_CASE_ONEOF(closed, StreamStates::Closed) { return false; } @@ -2720,10 +2740,14 @@ bool ReadableStreamJsController::hasBackpressure() { kj::Maybe> ReadableStreamJsController:: getController() { - if (maybePendingState != kj::none) { + // If there's a pending state transition, return none + if (state.hasPendingState()) { return kj::none; } KJ_SWITCH_ONEOF(state) { + KJ_CASE_ONEOF(initial, Initial) { + return kj::none; + } KJ_CASE_ONEOF(closed, StreamStates::Closed) { return kj::none; } @@ -2747,7 +2771,9 @@ class AllReader { public: using PartList = kj::Array>; - AllReader(jsg::Ref stream, uint64_t limit): state(kj::mv(stream)), limit(limit) {} + AllReader(jsg::Ref stream, uint64_t limit) + : state(State::create>(kj::mv(stream))), + limit(limit) {} KJ_DISALLOW_COPY_AND_MOVE(AllReader); jsg::Promise allBytes(jsg::Lock& js) { @@ -2776,19 +2802,20 @@ class AllReader { } void visitForGc(jsg::GcVisitor& visitor) { - KJ_SWITCH_ONEOF(state) { - KJ_CASE_ONEOF(closed, StreamStates::Closed) {} - KJ_CASE_ONEOF(errored, StreamStates::Errored) { - visitor.visit(errored); - } - KJ_CASE_ONEOF(readable, jsg::Ref) { - visitor.visit(readable); - } - } + state.visitForGc(visitor); } private: - kj::OneOf> state; + // State machine for AllReader: + // Closed is terminal, Errored is implicitly terminal via ErrorState. + // jsg::Ref is the active state (still reading). + using State = StateMachine, + ErrorState, + ActiveState>, + StreamStates::Closed, + StreamStates::Errored, + jsg::Ref>; + State state; uint64_t limit; kj::Vector parts; uint64_t runningTotal = 0; @@ -2809,7 +2836,7 @@ class AllReader { auto onSuccess = [this, &readable]( jsg::Lock& js, ReadResult result) -> jsg::Promise { if (result.done) { - state.template init(); + state.template transitionTo(); return loop(js); } @@ -2819,7 +2846,7 @@ class AllReader { if (!handle->IsArrayBufferView() && !handle->IsArrayBuffer()) { auto error = js.v8TypeError("This ReadableStream did not return bytes."); auto rs = kj::mv(readable); - state.template init(js.v8Ref(error)); + state.template transitionTo(js.v8Ref(error)); return rs->getController().cancel(js, error).then( js, [&](jsg::Lock& js) { return loop(js); }); } @@ -2834,7 +2861,7 @@ class AllReader { if ((runningTotal + bufferSource.size()) > limit) { auto error = js.v8TypeError("Memory limit exceeded before EOF."); auto rs = kj::mv(readable); - state.template init(js.v8Ref(error)); + state.template transitionTo(js.v8Ref(error)); return rs->getController().cancel(js, error).then( js, [&](jsg::Lock& js) { return loop(js); }); } @@ -2846,7 +2873,7 @@ class AllReader { auto onFailure = [this](auto& js, jsg::Value exception) -> jsg::Promise { // In this case the stream should already be errored. - state.template init(js.v8Ref(exception.getHandle(js))); + state.template transitionTo(js.v8Ref(exception.getHandle(js))); return loop(js); }; @@ -2870,7 +2897,7 @@ class PumpToReader { public: PumpToReader(jsg::Ref stream, kj::Own sink, bool end) : ioContext(IoContext::current()), - state(kj::mv(stream)), + state(State::create>(kj::mv(stream))), sink(kj::mv(sink)), self(kj::refcounted>(kj::Badge{}, *this)), end(end) {} @@ -2887,7 +2914,7 @@ class PumpToReader { KJ_SWITCH_ONEOF(state) { KJ_CASE_ONEOF(stream, jsg::Ref) { auto readable = stream.addRef(); - state.template init(); + state.template transitionTo(); // Ownership of readable passes into the pump loop... // Ownership of the sink remains with the PumpToReader... // The JS Promise loop uses an IoOwn wrapping a weak ref to the PumpToReader... @@ -2916,16 +2943,29 @@ class PumpToReader { } private: - struct Pumping {}; + struct Pumping { + static constexpr kj::StringPtr NAME KJ_UNUSED = "pumping"_kj; + }; IoContext& ioContext; - kj::OneOf> state; + + // State machine for PumpToReader: + // Closed and kj::Exception are terminal states (pump is done). + // jsg::Ref is the initial state (has stream to pump). + // Pumping is the active state (pump is in progress). + using State = StateMachine, + ErrorState, + Pumping, + StreamStates::Closed, + kj::Exception, + jsg::Ref>; + State state; kj::Own sink; kj::Own> self; kj::Canceler canceler; bool end; bool isErroredOrClosed() { - return state.template is() || state.template is(); + return state.isTerminal(); } jsg::Promise pumpLoop(jsg::Lock& js, @@ -3018,14 +3058,11 @@ class PumpToReader { if (byteStream) { jsg::BackingStore backing = bufferSource.detach(js); return backing.asArrayPtr().attach(kj::mv(backing)); - } else { - // We do not detach in this case because, as bad as an idea as it is, - // the stream spec does allow a single typedarray/arraybuffer instance - // to be queued multiple times when using value-oriented streams. - return bufferSource.asArrayPtr().attach(kj::mv(bufferSource)); } - - KJ_UNREACHABLE; + // We do not detach in this case because, as bad as an idea as it is, + // the stream spec does allow a single typedarray/arraybuffer instance + // to be queued multiple times when using value-oriented streams. + return bufferSource.asArrayPtr().attach(kj::mv(bufferSource)); }), [](auto& js, jsg::Value exception) mutable -> Result { return kj::mv(exception); }) .then(js, ioContext.addFunctor( JSG_VISITABLE_LAMBDA((readable = kj::mv(readable), pumpToReader = kj::mv(pumpToReader)), (readable), (jsg::Lock & js, Result result) mutable { @@ -3066,7 +3103,8 @@ class PumpToReader { // the PumpToReader is still alive. KJ_IF_SOME(exception, maybeException) { if (!reader.isErroredOrClosed()) { - reader.state.init(js.exceptionToKj(kj::mv(exception))); + reader.state.transitionTo( + js.exceptionToKj(kj::mv(exception))); } } else { // Else block to avert dangling else compiler warning. @@ -3090,7 +3128,7 @@ class PumpToReader { // If we got here, the read signaled that we're done. Close the reader and // pump one more time to shut things down. if (!reader.isErroredOrClosed()) { - reader.state.init(); + reader.state.transitionTo(); } } KJ_CASE_ONEOF(exception, jsg::Value) { @@ -3098,7 +3136,7 @@ class PumpToReader { // provided something other than bytes. Error the reader and pump one more // time to shut things down. if (!reader.isErroredOrClosed()) { - reader.state.init(js.exceptionToKj(kj::mv(exception))); + reader.state.transitionTo(js.exceptionToKj(kj::mv(exception))); } } } @@ -3181,6 +3219,15 @@ jsg::Promise ReadableStreamJsController::readAll(jsg::Lock& js, uint64_t limi }; KJ_SWITCH_ONEOF(state) { + KJ_CASE_ONEOF(initial, Initial) { + // Stream not yet set up, treat as closed. + if constexpr (kj::isSameType()) { + auto backing = jsg::BackingStore::alloc(js, 0); + return js.resolvedPromise(jsg::BufferSource(js, kj::mv(backing))); + } else { + return js.resolvedPromise(T()); + } + } KJ_CASE_ONEOF(closed, StreamStates::Closed) { if constexpr (kj::isSameType()) { auto backing = jsg::BackingStore::alloc(js, 0); @@ -3215,7 +3262,7 @@ kj::Own ReadableStreamJsController::detach( jsg::Lock& js, bool ignored /* unused */) { KJ_ASSERT(!isLockedToReader()); KJ_ASSERT(!isDisturbed()); - KJ_ASSERT(!isReadPending(), "Unable to detach with read pending"); + KJ_ASSERT(!state.hasOperationInProgress(), "Unable to detach with read pending"); auto controller = kj::heap(); controller->expectedLength = expectedLength; disturbed = true; @@ -3223,23 +3270,28 @@ kj::Own ReadableStreamJsController::detach( // Clones this streams state into a new ReadableStreamController, leaving this stream // locked, disturbed, and closed. + // The controller starts in Initial state by default, so we can use regular transitionTo. KJ_SWITCH_ONEOF(state) { + KJ_CASE_ONEOF(initial, Initial) { + // Still in initial state, transition to closed + controller->state.transitionTo(); + } KJ_CASE_ONEOF(closed, StreamStates::Closed) { - controller->state.init(); + controller->state.transitionTo(); } KJ_CASE_ONEOF(errored, StreamStates::Errored) { - controller->state.init(errored.addRef(js)); + controller->state.transitionTo(errored.addRef(js)); } KJ_CASE_ONEOF(readable, kj::Own) { KJ_ASSERT(lock.lock()); - controller->state = readable->clone(js, *controller); - state.init(); + controller->state.transitionTo>(readable->clone(js, *controller)); + state.transitionTo(); lock.onClose(js); } KJ_CASE_ONEOF(readable, kj::Own) { KJ_ASSERT(lock.lock()); - controller->state = readable->clone(js, *controller); - state.init(); + controller->state.transitionTo>(readable->clone(js, *controller)); + state.transitionTo(); lock.onClose(js); } } @@ -3270,6 +3322,10 @@ kj::Promise> ReadableStreamJsController::pumpTo( }; KJ_SWITCH_ONEOF(state) { + KJ_CASE_ONEOF(initial, Initial) { + // Stream not yet set up, treat as closed. + return addNoopDeferredProxy(sink->end().attach(kj::mv(sink))); + } KJ_CASE_ONEOF(closed, StreamStates::Closed) { return addNoopDeferredProxy(sink->end().attach(kj::mv(sink))); } @@ -3325,7 +3381,7 @@ jsg::Ref WritableStreamDefaultController::getSignal() { } kj::Maybe> WritableStreamDefaultController::isErroring(jsg::Lock& js) { - KJ_IF_SOME(erroring, impl.state.tryGet()) { + KJ_IF_SOME(erroring, impl.state.tryGetUnsafe()) { return erroring.reason.getHandle(js); } return kj::none; @@ -3359,11 +3415,12 @@ WritableStreamJsController::WritableStreamJsController(): ioContext(tryGetIoCont WritableStreamJsController::~WritableStreamJsController() noexcept(false) { // Clear algorithms to break circular references during destruction - KJ_IF_SOME(controller, state.tryGet()) { + KJ_IF_SOME(controller, state.tryGetUnsafe()) { controller->clearAlgorithms(); } - // Clear the state to break the circular reference to the controller - state = StreamStates::Closed(); + // Clear the state to break the circular reference to the controller. + // During destruction, we force the transition since the current state doesn't matter. + state.forceTransitionTo(); // Clear owner reference owner = kj::none; // Clear any pending abort promise @@ -3371,12 +3428,14 @@ WritableStreamJsController::~WritableStreamJsController() noexcept(false) { } WritableStreamJsController::WritableStreamJsController(StreamStates::Closed closed) - : ioContext(tryGetIoContext()), - state(closed) {} + : ioContext(tryGetIoContext()) { + state.transitionTo(); +} WritableStreamJsController::WritableStreamJsController(StreamStates::Errored errored) - : ioContext(tryGetIoContext()), - state(kj::mv(errored)) {} + : ioContext(tryGetIoContext()) { + state.transitionTo(kj::mv(errored)); +} jsg::Promise WritableStreamJsController::abort( jsg::Lock& js, jsg::Optional> reason) { @@ -3387,6 +3446,11 @@ jsg::Promise WritableStreamJsController::abort( return abortPromise.whenResolved(js); } KJ_SWITCH_ONEOF(state) { + KJ_CASE_ONEOF(initial, Initial) { + // Stream hasn't been set up yet - treat like closed for abort purposes + maybeAbortPromise = js.resolvedPromise(); + return KJ_ASSERT_NONNULL(maybeAbortPromise).whenResolved(js); + } KJ_CASE_ONEOF(closed, StreamStates::Closed) { maybeAbortPromise = js.resolvedPromise(); return KJ_ASSERT_NONNULL(maybeAbortPromise).whenResolved(js); @@ -3413,11 +3477,15 @@ bool WritableStreamJsController::isClosedOrClosing() { } bool WritableStreamJsController::isErrored() { - return state.is(); + return state.isErrored(); } jsg::Promise WritableStreamJsController::close(jsg::Lock& js, bool markAsHandled) { KJ_SWITCH_ONEOF(state) { + KJ_CASE_ONEOF(initial, Initial) { + return rejectedMaybeHandledPromise( + js, js.v8TypeError("This WritableStream has been closed."_kj), markAsHandled); + } KJ_CASE_ONEOF(closed, StreamStates::Closed) { return rejectedMaybeHandledPromise( js, js.v8TypeError("This WritableStream has been closed."_kj), markAsHandled); @@ -3437,37 +3505,54 @@ jsg::Promise WritableStreamJsController::close(jsg::Lock& js, bool markAsH } void WritableStreamJsController::doClose(jsg::Lock& js) { + // If already in a terminal state, nothing to do. + if (state.isTerminal()) return; + // Clear algorithms to break circular references before changing state - KJ_IF_SOME(controller, state.tryGet()) { + KJ_IF_SOME(controller, state.tryGetUnsafe()) { controller->clearAlgorithms(); } - state.init(); - KJ_IF_SOME(locked, lock.state.tryGet()) { + state.transitionTo(); + KJ_IF_SOME(locked, lock.state.tryGetUnsafe()) { maybeResolvePromise(js, locked.getClosedFulfiller()); maybeResolvePromise(js, locked.getReadyFulfiller()); - } else if (lock.state.tryGet() != kj::none) { - lock.state.init(); + } else { + (void)lock.state.transitionFromTo(); } } void WritableStreamJsController::doError(jsg::Lock& js, v8::Local reason) { + // If already in a terminal state, nothing to do. + if (state.isTerminal()) return; + // Clear algorithms to break circular references before changing state - KJ_IF_SOME(controller, state.tryGet()) { + KJ_IF_SOME(controller, state.tryGetUnsafe()) { controller->clearAlgorithms(); } - state.init(js.v8Ref(reason)); - KJ_IF_SOME(locked, lock.state.tryGet()) { + state.transitionTo(js.v8Ref(reason)); + KJ_IF_SOME(locked, lock.state.tryGetUnsafe()) { maybeRejectPromise(js, locked.getClosedFulfiller(), reason); maybeResolvePromise(js, locked.getReadyFulfiller()); - } else if (lock.state.tryGet() != kj::none) { - lock.state.init(); + } else KJ_IF_SOME(pipeLocked, lock.state.tryGetUnsafe()) { + // When the writable side of a pipe errors, we need to release the source stream. + // The pipeLoop may be waiting on a read from the source that will never complete, + // so we need to proactively release the source here. + if (!pipeLocked.flags.preventCancel) { + pipeLocked.source.release(js, reason); + } else { + pipeLocked.source.release(js); + } + lock.state.transitionTo(); } } kj::Maybe WritableStreamJsController::getDesiredSize() { KJ_SWITCH_ONEOF(state) { + KJ_CASE_ONEOF(initial, Initial) { + return 0; + } KJ_CASE_ONEOF(closed, StreamStates::Closed) { return 0; } @@ -3482,7 +3567,7 @@ kj::Maybe WritableStreamJsController::getDesiredSize() { } kj::Maybe> WritableStreamJsController::isErroring(jsg::Lock& js) { - KJ_IF_SOME(controller, state.tryGet()) { + KJ_IF_SOME(controller, state.tryGetUnsafe()) { return controller->isErroring(js); } return kj::none; @@ -3493,7 +3578,7 @@ bool WritableStreamDefaultController::isErroring() const { } kj::Maybe> WritableStreamJsController::isErroredOrErroring(jsg::Lock& js) { - KJ_IF_SOME(err, state.tryGet()) { + KJ_IF_SOME(err, state.tryGetErrorUnsafe()) { return err.getHandle(js); } return isErroring(js); @@ -3501,6 +3586,9 @@ kj::Maybe> WritableStreamJsController::isErroredOrErroring( bool WritableStreamJsController::isStarted() { KJ_SWITCH_ONEOF(state) { + KJ_CASE_ONEOF(initial, Initial) { + return false; + } KJ_CASE_ONEOF(closed, StreamStates::Closed) { return true; } @@ -3528,7 +3616,7 @@ bool WritableStreamJsController::lockWriter(jsg::Lock& js, Writer& writer) { void WritableStreamJsController::maybeRejectReadyPromise( jsg::Lock& js, v8::Local reason) { - KJ_IF_SOME(writerLock, lock.state.tryGet()) { + KJ_IF_SOME(writerLock, lock.state.tryGetUnsafe()) { if (writerLock.getReadyFulfiller() != kj::none) { maybeRejectPromise(js, writerLock.getReadyFulfiller(), reason); } else { @@ -3541,7 +3629,7 @@ void WritableStreamJsController::maybeRejectReadyPromise( } void WritableStreamJsController::maybeResolveReadyPromise(jsg::Lock& js) { - KJ_IF_SOME(writerLock, lock.state.tryGet()) { + KJ_IF_SOME(writerLock, lock.state.tryGetUnsafe()) { maybeResolvePromise(js, writerLock.getReadyFulfiller()); } } @@ -3576,10 +3664,12 @@ void WritableStreamJsController::setup(jsg::Lock& js, // We account for the memory usage of the WritableStreamDefaultController and AbortSignal together // because their lifetimes are identical and memory accounting itself has a memory overhead. - state = js.allocAccounted( + auto controller = js.allocAccounted( sizeof(WritableStreamDefaultController) + sizeof(AbortSignal), js, KJ_ASSERT_NONNULL(owner), js.alloc()); - state.get()->setup(js, kj::mv(underlyingSink), kj::mv(queuingStrategy)); + auto& controllerRef = *controller; + state.transitionTo(kj::mv(controller)); + controllerRef.setup(js, kj::mv(underlyingSink), kj::mv(queuingStrategy)); } kj::Maybe> WritableStreamJsController::tryPipeFrom( @@ -3636,7 +3726,7 @@ jsg::Promise WritableStreamJsController::pipeLoop(jsg::Lock& js) { return rejectedMaybeHandledPromise(js, errored, pipeThrough); } - KJ_IF_SOME(errored, state.tryGet()) { + KJ_IF_SOME(errored, state.tryGetUnsafe()) { lock.releasePipeLock(); auto reason = errored.getHandle(js); if (!preventCancel) { @@ -3692,8 +3782,8 @@ jsg::Promise WritableStreamJsController::pipeLoop(jsg::Lock& js) { // source (again, depending on options). If the write operation is successful, // we call pipeLoop again to move on to the next iteration. - auto onSuccess = JSG_VISITABLE_LAMBDA((this, ref = addRef(), preventCancel, pipeThrough, &source), - (ref), (jsg::Lock & js, ReadResult result)->jsg::Promise { + auto onSuccess = JSG_VISITABLE_LAMBDA((this, ref = addRef(), preventCancel, pipeThrough), (ref), + (jsg::Lock & js, ReadResult result)->jsg::Promise { auto maybePipeLock = lock.tryGetPipe(); if (maybePipeLock == kj::none) return js.resolvedPromise(); auto& pipeLock = KJ_REQUIRE_NONNULL(maybePipeLock); @@ -3715,15 +3805,17 @@ jsg::Promise WritableStreamJsController::pipeLoop(jsg::Lock& js) { } ); auto onFailure = JSG_VISITABLE_LAMBDA( - (ref=addRef(),&source, preventCancel, pipeThrough), + (this, ref=addRef(), preventCancel, pipeThrough), (ref) , (jsg::Lock& js, jsg::Value value) { - // The write failed. We handle it here because the pipe lock will have been released. + // The write failed. We need to release the source if the pipe lock still exists. auto reason = value.getHandle(js); - if (!preventCancel) { - source.release(js, reason); - } else { - source.release(js); - } + KJ_IF_SOME(pipeLock, lock.tryGetPipe()) { + if (!preventCancel) { + pipeLock.source.release(js, reason); + } else { + pipeLock.source.release(js); + } + } else {} // Trailing else() to squash compiler warning return rejectedMaybeHandledPromise(js, reason, pipeThrough); } ); @@ -3743,7 +3835,7 @@ jsg::Promise WritableStreamJsController::pipeLoop(jsg::Lock& js) { } void WritableStreamJsController::updateBackpressure(jsg::Lock& js, bool backpressure) { - KJ_IF_SOME(writerLock, lock.state.tryGet()) { + KJ_IF_SOME(writerLock, lock.state.tryGetUnsafe()) { if (backpressure) { // Per the spec, when backpressure is updated and is true, we replace the existing // ready promise on the writer with a new pending promise, regardless of whether @@ -3761,6 +3853,9 @@ void WritableStreamJsController::updateBackpressure(jsg::Lock& js, bool backpres jsg::Promise WritableStreamJsController::write( jsg::Lock& js, jsg::Optional> value) { KJ_SWITCH_ONEOF(state) { + KJ_CASE_ONEOF(initial, Initial) { + return js.rejectedPromise(js.v8TypeError("This WritableStream has been closed."_kj)); + } KJ_CASE_ONEOF(closed, StreamStates::Closed) { return js.rejectedPromise(js.v8TypeError("This WritableStream has been closed."_kj)); } @@ -3775,15 +3870,7 @@ jsg::Promise WritableStreamJsController::write( } void WritableStreamJsController::visitForGc(jsg::GcVisitor& visitor) { - KJ_SWITCH_ONEOF(state) { - KJ_CASE_ONEOF(closed, StreamStates::Closed) {} - KJ_CASE_ONEOF(error, StreamStates::Errored) { - visitor.visit(error); - } - KJ_CASE_ONEOF(controller, Controller) { - visitor.visit(controller); - } - } + state.visitForGc(visitor); visitor.visit(maybeAbortPromise, lock); } @@ -4125,6 +4212,7 @@ size_t WritableStreamJsController::jsgGetMemorySelfSize() const { void WritableStreamJsController::jsgGetMemoryInfo(jsg::MemoryTracker& tracker) const { KJ_SWITCH_ONEOF(state) { + KJ_CASE_ONEOF(initial, Initial) {} KJ_CASE_ONEOF(closed, StreamStates::Closed) {} KJ_CASE_ONEOF(error, StreamStates::Errored) { tracker.trackField("error", error); @@ -4151,6 +4239,7 @@ size_t ReadableStreamJsController::jsgGetMemorySelfSize() const { void ReadableStreamJsController::jsgGetMemoryInfo(jsg::MemoryTracker& tracker) const { KJ_SWITCH_ONEOF(state) { + KJ_CASE_ONEOF(initial, Initial) {} KJ_CASE_ONEOF(closed, StreamStates::Closed) {} KJ_CASE_ONEOF(error, StreamStates::Errored) { tracker.trackField("error", error); @@ -4165,13 +4254,9 @@ void ReadableStreamJsController::jsgGetMemoryInfo(jsg::MemoryTracker& tracker) c tracker.trackField("lock", lock); - KJ_IF_SOME(pendingState, maybePendingState) { - KJ_SWITCH_ONEOF(pendingState) { - KJ_CASE_ONEOF(closed, StreamStates::Closed) {} - KJ_CASE_ONEOF(error, StreamStates::Errored) { - tracker.trackField("pendingError", error); - } - } + // Track pending error state if present (Closed has no trackable content) + KJ_IF_SOME(pendingError, state.tryGetPendingStateUnsafe()) { + tracker.trackField("pendingError", pendingError); } } diff --git a/src/workerd/api/streams/standard.h b/src/workerd/api/streams/standard.h index 8f99803ab7b..d010b412211 100644 --- a/src/workerd/api/streams/standard.h +++ b/src/workerd/api/streams/standard.h @@ -9,6 +9,7 @@ #include #include +#include #include namespace workerd::api { @@ -218,7 +219,18 @@ class ReadableImpl { using Queue = typename Self::QueueType; - kj::OneOf state; + // State machine for ReadableImpl: + // Queue is the active state where the stream can accept data + // Closed and Errored are terminal states (cannot transition back to Queue) + // Queue -> Closed (close() or doCancel() called) + // Queue -> Errored (doError() called) + using State = StateMachine, + ErrorState, + ActiveState, + StreamStates::Closed, + StreamStates::Errored, + Queue>; + State state; Algorithms algorithms; size_t highWaterMark = 1; @@ -357,7 +369,25 @@ class WritableImpl { } }; - struct Writable {}; + struct Writable { + static constexpr kj::StringPtr NAME KJ_UNUSED = "writable"_kj; + }; + + // State machine for WritableImpl: + // Writable is the active state where the stream can accept writes + // Erroring is a transitional state - waiting for in-flight ops before erroring + // Closed and Errored are terminal states + // Writable -> Erroring (startErroring() called) + // Writable -> Closed (finishInFlightClose() succeeds) + // Erroring -> Errored (finishErroring() called) + // Erroring -> Closed (finishInFlightClose() succeeds - close wins) + using State = StateMachine, + ErrorState, + ActiveState, + StreamStates::Closed, + StreamStates::Errored, + StreamStates::Erroring, + Writable>; // Sadly, we have to use a weak ref here rather than jsg::Ref. This is because // the jsg::Ref (via its internal WritableStreamJsController) @@ -367,8 +397,7 @@ class WritableImpl { // try tracing each other. kj::Maybe>> owner; jsg::Ref signal; - kj::OneOf state = - Writable(); + State state = State::template create(); Algorithms algorithms; size_t highWaterMark = 1; diff --git a/src/workerd/api/streams/streams-test.js b/src/workerd/api/streams/streams-test.js index 39385078218..af52994d3a1 100644 --- a/src/workerd/api/streams/streams-test.js +++ b/src/workerd/api/streams/streams-test.js @@ -285,3 +285,227 @@ export const inspect = { } }, }; + +// Test for re-entrancy bug: when pushing to multiple consumers (via tee), +// the transform function can directly cancel another consumer synchronously. +// This should not crash - the cancelled consumer should gracefully ignore the push. +// Before the fix, this would crash with: +// "expected state.template tryGet() != nullptr; The consumer is either closed or errored." +// +// This test simulates the production scenario where: +// 1. A TransformStream's readable is tee'd +// 2. The transform function synchronously cancels one of the tee branches +// 3. When enqueue is called, the push loop tries to push to the cancelled consumer +export const transformTeeReentrancySynchronousCancel = { + async test() { + let reader2; + let cancelledBranch2 = false; + + // Create a TransformStream whose transform function cancels branch2 + const ts = new TransformStream({ + transform(chunk, controller) { + // First time through, cancel branch2 BEFORE enqueueing + // This simulates the production scenario where user code in the + // transform function affects another consumer + if (!cancelledBranch2 && reader2) { + reader2.cancel('cancelled synchronously in transform'); + cancelledBranch2 = true; + } + controller.enqueue(chunk); + }, + }); + + const writer = ts.writable.getWriter(); + const [branch1, branch2] = ts.readable.tee(); + const reader1 = branch1.getReader(); + reader2 = branch2.getReader(); + + // Start pending reads on both branches + const read1Promise = reader1.read(); + const read2Promise = reader2.read(); + + // Write to the transform - this triggers the transform function which: + // 1. Cancels branch2 (closing/erroring its consumer) + // 2. Calls controller.enqueue() which pushes to all consumers + // Before the fix, step 2 would crash when trying to push to cancelled branch2 + await writer.write('test data'); + + // Verify branch1 got the data + const result1 = await read1Promise; + assert.strictEqual(result1.value, 'test data'); + + // branch2 was cancelled, so its read should complete (done or with data before cancel) + const result2 = await read2Promise; + assert.ok(result2 !== undefined); + + await writer.close(); + await reader1.cancel(); + }, +}; + +// Test with TransformStream to match the production stack trace more closely. +// The production bug occurred during: TransformStream → enqueue → QueueImpl::push → consumer iteration +export const transformStreamTeeReentrancy = { + async test() { + const { readable, writable } = new TransformStream(); + const writer = writable.getWriter(); + + const [branch1, branch2] = readable.tee(); + const reader1 = branch1.getReader(); + const reader2 = branch2.getReader(); + + // Start pending reads on both branches + const read1Promise = reader1.read(); + const read2Promise = reader2.read(); + + // When read1 resolves, cancel branch2 + // This simulates the production scenario where a .then() handler + // attached to the read promise cancels another branch + read1Promise.then(() => { + reader2.cancel('cancelled during transform'); + }); + + // Write through the transform - this triggers enqueue on the readable side. + // Before the fix, this would crash when the push loop tried to push to + // the now-cancelled branch2 consumer. + await writer.write('transform data'); + + // Verify read1 succeeded + const result1 = await read1Promise; + assert.strictEqual(result1.value, 'transform data'); + + // read2 may have received data or be done - either is fine + // The important thing is no crash occurred + const result2 = await read2Promise; + assert.ok(result2 !== undefined); + + await writer.close(); + await reader1.cancel(); + }, +}; + +// Test that multiple writes through a tee'd ReadableStream work correctly +// even when one branch is cancelled mid-stream +export const teeWithCancelMidStream = { + async test() { + let controller; + const stream = new ReadableStream({ + start(c) { + controller = c; + }, + }); + + const [branch1, branch2] = stream.tee(); + const reader1 = branch1.getReader(); + const reader2 = branch2.getReader(); + + // Start reads on both branches + let read1Promise = reader1.read(); + let read2Promise = reader2.read(); + + // Enqueue first chunk - both branches should get it + controller.enqueue('chunk1'); + const r1a = await read1Promise; + const r2a = await read2Promise; + assert.strictEqual(r1a.value, 'chunk1'); + assert.strictEqual(r2a.value, 'chunk1'); + + // Now cancel branch2 + await reader2.cancel('done with branch2'); + + // Start another read on branch1 + read1Promise = reader1.read(); + + // Enqueue second chunk - only branch1 should get it + // This should not crash even though branch2's consumer is now closed + controller.enqueue('chunk2'); + const r1b = await read1Promise; + assert.strictEqual(r1b.value, 'chunk2'); + + // Start another read and enqueue third chunk to confirm continued operation + read1Promise = reader1.read(); + controller.enqueue('chunk3'); + const r1c = await read1Promise; + assert.strictEqual(r1c.value, 'chunk3'); + + // Close and verify + read1Promise = reader1.read(); + controller.close(); + const r1d = await read1Promise; + assert.strictEqual(r1d.done, true); + }, +}; + +// ============================================================================ + +export const testCancelPipethrough = { + async test() { + const enc = new TextEncoder(); + const transform = new IdentityTransformStream(); + const rs = new ReadableStream({ + start(c) { + c.enqueue(enc.encode('hello')); + }, + }); + const readable = rs.pipeThrough(transform); + + const reader = readable.getReader(); + + assert.ok(rs.locked); + assert.ok(transform.writable.locked); + + reader.cancel(new Error('boom')); + reader.releaseLock(); + + // We've got to wait a tick to allow the cancel to propagate + await scheduler.wait(1); + + assert.ok(!rs.locked); + assert.ok(!transform.readable.locked); + assert.ok(!transform.writable.locked); + + // Our JavaScript ReadableStream should be closed (not errored). + // Cancel propagates back and closes the source stream. + const reader2 = rs.getReader(); + const result = await reader2.read(); + assert.ok(result.done); + assert.strictEqual(result.value, undefined); + }, +}; + +// Same as testCancelPipethrough but uses a JavaScript-backed TransformStream +// instead of IdentityTransformStream. The behavior should be the same. +export const testCancelPipethrough2 = { + async test() { + const enc = new TextEncoder(); + const transform = new TransformStream(); + const rs = new ReadableStream({ + start(c) { + c.enqueue(enc.encode('hello')); + }, + }); + const readable = rs.pipeThrough(transform); + + const reader = readable.getReader(); + + assert.ok(rs.locked); + assert.ok(transform.writable.locked); + + reader.cancel(new Error('boom')); + reader.releaseLock(); + + // We've got to wait a tick to allow the cancel to propagate + await scheduler.wait(1); + + assert.ok(!rs.locked); + assert.ok(!transform.readable.locked); + assert.ok(!transform.writable.locked); + + // Our JavaScript ReadableStream should be closed (not errored). + // Cancel propagates back and closes the source stream. + const reader2 = rs.getReader(); + const result = await reader2.read(); + assert.ok(result.done); + assert.strictEqual(result.value, undefined); + }, +}; diff --git a/src/workerd/api/streams/streams-test.wd-test b/src/workerd/api/streams/streams-test.wd-test index c16f026bef7..4fe8fd21b48 100644 --- a/src/workerd/api/streams/streams-test.wd-test +++ b/src/workerd/api/streams/streams-test.wd-test @@ -7,7 +7,7 @@ const unitTests :Workerd.Config = ( modules = [ (name = "worker", esModule = embed "streams-test.js") ], - compatibilityFlags = ["nodejs_compat", "streams_enable_constructors", "workers_api_getters_setters_on_prototype"], + compatibilityFlags = ["nodejs_compat", "streams_enable_constructors", "transformstream_enable_standard_constructor", "workers_api_getters_setters_on_prototype"], bindings = [ ( name = "KV", kvNamespace = "kv" ) ], ) ), diff --git a/src/workerd/api/streams/writable.c++ b/src/workerd/api/streams/writable.c++ index 524389df06c..d0d8eaa4d7b 100644 --- a/src/workerd/api/streams/writable.c++ +++ b/src/workerd/api/streams/writable.c++ @@ -10,11 +10,13 @@ namespace workerd::api { -WritableStreamDefaultWriter::WritableStreamDefaultWriter(): ioContext(tryGetIoContext()) {} +WritableStreamDefaultWriter::WritableStreamDefaultWriter() + : ioContext(tryGetIoContext()), + state(WriterState::create()) {} WritableStreamDefaultWriter::~WritableStreamDefaultWriter() noexcept(false) { - KJ_IF_SOME(stream, state.tryGet()) { - stream->getController().releaseWriter(*this, kj::none); + KJ_IF_SOME(attached, state.tryGetActiveUnsafe()) { + attached.stream->getController().releaseWriter(*this, kj::none); } } @@ -29,27 +31,21 @@ jsg::Ref WritableStreamDefaultWriter::constructor( jsg::Promise WritableStreamDefaultWriter::abort( jsg::Lock& js, jsg::Optional> reason) { - KJ_SWITCH_ONEOF(state) { - KJ_CASE_ONEOF(i, Initial) { - KJ_FAIL_ASSERT("this writer was never attached"); - } - KJ_CASE_ONEOF(stream, Attached) { - // In some edge cases, this writer is the last thing holding a strong - // reference to the stream. Calling abort can cause the writers strong - // reference to be cleared, so let's make sure we keep a reference to - // the stream at least until the call to abort completes. - auto ref = stream.addRef(); - return stream->getController().abort(js, reason); - } - KJ_CASE_ONEOF(r, Released) { - return js.rejectedPromise( - js.v8TypeError("This WritableStream writer has been released."_kj)); - } - KJ_CASE_ONEOF(c, StreamStates::Closed) { - return js.resolvedPromise(); - } + assertAttachedOrTerminal(); + if (state.is()) { + return js.rejectedPromise( + js.v8TypeError("This WritableStream writer has been released."_kj)); } - KJ_UNREACHABLE; + if (state.is()) { + return js.resolvedPromise(); + } + auto& attached = state.requireActiveUnsafe(); + // In some edge cases, this writer is the last thing holding a strong + // reference to the stream. Calling abort can cause the writers strong + // reference to be cleared, so let's make sure we keep a reference to + // the stream at least until the call to abort completes. + auto ref = attached.stream.addRef(); + return attached.stream->getController().abort(js, reason); } void WritableStreamDefaultWriter::attach(jsg::Lock& js, @@ -57,55 +53,35 @@ void WritableStreamDefaultWriter::attach(jsg::Lock& js, jsg::Promise closedPromise, jsg::Promise readyPromise) { KJ_ASSERT(state.is()); - state = controller.addRef(); + state.transitionTo(controller.addRef()); this->closedPromise = kj::mv(closedPromise); replaceReadyPromise(js, kj::mv(readyPromise)); } jsg::Promise WritableStreamDefaultWriter::close(jsg::Lock& js) { - KJ_SWITCH_ONEOF(state) { - KJ_CASE_ONEOF(i, Initial) { - KJ_FAIL_ASSERT("this writer was never attached"); - } - KJ_CASE_ONEOF(stream, Attached) { - // In some edge cases, this writer is the last thing holding a strong - // reference to the stream. Calling close can cause the writers strong - // reference to be cleared, so let's make sure we keep a reference to - // the stream at least until the call to close completes. - auto ref = stream.addRef(); - return stream->getController().close(js); - } - KJ_CASE_ONEOF(r, Released) { - return js.rejectedPromise( - js.v8TypeError("This WritableStream writer has been released."_kj)); - } - KJ_CASE_ONEOF(c, StreamStates::Closed) { - return js.rejectedPromise(js.v8TypeError("This WritableStream has been closed."_kj)); - } + assertAttachedOrTerminal(); + if (state.is()) { + return js.rejectedPromise( + js.v8TypeError("This WritableStream writer has been released."_kj)); } - KJ_UNREACHABLE; + if (state.is()) { + return js.rejectedPromise(js.v8TypeError("This WritableStream has been closed."_kj)); + } + auto& attached = state.requireActiveUnsafe(); + // In some edge cases, this writer is the last thing holding a strong + // reference to the stream. Calling close can cause the writers strong + // reference to be cleared, so let's make sure we keep a reference to + // the stream at least until the call to close completes. + auto ref = attached.stream.addRef(); + return attached.stream->getController().close(js); } void WritableStreamDefaultWriter::detach() { - KJ_SWITCH_ONEOF(state) { - KJ_CASE_ONEOF(i, Initial) { - // Do nothing in this case. - return; - } - KJ_CASE_ONEOF(stream, Attached) { - state.init(); - return; - } - KJ_CASE_ONEOF(c, StreamStates::Closed) { - // Do nothing in this case. - return; - } - KJ_CASE_ONEOF(r, Released) { - // Do nothing in this case. - return; - } + // Only transition from Attached to Closed. + // All other states (Initial, Closed, Released) are no-ops. + if (state.isActive()) { + state.transitionTo(); } - KJ_UNREACHABLE; } jsg::MemoizedIdentity>& WritableStreamDefaultWriter::getClosed() { @@ -113,21 +89,15 @@ jsg::MemoizedIdentity>& WritableStreamDefaultWriter::getClose } kj::Maybe WritableStreamDefaultWriter::getDesiredSize() { - KJ_SWITCH_ONEOF(state) { - KJ_CASE_ONEOF(i, Initial) { - KJ_FAIL_ASSERT("this writer was never attached"); - } - KJ_CASE_ONEOF(stream, Attached) { - return stream->getController().getDesiredSize(); - } - KJ_CASE_ONEOF(c, StreamStates::Closed) { - return 0; - } - KJ_CASE_ONEOF(r, Released) { - JSG_FAIL_REQUIRE(TypeError, "This WritableStream writer has been released."); - } + assertAttachedOrTerminal(); + if (state.is()) { + JSG_FAIL_REQUIRE(TypeError, "This WritableStream writer has been released."); + } + if (state.is()) { + return 0; } - KJ_UNREACHABLE; + auto& attached = state.requireActiveUnsafe(); + return attached.stream->getController().getDesiredSize(); } jsg::MemoizedIdentity>& WritableStreamDefaultWriter::getReady() { @@ -145,30 +115,17 @@ void WritableStreamDefaultWriter::lockToStream(jsg::Lock& js, WritableStream& st void WritableStreamDefaultWriter::releaseLock(jsg::Lock& js) { // TODO(soon): Releasing the lock should cancel any pending writes. - KJ_SWITCH_ONEOF(state) { - KJ_CASE_ONEOF(i, Initial) { - KJ_FAIL_ASSERT("this writer was never attached"); - } - KJ_CASE_ONEOF(stream, Attached) { - // In some edge cases, this writer is the last thing holding a strong - // reference to the stream. Calling releaseWriter can cause the writers - // strong reference to be cleared, so let's make sure we keep a reference - // to the stream at least until the call to releaseLock completes. - auto ref = stream.addRef(); - stream->getController().releaseWriter(*this, js); - state.init(); - return; - } - KJ_CASE_ONEOF(c, StreamStates::Closed) { - // Do nothing in this case - return; - } - KJ_CASE_ONEOF(r, Released) { - // Do nothing in this case - return; - } + assertAttachedOrTerminal(); + // Closed and Released states are no-ops. + KJ_IF_SOME(attached, state.tryGetActiveUnsafe()) { + // In some edge cases, this writer is the last thing holding a strong + // reference to the stream. Calling releaseWriter can cause the writers + // strong reference to be cleared, so let's make sure we keep a reference + // to the stream at least until the call to releaseLock completes. + auto ref = attached.stream.addRef(); + attached.stream->getController().releaseWriter(*this, js); + state.transitionTo(); } - KJ_UNREACHABLE; } void WritableStreamDefaultWriter::replaceReadyPromise( @@ -179,22 +136,16 @@ void WritableStreamDefaultWriter::replaceReadyPromise( jsg::Promise WritableStreamDefaultWriter::write( jsg::Lock& js, jsg::Optional> chunk) { - KJ_SWITCH_ONEOF(state) { - KJ_CASE_ONEOF(i, Initial) { - KJ_FAIL_ASSERT("this writer was never attached"); - } - KJ_CASE_ONEOF(stream, Attached) { - return stream->getController().write(js, kj::mv(chunk)); - } - KJ_CASE_ONEOF(r, Released) { - return js.rejectedPromise( - js.v8TypeError("This WritableStream writer has been released."_kj)); - } - KJ_CASE_ONEOF(c, StreamStates::Closed) { - return js.rejectedPromise(js.v8TypeError("This WritableStream has been closed."_kj)); - } + assertAttachedOrTerminal(); + if (state.is()) { + return js.rejectedPromise( + js.v8TypeError("This WritableStream writer has been released."_kj)); + } + if (state.is()) { + return js.rejectedPromise(js.v8TypeError("This WritableStream has been closed."_kj)); } - KJ_UNREACHABLE; + auto& attached = state.requireActiveUnsafe(); + return attached.stream->getController().write(js, chunk); } jsg::JsString WritableStream::inspectState(jsg::Lock& js) { @@ -214,8 +165,8 @@ bool WritableStream::inspectExpectsBytes() { } void WritableStreamDefaultWriter::visitForGc(jsg::GcVisitor& visitor) { - KJ_IF_SOME(writable, state.tryGet()) { - visitor.visit(writable); + KJ_IF_SOME(attached, state.tryGetActiveUnsafe()) { + visitor.visit(attached.stream); } visitor.visit(closedPromise, readyPromise); } @@ -632,8 +583,8 @@ jsg::Ref WritableStream::deserialize( } void WritableStreamDefaultWriter::visitForMemoryInfo(jsg::MemoryTracker& tracker) const { - KJ_IF_SOME(ref, state.tryGet()) { - tracker.trackField("attached", ref); + KJ_IF_SOME(attached, state.tryGetActiveUnsafe()) { + tracker.trackField("attached", attached.stream); } tracker.trackField("closedPromise", closedPromise); tracker.trackField("readyPromise", readyPromise); diff --git a/src/workerd/api/streams/writable.h b/src/workerd/api/streams/writable.h index bdf763ba7ae..6db12ef0257 100644 --- a/src/workerd/api/streams/writable.h +++ b/src/workerd/api/streams/writable.h @@ -6,6 +6,7 @@ #include "common.h" +#include #include namespace workerd::api { @@ -80,7 +81,9 @@ class WritableStreamDefaultWriter: public jsg::Object, public WritableStreamCont kj::Maybe> isReady(jsg::Lock& js); private: - struct Initial {}; + struct Initial { + static constexpr kj::StringPtr NAME KJ_UNUSED = "initial"_kj; + }; // While a Writer is attached to a WritableStream, it holds a strong reference to the // WritableStream to prevent it from being GC'ed so long as the Writer is available. // Once the writer is closed, released, or GC'ed the reference to the WritableStream @@ -88,11 +91,40 @@ class WritableStreamDefaultWriter: public jsg::Object, public WritableStreamCont // it being held anywhere. If the writer is still attached to the WritableStream when // it is destroyed, the WritableStream's reference to the writer is cleared but the // WritableStream remains in the "writer locked" state, per the spec. - using Attached = jsg::Ref; - struct Released {}; + struct Attached { + static constexpr kj::StringPtr NAME KJ_UNUSED = "attached"_kj; + jsg::Ref stream; + }; + // Released: The user explicitly called releaseLock() to detach the writer from the stream. + // The stream remains usable and can be locked by a new writer. + struct Released { + static constexpr kj::StringPtr NAME KJ_UNUSED = "released"_kj; + }; + // Closed: The underlying stream ended (closed or errored) while the writer was attached. + // The stream is no longer usable. + struct Closed { + static constexpr kj::StringPtr NAME KJ_UNUSED = "closed"_kj; + }; + + // State machine for WritableStreamDefaultWriter: + // Initial -> Attached (attach() called) + // Attached -> Closed (detach() called when stream closes) + // Attached -> Released (releaseLock() called) + // Closed and Released are terminal states. + // Initial is not terminal but most methods assert if called in this state. + using WriterState = StateMachine, + ActiveState, + Initial, + Attached, + Closed, + Released>; kj::Maybe ioContext; - kj::OneOf state = Initial(); + WriterState state; + + inline void assertAttachedOrTerminal() const { + KJ_ASSERT(!state.is(), "this writer was never attached"); + } kj::Maybe>> closedPromise; kj::Maybe>> readyPromise; diff --git a/src/workerd/api/tests/pipe-streams-test.js b/src/workerd/api/tests/pipe-streams-test.js index af103b7ca3e..393517c9e45 100644 --- a/src/workerd/api/tests/pipe-streams-test.js +++ b/src/workerd/api/tests/pipe-streams-test.js @@ -189,9 +189,12 @@ export const pipeThroughJsToInternalErroredDest = { ok(!transform.readable.locked); ok(!transform.writable.locked); - // Our JavaScript ReadableStream should no longer be usable. + // Our JavaScript ReadableStream should be closed (not errored). + // Cancel propagates back and closes the source stream. const reader2 = rs.getReader(); - await rejects(reader2.read(), { message: 'boom' }); + const result = await reader2.read(); + ok(result.done); + strictEqual(result.value, undefined); }, }; @@ -221,9 +224,12 @@ export const pipeToJsToInternalErroredDest = { ok(!readable.locked); ok(!writable.locked); - // Our JavaScript ReadableStream should no longer be usable. + // Our JavaScript ReadableStream should be closed (not errored). + // Cancel propagates back and closes the source stream. const reader2 = rs.getReader(); - await rejects(reader2.read(), { message: 'boom' }); + const result = await reader2.read(); + ok(result.done); + strictEqual(result.value, undefined); }, };