22
33#include " common.h"
44
5+ #include < workerd/util/autogate.h>
6+ #include < workerd/util/state-machine.h>
7+
58namespace workerd ::api {
69
710namespace {
@@ -10,6 +13,7 @@ namespace {
1013//
1114// This class is also used as the implementation of FixedLengthStream, in which case `limit` is
1215// non-nullptr.
16+ // TODO(cleanup): Remove the old implementation once the autogate is fully rolled out.
1317class IdentityTransformStreamImpl final : public kj::Refcounted,
1418 public ReadableStreamSource,
1519 public WritableStreamSink {
@@ -84,8 +88,8 @@ class IdentityTransformStreamImpl final: public kj::Refcounted,
8488
8589 // HACK: If `output` is another TransformStream, we don't allow pumping to it, in order to
8690 // guarantee that we can't create cycles.
87- JSG_REQUIRE (kj::dynamicDowncastIfAvailable<IdentityTransformStreamImpl> (output) == kj::none ,
88- TypeError, " Inter-TransformStream ReadableStream.pipeTo() is not implemented." );
91+ JSG_REQUIRE (! isIdentityTransformStream (output), TypeError ,
92+ " Inter-TransformStream ReadableStream.pipeTo() is not implemented." );
8993
9094 return ReadableStreamSource::pumpTo (output, end);
9195 }
@@ -304,6 +308,336 @@ class IdentityTransformStreamImpl final: public kj::Refcounted,
304308
305309 kj::OneOf<Idle, ReadRequest, WriteRequest, kj::Exception, StreamStates::Closed> state = Idle();
306310};
311+
312+ // =======================================================================================
313+ // V2 implementation using a StateMachine for the internal state management.
314+ struct Idle {
315+ static constexpr kj::StringPtr NAME KJ_UNUSED = " idle" _kj;
316+ };
317+
318+ struct ReadRequest {
319+ static constexpr kj::StringPtr NAME KJ_UNUSED = " read-request" _kj;
320+ kj::ArrayPtr<kj::byte> bytes;
321+ // WARNING: `bytes` may be invalid if fulfiller->isWaiting() returns false! (This indicates the
322+ // read was canceled.)
323+ kj::Own<kj::PromiseFulfiller<size_t >> fulfiller;
324+ };
325+
326+ struct WriteRequest {
327+ static constexpr kj::StringPtr NAME KJ_UNUSED = " write-request" _kj;
328+ kj::ArrayPtr<const kj::byte> bytes;
329+ kj::Own<kj::PromiseFulfiller<void >> fulfiller;
330+ };
331+
332+ struct Closed {
333+ static constexpr kj::StringPtr NAME KJ_UNUSED = " closed" _kj;
334+ };
335+
336+ // State machine for IdentityTransformStream:
337+ // Idle -> ReadRequest (read arrives when no write pending)
338+ // Idle -> WriteRequest (write arrives when no read pending)
339+ // Idle -> Closed (empty write = close)
340+ // ReadRequest -> Idle (write fulfills read completely)
341+ // WriteRequest -> Idle (read fulfills write completely)
342+ // ReadRequest -> Closed (empty write closes while read pending)
343+ // Any -> kj::Exception (cancel/abort)
344+ // Closed -> kj::Exception (abort can force-transition a closed stream to error)
345+ // Closed is terminal, kj::Exception is implicitly terminal via ErrorState.
346+ // abort() uses forceTransitionTo to allow the exceptional Closed -> Exception transition.
347+ using IdentityTransformState = StateMachine<TerminalStates<Closed>,
348+ ErrorState<kj::Exception>,
349+ Idle,
350+ ReadRequest,
351+ WriteRequest,
352+ Closed,
353+ kj::Exception>;
354+
355+ class IdentityTransformStreamImplV2 final : public kj::Refcounted,
356+ public ReadableStreamSource,
357+ public WritableStreamSink {
358+ public:
359+ // The limit is the maximum number of bytes that can be fed through the stream.
360+ // If kj::none, there is no limit.
361+ explicit IdentityTransformStreamImplV2 (kj::Maybe<uint64_t > limit = kj::none)
362+ : limit(limit),
363+ state(IdentityTransformState::create<Idle>()) {}
364+
365+ ~IdentityTransformStreamImplV2 () noexcept (false ) {
366+ // Due to the different natures of JS and C++ disposal, there is no point in enforcing the limit
367+ // for a FixedLengthStream here.
368+ //
369+ // 1. Creating but not using a `new FixedLengthStream(n)` should not be an error, and ought not
370+ // to logspam us.
371+ // 2. Chances are high that by the time this object gets destroyed, it's too late to tell the
372+ // user about the failure.
373+ }
374+
375+ // ReadableStreamSource implementation -------------------------------------------------
376+
377+ kj::Promise<size_t > tryRead (void * buffer, size_t minBytes, size_t maxBytes) override {
378+ size_t total = 0 ;
379+ while (total < minBytes) {
380+ // TODO(perf): tryReadInternal was written assuming minBytes would always be 1 but we've now
381+ // introduced an API for user to specify a larger minBytes. For now, this is implemented as a
382+ // naive loop dispatching to the 1 byte version but would be better to bake it deeper into
383+ // the implementation where it can be more efficient.
384+ auto amount = co_await tryReadInternal (buffer, maxBytes);
385+ KJ_ASSERT (amount <= maxBytes);
386+ if (amount == 0 ) {
387+ // EOF.
388+ break ;
389+ }
390+
391+ total += amount;
392+ buffer = reinterpret_cast <char *>(buffer) + amount;
393+ maxBytes -= amount;
394+ }
395+
396+ co_return total;
397+ }
398+
399+ kj::Promise<size_t > tryReadInternal (void * buffer, size_t maxBytes) {
400+ auto promise = readHelper (kj::arrayPtr (static_cast <kj::byte*>(buffer), maxBytes));
401+
402+ KJ_IF_SOME (l, limit) {
403+ promise = promise.then ([this , &l = l](size_t amount) -> kj::Promise<size_t > {
404+ if (amount > l) {
405+ auto exception = JSG_KJ_EXCEPTION (
406+ FAILED, TypeError, " Attempt to write too many bytes through a FixedLengthStream." );
407+ cancel (exception);
408+ return kj::mv (exception);
409+ } else if (amount == 0 && l != 0 ) {
410+ auto exception = JSG_KJ_EXCEPTION (FAILED, TypeError,
411+ " FixedLengthStream did not see all expected bytes before close()." );
412+ cancel (exception);
413+ return kj::mv (exception);
414+ }
415+ l -= amount;
416+ return amount;
417+ });
418+ }
419+
420+ return promise;
421+ }
422+
423+ kj::Promise<DeferredProxy<void >> pumpTo (WritableStreamSink& output, bool end) override {
424+ #ifdef KJ_NO_RTTI
425+ // Yes, I'm paranoid.
426+ static_assert (!KJ_NO_RTTI, " Need RTTI for correctness" );
427+ #endif
428+
429+ // HACK: If `output` is another TransformStream, we don't allow pumping to it, in order to
430+ // guarantee that we can't create cycles.
431+ JSG_REQUIRE (!isIdentityTransformStream (output), TypeError,
432+ " Inter-TransformStream ReadableStream.pipeTo() is not implemented." );
433+
434+ return ReadableStreamSource::pumpTo (output, end);
435+ }
436+
437+ kj::Maybe<uint64_t > tryGetLength (StreamEncoding encoding) override {
438+ if (encoding == StreamEncoding::IDENTITY) {
439+ return limit;
440+ } else {
441+ return kj::none;
442+ }
443+ }
444+
445+ void cancel (kj::Exception reason) override {
446+ // Already errored - nothing to do.
447+ if (state.isErrored ()) return ;
448+
449+ // Already closed by writable side - nothing to do.
450+ if (state.is <Closed>()) return ;
451+
452+ KJ_IF_SOME (request, state.tryGetUnsafe <ReadRequest>()) {
453+ request.fulfiller ->fulfill (static_cast <size_t >(0 ));
454+ } else KJ_IF_SOME (request, state.tryGetUnsafe <WriteRequest>()) {
455+ request.fulfiller ->reject (kj::cp (reason));
456+ }
457+ // Idle state is fine, just transition to error.
458+
459+ state.forceTransitionTo <kj::Exception>(kj::mv (reason));
460+
461+ // TODO(conform): Proactively put WritableStream into Errored state.
462+ }
463+
464+ // WritableStreamSink implementation ---------------------------------------------------
465+
466+ kj::Promise<void > write (kj::ArrayPtr<const byte> buffer) override {
467+ if (buffer == nullptr ) {
468+ return kj::READY_NOW;
469+ }
470+ return writeHelper (buffer);
471+ }
472+
473+ kj::Promise<void > write (kj::ArrayPtr<const kj::ArrayPtr<const kj::byte>> pieces) override {
474+ KJ_UNIMPLEMENTED (" IdentityTransformStreamImpl piecewise write() not currently supported" );
475+ // TODO(soon): This will be called by TeeBranch::pumpTo(). We disallow that anyway, since we
476+ // disallow inter-TransformStream pumping.
477+ }
478+
479+ kj::Promise<void > end () override {
480+ // If we're already closed, there's nothing else we need to do here.
481+ if (state.is <Closed>()) return kj::READY_NOW;
482+
483+ return writeHelper (kj::ArrayPtr<const kj::byte>());
484+ }
485+
486+ void abort (kj::Exception reason) override {
487+ // Already errored - nothing to do.
488+ if (state.isErrored ()) return ;
489+
490+ KJ_IF_SOME (request, state.tryGetUnsafe <ReadRequest>()) {
491+ request.fulfiller ->reject (kj::cp (reason));
492+ } else KJ_IF_SOME (request, state.tryGetUnsafe <WriteRequest>()) {
493+ // If the fulfiller is not waiting, the write promise was already
494+ // canceled and no one is waiting on it.
495+ KJ_ASSERT (!request.fulfiller ->isWaiting (),
496+ " abort() is supposed to wait for any pending write() to finish" );
497+ }
498+ // Idle and Closed states are fine, just transition to error.
499+ // (Closed can transition to error via abort)
500+
501+ state.forceTransitionTo <kj::Exception>(kj::mv (reason));
502+
503+ // TODO(conform): Proactively put ReadableStream into Errored state.
504+ }
505+
506+ private:
507+ kj::Promise<size_t > readHelper (kj::ArrayPtr<kj::byte> bytes) {
508+ // Handle error state first.
509+ KJ_IF_SOME (exception, state.tryGetErrorUnsafe ()) {
510+ return kj::cp (exception);
511+ }
512+
513+ // Handle closed state.
514+ if (state.is <Closed>()) {
515+ return static_cast <size_t >(0 );
516+ }
517+
518+ // Check for already in-flight read.
519+ if (state.is <ReadRequest>()) {
520+ KJ_FAIL_ASSERT (" read operation already in flight" );
521+ }
522+
523+ // Check for pending write request.
524+ KJ_IF_SOME (request, state.tryGetUnsafe <WriteRequest>()) {
525+ if (bytes.size () >= request.bytes .size ()) {
526+ // The write buffer will entirely fit into our read buffer; fulfill both requests.
527+ memcpy (bytes.begin (), request.bytes .begin (), request.bytes .size ());
528+ auto result = request.bytes .size ();
529+ request.fulfiller ->fulfill ();
530+
531+ // Switch to idle state.
532+ state.transitionTo <Idle>();
533+
534+ return result;
535+ }
536+
537+ // The write buffer won't quite fit into our read buffer; fulfill only the read request.
538+ memcpy (bytes.begin (), request.bytes .begin (), bytes.size ());
539+ request.bytes = request.bytes .slice (bytes.size (), request.bytes .size ());
540+ return bytes.size ();
541+ }
542+
543+ // Must be idle - no outstanding write request, switch to ReadRequest state.
544+ KJ_ASSERT (state.is <Idle>());
545+ auto paf = kj::newPromiseAndFulfiller<size_t >();
546+ state.transitionTo <ReadRequest>(bytes, kj::mv (paf.fulfiller ));
547+ return kj::mv (paf.promise );
548+ }
549+
550+ kj::Promise<void > writeHelper (kj::ArrayPtr<const kj::byte> bytes) {
551+ // Handle error state first.
552+ KJ_IF_SOME (exception, state.tryGetErrorUnsafe ()) {
553+ return kj::cp (exception);
554+ }
555+
556+ // Handle closed state.
557+ if (state.is <Closed>()) {
558+ KJ_FAIL_ASSERT (" close operation already in flight" );
559+ }
560+
561+ // Check for already in-flight write.
562+ if (state.is <WriteRequest>()) {
563+ KJ_FAIL_ASSERT (" write operation already in flight" );
564+ }
565+
566+ // Check for pending read request.
567+ KJ_IF_SOME (request, state.tryGetUnsafe <ReadRequest>()) {
568+ if (!request.fulfiller ->isWaiting ()) {
569+ // Oops, the request was canceled. Currently, this happen in particular when pumping a
570+ // response body to the client, and the client disconnects, cancelling the pump. In this
571+ // specific case, we want to propagate the error back to the write end of the transform
572+ // stream. In theory, though, there could be other cases where propagation is incorrect.
573+ //
574+ // TODO(cleanup): This cancellation should probably be handled at a higher level, e.g.
575+ // in pumpTo(), but I need a quick fix.
576+ state.forceTransitionTo <kj::Exception>(KJ_EXCEPTION (DISCONNECTED, " reader canceled" ));
577+
578+ // I was going to use a `goto` but Harris choked on his bagel. Recursion it is.
579+ return writeHelper (bytes);
580+ }
581+
582+ if (bytes.size () == 0 ) {
583+ // This is a close operation.
584+ request.fulfiller ->fulfill (static_cast <size_t >(0 ));
585+ state.transitionTo <Closed>();
586+ return kj::READY_NOW;
587+ }
588+
589+ KJ_ASSERT (request.bytes .size () > 0 );
590+
591+ if (request.bytes .size () >= bytes.size ()) {
592+ // Our write buffer will entirely fit into the read buffer; fulfill both requests.
593+ memcpy (request.bytes .begin (), bytes.begin (), bytes.size ());
594+ request.fulfiller ->fulfill (bytes.size ());
595+ state.transitionTo <Idle>();
596+ return kj::READY_NOW;
597+ }
598+
599+ // Our write buffer won't quite fit into the read buffer; fulfill only the read request.
600+ memcpy (request.bytes .begin (), bytes.begin (), request.bytes .size ());
601+ bytes = bytes.slice (request.bytes .size (), bytes.size ());
602+ request.fulfiller ->fulfill (request.bytes .size ());
603+
604+ auto paf = kj::newPromiseAndFulfiller<void >();
605+ state.transitionTo <WriteRequest>(bytes, kj::mv (paf.fulfiller ));
606+ return kj::mv (paf.promise );
607+ }
608+
609+ // Must be idle.
610+ KJ_ASSERT (state.is <Idle>());
611+ if (bytes.size () == 0 ) {
612+ // This is a close operation.
613+ state.transitionTo <Closed>();
614+ return kj::READY_NOW;
615+ }
616+
617+ auto paf = kj::newPromiseAndFulfiller<void >();
618+ state.transitionTo <WriteRequest>(bytes, kj::mv (paf.fulfiller ));
619+ return kj::mv (paf.promise );
620+ }
621+
622+ kj::Maybe<uint64_t > limit;
623+ IdentityTransformState state;
624+ };
625+
626+ struct Pair {
627+ kj::Own<ReadableStreamSource> readable;
628+ kj::Own<WritableStreamSink> writable;
629+ };
630+ Pair newIdentityPair (kj::Maybe<uint64_t > expectedLength = kj::none) {
631+ // TODO(cleanup): Remove the old implementation once the autogate is fully rolled out.
632+ if (util::Autogate::isEnabled (util::AutogateKey::IDENTITY_TRANSFORM_STREAM_USE_STATE_MACHINE)) {
633+ auto readableSide = kj::refcounted<IdentityTransformStreamImplV2>(kj::mv (expectedLength));
634+ auto writableSide = kj::addRef (*readableSide);
635+ return Pair{.readable = kj::mv (readableSide), .writable = kj::mv (writableSide)};
636+ }
637+ auto readableSide = kj::refcounted<IdentityTransformStreamImpl>(kj::mv (expectedLength));
638+ auto writableSide = kj::addRef (*readableSide);
639+ return Pair{.readable = kj::mv (readableSide), .writable = kj::mv (writableSide)};
640+ }
307641} // namespace
308642
309643jsg::Ref<IdentityTransformStream> IdentityTransformStream::constructor (
@@ -345,12 +679,14 @@ jsg::Ref<FixedLengthStream> FixedLengthStream::constructor(jsg::Lock& js,
345679}
346680
347681OneWayPipe newIdentityPipe (kj::Maybe<uint64_t > expectedLength) {
348- auto readableSide = kj::refcounted<IdentityTransformStreamImpl>(expectedLength);
349- auto writableSide = kj::addRef (*readableSide);
350- return OneWayPipe{.in = kj::mv (readableSide), .out = kj::mv (writableSide)};
682+ auto pair = newIdentityPair (kj::mv (expectedLength));
683+ return OneWayPipe{.in = kj::mv (pair.readable ), .out = kj::mv (pair.writable )};
351684}
352685
353686bool isIdentityTransformStream (WritableStreamSink& sink) {
687+ if (util::Autogate::isEnabled (util::AutogateKey::IDENTITY_TRANSFORM_STREAM_USE_STATE_MACHINE)) {
688+ return kj::dynamicDowncastIfAvailable<IdentityTransformStreamImplV2>(sink) != kj::none;
689+ }
354690 return kj::dynamicDowncastIfAvailable<IdentityTransformStreamImpl>(sink) != kj::none;
355691}
356692
0 commit comments