Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
346 changes: 341 additions & 5 deletions src/workerd/api/streams/identity-transform-stream.c++
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

#include "common.h"

#include <workerd/util/autogate.h>
#include <workerd/util/state-machine.h>

namespace workerd::api {

namespace {
Expand All @@ -10,6 +13,7 @@ namespace {
//
// This class is also used as the implementation of FixedLengthStream, in which case `limit` is
// non-nullptr.
// TODO(cleanup): Remove the old implementation once the autogate is fully rolled out.
class IdentityTransformStreamImpl final: public kj::Refcounted,
public ReadableStreamSource,
public WritableStreamSink {
Expand Down Expand Up @@ -84,8 +88,8 @@ class IdentityTransformStreamImpl final: public kj::Refcounted,

// HACK: If `output` is another TransformStream, we don't allow pumping to it, in order to
// guarantee that we can't create cycles.
JSG_REQUIRE(kj::dynamicDowncastIfAvailable<IdentityTransformStreamImpl>(output) == kj::none,
TypeError, "Inter-TransformStream ReadableStream.pipeTo() is not implemented.");
JSG_REQUIRE(!isIdentityTransformStream(output), TypeError,
"Inter-TransformStream ReadableStream.pipeTo() is not implemented.");

return ReadableStreamSource::pumpTo(output, end);
}
Expand Down Expand Up @@ -304,6 +308,336 @@ class IdentityTransformStreamImpl final: public kj::Refcounted,

kj::OneOf<Idle, ReadRequest, WriteRequest, kj::Exception, StreamStates::Closed> state = Idle();
};

// =======================================================================================
// V2 implementation using a StateMachine for the internal state management.
struct Idle {
static constexpr kj::StringPtr NAME KJ_UNUSED = "idle"_kj;
};

struct ReadRequest {
static constexpr kj::StringPtr NAME KJ_UNUSED = "read-request"_kj;
kj::ArrayPtr<kj::byte> bytes;
// WARNING: `bytes` may be invalid if fulfiller->isWaiting() returns false! (This indicates the
// read was canceled.)
kj::Own<kj::PromiseFulfiller<size_t>> fulfiller;
};

struct WriteRequest {
static constexpr kj::StringPtr NAME KJ_UNUSED = "write-request"_kj;
kj::ArrayPtr<const kj::byte> bytes;
kj::Own<kj::PromiseFulfiller<void>> fulfiller;
};

struct Closed {
static constexpr kj::StringPtr NAME KJ_UNUSED = "closed"_kj;
};

// State machine for IdentityTransformStream:
// Idle -> ReadRequest (read arrives when no write pending)
// Idle -> WriteRequest (write arrives when no read pending)
// Idle -> Closed (empty write = close)
// ReadRequest -> Idle (write fulfills read completely)
// WriteRequest -> Idle (read fulfills write completely)
// ReadRequest -> Closed (empty write closes while read pending)
// Any -> kj::Exception (cancel/abort)
// Closed -> kj::Exception (abort can force-transition a closed stream to error)
// Closed is terminal, kj::Exception is implicitly terminal via ErrorState.
// abort() uses forceTransitionTo to allow the exceptional Closed -> Exception transition.
using IdentityTransformState = StateMachine<TerminalStates<Closed>,
ErrorState<kj::Exception>,
Idle,
ReadRequest,
WriteRequest,
Closed,
kj::Exception>;

class IdentityTransformStreamImplV2 final: public kj::Refcounted,
public ReadableStreamSource,
public WritableStreamSink {
public:
// The limit is the maximum number of bytes that can be fed through the stream.
// If kj::none, there is no limit.
explicit IdentityTransformStreamImplV2(kj::Maybe<uint64_t> limit = kj::none)
: limit(limit),
state(IdentityTransformState::create<Idle>()) {}

~IdentityTransformStreamImplV2() noexcept(false) {
// Due to the different natures of JS and C++ disposal, there is no point in enforcing the limit
// for a FixedLengthStream here.
//
// 1. Creating but not using a `new FixedLengthStream(n)` should not be an error, and ought not
// to logspam us.
// 2. Chances are high that by the time this object gets destroyed, it's too late to tell the
// user about the failure.
}

// ReadableStreamSource implementation -------------------------------------------------

kj::Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
size_t total = 0;
while (total < minBytes) {
// TODO(perf): tryReadInternal was written assuming minBytes would always be 1 but we've now
// introduced an API for user to specify a larger minBytes. For now, this is implemented as a
// naive loop dispatching to the 1 byte version but would be better to bake it deeper into
// the implementation where it can be more efficient.
auto amount = co_await tryReadInternal(buffer, maxBytes);
KJ_ASSERT(amount <= maxBytes);
if (amount == 0) {
// EOF.
break;
}

total += amount;
buffer = reinterpret_cast<char*>(buffer) + amount;
maxBytes -= amount;
}

co_return total;
}

kj::Promise<size_t> tryReadInternal(void* buffer, size_t maxBytes) {
auto promise = readHelper(kj::arrayPtr(static_cast<kj::byte*>(buffer), maxBytes));

KJ_IF_SOME(l, limit) {
promise = promise.then([this, &l = l](size_t amount) -> kj::Promise<size_t> {
if (amount > l) {
auto exception = JSG_KJ_EXCEPTION(
FAILED, TypeError, "Attempt to write too many bytes through a FixedLengthStream.");
cancel(exception);
return kj::mv(exception);
} else if (amount == 0 && l != 0) {
auto exception = JSG_KJ_EXCEPTION(FAILED, TypeError,
"FixedLengthStream did not see all expected bytes before close().");
cancel(exception);
return kj::mv(exception);
}
l -= amount;
return amount;
});
}

return promise;
}

kj::Promise<DeferredProxy<void>> pumpTo(WritableStreamSink& output, bool end) override {
#ifdef KJ_NO_RTTI
// Yes, I'm paranoid.
static_assert(!KJ_NO_RTTI, "Need RTTI for correctness");
#endif

// HACK: If `output` is another TransformStream, we don't allow pumping to it, in order to
// guarantee that we can't create cycles.
JSG_REQUIRE(!isIdentityTransformStream(output), TypeError,
"Inter-TransformStream ReadableStream.pipeTo() is not implemented.");

return ReadableStreamSource::pumpTo(output, end);
}

kj::Maybe<uint64_t> tryGetLength(StreamEncoding encoding) override {
if (encoding == StreamEncoding::IDENTITY) {
return limit;
} else {
return kj::none;
}
}

void cancel(kj::Exception reason) override {
// Already errored - nothing to do.
if (state.isErrored()) return;

// Already closed by writable side - nothing to do.
if (state.is<Closed>()) return;

KJ_IF_SOME(request, state.tryGetUnsafe<ReadRequest>()) {
request.fulfiller->fulfill(static_cast<size_t>(0));
} else KJ_IF_SOME(request, state.tryGetUnsafe<WriteRequest>()) {
request.fulfiller->reject(kj::cp(reason));
}
// Idle state is fine, just transition to error.

state.forceTransitionTo<kj::Exception>(kj::mv(reason));

// TODO(conform): Proactively put WritableStream into Errored state.
}

// WritableStreamSink implementation ---------------------------------------------------

kj::Promise<void> write(kj::ArrayPtr<const byte> buffer) override {
if (buffer == nullptr) {
return kj::READY_NOW;
}
return writeHelper(buffer);
}

kj::Promise<void> write(kj::ArrayPtr<const kj::ArrayPtr<const kj::byte>> pieces) override {
KJ_UNIMPLEMENTED("IdentityTransformStreamImpl piecewise write() not currently supported");
// TODO(soon): This will be called by TeeBranch::pumpTo(). We disallow that anyway, since we
// disallow inter-TransformStream pumping.
}

kj::Promise<void> end() override {
// If we're already closed, there's nothing else we need to do here.
if (state.is<Closed>()) return kj::READY_NOW;

return writeHelper(kj::ArrayPtr<const kj::byte>());
}

void abort(kj::Exception reason) override {
// Already errored - nothing to do.
if (state.isErrored()) return;

KJ_IF_SOME(request, state.tryGetUnsafe<ReadRequest>()) {
request.fulfiller->reject(kj::cp(reason));
} else KJ_IF_SOME(request, state.tryGetUnsafe<WriteRequest>()) {
// If the fulfiller is not waiting, the write promise was already
// canceled and no one is waiting on it.
KJ_ASSERT(!request.fulfiller->isWaiting(),
"abort() is supposed to wait for any pending write() to finish");
}
// Idle and Closed states are fine, just transition to error.
// (Closed can transition to error via abort)

state.forceTransitionTo<kj::Exception>(kj::mv(reason));

// TODO(conform): Proactively put ReadableStream into Errored state.
}

private:
kj::Promise<size_t> readHelper(kj::ArrayPtr<kj::byte> bytes) {
// Handle error state first.
KJ_IF_SOME(exception, state.tryGetErrorUnsafe()) {
return kj::cp(exception);
}

// Handle closed state.
if (state.is<Closed>()) {
return static_cast<size_t>(0);
}

// Check for already in-flight read.
if (state.is<ReadRequest>()) {
KJ_FAIL_ASSERT("read operation already in flight");
}

// Check for pending write request.
KJ_IF_SOME(request, state.tryGetUnsafe<WriteRequest>()) {
if (bytes.size() >= request.bytes.size()) {
// The write buffer will entirely fit into our read buffer; fulfill both requests.
memcpy(bytes.begin(), request.bytes.begin(), request.bytes.size());
auto result = request.bytes.size();
request.fulfiller->fulfill();

// Switch to idle state.
state.transitionTo<Idle>();

return result;
}

// The write buffer won't quite fit into our read buffer; fulfill only the read request.
memcpy(bytes.begin(), request.bytes.begin(), bytes.size());
request.bytes = request.bytes.slice(bytes.size(), request.bytes.size());
return bytes.size();
}

// Must be idle - no outstanding write request, switch to ReadRequest state.
KJ_ASSERT(state.is<Idle>());
auto paf = kj::newPromiseAndFulfiller<size_t>();
state.transitionTo<ReadRequest>(bytes, kj::mv(paf.fulfiller));
return kj::mv(paf.promise);
}

kj::Promise<void> writeHelper(kj::ArrayPtr<const kj::byte> bytes) {
// Handle error state first.
KJ_IF_SOME(exception, state.tryGetErrorUnsafe()) {
return kj::cp(exception);
}

// Handle closed state.
if (state.is<Closed>()) {
KJ_FAIL_ASSERT("close operation already in flight");
}

// Check for already in-flight write.
if (state.is<WriteRequest>()) {
KJ_FAIL_ASSERT("write operation already in flight");
}

// Check for pending read request.
KJ_IF_SOME(request, state.tryGetUnsafe<ReadRequest>()) {
if (!request.fulfiller->isWaiting()) {
// Oops, the request was canceled. Currently, this happen in particular when pumping a
// response body to the client, and the client disconnects, cancelling the pump. In this
// specific case, we want to propagate the error back to the write end of the transform
// stream. In theory, though, there could be other cases where propagation is incorrect.
//
// TODO(cleanup): This cancellation should probably be handled at a higher level, e.g.
// in pumpTo(), but I need a quick fix.
state.forceTransitionTo<kj::Exception>(KJ_EXCEPTION(DISCONNECTED, "reader canceled"));

// I was going to use a `goto` but Harris choked on his bagel. Recursion it is.
return writeHelper(bytes);
}

if (bytes.size() == 0) {
// This is a close operation.
request.fulfiller->fulfill(static_cast<size_t>(0));
state.transitionTo<Closed>();
return kj::READY_NOW;
}

KJ_ASSERT(request.bytes.size() > 0);

if (request.bytes.size() >= bytes.size()) {
// Our write buffer will entirely fit into the read buffer; fulfill both requests.
memcpy(request.bytes.begin(), bytes.begin(), bytes.size());
request.fulfiller->fulfill(bytes.size());
state.transitionTo<Idle>();
return kj::READY_NOW;
}

// Our write buffer won't quite fit into the read buffer; fulfill only the read request.
memcpy(request.bytes.begin(), bytes.begin(), request.bytes.size());
bytes = bytes.slice(request.bytes.size(), bytes.size());
request.fulfiller->fulfill(request.bytes.size());

auto paf = kj::newPromiseAndFulfiller<void>();
state.transitionTo<WriteRequest>(bytes, kj::mv(paf.fulfiller));
return kj::mv(paf.promise);
}

// Must be idle.
KJ_ASSERT(state.is<Idle>());
if (bytes.size() == 0) {
// This is a close operation.
state.transitionTo<Closed>();
return kj::READY_NOW;
}

auto paf = kj::newPromiseAndFulfiller<void>();
state.transitionTo<WriteRequest>(bytes, kj::mv(paf.fulfiller));
return kj::mv(paf.promise);
}

kj::Maybe<uint64_t> limit;
IdentityTransformState state;
};

struct Pair {
kj::Own<ReadableStreamSource> readable;
kj::Own<WritableStreamSink> writable;
};
Pair newIdentityPair(kj::Maybe<uint64_t> expectedLength = kj::none) {
// TODO(cleanup): Remove the old implementation once the autogate is fully rolled out.
if (util::Autogate::isEnabled(util::AutogateKey::IDENTITY_TRANSFORM_STREAM_USE_STATE_MACHINE)) {
auto readableSide = kj::refcounted<IdentityTransformStreamImplV2>(kj::mv(expectedLength));
auto writableSide = kj::addRef(*readableSide);
return Pair{.readable = kj::mv(readableSide), .writable = kj::mv(writableSide)};
}
auto readableSide = kj::refcounted<IdentityTransformStreamImpl>(kj::mv(expectedLength));
auto writableSide = kj::addRef(*readableSide);
return Pair{.readable = kj::mv(readableSide), .writable = kj::mv(writableSide)};
}
} // namespace

