@@ -598,10 +598,13 @@ class TailStreamTarget final: public rpc::TailStreamTarget::Server {
598598 ioContext.getLimitEnforcer ().topUpActor ();
599599
600600 auto ownReportContext = capnp::CallContextHook::from (reportContext).addRef ();
601+ // We need to be able to access the results builder from both the promise below and its
602+ // exception handler.
603+ auto sharedResults = kj::rc<SharedResults>(reportContext.initResults ());
601604
602- auto promise = ioContext.run (
603- [ this , &ioContext, reportContext, ownReportContext = kj::mv ( ownReportContext)](
604- Worker::Lock& lock) mutable -> kj::Promise<void > {
605+ auto promise = ioContext.run ([ this , &ioContext, sharedResults = sharedResults. addRef (),
606+ reportContext, ownReportContext = ownReportContext-> addRef ( )](
607+ Worker::Lock& lock) mutable -> kj::Promise<void > {
605608 auto params = reportContext.getParams ();
606609 KJ_ASSERT (params.hasEvents (), " Events are required." );
607610 auto eventReaders = params.getEvents ();
@@ -616,10 +619,9 @@ class TailStreamTarget final: public rpc::TailStreamTarget::Server {
616619 auto result = ([&]() -> kj::Promise<void > {
617620 KJ_IF_SOME (handler, maybeHandler) {
618621 auto h = handler.getHandle (lock);
619- return handleEvents (
620- lock, h, ioContext, events.releaseAsArray (), reportContext.initResults ());
622+ return handleEvents (lock, h, ioContext, events.releaseAsArray (), kj::mv (sharedResults));
621623 } else {
622- return handleOnset (lock, ioContext, events.releaseAsArray (), reportContext. initResults ( ));
624+ return handleOnset (lock, ioContext, events.releaseAsArray (), kj::mv (sharedResults ));
623625 }
624626 })();
625627
@@ -634,7 +636,8 @@ class TailStreamTarget final: public rpc::TailStreamTarget::Server {
634636
635637 auto paf = kj::newPromiseAndFulfiller<void >();
636638 promise = promise.then ([&fulfiller = *paf.fulfiller ]() { fulfiller.fulfill (); },
637- [&, &fulfiller = *paf.fulfiller ](kj::Exception&& e) {
639+ [&, &fulfiller = *paf.fulfiller , ownReportContext = kj::mv (ownReportContext),
640+ results = kj::mv (sharedResults)](kj::Exception&& e) mutable {
638641 // This is the top level exception catcher for tail events being delivered. We do not want to
639642 // propagate JS exceptions to the client side here, all exceptions should stay within this
640643 // customEvent. Instead, we propagate the exception to the doneFulfiller, where it is used to
@@ -648,6 +651,7 @@ class TailStreamTarget final: public rpc::TailStreamTarget::Server {
648651 }
649652 // We still fulfill this fulfiller to disarm the cancellation check below
650653 fulfiller.fulfill ();
654+ results->setStop (true );
651655 doneReceiving = true ;
652656 doneFulfiller->reject (kj::mv (e));
653657 });
@@ -664,6 +668,12 @@ class TailStreamTarget final: public rpc::TailStreamTarget::Server {
664668 }
665669
666670 private:
671+ // Used to share the results builder (and send the stop signal) from both the main code path and
672+ // the exception handler.
673+ struct SharedResults : public kj ::Refcounted, rpc::TailStreamTarget::TailStreamResults::Builder {
674+ SharedResults (rpc::TailStreamTarget::TailStreamResults::Builder results)
675+ : rpc::TailStreamTarget::TailStreamResults::Builder(kj::mv(results)) {}
676+ };
667677 // Handles the very first (onset) event in the tail stream. This will cause
668678 // the exported tailStream handler to be called, passing the onset event
669679 // as the initial argument. If the tail stream wishes to continue receiving
@@ -673,7 +683,7 @@ class TailStreamTarget final: public rpc::TailStreamTarget::Server {
673683 kj::Promise<void > handleOnset (Worker::Lock& lock,
674684 IoContext& ioContext,
675685 kj::Array<tracing::TailEvent> events,
676- rpc::TailStreamTarget::TailStreamResults::Builder results) {
686+ kj::Rc<SharedResults> results) {
677687 // There should be only a single onset event in this batch.
678688 KJ_ASSERT (events.size () == 1 && events[0 ].event .is <tracing::Onset>(),
679689 " Expected only a single onset event" );
@@ -694,7 +704,7 @@ class TailStreamTarget final: public rpc::TailStreamTarget::Server {
694704 if (!maybeFn->IsFunction ()) {
695705 ioContext.logWarningOnce (" A worker configured to act as a streaming tail worker does "
696706 " not export a tailStream() handler." );
697- results. setStop (true );
707+ results-> setStop (true );
698708 doneReceiving = true ;
699709 doneFulfiller->fulfill ();
700710 return kj::READY_NOW;
@@ -717,24 +727,12 @@ class TailStreamTarget final: public rpc::TailStreamTarget::Server {
717727 auto result =
718728 jsg::check (fn->Call (js.v8Context (), target, handlerArgs.size (), handlerArgs.data ()));
719729
720- // We need to be able to access the results builder from both the
721- // success and failure branches of the promise we set up below.
722- struct SharedResults : public kj ::Refcounted {
723- rpc::TailStreamTarget::TailStreamResults::Builder results;
724- rpc::TailStreamTarget::TailStreamResults::Builder& get () {
725- return results;
726- }
727- SharedResults (rpc::TailStreamTarget::TailStreamResults::Builder results)
728- : results(kj::mv(results)) {}
729- };
730- auto sharedResults = kj::rc<SharedResults>(kj::mv (results));
731-
732730 // The handler can return a function, an object, undefined, or a promise
733731 // for any of these. We will convert the result to a promise for consistent
734732 // handling...
735733 return ioContext.awaitJs (js,
736734 js.toPromise (result).then (js,
737- ioContext.addFunctor ([this , results = sharedResults .addRef (), &ioContext](
735+ ioContext.addFunctor ([this , results = results .addRef (), &ioContext](
738736 jsg::Lock& js, jsg::Value value) mutable {
739737 // The value here can be one of a function, an object, or undefined.
740738 // Any value other than these will result in a warning but will otherwise
@@ -762,22 +760,22 @@ class TailStreamTarget final: public rpc::TailStreamTarget::Server {
762760 }
763761 // And finally, we'll stop the stream since the tail worker did not return
764762 // a handler for us to continue with.
765- results->get (). setStop (true );
763+ results->setStop (true );
766764 doneReceiving = true ;
767765 doneFulfiller->fulfill ();
768766 }),
769767 ioContext.addFunctor (
770- [&, results = sharedResults .addRef ()](jsg::Lock& js, jsg::Value&& error) mutable {
768+ [&, results = results .addRef ()](jsg::Lock& js, jsg::Value&& error) mutable {
771769 // Received a JS error. Do not reject doneFulfiller yet, this will be handled when we catch
772770 // the exception later.
773- results->get (). setStop (true );
771+ results->setStop (true );
774772 doneReceiving = true ;
775773 js.throwException (kj::mv (error));
776774 })));
777775 } catch (...) {
778776 ioContext.logWarningOnce (" A worker configured to act as a streaming tail worker did "
779777 " not return a valid tailStream() handler." );
780- results. setStop (true );
778+ results-> setStop (true );
781779 doneReceiving = true ;
782780 doneFulfiller->fulfill ();
783781 return kj::READY_NOW;
@@ -789,7 +787,7 @@ class TailStreamTarget final: public rpc::TailStreamTarget::Server {
789787 const jsg::JsValue& handler,
790788 IoContext& ioContext,
791789 kj::Array<tracing::TailEvent> events,
792- rpc::TailStreamTarget::TailStreamResults::Builder results) {
790+ kj::Rc<SharedResults> results) {
793791 jsg::Lock& js = lock;
794792
795793 // Should not ever happen but let's handle it anyway.
@@ -816,7 +814,7 @@ class TailStreamTarget final: public rpc::TailStreamTarget::Server {
816814 if (finishing) break ;
817815 if (event.event .is <tracing::Outcome>()) {
818816 finishing = true ;
819- results. setStop (true );
817+ results-> setStop (true );
820818 doneReceiving = true ;
821819 // We set doFulfill to indicate that the outcome event has been received via RPC and no more
822820 // events are expected.
0 commit comments