|
12 | 12 | namespace workerd::tracing { |
13 | 13 | namespace { |
14 | 14 |
|
15 | | -// Uniquely identifies js tail session failures |
16 | | -constexpr kj::Exception::DetailTypeId TAIL_STREAM_JS_FAILURE = 0xcde53d65a46183f7; |
17 | | - |
18 | 15 | #define STRS(V) \ |
19 | 16 | V(ALARM, "alarm") \ |
20 | 17 | V(ATTRIBUTES, "attributes") \ |
@@ -606,10 +603,9 @@ class TailStreamTarget final: public rpc::TailStreamTarget::Server { |
606 | 603 |
|
607 | 604 | auto ownReportContext = capnp::CallContextHook::from(reportContext).addRef(); |
608 | 605 |
|
609 | | - auto promise = |
610 | | - ioContext |
611 | | - .run([this, &ioContext, reportContext, ownReportContext = kj::mv(ownReportContext)]( |
612 | | - Worker::Lock& lock) mutable -> kj::Promise<void> { |
| 606 | + auto promise = ioContext.run( |
| 607 | + [this, &ioContext, reportContext, ownReportContext = kj::mv(ownReportContext)]( |
| 608 | + Worker::Lock& lock) mutable -> kj::Promise<void> { |
613 | 609 | auto params = reportContext.getParams(); |
614 | 610 | KJ_ASSERT(params.hasEvents(), "Events are required."); |
615 | 611 | auto eventReaders = params.getEvents(); |
@@ -638,19 +634,26 @@ class TailStreamTarget final: public rpc::TailStreamTarget::Server { |
638 | 634 | } else { |
639 | 635 | return kj::mv(result); |
640 | 636 | } |
641 | | - }).catch_([](kj::Exception&& e) { |
| 637 | + }); |
| 638 | + |
| 639 | + auto paf = kj::newPromiseAndFulfiller<void>(); |
| 640 | + promise = promise.then([&fulfiller = *paf.fulfiller]() { fulfiller.fulfill(); }, |
| 641 | + [&, &fulfiller = *paf.fulfiller](kj::Exception&& e) { |
| 642 | + // This is the top level exception catcher for tail events being delivered. We do not want to |
| 643 | + // propagate JS exceptions to the client side here, all exceptions should stay within this |
| 644 | + // customEvent. Instead, we propagate the exception to the doneFulfiller, where it is used to |
| 645 | + // set the right outcome code and re-thrown if appropriate. By rejecting the doneFulfiller, we |
| 646 | + // also ensure that no more tail events get delivered. |
642 | 647 | if (jsg::isTunneledException(e.getDescription())) { |
643 | 648 | auto description = jsg::stripRemoteExceptionPrefix(e.getDescription()); |
644 | 649 | if (!description.startsWith("remote.")) { |
645 | 650 | e.setDescription(kj::str("remote.", description)); |
646 | 651 | } |
647 | 652 | } |
648 | | - kj::throwFatalException(kj::mv(e)); |
| 653 | + // We still fulfill this fulfiller to disarm the cancellation check below |
| 654 | + fulfiller.fulfill(); |
| 655 | + doneFulfiller->reject(kj::mv(e)); |
649 | 656 | }); |
650 | | - |
651 | | - auto paf = kj::newPromiseAndFulfiller<void>(); |
652 | | - promise = promise.then([&fulfiller = *paf.fulfiller]() { fulfiller.fulfill(); }, |
653 | | - [&fulfiller = *paf.fulfiller](kj::Exception&& e) { fulfiller.reject(kj::mv(e)); }); |
654 | 657 | promise = promise.attach(kj::defer([fulfiller = kj::mv(paf.fulfiller)]() mutable { |
655 | 658 | if (fulfiller->isWaiting()) { |
656 | 659 | fulfiller->reject(JSG_KJ_EXCEPTION(FAILED, Error, |
@@ -764,10 +767,11 @@ class TailStreamTarget final: public rpc::TailStreamTarget::Server { |
764 | 767 | results->get().setStop(true); |
765 | 768 | doneFulfiller->fulfill(); |
766 | 769 | }), |
767 | | - ioContext.addFunctor([this, results = sharedResults.addRef()]( |
768 | | - jsg::Lock& js, jsg::Value&& error) mutable { |
| 770 | + ioContext.addFunctor( |
| 771 | + [results = sharedResults.addRef()](jsg::Lock& js, jsg::Value&& error) mutable { |
| 772 | + // Received a JS error. Do not reject doneFulfiller yet, this will be handled when we catch |
| 773 | + // the exception later. |
769 | 774 | results->get().setStop(true); |
770 | | - doneFulfiller->fulfill(); |
771 | 775 | js.throwException(kj::mv(error)); |
772 | 776 | }))); |
773 | 777 | } catch (...) { |
@@ -860,15 +864,11 @@ class TailStreamTarget final: public rpc::TailStreamTarget::Server { |
860 | 864 | // the doneFulfiller afterwards, indicating that TailStreamTarget has received all events over |
861 | 865 | // the stream and has done all its work, that the stream self-evidently did not get canceled |
862 | 866 | // prematurely. This applies even if promises were rejected. |
| 867 | + // No need to catch exceptions here: They will be handled in report() alongside exceptions |
| 868 | + // from the onset event etc. JSG knows how JS exceptions look like, so we don't need an |
| 869 | + // identifier for them. |
863 | 870 | if (doFulfill) { |
864 | | - p = p.then(js, [&](jsg::Lock& js) { doneFulfiller->fulfill(); }, |
865 | | - [&](jsg::Lock& js, jsg::Value&& value) { |
866 | | - // Convert the JS exception to a KJ exception, preserving all details |
867 | | - kj::Exception exception = js.exceptionToKj(kj::mv(value)); |
868 | | - // Mark this as a tail stream failure for proper classification |
869 | | - exception.setDetail(TAIL_STREAM_JS_FAILURE, kj::heapArray<kj::byte>(0)); |
870 | | - doneFulfiller->reject(kj::mv(exception)); |
871 | | - }); |
| 871 | + p = p.then(js, [&](jsg::Lock& js) { doneFulfiller->fulfill(); }); |
872 | 872 | } |
873 | 873 | return ioContext.awaitJs(js, kj::mv(p)); |
874 | 874 | } |
@@ -916,8 +916,12 @@ kj::Promise<WorkerInterface::CustomEvent::Result> TailStreamCustomEventImpl::run |
916 | 916 |
|
917 | 917 | auto eventOutcome = co_await donePromise.exclusiveJoin(ioContext.onAbort()).then([&]() { |
918 | 918 | return ioContext.waitUntilStatus(); |
919 | | - }, [](kj::Exception&& e) { |
920 | | - if (e.getDetail(TAIL_STREAM_JS_FAILURE) != kj::none) { |
| 919 | + }, [&incomingRequest](kj::Exception&& e) { |
| 920 | + // If we have a JSG exception, just set the appropriate return code – this will already have |
| 921 | + // been logged and we do not need to treat it like a KJ exception. Otherwise, re-throw the |
| 922 | + // exception. |
| 923 | + if (jsg::isTunneledException(e.getDescription())) { |
| 924 | + incomingRequest->getMetrics().reportFailure(e); |
921 | 925 | return EventOutcome::EXCEPTION; |
922 | 926 | } |
923 | 927 | kj::throwRecoverableException(kj::mv(e)); |
|
0 commit comments