Skip to content

Commit c7a8649

Browse files
committed
Implement AbortSignal serialization via ExternalPusher.
1 parent f7efc20 commit c7a8649

File tree

4 files changed

+157
-17
lines changed

4 files changed

+157
-17
lines changed

src/workerd/api/basics.c++

Lines changed: 52 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -574,6 +574,10 @@ class AbortTriggerRpcClient final {
574574

575575
namespace {
576576
// The jsrpc handler that receives aborts from the remote and triggers them locally
577+
//
578+
// TODO(cleanup): This class has been copied to external-pusher.c++. The copy here can be
579+
// deleted as soon as we've switched from StreamSink to ExternalPusher and can delete all the
580+
// StreamSink-related code. For now I'm not trying to avoid duplication.
577581
class AbortTriggerRpcServer final: public rpc::AbortTrigger::Server {
578582
public:
579583
AbortTriggerRpcServer(kj::Own<kj::PromiseFulfiller<void>> fulfiller,
@@ -858,15 +862,28 @@ void AbortSignal::serialize(jsg::Lock& js, jsg::Serializer& serializer) {
858862
return;
859863
}
860864

861-
auto streamCap = externalHandler
862-
->writeStream([&](rpc::JsValue::External::Builder builder) mutable {
863-
builder.setAbortTrigger();
864-
}).castAs<rpc::AbortTrigger>();
865+
auto triggerCap = [&]() -> rpc::AbortTrigger::Client {
866+
KJ_IF_SOME(pusher, externalHandler->getExternalPusher()) {
867+
auto pipeline = pusher.pushAbortSignalRequest(capnp::MessageSize{2, 0}).sendForPipeline();
868+
869+
externalHandler->write(
870+
[signal = pipeline.getSignal()](rpc::JsValue::External::Builder builder) mutable {
871+
builder.setAbortSignal(kj::mv(signal));
872+
});
873+
874+
return pipeline.getTrigger();
875+
} else {
876+
return externalHandler
877+
->writeStream([&](rpc::JsValue::External::Builder builder) mutable {
878+
builder.setAbortTrigger();
879+
}).castAs<rpc::AbortTrigger>();
880+
}
881+
}();
865882

866883
auto& ioContext = IoContext::current();
867884
// Keep track of every AbortSignal cloned from this one.
868885
// If this->triggerAbort(...) is called, each rpcClient will be informed.
869-
rpcClients.add(ioContext.addObject(kj::heap<AbortTriggerRpcClient>(kj::mv(streamCap))));
886+
rpcClients.add(ioContext.addObject(kj::heap<AbortTriggerRpcClient>(kj::mv(triggerCap))));
870887
}
871888

872889
jsg::Ref<AbortSignal> AbortSignal::deserialize(
@@ -890,20 +907,41 @@ jsg::Ref<AbortSignal> AbortSignal::deserialize(
890907
return js.alloc<AbortSignal>(/* exception */ kj::none, /* maybeReason */ kj::none, flag);
891908
}
892909

893-
auto reader = externalHandler->read();
894-
KJ_REQUIRE(reader.isAbortTrigger(), "external table slot type does't match serialization tag");
895-
896910
// The AbortSignalImpl will receive any remote triggerAbort requests and fulfill the promise with the reason for abort
897911

898912
auto signal = js.alloc<AbortSignal>(/* exception */ kj::none, /* maybeReason */ kj::none, flag);
899913

900-
auto paf = kj::newPromiseAndFulfiller<void>();
901-
auto pendingReason = IoContext::current().addObject(kj::refcounted<PendingReason>());
914+
auto& ioctx = IoContext::current();
902915

903-
externalHandler->setLastStream(
904-
kj::heap<AbortTriggerRpcServer>(kj::mv(paf.fulfiller), kj::addRef(*pendingReason)));
905-
signal->rpcAbortPromise = IoContext::current().addObject(kj::heap(kj::mv(paf.promise)));
906-
signal->pendingReason = kj::mv(pendingReason);
916+
auto reader = externalHandler->read();
917+
if (reader.isAbortTrigger()) {
918+
// Old-style StreamSink.
919+
// TODO(cleanup): Remove this once the ExternalPusher autogate has rolled out.
920+
auto paf = kj::newPromiseAndFulfiller<void>();
921+
auto pendingReason = ioctx.addObject(kj::refcounted<PendingReason>());
922+
923+
externalHandler->setLastStream(
924+
kj::heap<AbortTriggerRpcServer>(kj::mv(paf.fulfiller), kj::addRef(*pendingReason)));
925+
signal->rpcAbortPromise = ioctx.addObject(kj::heap(kj::mv(paf.promise)));
926+
signal->pendingReason = kj::mv(pendingReason);
927+
} else {
928+
KJ_REQUIRE(reader.isAbortSignal(), "external table slot type does't match serialization tag");
929+
930+
// TODO(now): This is very ugly, but it'll get better in a later commit in this PR when we
931+
// make ExternalPusher resolve all promises before delivery.
932+
auto promise =
933+
ioctx.getExternalPusher()
934+
->unwrapAbortSignal(reader.getAbortSignal())
935+
.then([&ioctx, &signal = *signal](ExternalPusherImpl::AbortSignal resolvedSignal) {
936+
return ioctx.run(
937+
[&ioctx, &signal, resolvedSignal = kj::mv(resolvedSignal)](auto& lock) mutable {
938+
signal.pendingReason = ioctx.addObject(kj::mv(resolvedSignal.reason));
939+
return kj::mv(resolvedSignal.signal);
940+
});
941+
});
942+
943+
signal->rpcAbortPromise = ioctx.addObject(kj::heap(kj::mv(promise)));
944+
}
907945

908946
return signal;
909947
}

src/workerd/api/basics.h

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
// TODO(cleanup): Rename to events.h?
99

1010
#include <workerd/io/compatibility-date.capnp.h>
11+
#include <workerd/io/external-pusher.h>
1112
#include <workerd/io/io-own.h>
1213
#include <workerd/io/worker-interface.capnp.h>
1314
#include <workerd/jsg/jsg.h>
@@ -571,9 +572,7 @@ class AbortSignal final: public EventTarget {
571572
jsg::Optional<jsg::JsRef<jsg::JsValue>> maybeReason = kj::none,
572573
Flag flag = Flag::NONE);
573574

574-
using PendingReason = kj::RefcountedWrapper<
575-
kj::OneOf<kj::Array<kj::byte> /* v8Serialized */, kj::Exception /* if capability is dropped */
576-
>>;
575+
using PendingReason = ExternalPusherImpl::PendingAbortReason;
577576

578577
// The AbortSignal explicitly does not expose a constructor(). It is
579578
// illegal for user code to create an AbortSignal directly.

src/workerd/io/external-pusher.c++

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,4 +139,89 @@ kj::Promise<kj::Own<kj::AsyncInputStream>> ExternalPusherImpl::unwrapStream(
139139
"pushed byte stream has already been consumed");
140140
}
141141

142+
// =======================================================================================
143+
// AbortSignal handling
144+
145+
namespace {
146+
147+
// The jsrpc handler that receives aborts from the remote and triggers them locally
148+
//
149+
// TODO(cleanup): This class has been copied from external-pusher.c++. The copy there can be
150+
// deleted as soon as we've switched from StreamSink to ExternalPusher and can delete all the
151+
// StreamSink-related code. For now I'm not trying to avoid duplication.
152+
class AbortTriggerRpcServer final: public rpc::AbortTrigger::Server {
153+
public:
154+
AbortTriggerRpcServer(kj::Own<kj::PromiseFulfiller<void>> fulfiller,
155+
kj::Own<ExternalPusherImpl::PendingAbortReason>&& pendingReason)
156+
: fulfiller(kj::mv(fulfiller)),
157+
pendingReason(kj::mv(pendingReason)) {}
158+
159+
kj::Promise<void> abort(AbortContext abortCtx) override {
160+
auto params = abortCtx.getParams();
161+
auto reason = params.getReason().getV8Serialized();
162+
163+
pendingReason->getWrapped() = kj::heapArray(reason.asBytes());
164+
fulfiller->fulfill();
165+
return kj::READY_NOW;
166+
}
167+
168+
kj::Promise<void> release(ReleaseContext releaseCtx) override {
169+
released = true;
170+
return kj::READY_NOW;
171+
}
172+
173+
~AbortTriggerRpcServer() noexcept(false) {
174+
if (pendingReason->getWrapped() != nullptr) {
175+
// Already triggered
176+
return;
177+
}
178+
179+
if (!released) {
180+
pendingReason->getWrapped() = JSG_KJ_EXCEPTION(FAILED, DOMAbortError,
181+
"An AbortSignal received over RPC was implicitly aborted because the connection back to "
182+
"its trigger was lost.");
183+
}
184+
185+
// Always fulfill the promise in case the AbortSignal was waiting
186+
fulfiller->fulfill();
187+
}
188+
189+
private:
190+
kj::Own<kj::PromiseFulfiller<void>> fulfiller;
191+
kj::Own<ExternalPusherImpl::PendingAbortReason> pendingReason;
192+
bool released = false;
193+
};
194+
195+
} // namespace
196+
197+
class ExternalPusherImpl::AbortSignalImpl final: public ExternalPusher::AbortSignal::Server {
198+
public:
199+
AbortSignalImpl(AbortSignal content): content(kj::mv(content)) {}
200+
201+
kj::Maybe<AbortSignal> content;
202+
};
203+
204+
kj::Promise<void> ExternalPusherImpl::pushAbortSignal(PushAbortSignalContext context) {
205+
auto paf = kj::newPromiseAndFulfiller<void>();
206+
auto pendingReason = kj::refcounted<PendingAbortReason>();
207+
208+
auto results = context.initResults(capnp::MessageSize{4, 2});
209+
210+
results.setTrigger(
211+
kj::heap<AbortTriggerRpcServer>(kj::mv(paf.fulfiller), kj::addRef(*pendingReason)));
212+
results.setSignal(abortSignalSet.add(
213+
kj::heap<AbortSignalImpl>(AbortSignal{kj::mv(paf.promise), kj::mv(pendingReason)})));
214+
215+
return kj::READY_NOW;
216+
}
217+
218+
kj::Promise<ExternalPusherImpl::AbortSignal> ExternalPusherImpl::unwrapAbortSignal(
219+
ExternalPusher::AbortSignal::Client cap) {
220+
auto& unwrapped = KJ_REQUIRE_NONNULL(
221+
co_await abortSignalSet.getLocalServer(cap), "pushed external is not an AbortSignal");
222+
223+
co_return KJ_REQUIRE_NONNULL(kj::mv(kj::downcast<AbortSignalImpl>(unwrapped).content),
224+
"pushed AbortSignal has already been consumed");
225+
}
226+
142227
} // namespace workerd

src/workerd/io/external-pusher.h

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,33 @@ class ExternalPusherImpl: public rpc::JsValue::ExternalPusher::Server, public kj
2626

2727
kj::Promise<kj::Own<kj::AsyncInputStream>> unwrapStream(ExternalPusher::InputStream::Client cap);
2828

29+
// Box which holds the reason why an AbortSignal was aborted. May be either:
30+
// - A serialized V8 value if the signal was aborted from JavaScript.
31+
// - A KJ exception if the connection from the trigger was lost.
32+
using PendingAbortReason = kj::RefcountedWrapper<kj::OneOf<kj::Array<kj::byte>, kj::Exception>>;
33+
34+
struct AbortSignal {
35+
// Resolves when `reason` has been filled in.
36+
kj::Promise<void> signal;
37+
38+
// The abort reason box, will be uninitialized until `signal` resolves.
39+
kj::Own<PendingAbortReason> reason;
40+
};
41+
42+
kj::Promise<AbortSignal> unwrapAbortSignal(ExternalPusher::AbortSignal::Client cap);
43+
2944
protected:
3045
kj::Promise<void> pushByteStream(PushByteStreamContext context) override;
46+
kj::Promise<void> pushAbortSignal(PushAbortSignalContext context) override;
3147

3248
private:
3349
capnp::ByteStreamFactory& byteStreamFactory;
3450

3551
capnp::CapabilityServerSet<ExternalPusher::InputStream> inputStreamSet;
52+
capnp::CapabilityServerSet<ExternalPusher::AbortSignal> abortSignalSet;
3653

3754
class InputStreamImpl;
55+
class AbortSignalImpl;
3856
};
3957

4058
} // namespace workerd

0 commit comments

Comments
 (0)