Skip to content

Commit 27b2baf

Browse files
committed
Wait for pushed externals upfront.
This commit refactors ExternalPusher a bit such that we wait for all the pushed externals in an external set upfront, before deserializing the value. This has two benefits: * It removes some complexity from both the ReadableStream and AbortSignal deserializers. * It sets us up to support embedded promise pipelining later -- that is, embedding promises into another message, and having the promises awaited and replaced with their resolution before delivery. This is already supported by Cap'n Web, and we want to support it in workerd's built-in RPC as well to match.
1 parent d9b71f2 commit 27b2baf

File tree

6 files changed

+196
-43
lines changed

6 files changed

+196
-43
lines changed

src/workerd/api/basics.c++

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -927,20 +927,11 @@ jsg::Ref<AbortSignal> AbortSignal::deserialize(
927927
} else {
928928
KJ_REQUIRE(reader.isAbortSignal(), "external table slot type does't match serialization tag");
929929

930-
// TODO(now): This is very ugly, but it'll get better in a later commit in this PR when we
931-
// make ExternalPusher resolve all promises before delivery.
932-
auto promise =
933-
ioctx.getExternalPusher()
934-
->unwrapAbortSignal(reader.getAbortSignal())
935-
.then([&ioctx, &signal = *signal](ExternalPusherImpl::AbortSignal resolvedSignal) {
936-
return ioctx.run(
937-
[&ioctx, &signal, resolvedSignal = kj::mv(resolvedSignal)](auto& lock) mutable {
938-
signal.pendingReason = ioctx.addObject(kj::mv(resolvedSignal.reason));
939-
return kj::mv(resolvedSignal.signal);
940-
});
941-
});
930+
auto resolvedSignal =
931+
externalHandler->getPushedExternals().unwrapAbortSignal(reader.getAbortSignal());
942932

943-
signal->rpcAbortPromise = ioctx.addObject(kj::heap(kj::mv(promise)));
933+
signal->rpcAbortPromise = ioctx.addObject(kj::heap(kj::mv(resolvedSignal.signal)));
934+
signal->pendingReason = ioctx.addObject(kj::mv(resolvedSignal.reason));
944935
}
945936

946937
return signal;

