@@ -519,93 +519,6 @@ jsg::Optional<uint32_t> ByteLengthQueuingStrategy::size(
519519
520520namespace {
521521
522- // TODO(cleanup): These classes have been copied to external-pusher.c++. The copies here can be
523- // deleted as soon as we've switched from StreamSink to ExternalPusher and can delete all the
524- // StreamSink-related code. For now I'm not trying to avoid duplication.
525-
526- // HACK: We need as async pipe, like kj::newOneWayPipe(), except supporting explicit end(). So we
527- // wrap the two ends of the pipe in special adapters that track whether end() was called.
528- class ExplicitEndOutputPipeAdapter final : public capnp::ExplicitEndOutputStream {
529- public:
530- ExplicitEndOutputPipeAdapter (
531- kj::Own<kj::AsyncOutputStream> inner, kj::Own<kj::RefcountedWrapper<bool >> ended)
532- : inner(kj::mv(inner)),
533- ended (kj::mv(ended)) {}
534-
535- kj::Promise<void > write (kj::ArrayPtr<const byte> buffer) override {
536- return KJ_REQUIRE_NONNULL (inner)->write (buffer);
537- }
538- kj::Promise<void > write (kj::ArrayPtr<const kj::ArrayPtr<const byte>> pieces) override {
539- return KJ_REQUIRE_NONNULL (inner)->write (pieces);
540- }
541-
542- kj::Maybe<kj::Promise<uint64_t >> tryPumpFrom (
543- kj::AsyncInputStream& input, uint64_t amount) override {
544- return KJ_REQUIRE_NONNULL (inner)->tryPumpFrom (input, amount);
545- }
546-
547- kj::Promise<void > whenWriteDisconnected () override {
548- return KJ_REQUIRE_NONNULL (inner)->whenWriteDisconnected ();
549- }
550-
551- kj::Promise<void > end () override {
552- // Signal to the other side that end() was actually called.
553- ended->getWrapped () = true ;
554- inner = kj::none;
555- return kj::READY_NOW;
556- }
557-
558- private:
559- kj::Maybe<kj::Own<kj::AsyncOutputStream>> inner;
560- kj::Own<kj::RefcountedWrapper<bool >> ended;
561- };
562-
563- class ExplicitEndInputPipeAdapter final : public kj::AsyncInputStream {
564- public:
565- ExplicitEndInputPipeAdapter (kj::Own<kj::AsyncInputStream> inner,
566- kj::Own<kj::RefcountedWrapper<bool >> ended,
567- kj::Maybe<uint64_t > expectedLength)
568- : inner(kj::mv(inner)),
569- ended (kj::mv(ended)),
570- expectedLength(expectedLength) {}
571-
572- kj::Promise<size_t > tryRead (void * buffer, size_t minBytes, size_t maxBytes) override {
573- size_t result = co_await inner->tryRead (buffer, minBytes, maxBytes);
574-
575- KJ_IF_SOME (l, expectedLength) {
576- KJ_ASSERT (result <= l);
577- l -= result;
578- if (l == 0 ) {
579- // If we got all the bytes we expected, we treat this as a successful end, because the
580- // underlying KJ pipe is not actually going to wait for the other side to drop. This is
581- // consistent with the behavior of Content-Length in HTTP anyway.
582- ended->getWrapped () = true ;
583- }
584- }
585-
586- if (result < minBytes) {
587- // Verify that end() was called.
588- if (!ended->getWrapped ()) {
589- JSG_FAIL_REQUIRE (Error, " ReadableStream received over RPC disconnected prematurely." );
590- }
591- }
592- co_return result;
593- }
594-
595- kj::Maybe<uint64_t > tryGetLength () override {
596- return inner->tryGetLength ();
597- }
598-
599- kj::Promise<uint64_t > pumpTo (kj::AsyncOutputStream& output, uint64_t amount) override {
600- return inner->pumpTo (output, amount);
601- }
602-
603- private:
604- kj::Own<kj::AsyncInputStream> inner;
605- kj::Own<kj::RefcountedWrapper<bool >> ended;
606- kj::Maybe<uint64_t > expectedLength;
607- };
608-
609522// Wrapper around ReadableStreamSource that prevents deferred proxying. We need this for RPC
610523// streams because although they are "system streams", they become disconnected when the IoContext
611524// is destroyed, due to the JsRpcCustomEvent being canceled.
@@ -680,37 +593,21 @@ void ReadableStream::serialize(jsg::Lock& js, jsg::Serializer& serializer) {
680593 StreamEncoding encoding = controller.getPreferredEncoding ();
681594 auto expectedLength = controller.tryGetLength (encoding);
682595
683- capnp::ByteStream::Client streamCap = [&]() {
684- KJ_IF_SOME (pusher, externalHandler->getExternalPusher ()) {
685- auto req = pusher.pushByteStreamRequest (capnp::MessageSize{2 , 0 });
686- KJ_IF_SOME (el, expectedLength) {
687- req.setLengthPlusOne (el + 1 );
688- }
689- auto pipeline = req.sendForPipeline ();
690-
691- externalHandler->write ([encoding, expectedLength, source = pipeline.getSource ()](
692- rpc::JsValue::External::Builder builder) mutable {
693- auto rs = builder.initReadableStream ();
694- rs.setStream (kj::mv (source));
695- rs.setEncoding (encoding);
696- });
697-
698- return pipeline.getSink ();
699- } else {
700- return externalHandler
701- ->writeStream (
702- [encoding, expectedLength](rpc::JsValue::External::Builder builder) mutable {
703- auto rs = builder.initReadableStream ();
704- rs.setEncoding (encoding);
705- KJ_IF_SOME (l, expectedLength) {
706- rs.getExpectedLength ().setKnown (l);
707- }
708- }).castAs <capnp::ByteStream>();
709- }
710- }();
596+ auto req = externalHandler->getExternalPusher ().pushByteStreamRequest (capnp::MessageSize{2 , 0 });
597+ KJ_IF_SOME (el, expectedLength) {
598+ req.setLengthPlusOne (el + 1 );
599+ }
600+ auto pipeline = req.sendForPipeline ();
601+
602+ externalHandler->write ([encoding, expectedLength, source = pipeline.getSource ()](
603+ rpc::JsValue::External::Builder builder) mutable {
604+ auto rs = builder.initReadableStream ();
605+ rs.setStream (kj::mv (source));
606+ rs.setEncoding (encoding);
607+ });
711608
712609 kj::Own<capnp::ExplicitEndOutputStream> kjStream =
713- ioctx.getByteStreamFactory ().capnpToKjExplicitEnd (kj::mv (streamCap ));
610+ ioctx.getByteStreamFactory ().capnpToKjExplicitEnd (pipeline. getSink ( ));
714611
715612 auto sink = newSystemStream (kj::mv (kjStream), encoding, ioctx);
716613
@@ -741,25 +638,7 @@ jsg::Ref<ReadableStream> ReadableStream::deserialize(
741638
742639 auto & ioctx = IoContext::current ();
743640
744- kj::Own<kj::AsyncInputStream> in;
745- if (rs.hasStream ()) {
746- in = ioctx.getExternalPusher ()->unwrapStream (rs.getStream ());
747- } else {
748- kj::Maybe<uint64_t > expectedLength;
749- auto el = rs.getExpectedLength ();
750- if (el.isKnown ()) {
751- expectedLength = el.getKnown ();
752- }
753-
754- auto pipe = kj::newOneWayPipe (expectedLength);
755-
756- auto endedFlag = kj::refcounted<kj::RefcountedWrapper<bool >>(false );
757-
758- auto out = kj::heap<ExplicitEndOutputPipeAdapter>(kj::mv (pipe.out ), kj::addRef (*endedFlag));
759- in = kj::heap<ExplicitEndInputPipeAdapter>(kj::mv (pipe.in ), kj::mv (endedFlag), expectedLength);
760-
761- externalHandler->setLastStream (ioctx.getByteStreamFactory ().kjToCapnp (kj::mv (out)));
762- }
641+ auto in = ioctx.getExternalPusher ()->unwrapStream (rs.getStream ());
763642
764643 return js.alloc <ReadableStream>(ioctx,
765644 kj::heap<NoDeferredProxyReadableStream>(newSystemStream (kj::mv (in), encoding, ioctx), ioctx));
0 commit comments