Skip to content

Commit b269651

Browse files
committed
Implement ReadableStream serialization via ExternalPusher.
1 parent 25c9bef commit b269651

File tree

3 files changed

+194
-23
lines changed

3 files changed

+194
-23
lines changed

src/workerd/api/streams/readable.c++

Lines changed: 47 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -519,6 +519,10 @@ jsg::Optional<uint32_t> ByteLengthQueuingStrategy::size(
519519

520520
namespace {
521521

522+
// TODO(cleanup): These classes have been copied to external-pusher.c++. The copies here can be
523+
// deleted as soon as we've switched from StreamSink to ExternalPusher and can delete all the
524+
// StreamSink-related code. For now I'm not trying to avoid duplication.
525+
522526
// HACK: We need as async pipe, like kj::newOneWayPipe(), except supporting explicit end(). So we
523527
// wrap the two ends of the pipe in special adapters that track whether end() was called.
524528
class ExplicitEndOutputPipeAdapter final: public capnp::ExplicitEndOutputStream {
@@ -676,18 +680,37 @@ void ReadableStream::serialize(jsg::Lock& js, jsg::Serializer& serializer) {
676680
StreamEncoding encoding = controller.getPreferredEncoding();
677681
auto expectedLength = controller.tryGetLength(encoding);
678682

679-
auto streamCap = externalHandler->writeStream(
680-
[encoding, expectedLength](rpc::JsValue::External::Builder builder) mutable {
681-
auto rs = builder.initReadableStream();
682-
rs.setEncoding(encoding);
683-
KJ_IF_SOME(l, expectedLength) {
684-
rs.getExpectedLength().setKnown(l);
683+
capnp::ByteStream::Client streamCap = [&]() {
684+
KJ_IF_SOME(pusher, externalHandler->getExternalPusher()) {
685+
auto req = pusher.pushByteStreamRequest(capnp::MessageSize{2, 0});
686+
KJ_IF_SOME(el, expectedLength) {
687+
req.setLengthPlusOne(el + 1);
688+
}
689+
auto pipeline = req.sendForPipeline();
690+
691+
externalHandler->write([encoding, expectedLength, source = pipeline.getSource()](
692+
rpc::JsValue::External::Builder builder) mutable {
693+
auto rs = builder.initReadableStream();
694+
rs.setStream(kj::mv(source));
695+
rs.setEncoding(encoding);
696+
});
697+
698+
return pipeline.getSink();
699+
} else {
700+
return externalHandler
701+
->writeStream(
702+
[encoding, expectedLength](rpc::JsValue::External::Builder builder) mutable {
703+
auto rs = builder.initReadableStream();
704+
rs.setEncoding(encoding);
705+
KJ_IF_SOME(l, expectedLength) {
706+
rs.getExpectedLength().setKnown(l);
707+
}
708+
}).castAs<capnp::ByteStream>();
685709
}
686-
});
710+
}();
687711

688712
kj::Own<capnp::ExplicitEndOutputStream> kjStream =
689-
ioctx.getByteStreamFactory().capnpToKjExplicitEnd(
690-
kj::mv(streamCap).castAs<capnp::ByteStream>());
713+
ioctx.getByteStreamFactory().capnpToKjExplicitEnd(kj::mv(streamCap));
691714

692715
auto sink = newSystemStream(kj::mv(kjStream), encoding, ioctx);
693716

@@ -718,21 +741,25 @@ jsg::Ref<ReadableStream> ReadableStream::deserialize(
718741

719742
auto& ioctx = IoContext::current();
720743

721-
kj::Maybe<uint64_t> expectedLength;
722-
auto el = rs.getExpectedLength();
723-
if (el.isKnown()) {
724-
expectedLength = el.getKnown();
725-
}
744+
kj::Own<kj::AsyncInputStream> in;
745+
if (rs.hasStream()) {
746+
in = ioctx.getExternalPusher()->unwrapStream(rs.getStream());
747+
} else {
748+
kj::Maybe<uint64_t> expectedLength;
749+
auto el = rs.getExpectedLength();
750+
if (el.isKnown()) {
751+
expectedLength = el.getKnown();
752+
}
726753

727-
auto pipe = kj::newOneWayPipe(expectedLength);
754+
auto pipe = kj::newOneWayPipe(expectedLength);
728755

729-
auto endedFlag = kj::refcounted<kj::RefcountedWrapper<bool>>(false);
756+
auto endedFlag = kj::refcounted<kj::RefcountedWrapper<bool>>(false);
730757

731-
auto out = kj::heap<ExplicitEndOutputPipeAdapter>(kj::mv(pipe.out), kj::addRef(*endedFlag));
732-
auto in =
733-
kj::heap<ExplicitEndInputPipeAdapter>(kj::mv(pipe.in), kj::mv(endedFlag), expectedLength);
758+
auto out = kj::heap<ExplicitEndOutputPipeAdapter>(kj::mv(pipe.out), kj::addRef(*endedFlag));
759+
in = kj::heap<ExplicitEndInputPipeAdapter>(kj::mv(pipe.in), kj::mv(endedFlag), expectedLength);
734760

735-
externalHandler->setLastStream(ioctx.getByteStreamFactory().kjToCapnp(kj::mv(out)));
761+
externalHandler->setLastStream(ioctx.getByteStreamFactory().kjToCapnp(kj::mv(out)));
762+
}
736763

737764
return js.alloc<ReadableStream>(ioctx,
738765
kj::heap<NoDeferredProxyReadableStream>(newSystemStream(kj::mv(in), encoding, ioctx), ioctx));

src/workerd/io/external-pusher.c++

Lines changed: 131 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,136 @@
77

88
namespace workerd {
99

10-
// TODO(now): implement
10+
// =======================================================================================
11+
// ReadableStream handling
12+
13+
namespace {
14+
15+
// TODO(cleanup): These classes have been copied from streams/readable.c++. The copies there can be
16+
// deleted as soon as we've switched from StreamSink to ExternalPusher and can delete all the
17+
// StreamSink-related code. For now I'm not trying to avoid duplication.
18+
19+
// HACK: We need as async pipe, like kj::newOneWayPipe(), except supporting explicit end(). So we
20+
// wrap the two ends of the pipe in special adapters that track whether end() was called.
21+
class ExplicitEndOutputPipeAdapter final: public capnp::ExplicitEndOutputStream {
22+
public:
23+
ExplicitEndOutputPipeAdapter(
24+
kj::Own<kj::AsyncOutputStream> inner, kj::Own<kj::RefcountedWrapper<bool>> ended)
25+
: inner(kj::mv(inner)),
26+
ended(kj::mv(ended)) {}
27+
28+
kj::Promise<void> write(kj::ArrayPtr<const byte> buffer) override {
29+
return KJ_REQUIRE_NONNULL(inner)->write(buffer);
30+
}
31+
kj::Promise<void> write(kj::ArrayPtr<const kj::ArrayPtr<const byte>> pieces) override {
32+
return KJ_REQUIRE_NONNULL(inner)->write(pieces);
33+
}
34+
35+
kj::Maybe<kj::Promise<uint64_t>> tryPumpFrom(
36+
kj::AsyncInputStream& input, uint64_t amount) override {
37+
return KJ_REQUIRE_NONNULL(inner)->tryPumpFrom(input, amount);
38+
}
39+
40+
kj::Promise<void> whenWriteDisconnected() override {
41+
return KJ_REQUIRE_NONNULL(inner)->whenWriteDisconnected();
42+
}
43+
44+
kj::Promise<void> end() override {
45+
// Signal to the other side that end() was actually called.
46+
ended->getWrapped() = true;
47+
inner = kj::none;
48+
return kj::READY_NOW;
49+
}
50+
51+
private:
52+
kj::Maybe<kj::Own<kj::AsyncOutputStream>> inner;
53+
kj::Own<kj::RefcountedWrapper<bool>> ended;
54+
};
55+
56+
class ExplicitEndInputPipeAdapter final: public kj::AsyncInputStream {
57+
public:
58+
ExplicitEndInputPipeAdapter(kj::Own<kj::AsyncInputStream> inner,
59+
kj::Own<kj::RefcountedWrapper<bool>> ended,
60+
kj::Maybe<uint64_t> expectedLength)
61+
: inner(kj::mv(inner)),
62+
ended(kj::mv(ended)),
63+
expectedLength(expectedLength) {}
64+
65+
kj::Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
66+
size_t result = co_await inner->tryRead(buffer, minBytes, maxBytes);
67+
68+
KJ_IF_SOME(l, expectedLength) {
69+
KJ_ASSERT(result <= l);
70+
l -= result;
71+
if (l == 0) {
72+
// If we got all the bytes we expected, we treat this as a successful end, because the
73+
// underlying KJ pipe is not actually going to wait for the other side to drop. This is
74+
// consistent with the behavior of Content-Length in HTTP anyway.
75+
ended->getWrapped() = true;
76+
}
77+
}
78+
79+
if (result < minBytes) {
80+
// Verify that end() was called.
81+
if (!ended->getWrapped()) {
82+
JSG_FAIL_REQUIRE(Error, "ReadableStream received over RPC disconnected prematurely.");
83+
}
84+
}
85+
co_return result;
86+
}
87+
88+
kj::Maybe<uint64_t> tryGetLength() override {
89+
return inner->tryGetLength();
90+
}
91+
92+
kj::Promise<uint64_t> pumpTo(kj::AsyncOutputStream& output, uint64_t amount) override {
93+
return inner->pumpTo(output, amount);
94+
}
95+
96+
private:
97+
kj::Own<kj::AsyncInputStream> inner;
98+
kj::Own<kj::RefcountedWrapper<bool>> ended;
99+
kj::Maybe<uint64_t> expectedLength;
100+
};
101+
102+
} // namespace
103+
104+
class ExternalPusherImpl::InputStreamImpl final: public ExternalPusher::InputStream::Server {
105+
public:
106+
InputStreamImpl(kj::Own<kj::AsyncInputStream> stream): stream(kj::mv(stream)) {}
107+
108+
kj::Maybe<kj::Own<kj::AsyncInputStream>> stream;
109+
};
110+
111+
kj::Promise<void> ExternalPusherImpl::pushByteStream(PushByteStreamContext context) {
112+
kj::Maybe<uint64_t> expectedLength;
113+
auto lp1 = context.getParams().getLengthPlusOne();
114+
if (lp1 > 0) {
115+
expectedLength = lp1 - 1;
116+
}
117+
118+
auto pipe = kj::newOneWayPipe(expectedLength);
119+
120+
auto endedFlag = kj::refcounted<kj::RefcountedWrapper<bool>>(false);
121+
122+
auto out = kj::heap<ExplicitEndOutputPipeAdapter>(kj::mv(pipe.out), kj::addRef(*endedFlag));
123+
auto in =
124+
kj::heap<ExplicitEndInputPipeAdapter>(kj::mv(pipe.in), kj::mv(endedFlag), expectedLength);
125+
126+
auto results = context.initResults(capnp::MessageSize{4, 2});
127+
128+
results.setSource(inputStreamSet.add(kj::heap<InputStreamImpl>(kj::mv(in))));
129+
results.setSink(byteStreamFactory.kjToCapnp(kj::mv(out)));
130+
return kj::READY_NOW;
131+
}
132+
133+
kj::Own<kj::AsyncInputStream> ExternalPusherImpl::unwrapStream(
134+
ExternalPusher::InputStream::Client cap) {
135+
auto& unwrapped = KJ_REQUIRE_NONNULL(
136+
inputStreamSet.tryGetLocalServerSync(cap), "pushed external is not a byte stream");
137+
138+
return KJ_REQUIRE_NONNULL(kj::mv(kj::downcast<InputStreamImpl>(unwrapped).stream),
139+
"pushed byte stream has already been consumed");
140+
}
11141

12142
} // namespace workerd

src/workerd/io/external-pusher.h

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,29 @@
1111

1212
namespace workerd {
1313

14+
using kj::byte;
15+
1416
// Implements JsValue.ExternalPusher from worker-interface.capnp.
1517
//
1618
// ExternalPusher allows a remote peer to "push" certain kinds of objects into our address space
1719
// so that they can then be embedded in `JsValue` as `External` values.
1820
class ExternalPusherImpl: public rpc::JsValue::ExternalPusher::Server, public kj::Refcounted {
1921
public:
20-
ExternalPusherImpl(capnp::ByteStreamFactory& byteStreamFactory) {}
22+
ExternalPusherImpl(capnp::ByteStreamFactory& byteStreamFactory)
23+
: byteStreamFactory(byteStreamFactory) {}
24+
25+
using ExternalPusher = rpc::JsValue::ExternalPusher;
26+
27+
kj::Own<kj::AsyncInputStream> unwrapStream(ExternalPusher::InputStream::Client cap);
28+
29+
kj::Promise<void> pushByteStream(PushByteStreamContext context) override;
30+
31+
private:
32+
capnp::ByteStreamFactory& byteStreamFactory;
33+
34+
capnp::CapabilityServerSet<ExternalPusher::InputStream> inputStreamSet;
2135

22-
// TODO(now): Implement methods.
36+
class InputStreamImpl;
2337
};
2438

2539
} // namespace workerd

0 commit comments

Comments
 (0)