jsg::Ref<IdentityTransformStream> IdentityTransformStream::constructor(
Expand Down Expand Up @@ -345,12 +679,14 @@ jsg::Ref<FixedLengthStream> FixedLengthStream::constructor(jsg::Lock& js,
}

OneWayPipe newIdentityPipe(kj::Maybe<uint64_t> expectedLength) {
auto readableSide = kj::refcounted<IdentityTransformStreamImpl>(expectedLength);
auto writableSide = kj::addRef(*readableSide);
return OneWayPipe{.in = kj::mv(readableSide), .out = kj::mv(writableSide)};
auto pair = newIdentityPair(kj::mv(expectedLength));
return OneWayPipe{.in = kj::mv(pair.readable), .out = kj::mv(pair.writable)};
}

bool isIdentityTransformStream(WritableStreamSink& sink) {
if (util::Autogate::isEnabled(util::AutogateKey::IDENTITY_TRANSFORM_STREAM_USE_STATE_MACHINE)) {
return kj::dynamicDowncastIfAvailable<IdentityTransformStreamImplV2>(sink) != kj::none;
}
return kj::dynamicDowncastIfAvailable<IdentityTransformStreamImpl>(sink) != kj::none;
}

Expand Down
6 changes: 6 additions & 0 deletions src/workerd/api/tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -586,3 +586,9 @@ wd_test(
args = ["--experimental"],
data = ["compression-streams-test.js"],
)

wd_test(
src = "identity-transform-stream-state-machine-test.wd-test",
args = ["--experimental"],
data = ["identity-transform-stream-state-machine-test.js"],
)
Loading