src/workerd/api/streams/readable.c++

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -743,9 +743,7 @@ jsg::Ref<ReadableStream> ReadableStream::deserialize(
743743

744744
kj::Own<kj::AsyncInputStream> in;
745745
if (rs.hasStream()) {
746-
// TODO(now): Create a mechanism to await all the external promises *before* deserializing.
747-
// We'll need it anyway to support Promise serialization.
748-
in = kj::newPromisedStream(ioctx.getExternalPusher()->unwrapStream(rs.getStream()));
746+
in = externalHandler->getPushedExternals().unwrapStream(rs.getStream());
749747
} else {
750748
kj::Maybe<uint64_t> expectedLength;
751749
auto el = rs.getExpectedLength();

src/workerd/api/worker-rpc.c++

Lines changed: 82 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -217,11 +217,14 @@ struct DeserializeResult {
217217
};
218218

219219
// Call to construct a JS value from an `rpc::JsValue`.
220-
DeserializeResult deserializeJsValue(
221-
jsg::Lock& js, rpc::JsValue::Reader reader, kj::Maybe<StreamSinkImpl&> streamSink = kj::none) {
220+
DeserializeResult deserializeJsValue(jsg::Lock& js,
221+
rpc::JsValue::Reader reader,
222+
kj::Maybe<kj::Own<ExternalPusherImpl::Bundle>> pushedExternals,
223+
kj::Maybe<StreamSinkImpl&> streamSink = kj::none) {
222224
auto disposalGroup = kj::heap<RpcStubDisposalGroup>();
223225

224-
RpcDeserializerExternalHandler externalHandler(reader.getExternals(), *disposalGroup, streamSink);
226+
RpcDeserializerExternalHandler externalHandler(
227+
reader.getExternals(), kj::mv(pushedExternals), *disposalGroup, streamSink);
225228

226229
jsg::Deserializer deserializer(js, reader.getV8Serialized(), kj::none, kj::none,
227230
jsg::Deserializer::Options{
@@ -249,8 +252,10 @@ DeserializeResult deserializeJsValue(
249252
// an object) which disposes all stubs therein.
250253
jsg::JsValue deserializeRpcReturnValue(jsg::Lock& js,
251254
rpc::JsRpcTarget::CallResults::Reader callResults,
255+
kj::Maybe<kj::Own<ExternalPusherImpl::Bundle>> pushedExternals,
252256
kj::Maybe<StreamSinkImpl&> streamSink) {
253-
auto [value, disposalGroup, ss] = deserializeJsValue(js, callResults.getResult(), streamSink);
257+
auto [value, disposalGroup, ss] =
258+
deserializeJsValue(js, callResults.getResult(), kj::mv(pushedExternals), streamSink);
254259

255260
if (streamSink == kj::none) {
256261
KJ_REQUIRE(ss == kj::none,
@@ -549,12 +554,13 @@ JsRpcPromiseAndPipeline callImpl(jsg::Lock& js,
549554
}
550555

551556
kj::Maybe<kj::Own<StreamSinkImpl>> resultStreamSink;
557+
auto externalPusher = ioContext.getExternalPusher();
552558
if (useExternalPusher) {
553559
// Unfortunately, we always have to send the ExternalPusher since we don't know whether the
554560
// call will return any streams (or other pushed externals). Luckily, it's a
555561
// one-per-IoContext object, not a big deal. (It'll take a slot on the capnp export table
556562
// though.)
557-
builder.getResultsStreamHandler().setExternalPusher(ioContext.getExternalPusher());
563+
builder.getResultsStreamHandler().setExternalPusher(externalPusher.addRef());
558564
} else {
559565
// Unfortunately, we always have to send a `resultsStreamSink` because we don't know until
560566
// after the call completes whether or not it will return any streams. If it's unused,
@@ -569,6 +575,31 @@ JsRpcPromiseAndPipeline callImpl(jsg::Lock& js,
569575
ssf->fulfill(callResult.getParamsStreamSink());
570576
}
571577

578+
struct ResponseAndPushedExternals {
579+
capnp::Response<rpc::JsRpcTarget::CallResults> response;
580+
kj::Maybe<kj::Own<ExternalPusherImpl::Bundle>> pushedExternals;
581+
};
582+
583+
auto responsePromise =
584+
callResult.then([externalPusher = kj::mv(externalPusher)](
585+
capnp::Response<rpc::JsRpcTarget::CallResults> response) mutable
586+
-> kj::Promise<ResponseAndPushedExternals> {
587+
KJ_IF_SOME(promise, externalPusher->makeBundle(response.getResult().getExternals())) {
588+
return promise.then([response = kj::mv(response)](
589+
kj::Own<ExternalPusherImpl::Bundle> pushedExternals) mutable {
590+
return ResponseAndPushedExternals{
591+
.response = kj::mv(response),
592+
.pushedExternals = kj::mv(pushedExternals),
593+
};
594+
});
595+
} else {
596+
return ResponseAndPushedExternals{
597+
.response = kj::mv(response),
598+
.pushedExternals = kj::none,
599+
};
600+
}
601+
});
602+
572603
// We need to arrange that our JsRpcPromise will updated in-place with the final settlement
573604
// of this RPC promise. However, we can't actually construct the JsRpcPromise until we have
574605
// the final promise to give it. To resolve the cycle, we only create a JsRpcPromise::WeakRef
@@ -577,11 +608,11 @@ JsRpcPromiseAndPipeline callImpl(jsg::Lock& js,
577608

578609
// RemotePromise lets us consume its pipeline and promise portions independently; we consume
579610
// the promise here and we consume the pipeline below, both via kj::mv().
580-
auto jsPromise = ioContext.awaitIo(js, kj::mv(callResult),
611+
auto jsPromise = ioContext.awaitIo(js, kj::mv(responsePromise),
581612
[weakRef = kj::atomicAddRef(*weakRef), resultStreamSink = kj::mv(resultStreamSink)](
582-
jsg::Lock& js,
583-
capnp::Response<rpc::JsRpcTarget::CallResults> response) mutable -> jsg::Value {
584-
auto jsResult = deserializeRpcReturnValue(js, response, resultStreamSink);
613+
jsg::Lock& js, ResponseAndPushedExternals response) mutable -> jsg::Value {
614+
auto jsResult = deserializeRpcReturnValue(
615+
js, kj::mv(response.response), kj::mv(response.pushedExternals), resultStreamSink);
585616

586617
if (weakRef->disposed) {
587618
// The promise was explicitly disposed before it even resolved. This means we must dispose
@@ -973,19 +1004,23 @@ class JsRpcTargetBase: public rpc::JsRpcTarget::Server {
9731004
// or during a call.
9741005
JsRpcTargetBase(IoContext& ctx, MayOutliveIncomingRequest)
9751006
: enterIsolateAndCall(ctx.makeReentryCallback<IoContext::TOP_UP>(
976-
[this, &ctx](Worker::Lock& lock, CallContext callContext) {
977-
return callImpl(lock, ctx, callContext);
1007+
[this, &ctx](Worker::Lock& lock,
1008+
CallContext callContext,
1009+
kj::Maybe<kj::Own<ExternalPusherImpl::Bundle>> pushedExternals) {
1010+
return callImpl(lock, ctx, callContext, kj::mv(pushedExternals));
9781011
})),
9791012
externalPusher(ctx.getExternalPusher()) {}
9801013

9811014
// Constructor use by EntrypointJsRpcTarget, which is revoked and destroyed before the IoContext
9821015
// can possibly be canceled. It can just use ctx.run().
9831016
JsRpcTargetBase(IoContext& ctx, CantOutliveIncomingRequest)
984-
: enterIsolateAndCall([this, &ctx](CallContext callContext) {
1017+
: enterIsolateAndCall([this, &ctx](CallContext callContext,
1018+
kj::Maybe<kj::Own<ExternalPusherImpl::Bundle>> pushedExternals) {
9851019
// Note: No need to topUpActor() since this is the start of a top-level request, so the
9861020
// actor will already have been topped up by IncomingRequest::delivered().
987-
return ctx.run([this, &ctx, callContext](Worker::Lock& lock) mutable {
988-
return callImpl(lock, ctx, callContext);
1021+
return ctx.run([this, &ctx, callContext, pushedExternals = kj::mv(pushedExternals)](
1022+
Worker::Lock& lock) mutable {
1023+
return callImpl(lock, ctx, callContext, kj::mv(pushedExternals));
9891024
});
9901025
}),
9911026
externalPusher(ctx.getExternalPusher()) {}
@@ -1012,8 +1047,25 @@ class JsRpcTargetBase: public rpc::JsRpcTarget::Server {
10121047

10131048
// Handles the delivery of JS RPC method calls.
10141049
kj::Promise<void> call(CallContext callContext) override {
1015-
// Try to execute the requested method.
1016-
return enterIsolateAndCall(callContext).catch_([](kj::Exception&& e) {
1050+
kj::Promise<void> promise = [&]() {
1051+
auto op = callContext.getParams().getOperation();
1052+
if (op.hasCallWithArgs()) {
1053+
auto args = op.getCallWithArgs();
1054+
if (args.hasExternals()) {
1055+
KJ_IF_SOME(bundlePromise, externalPusher->makeBundle(args.getExternals())) {
1056+
// The call params contain one or more pushed externals.
1057+
return bundlePromise.then(
1058+
[this, callContext](kj::Own<ExternalPusherImpl::Bundle> pushedExternals) mutable {
1059+
return enterIsolateAndCall(callContext, kj::mv(pushedExternals));
1060+
});
1061+
}
1062+
}
1063+
}
1064+
1065+
return enterIsolateAndCall(callContext, kj::none);
1066+
}();
1067+
1068+
return promise.catch_([](kj::Exception&& e) {
10171069
if (jsg::isTunneledException(e.getDescription())) {
10181070
// Annotate exceptions in RPC worker calls as remote exceptions.
10191071
auto description = jsg::stripRemoteExceptionPrefix(e.getDescription());
@@ -1039,14 +1091,18 @@ class JsRpcTargetBase: public rpc::JsRpcTarget::Server {
10391091

10401092
// Function which enters the isolate lock and IoContext and then invokes callImpl(). Created
10411093
// using IoContext::makeReentryCallback().
1042-
kj::Function<kj::Promise<void>(CallContext callContext)> enterIsolateAndCall;
1094+
kj::Function<kj::Promise<void>(CallContext, kj::Maybe<kj::Own<ExternalPusherImpl::Bundle>>)>
1095+
enterIsolateAndCall;
10431096

10441097
kj::Rc<ExternalPusherImpl> externalPusher;
10451098

10461099
// Returns true if the given name cannot be used as a method on this type.
10471100
virtual bool isReservedName(kj::StringPtr name) = 0;
10481101

1049-
kj::Promise<void> callImpl(Worker::Lock& lock, IoContext& ctx, CallContext callContext) {
1102+
kj::Promise<void> callImpl(Worker::Lock& lock,
1103+
IoContext& ctx,
1104+
CallContext callContext,
1105+
kj::Maybe<kj::Own<ExternalPusherImpl::Bundle>> pushedExternals) {
10501106
jsg::Lock& js = lock;
10511107
auto params = callContext.getParams();
10521108
// Method name suitable for use in trace and error messages. May be a pointer into the RPC
@@ -1236,10 +1292,10 @@ class JsRpcTargetBase: public rpc::JsRpcTarget::Server {
12361292

12371293
InvocationResult invocationResult;
12381294
KJ_IF_SOME(envCtx, targetInfo.envCtx) {
1239-
invocationResult = invokeFnInsertingEnvCtx(
1240-
js, methodNameForTrace, fn, thisArg, args, envCtx.env, envCtx.ctx);
1295+
invocationResult = invokeFnInsertingEnvCtx(js, methodNameForTrace, fn, thisArg, args,
1296+
kj::mv(pushedExternals), envCtx.env, envCtx.ctx);
12411297
} else {
1242-
invocationResult = invokeFn(js, fn, thisArg, args);
1298+
invocationResult = invokeFn(js, fn, thisArg, args, kj::mv(pushedExternals));
12431299
}
12441300

12451301
// We have a function, so let's call it and serialize the result for RPC.
@@ -1406,10 +1462,11 @@ class JsRpcTargetBase: public rpc::JsRpcTarget::Server {
14061462
static InvocationResult invokeFn(jsg::Lock& js,
14071463
v8::Local<v8::Function> fn,
14081464
v8::Local<v8::Object> thisArg,
1409-
kj::Maybe<rpc::JsValue::Reader> args) {
1465+
kj::Maybe<rpc::JsValue::Reader> args,
1466+
kj::Maybe<kj::Own<ExternalPusherImpl::Bundle>> pushedExternals) {
14101467
// We received arguments from the client, deserialize them back to JS.
14111468
KJ_IF_SOME(a, args) {
1412-
auto [value, disposalGroup, streamSink] = deserializeJsValue(js, a);
1469+
auto [value, disposalGroup, streamSink] = deserializeJsValue(js, a, kj::mv(pushedExternals));
14131470
auto args = KJ_REQUIRE_NONNULL(
14141471
value.tryCast<jsg::JsArray>(), "expected JsArray when deserializing arguments.");
14151472
// Call() expects a `Local<Value> []`... so we populate an array.
@@ -1440,6 +1497,7 @@ class JsRpcTargetBase: public rpc::JsRpcTarget::Server {
14401497
v8::Local<v8::Function> fn,
14411498
v8::Local<v8::Object> thisArg,
14421499
kj::Maybe<rpc::JsValue::Reader> args,
1500+
kj::Maybe<kj::Own<ExternalPusherImpl::Bundle>> pushedExternals,
14431501
v8::Local<v8::Value> env,
14441502
jsg::JsObject ctx) {
14451503
// Determine the function arity (how many parameters it was declared to accept) by reading the
@@ -1469,7 +1527,7 @@ class JsRpcTargetBase: public rpc::JsRpcTarget::Server {
14691527
kj::Maybe<jsg::JsArray> argsArrayFromClient;
14701528
size_t argCountFromClient = 0;
14711529
KJ_IF_SOME(a, args) {
1472-
auto [value, disposalGroup, ss] = deserializeJsValue(js, a);
1530+
auto [value, disposalGroup, ss] = deserializeJsValue(js, a, kj::mv(pushedExternals));
14731531
streamSink = kj::mv(ss);
14741532

14751533
auto array = KJ_REQUIRE_NONNULL(

src/workerd/api/worker-rpc.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,16 +127,25 @@ class RpcDeserializerExternalHandler final: public jsg::Deserializer::ExternalHa
127127
// The `streamSink` parameter should be provided if a StreamSink already exists, e.g. when
128128
// deserializing results. If omitted, it will be constructed on-demand.
129129
RpcDeserializerExternalHandler(capnp::List<rpc::JsValue::External>::Reader externals,
130+
kj::Maybe<kj::Own<ExternalPusherImpl::Bundle>> pushedExternals,
130131
RpcStubDisposalGroup& disposalGroup,
131132
kj::Maybe<StreamSinkImpl&> streamSink)
132133
: externals(externals),
134+
pushedExternals(kj::mv(pushedExternals)),
133135
disposalGroup(disposalGroup),
134136
streamSink(streamSink) {}
135137
~RpcDeserializerExternalHandler() noexcept(false);
136138

137139
// Read and return the next external.
138140
rpc::JsValue::External::Reader read();
139141

142+
// Call to get the bundle of externals pushed to this isolate using ExternalPusher. Throws an
143+
// exception if there were none -- but this should only be called when a specific pushed
144+
// external was encountered during deserialization.
145+
ExternalPusherImpl::Bundle& getPushedExternals() {
146+
return *KJ_REQUIRE_NONNULL(pushedExternals);
147+
}
148+
140149
// Call immediately after `read()` when reading an external that is associated with a stream.
141150
// `stream` is published back to the sender via StreamSink.
142151
void setLastStream(capnp::Capability::Client stream);
@@ -158,6 +167,8 @@ class RpcDeserializerExternalHandler final: public jsg::Deserializer::ExternalHa
158167
capnp::List<rpc::JsValue::External>::Reader externals;
159168
uint i = 0;
160169

170+
kj::Maybe<kj::Own<ExternalPusherImpl::Bundle>> pushedExternals;
171+
161172
kj::UnwindDetector unwindDetector;
162173
RpcStubDisposalGroup& disposalGroup;
163174

src/workerd/io/external-pusher.c++

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,70 @@
77

88
namespace workerd {
99

10+
// HACK: If `CapabilityServerSet` successfully unwraps a `ClientHook`, this either means the hook
11+
// itself pointed to a local object, or it resolved to something that did. We want to populate
12+
// a HashMap based on hook pointers, so we'd better use the fully-resolved hook, otherwise things
13+
// like membrane layers can cause confusion.
14+
static capnp::ClientHook* fullyResolvedPtr(capnp::ClientHook& hook) {
15+
KJ_IF_SOME(r, hook.getResolved()) {
16+
return &r;
17+
} else {
18+
return &hook;
19+
}
20+
}
21+
22+
kj::Maybe<kj::Promise<kj::Own<ExternalPusherImpl::Bundle>>> ExternalPusherImpl::makeBundle(
23+
capnp::List<rpc::JsValue::External>::Reader externals) {
24+
kj::Vector<kj::Promise<void>> promises;
25+
kj::Maybe<kj::Own<Bundle>> maybeBundle;
26+
27+
auto add = [&](auto cap, auto func) {
28+
auto hook = capnp::ClientHook::from(cap);
29+
Bundle* bundle;
30+
KJ_IF_SOME(b, maybeBundle) {
31+
bundle = b.get();
32+
} else {
33+
bundle = maybeBundle.emplace(kj::heap<Bundle>());
34+
}
35+
promises.add((this->*func)(cap).then([bundle, hook = kj::mv(hook)](auto&& unwrapped) mutable {
36+
bundle->items.insert(fullyResolvedPtr(*hook), kj::mv(unwrapped));
37+
}));
38+
};
39+
40+
for (auto ext: externals) {
41+
switch (ext.which()) {
42+
case rpc::JsValue::External::READABLE_STREAM: {
43+
auto rs = ext.getReadableStream();
44+
if (rs.hasStream()) {
45+
add(rs.getStream(), &ExternalPusherImpl::unwrapStream);
46+
}
47+
break;
48+
}
49+
50+
case rpc::JsValue::External::ABORT_SIGNAL:
51+
add(ext.getAbortSignal(), &ExternalPusherImpl::unwrapAbortSignal);
52+
break;
53+
54+
default:
55+
break;
56+
}
57+
}
58+
59+
return maybeBundle.map([&](kj::Own<Bundle>& bundle) -> kj::Promise<kj::Own<Bundle>> {
60+
return kj::joinPromisesFailFast(promises.releaseAsArray())
61+
.then([bundle = kj::mv(bundle)]() mutable { return kj::mv(bundle); });
62+
});
63+
}
64+
65+
template <typename T>
66+
T ExternalPusherImpl::Bundle::unwrap(capnp::Capability::Client cap) {
67+
auto hook = capnp::ClientHook::from(kj::mv(cap));
68+
auto& item = KJ_REQUIRE_NONNULL(items.find(fullyResolvedPtr(*hook)));
69+
auto result = kj::mv(KJ_REQUIRE_NONNULL(item.tryGet<T>()));
70+
item = {};
71+
return result;
72+
}
73+
1074
// =======================================================================================
1175
// ReadableStream handling
1276

@@ -139,6 +203,11 @@ kj::Promise<kj::Own<kj::AsyncInputStream>> ExternalPusherImpl::unwrapStream(
139203
"pushed byte stream has already been consumed");
140204
}
141205

206+
kj::Own<kj::AsyncInputStream> ExternalPusherImpl::Bundle::unwrapStream(
207+
ExternalPusher::InputStream::Client cap) {
208+
return unwrap<kj::Own<kj::AsyncInputStream>>(kj::mv(cap));
209+
}
210+
142211
// =======================================================================================
143212
// AbortSignal handling
144213

@@ -224,4 +293,9 @@ kj::Promise<ExternalPusherImpl::AbortSignal> ExternalPusherImpl::unwrapAbortSign
224293
"pushed AbortSignal has already been consumed");
225294
}
226295

296+
ExternalPusherImpl::AbortSignal ExternalPusherImpl::Bundle::unwrapAbortSignal(
297+
ExternalPusher::AbortSignal::Client cap) {
298+
return unwrap<AbortSignal>(kj::mv(cap));
299+
}
300+
227301
} // namespace workerd

0 commit comments

Comments
 (0)