Skip to content

Commit 349407a

Browse files
committed
WIP: Remove StreamSink
DO NOT MERGE until the autogate introduced in #5738 is fully rolled out and working. Until then I'm putting this up just to show what we'll be able to remove.
1 parent e7fe4d0 commit 349407a

File tree

8 files changed

+76
-612
lines changed

8 files changed

+76
-612
lines changed

src/workerd/api/basics.c++

Lines changed: 12 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -572,56 +572,6 @@ class AbortTriggerRpcClient final {
572572
rpc::AbortTrigger::Client client;
573573
};
574574

575-
namespace {
576-
// The jsrpc handler that receives aborts from the remote and triggers them locally
577-
//
578-
// TODO(cleanup): This class has been copied to external-pusher.c++. The copy here can be
579-
// deleted as soon as we've switched from StreamSink to ExternalPusher and can delete all the
580-
// StreamSink-related code. For now I'm not trying to avoid duplication.
581-
class AbortTriggerRpcServer final: public rpc::AbortTrigger::Server {
582-
public:
583-
AbortTriggerRpcServer(kj::Own<kj::PromiseFulfiller<void>> fulfiller,
584-
kj::Own<AbortSignal::PendingReason>&& pendingReason)
585-
: fulfiller(kj::mv(fulfiller)),
586-
pendingReason(kj::mv(pendingReason)) {}
587-
588-
kj::Promise<void> abort(AbortContext abortCtx) override {
589-
auto params = abortCtx.getParams();
590-
auto reason = params.getReason().getV8Serialized();
591-
592-
pendingReason->getWrapped() = kj::heapArray(reason.asBytes());
593-
fulfiller->fulfill();
594-
return kj::READY_NOW;
595-
}
596-
597-
kj::Promise<void> release(ReleaseContext releaseCtx) override {
598-
released = true;
599-
return kj::READY_NOW;
600-
}
601-
602-
~AbortTriggerRpcServer() noexcept(false) {
603-
if (pendingReason->getWrapped() != nullptr) {
604-
// Already triggered
605-
return;
606-
}
607-
608-
if (!released) {
609-
pendingReason->getWrapped() = JSG_KJ_EXCEPTION(FAILED, DOMAbortError,
610-
"An AbortSignal received over RPC was implicitly aborted because the connection back to "
611-
"its trigger was lost.");
612-
}
613-
614-
// Always fulfill the promise in case the AbortSignal was waiting
615-
fulfiller->fulfill();
616-
}
617-
618-
private:
619-
kj::Own<kj::PromiseFulfiller<void>> fulfiller;
620-
kj::Own<AbortSignal::PendingReason> pendingReason;
621-
bool released = false;
622-
};
623-
} // namespace
624-
625575
AbortSignal::AbortSignal(kj::Maybe<kj::Exception> exception,
626576
jsg::Optional<jsg::JsRef<jsg::JsValue>> maybeReason,
627577
Flag flag)
@@ -862,28 +812,19 @@ void AbortSignal::serialize(jsg::Lock& js, jsg::Serializer& serializer) {
862812
return;
863813
}
864814

865-
auto triggerCap = [&]() -> rpc::AbortTrigger::Client {
866-
KJ_IF_SOME(pusher, externalHandler->getExternalPusher()) {
867-
auto pipeline = pusher.pushAbortSignalRequest(capnp::MessageSize{2, 0}).sendForPipeline();
868-
869-
externalHandler->write(
870-
[signal = pipeline.getSignal()](rpc::JsValue::External::Builder builder) mutable {
871-
builder.setAbortSignal(kj::mv(signal));
872-
});
815+
auto pipeline = externalHandler->getExternalPusher()
816+
.pushAbortSignalRequest(capnp::MessageSize{2, 0})
817+
.sendForPipeline();
873818

874-
return pipeline.getTrigger();
875-
} else {
876-
return externalHandler
877-
->writeStream([&](rpc::JsValue::External::Builder builder) mutable {
878-
builder.setAbortTrigger();
879-
}).castAs<rpc::AbortTrigger>();
880-
}
881-
}();
819+
externalHandler->write(
820+
[signal = pipeline.getSignal()](rpc::JsValue::External::Builder builder) mutable {
821+
builder.setAbortSignal(kj::mv(signal));
822+
});
882823

