diff --git a/src/workerd/api/streams/identity-transform-stream.c++ b/src/workerd/api/streams/identity-transform-stream.c++ index 14c0ae9f88c..a6a5fd8ff89 100644 --- a/src/workerd/api/streams/identity-transform-stream.c++ +++ b/src/workerd/api/streams/identity-transform-stream.c++ @@ -2,6 +2,9 @@ #include "common.h" +#include +#include + namespace workerd::api { namespace { @@ -10,6 +13,7 @@ namespace { // // This class is also used as the implementation of FixedLengthStream, in which case `limit` is // non-nullptr. +// TODO(cleanup): Remove the old implementation once the autogate is fully rolled out. class IdentityTransformStreamImpl final: public kj::Refcounted, public ReadableStreamSource, public WritableStreamSink { @@ -84,8 +88,8 @@ class IdentityTransformStreamImpl final: public kj::Refcounted, // HACK: If `output` is another TransformStream, we don't allow pumping to it, in order to // guarantee that we can't create cycles. - JSG_REQUIRE(kj::dynamicDowncastIfAvailable(output) == kj::none, - TypeError, "Inter-TransformStream ReadableStream.pipeTo() is not implemented."); + JSG_REQUIRE(!isIdentityTransformStream(output), TypeError, + "Inter-TransformStream ReadableStream.pipeTo() is not implemented."); return ReadableStreamSource::pumpTo(output, end); } @@ -304,6 +308,336 @@ class IdentityTransformStreamImpl final: public kj::Refcounted, kj::OneOf state = Idle(); }; + +// ======================================================================================= +// V2 implementation using a StateMachine for the internal state management. +struct Idle { + static constexpr kj::StringPtr NAME KJ_UNUSED = "idle"_kj; +}; + +struct ReadRequest { + static constexpr kj::StringPtr NAME KJ_UNUSED = "read-request"_kj; + kj::ArrayPtr bytes; + // WARNING: `bytes` may be invalid if fulfiller->isWaiting() returns false! (This indicates the + // read was canceled.) + kj::Own> fulfiller; +}; + +struct WriteRequest { + static constexpr kj::StringPtr NAME KJ_UNUSED = "write-request"_kj; + kj::ArrayPtr bytes; + kj::Own> fulfiller; +}; + +struct Closed { + static constexpr kj::StringPtr NAME KJ_UNUSED = "closed"_kj; +}; + +// State machine for IdentityTransformStream: +// Idle -> ReadRequest (read arrives when no write pending) +// Idle -> WriteRequest (write arrives when no read pending) +// Idle -> Closed (empty write = close) +// ReadRequest -> Idle (write fulfills read completely) +// WriteRequest -> Idle (read fulfills write completely) +// ReadRequest -> Closed (empty write closes while read pending) +// Any -> kj::Exception (cancel/abort) +// Closed -> kj::Exception (abort can force-transition a closed stream to error) +// Closed is terminal, kj::Exception is implicitly terminal via ErrorState. +// abort() uses forceTransitionTo to allow the exceptional Closed -> Exception transition. +using IdentityTransformState = StateMachine, + ErrorState, + Idle, + ReadRequest, + WriteRequest, + Closed, + kj::Exception>; + +class IdentityTransformStreamImplV2 final: public kj::Refcounted, + public ReadableStreamSource, + public WritableStreamSink { + public: + // The limit is the maximum number of bytes that can be fed through the stream. + // If kj::none, there is no limit. + explicit IdentityTransformStreamImplV2(kj::Maybe limit = kj::none) + : limit(limit), + state(IdentityTransformState::create()) {} + + ~IdentityTransformStreamImplV2() noexcept(false) { + // Due to the different natures of JS and C++ disposal, there is no point in enforcing the limit + // for a FixedLengthStream here. + // + // 1. Creating but not using a `new FixedLengthStream(n)` should not be an error, and ought not + // to logspam us. + // 2. Chances are high that by the time this object gets destroyed, it's too late to tell the + // user about the failure. + } + + // ReadableStreamSource implementation ------------------------------------------------- + + kj::Promise tryRead(void* buffer, size_t minBytes, size_t maxBytes) override { + size_t total = 0; + while (total < minBytes) { + // TODO(perf): tryReadInternal was written assuming minBytes would always be 1 but we've now + // introduced an API for user to specify a larger minBytes. For now, this is implemented as a + // naive loop dispatching to the 1 byte version but would be better to bake it deeper into + // the implementation where it can be more efficient. + auto amount = co_await tryReadInternal(buffer, maxBytes); + KJ_ASSERT(amount <= maxBytes); + if (amount == 0) { + // EOF. + break; + } + + total += amount; + buffer = reinterpret_cast(buffer) + amount; + maxBytes -= amount; + } + + co_return total; + } + + kj::Promise tryReadInternal(void* buffer, size_t maxBytes) { + auto promise = readHelper(kj::arrayPtr(static_cast(buffer), maxBytes)); + + KJ_IF_SOME(l, limit) { + promise = promise.then([this, &l = l](size_t amount) -> kj::Promise { + if (amount > l) { + auto exception = JSG_KJ_EXCEPTION( + FAILED, TypeError, "Attempt to write too many bytes through a FixedLengthStream."); + cancel(exception); + return kj::mv(exception); + } else if (amount == 0 && l != 0) { + auto exception = JSG_KJ_EXCEPTION(FAILED, TypeError, + "FixedLengthStream did not see all expected bytes before close()."); + cancel(exception); + return kj::mv(exception); + } + l -= amount; + return amount; + }); + } + + return promise; + } + + kj::Promise> pumpTo(WritableStreamSink& output, bool end) override { +#ifdef KJ_NO_RTTI + // Yes, I'm paranoid. + static_assert(!KJ_NO_RTTI, "Need RTTI for correctness"); +#endif + + // HACK: If `output` is another TransformStream, we don't allow pumping to it, in order to + // guarantee that we can't create cycles. + JSG_REQUIRE(!isIdentityTransformStream(output), TypeError, + "Inter-TransformStream ReadableStream.pipeTo() is not implemented."); + + return ReadableStreamSource::pumpTo(output, end); + } + + kj::Maybe tryGetLength(StreamEncoding encoding) override { + if (encoding == StreamEncoding::IDENTITY) { + return limit; + } else { + return kj::none; + } + } + + void cancel(kj::Exception reason) override { + // Already errored - nothing to do. + if (state.isErrored()) return; + + // Already closed by writable side - nothing to do. + if (state.is()) return; + + KJ_IF_SOME(request, state.tryGetUnsafe()) { + request.fulfiller->fulfill(static_cast(0)); + } else KJ_IF_SOME(request, state.tryGetUnsafe()) { + request.fulfiller->reject(kj::cp(reason)); + } + // Idle state is fine, just transition to error. + + state.forceTransitionTo(kj::mv(reason)); + + // TODO(conform): Proactively put WritableStream into Errored state. + } + + // WritableStreamSink implementation --------------------------------------------------- + + kj::Promise write(kj::ArrayPtr buffer) override { + if (buffer == nullptr) { + return kj::READY_NOW; + } + return writeHelper(buffer); + } + + kj::Promise write(kj::ArrayPtr> pieces) override { + KJ_UNIMPLEMENTED("IdentityTransformStreamImpl piecewise write() not currently supported"); + // TODO(soon): This will be called by TeeBranch::pumpTo(). We disallow that anyway, since we + // disallow inter-TransformStream pumping. + } + + kj::Promise end() override { + // If we're already closed, there's nothing else we need to do here. + if (state.is()) return kj::READY_NOW; + + return writeHelper(kj::ArrayPtr()); + } + + void abort(kj::Exception reason) override { + // Already errored - nothing to do. + if (state.isErrored()) return; + + KJ_IF_SOME(request, state.tryGetUnsafe()) { + request.fulfiller->reject(kj::cp(reason)); + } else KJ_IF_SOME(request, state.tryGetUnsafe()) { + // If the fulfiller is not waiting, the write promise was already + // canceled and no one is waiting on it. + KJ_ASSERT(!request.fulfiller->isWaiting(), + "abort() is supposed to wait for any pending write() to finish"); + } + // Idle and Closed states are fine, just transition to error. + // (Closed can transition to error via abort) + + state.forceTransitionTo(kj::mv(reason)); + + // TODO(conform): Proactively put ReadableStream into Errored state. + } + + private: + kj::Promise readHelper(kj::ArrayPtr bytes) { + // Handle error state first. + KJ_IF_SOME(exception, state.tryGetErrorUnsafe()) { + return kj::cp(exception); + } + + // Handle closed state. + if (state.is()) { + return static_cast(0); + } + + // Check for already in-flight read. + if (state.is()) { + KJ_FAIL_ASSERT("read operation already in flight"); + } + + // Check for pending write request. + KJ_IF_SOME(request, state.tryGetUnsafe()) { + if (bytes.size() >= request.bytes.size()) { + // The write buffer will entirely fit into our read buffer; fulfill both requests. + memcpy(bytes.begin(), request.bytes.begin(), request.bytes.size()); + auto result = request.bytes.size(); + request.fulfiller->fulfill(); + + // Switch to idle state. + state.transitionTo(); + + return result; + } + + // The write buffer won't quite fit into our read buffer; fulfill only the read request. + memcpy(bytes.begin(), request.bytes.begin(), bytes.size()); + request.bytes = request.bytes.slice(bytes.size(), request.bytes.size()); + return bytes.size(); + } + + // Must be idle - no outstanding write request, switch to ReadRequest state. + KJ_ASSERT(state.is()); + auto paf = kj::newPromiseAndFulfiller(); + state.transitionTo(bytes, kj::mv(paf.fulfiller)); + return kj::mv(paf.promise); + } + + kj::Promise writeHelper(kj::ArrayPtr bytes) { + // Handle error state first. + KJ_IF_SOME(exception, state.tryGetErrorUnsafe()) { + return kj::cp(exception); + } + + // Handle closed state. + if (state.is()) { + KJ_FAIL_ASSERT("close operation already in flight"); + } + + // Check for already in-flight write. + if (state.is()) { + KJ_FAIL_ASSERT("write operation already in flight"); + } + + // Check for pending read request. + KJ_IF_SOME(request, state.tryGetUnsafe()) { + if (!request.fulfiller->isWaiting()) { + // Oops, the request was canceled. Currently, this happen in particular when pumping a + // response body to the client, and the client disconnects, cancelling the pump. In this + // specific case, we want to propagate the error back to the write end of the transform + // stream. In theory, though, there could be other cases where propagation is incorrect. + // + // TODO(cleanup): This cancellation should probably be handled at a higher level, e.g. + // in pumpTo(), but I need a quick fix. + state.forceTransitionTo(KJ_EXCEPTION(DISCONNECTED, "reader canceled")); + + // I was going to use a `goto` but Harris choked on his bagel. Recursion it is. + return writeHelper(bytes); + } + + if (bytes.size() == 0) { + // This is a close operation. + request.fulfiller->fulfill(static_cast(0)); + state.transitionTo(); + return kj::READY_NOW; + } + + KJ_ASSERT(request.bytes.size() > 0); + + if (request.bytes.size() >= bytes.size()) { + // Our write buffer will entirely fit into the read buffer; fulfill both requests. + memcpy(request.bytes.begin(), bytes.begin(), bytes.size()); + request.fulfiller->fulfill(bytes.size()); + state.transitionTo(); + return kj::READY_NOW; + } + + // Our write buffer won't quite fit into the read buffer; fulfill only the read request. + memcpy(request.bytes.begin(), bytes.begin(), request.bytes.size()); + bytes = bytes.slice(request.bytes.size(), bytes.size()); + request.fulfiller->fulfill(request.bytes.size()); + + auto paf = kj::newPromiseAndFulfiller(); + state.transitionTo(bytes, kj::mv(paf.fulfiller)); + return kj::mv(paf.promise); + } + + // Must be idle. + KJ_ASSERT(state.is()); + if (bytes.size() == 0) { + // This is a close operation. + state.transitionTo(); + return kj::READY_NOW; + } + + auto paf = kj::newPromiseAndFulfiller(); + state.transitionTo(bytes, kj::mv(paf.fulfiller)); + return kj::mv(paf.promise); + } + + kj::Maybe limit; + IdentityTransformState state; +}; + +struct Pair { + kj::Own readable; + kj::Own writable; +}; +Pair newIdentityPair(kj::Maybe expectedLength = kj::none) { + // TODO(cleanup): Remove the old implementation once the autogate is fully rolled out. + if (util::Autogate::isEnabled(util::AutogateKey::IDENTITY_TRANSFORM_STREAM_USE_STATE_MACHINE)) { + auto readableSide = kj::refcounted(kj::mv(expectedLength)); + auto writableSide = kj::addRef(*readableSide); + return Pair{.readable = kj::mv(readableSide), .writable = kj::mv(writableSide)}; + } + auto readableSide = kj::refcounted(kj::mv(expectedLength)); + auto writableSide = kj::addRef(*readableSide); + return Pair{.readable = kj::mv(readableSide), .writable = kj::mv(writableSide)}; +} } // namespace jsg::Ref IdentityTransformStream::constructor( @@ -345,12 +679,14 @@ jsg::Ref FixedLengthStream::constructor(jsg::Lock& js, } OneWayPipe newIdentityPipe(kj::Maybe expectedLength) { - auto readableSide = kj::refcounted(expectedLength); - auto writableSide = kj::addRef(*readableSide); - return OneWayPipe{.in = kj::mv(readableSide), .out = kj::mv(writableSide)}; + auto pair = newIdentityPair(kj::mv(expectedLength)); + return OneWayPipe{.in = kj::mv(pair.readable), .out = kj::mv(pair.writable)}; } bool isIdentityTransformStream(WritableStreamSink& sink) { + if (util::Autogate::isEnabled(util::AutogateKey::IDENTITY_TRANSFORM_STREAM_USE_STATE_MACHINE)) { + return kj::dynamicDowncastIfAvailable(sink) != kj::none; + } return kj::dynamicDowncastIfAvailable(sink) != kj::none; } diff --git a/src/workerd/api/tests/BUILD.bazel b/src/workerd/api/tests/BUILD.bazel index bd6b8c8a533..522e2203fe8 100644 --- a/src/workerd/api/tests/BUILD.bazel +++ b/src/workerd/api/tests/BUILD.bazel @@ -586,3 +586,9 @@ wd_test( args = ["--experimental"], data = ["compression-streams-test.js"], ) + +wd_test( + src = "identity-transform-stream-state-machine-test.wd-test", + args = ["--experimental"], + data = ["identity-transform-stream-state-machine-test.js"], +) diff --git a/src/workerd/api/tests/identity-transform-stream-state-machine-test.js b/src/workerd/api/tests/identity-transform-stream-state-machine-test.js new file mode 100644 index 00000000000..25f94b31190 --- /dev/null +++ b/src/workerd/api/tests/identity-transform-stream-state-machine-test.js @@ -0,0 +1,24 @@ +import { strictEqual } from 'node:assert'; + +// TODO(cleanup): This is a copy of an existing test in streams-test. Once the autogate is remvoed, +// this separate test can be deleted. +export const test = { + async test() { + const its = new IdentityTransformStream(); + const writer = its.writable.getWriter(); + const input = new TextEncoder().encode('Hello, world!'); + const writePromise = writer.write(input); + const closePromise = writer.close(); + const reader = its.readable.getReader(); + const { value, done } = await reader.read(); + strictEqual(done, false); + strictEqual(value instanceof Uint8Array, true); + strictEqual(value.length, input.length); + for (let i = 0; i < input.length; i++) { + strictEqual(value[i], input[i]); + } + await Promise.all([writePromise, closePromise]); + const { done: doneAfterClose } = await reader.read(); + strictEqual(doneAfterClose, true); + }, +}; diff --git a/src/workerd/api/tests/identity-transform-stream-state-machine-test.wd-test b/src/workerd/api/tests/identity-transform-stream-state-machine-test.wd-test new file mode 100644 index 00000000000..4b2440fddf5 --- /dev/null +++ b/src/workerd/api/tests/identity-transform-stream-state-machine-test.wd-test @@ -0,0 +1,18 @@ +using Workerd = import "/workerd/workerd.capnp"; + +# TODO(cleanup): This is a copy of an existing test in streams-test. Once the autogate is remvoed, +# this separate test can be deleted. +const unitTests :Workerd.Config = ( + autogates = ["workerd-autogate-identity-transform-stream-use-state-machine"], + services = [ + ( name = "identity-transform-stream-state-machine-test", + worker = ( + modules = [ + (name = "worker", esModule = embed "identity-transform-stream-state-machine-test.js") + ], + compatibilityDate = "2025-12-15", + compatibilityFlags = ["nodejs_compat"], + ) + ), + ], +); diff --git a/src/workerd/util/autogate.c++ b/src/workerd/util/autogate.c++ index 61e99e636d9..22983adb6ef 100644 --- a/src/workerd/util/autogate.c++ +++ b/src/workerd/util/autogate.c++ @@ -33,6 +33,8 @@ kj::StringPtr KJ_STRINGIFY(AutogateKey key) { return "rust-backed-node-dns"_kj; case AutogateKey::COMPRESSION_STREAM_USE_STATE_MACHINE: return "compression-stream-use-state-machine"_kj; + case AutogateKey::IDENTITY_TRANSFORM_STREAM_USE_STATE_MACHINE: + return "identity-transform-stream-use-state-machine"_kj; case AutogateKey::NumOfKeys: KJ_FAIL_ASSERT("NumOfKeys should not be used in getName"); } diff --git a/src/workerd/util/autogate.h b/src/workerd/util/autogate.h index 23f5109e21e..cdf2778c4b3 100644 --- a/src/workerd/util/autogate.h +++ b/src/workerd/util/autogate.h @@ -28,6 +28,8 @@ enum class AutogateKey { RUST_BACKED_NODE_DNS, // Switch the CompressionStream to use the new state machine-based impl COMPRESSION_STREAM_USE_STATE_MACHINE, + // Switch the IdentityTransformStream to use the new state machine-based impl + IDENTITY_TRANSFORM_STREAM_USE_STATE_MACHINE, NumOfKeys // Reserved for iteration. };