Skip to content

Commit da44139

Browse files
committed
Implement AbortSignal serialization via ExternalPusher.
1 parent d2f482e commit da44139

File tree

4 files changed

+147
-17
lines changed

4 files changed

+147
-17
lines changed

src/workerd/api/basics.c++

Lines changed: 42 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -574,6 +574,10 @@ class AbortTriggerRpcClient final {
574574

575575
namespace {
576576
// The jsrpc handler that receives aborts from the remote and triggers them locally
577+
//
578+
// TODO(cleanup): This class has been copied to external-pusher.c++. The copy here can be
579+
// deleted as soon as we've switched from StreamSink to ExternalPusher and can delete all the
580+
// StreamSink-related code. For now I'm not trying to avoid duplication.
577581
class AbortTriggerRpcServer final: public rpc::AbortTrigger::Server {
578582
public:
579583
AbortTriggerRpcServer(kj::Own<kj::PromiseFulfiller<void>> fulfiller,
@@ -858,15 +862,28 @@ void AbortSignal::serialize(jsg::Lock& js, jsg::Serializer& serializer) {
858862
return;
859863
}
860864

861-
auto streamCap = externalHandler
862-
->writeStream([&](rpc::JsValue::External::Builder builder) mutable {
863-
builder.setAbortTrigger();
864-
}).castAs<rpc::AbortTrigger>();
865+
auto triggerCap = [&]() -> rpc::AbortTrigger::Client {
866+
KJ_IF_SOME(pusher, externalHandler->getExternalPusher()) {
867+
auto pipeline = pusher.pushAbortSignalRequest(capnp::MessageSize{2, 0}).sendForPipeline();
868+
869+
externalHandler->write(
870+
[signal = pipeline.getSignal()](rpc::JsValue::External::Builder builder) mutable {
871+
builder.setAbortSignal(kj::mv(signal));
872+
});
873+
874+
return pipeline.getTrigger();
875+
} else {
876+
return externalHandler
877+
->writeStream([&](rpc::JsValue::External::Builder builder) mutable {
878+
builder.setAbortTrigger();
879+
}).castAs<rpc::AbortTrigger>();
880+
}
881+
}();
865882

866883
auto& ioContext = IoContext::current();
867884
// Keep track of every AbortSignal cloned from this one.
868885
// If this->triggerAbort(...) is called, each rpcClient will be informed.
869-
rpcClients.add(ioContext.addObject(kj::heap<AbortTriggerRpcClient>(kj::mv(streamCap))));
886+
rpcClients.add(ioContext.addObject(kj::heap<AbortTriggerRpcClient>(kj::mv(triggerCap))));
870887
}
871888

872889
jsg::Ref<AbortSignal> AbortSignal::deserialize(
@@ -890,20 +907,31 @@ jsg::Ref<AbortSignal> AbortSignal::deserialize(
890907
return js.alloc<AbortSignal>(/* exception */ kj::none, /* maybeReason */ kj::none, flag);
891908
}
892909

893-
auto reader = externalHandler->read();
894-
KJ_REQUIRE(reader.isAbortTrigger(), "external table slot type does't match serialization tag");
895-
896910
// The AbortSignalImpl will receive any remote triggerAbort requests and fulfill the promise with the reason for abort
897911

898912
auto signal = js.alloc<AbortSignal>(/* exception */ kj::none, /* maybeReason */ kj::none, flag);
899913

900-
auto paf = kj::newPromiseAndFulfiller<void>();
901-
auto pendingReason = IoContext::current().addObject(kj::refcounted<PendingReason>());
914+
auto& ioctx = IoContext::current();
915+
916+
auto reader = externalHandler->read();
917+
if (reader.isAbortTrigger()) {
918+
// Old-style StreamSink.
919+
// TODO(cleanup): Remove this once the ExternalPusher autogate has rolled out.
920+
auto paf = kj::newPromiseAndFulfiller<void>();
921+
auto pendingReason = ioctx.addObject(kj::refcounted<PendingReason>());
922+
923+
externalHandler->setLastStream(
924+
kj::heap<AbortTriggerRpcServer>(kj::mv(paf.fulfiller), kj::addRef(*pendingReason)));
925+
signal->rpcAbortPromise = ioctx.addObject(kj::heap(kj::mv(paf.promise)));
926+
signal->pendingReason = kj::mv(pendingReason);
927+
} else {
928+
KJ_REQUIRE(reader.isAbortSignal(), "external table slot type does't match serialization tag");
929+
930+
auto resolvedSignal = ioctx.getExternalPusher()->unwrapAbortSignal(reader.getAbortSignal());
902931

903-
externalHandler->setLastStream(
904-
kj::heap<AbortTriggerRpcServer>(kj::mv(paf.fulfiller), kj::addRef(*pendingReason)));
905-
signal->rpcAbortPromise = IoContext::current().addObject(kj::heap(kj::mv(paf.promise)));
906-
signal->pendingReason = kj::mv(pendingReason);
932+
signal->rpcAbortPromise = ioctx.addObject(kj::heap(kj::mv(resolvedSignal.signal)));
933+
signal->pendingReason = ioctx.addObject(kj::mv(resolvedSignal.reason));
934+
}
907935

908936
return signal;
909937
}

src/workerd/api/basics.h

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
// TODO(cleanup): Rename to events.h?
99

1010
#include <workerd/io/compatibility-date.capnp.h>
11+
#include <workerd/io/external-pusher.h>
1112
#include <workerd/io/io-own.h>
1213
#include <workerd/io/worker-interface.capnp.h>
1314
#include <workerd/jsg/jsg.h>
@@ -571,9 +572,7 @@ class AbortSignal final: public EventTarget {
571572
jsg::Optional<jsg::JsRef<jsg::JsValue>> maybeReason = kj::none,
572573
Flag flag = Flag::NONE);
573574

574-
using PendingReason = kj::RefcountedWrapper<
575-
kj::OneOf<kj::Array<kj::byte> /* v8Serialized */, kj::Exception /* if capability is dropped */
576-
>>;
575+
using PendingReason = ExternalPusherImpl::PendingAbortReason;
577576

578577
// The AbortSignal explicitly does not expose a constructor(). It is
579578
// illegal for user code to create an AbortSignal directly.

src/workerd/io/external-pusher.c++

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,4 +139,89 @@ kj::Own<kj::AsyncInputStream> ExternalPusherImpl::unwrapStream(
139139
"pushed byte stream has already been consumed");
140140
}
141141

142+
// =======================================================================================
143+
// AbortSignal handling
144+
145+
namespace {
146+
147+
// The jsrpc handler that receives aborts from the remote and triggers them locally
148+
//
149+
// TODO(cleanup): This class has been copied from external-pusher.c++. The copy there can be
150+
// deleted as soon as we've switched from StreamSink to ExternalPusher and can delete all the
151+
// StreamSink-related code. For now I'm not trying to avoid duplication.
152+
class AbortTriggerRpcServer final: public rpc::AbortTrigger::Server {
153+
public:
154+
AbortTriggerRpcServer(kj::Own<kj::PromiseFulfiller<void>> fulfiller,
155+
kj::Own<ExternalPusherImpl::PendingAbortReason>&& pendingReason)
156+
: fulfiller(kj::mv(fulfiller)),
157+
pendingReason(kj::mv(pendingReason)) {}
158+
159+
kj::Promise<void> abort(AbortContext abortCtx) override {
160+
auto params = abortCtx.getParams();
161+
auto reason = params.getReason().getV8Serialized();
162+
163+
pendingReason->getWrapped() = kj::heapArray(reason.asBytes());
164+
fulfiller->fulfill();
165+
return kj::READY_NOW;
166+
}
167+
168+
kj::Promise<void> release(ReleaseContext releaseCtx) override {
169+
released = true;
170+
return kj::READY_NOW;
171+
}
172+
173+
~AbortTriggerRpcServer() noexcept(false) {
174+
if (pendingReason->getWrapped() != nullptr) {
175+
// Already triggered
176+
return;
177+
}
178+
179+
if (!released) {
180+
pendingReason->getWrapped() = JSG_KJ_EXCEPTION(FAILED, DOMAbortError,
181+
"An AbortSignal received over RPC was implicitly aborted because the connection back to "
182+
"its trigger was lost.");
183+
}
184+
185+
// Always fulfill the promise in case the AbortSignal was waiting
186+
fulfiller->fulfill();
187+
}
188+
189+
private:
190+
kj::Own<kj::PromiseFulfiller<void>> fulfiller;
191+
kj::Own<ExternalPusherImpl::PendingAbortReason> pendingReason;
192+
bool released = false;
193+
};
194+
195+
} // namespace
196+
197+
class ExternalPusherImpl::AbortSignalImpl final: public ExternalPusher::AbortSignal::Server {
198+
public:
199+
AbortSignalImpl(AbortSignal content): content(kj::mv(content)) {}
200+
201+
kj::Maybe<AbortSignal> content;
202+
};
203+
204+
kj::Promise<void> ExternalPusherImpl::pushAbortSignal(PushAbortSignalContext context) {
205+
auto paf = kj::newPromiseAndFulfiller<void>();
206+
auto pendingReason = kj::refcounted<PendingAbortReason>();
207+
208+
auto results = context.initResults(capnp::MessageSize{4, 2});
209+
210+
results.setTrigger(
211+
kj::heap<AbortTriggerRpcServer>(kj::mv(paf.fulfiller), kj::addRef(*pendingReason)));
212+
results.setSignal(abortSignalSet.add(
213+
kj::heap<AbortSignalImpl>(AbortSignal{kj::mv(paf.promise), kj::mv(pendingReason)})));
214+
215+
return kj::READY_NOW;
216+
}
217+
218+
ExternalPusherImpl::AbortSignal ExternalPusherImpl::unwrapAbortSignal(
219+
ExternalPusher::AbortSignal::Client cap) {
220+
auto& unwrapped = KJ_REQUIRE_NONNULL(
221+
abortSignalSet.tryGetLocalServerSync(cap), "pushed external is not an AbortSignal");
222+
223+
return KJ_REQUIRE_NONNULL(kj::mv(kj::downcast<AbortSignalImpl>(unwrapped).content),
224+
"pushed AbortSignal has already been consumed");
225+
}
226+
142227
} // namespace workerd

src/workerd/io/external-pusher.h

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,32 @@ class ExternalPusherImpl: public rpc::JsValue::ExternalPusher::Server, public kj
2626

2727
kj::Own<kj::AsyncInputStream> unwrapStream(ExternalPusher::InputStream::Client cap);
2828

29+
// Box which holds the reason why an AbortSignal was aborted. May be either:
30+
// - A serialized V8 value if the signal was aborted from JavaScript.
31+
// - A KJ exception if the connection from the trigger was lost.
32+
using PendingAbortReason = kj::RefcountedWrapper<kj::OneOf<kj::Array<kj::byte>, kj::Exception>>;
33+
34+
struct AbortSignal {
35+
// Resolves when `reason` has been filled in.
36+
kj::Promise<void> signal;
37+
38+
// The abort reason box, will be uninitialized until `signal` resolves.
39+
kj::Own<PendingAbortReason> reason;
40+
};
41+
42+
AbortSignal unwrapAbortSignal(ExternalPusher::AbortSignal::Client cap);
43+
2944
kj::Promise<void> pushByteStream(PushByteStreamContext context) override;
45+
kj::Promise<void> pushAbortSignal(PushAbortSignalContext context) override;
3046

3147
private:
3248
capnp::ByteStreamFactory& byteStreamFactory;
3349

3450
capnp::CapabilityServerSet<ExternalPusher::InputStream> inputStreamSet;
51+
capnp::CapabilityServerSet<ExternalPusher::AbortSignal> abortSignalSet;
3552

3653
class InputStreamImpl;
54+
class AbortSignalImpl;
3755
};
3856

3957
} // namespace workerd

0 commit comments

Comments
 (0)