883824
auto& ioContext = IoContext::current();
884825
// Keep track of every AbortSignal cloned from this one.
885826
// If this->triggerAbort(...) is called, each rpcClient will be informed.
886-
rpcClients.add(ioContext.addObject(kj::heap<AbortTriggerRpcClient>(kj::mv(triggerCap))));
827+
rpcClients.add(ioContext.addObject(kj::heap<AbortTriggerRpcClient>(pipeline.getTrigger())));
887828
}
888829

889830
jsg::Ref<AbortSignal> AbortSignal::deserialize(
@@ -914,24 +855,12 @@ jsg::Ref<AbortSignal> AbortSignal::deserialize(
914855
auto& ioctx = IoContext::current();
915856

916857
auto reader = externalHandler->read();
917-
if (reader.isAbortTrigger()) {
918-
// Old-style StreamSink.
919-
// TODO(cleanup): Remove this once the ExternalPusher autogate has rolled out.
920-
auto paf = kj::newPromiseAndFulfiller<void>();
921-
auto pendingReason = ioctx.addObject(kj::refcounted<PendingReason>());
922-
923-
externalHandler->setLastStream(
924-
kj::heap<AbortTriggerRpcServer>(kj::mv(paf.fulfiller), kj::addRef(*pendingReason)));
925-
signal->rpcAbortPromise = ioctx.addObject(kj::heap(kj::mv(paf.promise)));
926-
signal->pendingReason = kj::mv(pendingReason);
927-
} else {
928-
KJ_REQUIRE(reader.isAbortSignal(), "external table slot type does't match serialization tag");
858+
KJ_REQUIRE(reader.isAbortSignal(), "external table slot type does't match serialization tag");
929859

930-
auto resolvedSignal = ioctx.getExternalPusher()->unwrapAbortSignal(reader.getAbortSignal());
860+
auto resolvedSignal = ioctx.getExternalPusher()->unwrapAbortSignal(reader.getAbortSignal());
931861

932-
signal->rpcAbortPromise = ioctx.addObject(kj::heap(kj::mv(resolvedSignal.signal)));
933-
signal->pendingReason = ioctx.addObject(kj::mv(resolvedSignal.reason));
934-
}
862+
signal->rpcAbortPromise = ioctx.addObject(kj::heap(kj::mv(resolvedSignal.signal)));
863+
signal->pendingReason = ioctx.addObject(kj::mv(resolvedSignal.reason));
935864

936865
return signal;
937866
}

src/workerd/api/streams/readable.c++

Lines changed: 14 additions & 135 deletions
Original file line numberDiff line numberDiff line change
@@ -519,93 +519,6 @@ jsg::Optional<uint32_t> ByteLengthQueuingStrategy::size(
519519

520520
namespace {
521521

522-
// TODO(cleanup): These classes have been copied to external-pusher.c++. The copies here can be
523-
// deleted as soon as we've switched from StreamSink to ExternalPusher and can delete all the
524-
// StreamSink-related code. For now I'm not trying to avoid duplication.
525-
526-
// HACK: We need as async pipe, like kj::newOneWayPipe(), except supporting explicit end(). So we
527-
// wrap the two ends of the pipe in special adapters that track whether end() was called.
528-
class ExplicitEndOutputPipeAdapter final: public capnp::ExplicitEndOutputStream {
529-
public:
530-
ExplicitEndOutputPipeAdapter(
531-
kj::Own<kj::AsyncOutputStream> inner, kj::Own<kj::RefcountedWrapper<bool>> ended)
532-
: inner(kj::mv(inner)),
533-
ended(kj::mv(ended)) {}
534-
535-
kj::Promise<void> write(kj::ArrayPtr<const byte> buffer) override {
536-
return KJ_REQUIRE_NONNULL(inner)->write(buffer);
537-
}
538-
kj::Promise<void> write(kj::ArrayPtr<const kj::ArrayPtr<const byte>> pieces) override {
539-
return KJ_REQUIRE_NONNULL(inner)->write(pieces);
540-
}
541-
542-
kj::Maybe<kj::Promise<uint64_t>> tryPumpFrom(
543-
kj::AsyncInputStream& input, uint64_t amount) override {
544-
return KJ_REQUIRE_NONNULL(inner)->tryPumpFrom(input, amount);
545-
}
546-
547-
kj::Promise<void> whenWriteDisconnected() override {
548-
return KJ_REQUIRE_NONNULL(inner)->whenWriteDisconnected();
549-
}
550-
551-
kj::Promise<void> end() override {
552-
// Signal to the other side that end() was actually called.
553-
ended->getWrapped() = true;
554-
inner = kj::none;
555-
return kj::READY_NOW;
556-
}
557-
558-
private:
559-
kj::Maybe<kj::Own<kj::AsyncOutputStream>> inner;
560-
kj::Own<kj::RefcountedWrapper<bool>> ended;
561-
};
562-
563-
class ExplicitEndInputPipeAdapter final: public kj::AsyncInputStream {
564-
public:
565-
ExplicitEndInputPipeAdapter(kj::Own<kj::AsyncInputStream> inner,
566-
kj::Own<kj::RefcountedWrapper<bool>> ended,
567-
kj::Maybe<uint64_t> expectedLength)
568-
: inner(kj::mv(inner)),
569-
ended(kj::mv(ended)),
570-
expectedLength(expectedLength) {}
571-
572-
kj::Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
573-
size_t result = co_await inner->tryRead(buffer, minBytes, maxBytes);
574-
575-
KJ_IF_SOME(l, expectedLength) {
576-
KJ_ASSERT(result <= l);
577-
l -= result;
578-
if (l == 0) {
579-
// If we got all the bytes we expected, we treat this as a successful end, because the
580-
// underlying KJ pipe is not actually going to wait for the other side to drop. This is
581-
// consistent with the behavior of Content-Length in HTTP anyway.
582-
ended->getWrapped() = true;
583-
}
584-
}
585-
586-
if (result < minBytes) {
587-
// Verify that end() was called.
588-
if (!ended->getWrapped()) {
589-
JSG_FAIL_REQUIRE(Error, "ReadableStream received over RPC disconnected prematurely.");
590-
}
591-
}
592-
co_return result;
593-
}
594-
595-
kj::Maybe<uint64_t> tryGetLength() override {
596-
return inner->tryGetLength();
597-
}
598-
599-
kj::Promise<uint64_t> pumpTo(kj::AsyncOutputStream& output, uint64_t amount) override {
600-
return inner->pumpTo(output, amount);
601-
}
602-
603-
private:
604-
kj::Own<kj::AsyncInputStream> inner;
605-
kj::Own<kj::RefcountedWrapper<bool>> ended;
606-
kj::Maybe<uint64_t> expectedLength;
607-
};
608-
609522
// Wrapper around ReadableStreamSource that prevents deferred proxying. We need this for RPC
610523
// streams because although they are "system streams", they become disconnected when the IoContext
611524
// is destroyed, due to the JsRpcCustomEvent being canceled.
@@ -680,37 +593,21 @@ void ReadableStream::serialize(jsg::Lock& js, jsg::Serializer& serializer) {
680593
StreamEncoding encoding = controller.getPreferredEncoding();
681594
auto expectedLength = controller.tryGetLength(encoding);
682595

683-
capnp::ByteStream::Client streamCap = [&]() {
684-
KJ_IF_SOME(pusher, externalHandler->getExternalPusher()) {
685-
auto req = pusher.pushByteStreamRequest(capnp::MessageSize{2, 0});
686-
KJ_IF_SOME(el, expectedLength) {
687-
req.setLengthPlusOne(el + 1);
688-
}
689-
auto pipeline = req.sendForPipeline();
690-
691-
externalHandler->write([encoding, expectedLength, source = pipeline.getSource()](
692-
rpc::JsValue::External::Builder builder) mutable {
693-
auto rs = builder.initReadableStream();
694-
rs.setStream(kj::mv(source));
695-
rs.setEncoding(encoding);
696-
});
697-
698-
return pipeline.getSink();
699-
} else {
700-
return externalHandler
701-
->writeStream(
702-
[encoding, expectedLength](rpc::JsValue::External::Builder builder) mutable {
703-
auto rs = builder.initReadableStream();
704-
rs.setEncoding(encoding);
705-
KJ_IF_SOME(l, expectedLength) {
706-
rs.getExpectedLength().setKnown(l);
707-
}
708-
}).castAs<capnp::ByteStream>();
709-
}
710-
}();
596+
auto req = externalHandler->getExternalPusher().pushByteStreamRequest(capnp::MessageSize{2, 0});
597+
KJ_IF_SOME(el, expectedLength) {
598+
req.setLengthPlusOne(el + 1);
599+
}
600+
auto pipeline = req.sendForPipeline();
601+
602+
externalHandler->write([encoding, expectedLength, source = pipeline.getSource()](
603+
rpc::JsValue::External::Builder builder) mutable {
604+
auto rs = builder.initReadableStream();
605+
rs.setStream(kj::mv(source));
606+
rs.setEncoding(encoding);
607+
});
711608

712609
kj::Own<capnp::ExplicitEndOutputStream> kjStream =
713-
ioctx.getByteStreamFactory().capnpToKjExplicitEnd(kj::mv(streamCap));
610+
ioctx.getByteStreamFactory().capnpToKjExplicitEnd(pipeline.getSink());
714611

715612
auto sink = newSystemStream(kj::mv(kjStream), encoding, ioctx);
716613

@@ -741,25 +638,7 @@ jsg::Ref<ReadableStream> ReadableStream::deserialize(
741638

742639
auto& ioctx = IoContext::current();
743640

744-
kj::Own<kj::AsyncInputStream> in;
745-
if (rs.hasStream()) {
746-
in = ioctx.getExternalPusher()->unwrapStream(rs.getStream());
747-
} else {
748-
kj::Maybe<uint64_t> expectedLength;
749-
auto el = rs.getExpectedLength();
750-
if (el.isKnown()) {
751-
expectedLength = el.getKnown();
752-
}
753-
754-
auto pipe = kj::newOneWayPipe(expectedLength);
755-
756-
auto endedFlag = kj::refcounted<kj::RefcountedWrapper<bool>>(false);
757-
758-
auto out = kj::heap<ExplicitEndOutputPipeAdapter>(kj::mv(pipe.out), kj::addRef(*endedFlag));
759-
in = kj::heap<ExplicitEndInputPipeAdapter>(kj::mv(pipe.in), kj::mv(endedFlag), expectedLength);
760-
761-
externalHandler->setLastStream(ioctx.getByteStreamFactory().kjToCapnp(kj::mv(out)));
762-
}
641+
auto in = ioctx.getExternalPusher()->unwrapStream(rs.getStream());
763642

764643
return js.alloc<ReadableStream>(ioctx,
765644
kj::heap<NoDeferredProxyReadableStream>(newSystemStream(kj::mv(in), encoding, ioctx), ioctx));

src/workerd/api/tests/BUILD.bazel

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -157,15 +157,6 @@ wd_test(
157157
data = ["abortsignal-test.js"],
158158
)
159159

160-
wd_test(
161-
src = "abortsignal-external-pusher-test.wd-test",
162-
args = ["--experimental"],
163-
data = [
164-
"abortsignal-test.js",
165-
"abortsignal-test.wd-test",
166-
],
167-
)
168-
169160
wd_test(
170161
src = "actor-stub-test.wd-test",
171162
args = ["--experimental"],
@@ -284,15 +275,6 @@ wd_test(
284275
data = ["js-rpc-test.js"],
285276
)
286277

287-
wd_test(
288-
src = "js-rpc-external-pusher-test.wd-test",
289-
args = ["--experimental"],
290-
data = [
291-
"js-rpc-test.js",
292-
"js-rpc-test.wd-test",
293-
],
294-
)
295-
296278
wd_test(
297279
src = "js-rpc-params-ownership-test.wd-test",
298280
args = ["--experimental"],

src/workerd/api/tests/abortsignal-external-pusher-test.wd-test

Lines changed: 0 additions & 10 deletions
This file was deleted.

src/workerd/api/tests/js-rpc-external-pusher-test.wd-test

Lines changed: 0 additions & 9 deletions
This file was deleted.

0 commit comments

Comments
 (0)