@@ -990,105 +990,147 @@ kj::Promise<WorkerInterface::CustomEvent::Result> TailStreamCustomEventImpl::sen
990990 }
991991}
992992
993- void TailStreamWriterState::reportImpl (tracing::TailEvent&& event) {
994- // In reportImpl, our inner state must be active.
995- auto & actives = KJ_ASSERT_NONNULL (inner.tryGet <kj::Array<kj::Own<Active>>>());
996-
997- // We only care about sessions that are currently active.
998- kj::Vector<kj::Own<Active>> alive (actives.size ());
999- for (auto & active: actives) {
1000- if (active->capability != kj::none) {
1001- alive.add (kj::mv (active));
993+ namespace {
994+ // The TailStreamWriterState holds the current client-side state for a collection
995+ // of streaming tail workers that a worker is reporting events to.
996+ struct TailStreamWriterState {
997+ // The initial state of our tail worker writer is that it is pending the first
998+ // onset event. During this time we will only have a collection of WorkerInterface
999+ // instances. When our first event is reported (the onset) we will arrange to acquire
1000+ // tailStream capabilities from each then use those to report the initial onset.
1001+ using Pending = kj::Array<kj::Own<WorkerInterface>>;
1002+
1003+ // Instances of Active are refcounted. The TailStreamWriterState itself
1004+ // holds the initial ref. Whenever events are being dispatched, an additional
1005+ // ref will be held by the outstanding pump promise in order to keep the
1006+ // client stub alive long enough for the rpc calls to complete. It is possible
1007+ // that the TailStreamWriterState will be dropped while pump promises are still
1008+ // pending.
1009+ struct Active : public kj ::Refcounted {
1010+ // Reference to keep the worker interface instance alive.
1011+ kj::Maybe<rpc::TailStreamTarget::Client> capability;
1012+ bool pumping = false ;
1013+ bool onsetSeen = false ;
1014+ workerd::util::Queue<tracing::TailEvent> queue;
1015+
1016+ Active (rpc::TailStreamTarget::Client capability): capability(kj::mv(capability)) {}
1017+ };
1018+
1019+ struct Closed {};
1020+
1021+ // The closing flag will be set when the Outcome event has been reported.
1022+ // Once closing is true, no further events will be accepted and the state
1023+ // will transition to closed once the currently active pump completes.
1024+ bool closing = false ;
1025+ kj::OneOf<Pending, kj::Array<kj::Own<Active>>, Closed> inner;
1026+ kj::TaskSet& waitUntilTasks;
1027+
1028+ TailStreamWriterState (Pending pending, kj::TaskSet& waitUntilTasks)
1029+ : inner(kj::mv(pending)),
1030+ waitUntilTasks (waitUntilTasks) {}
1031+ KJ_DISALLOW_COPY_AND_MOVE (TailStreamWriterState);
1032+
1033+ void reportImpl (tracing::TailEvent&& event) {
1034+ // In reportImpl, our inner state must be active.
1035+ auto & actives = KJ_ASSERT_NONNULL (inner.tryGet <kj::Array<kj::Own<Active>>>());
1036+
1037+ // We only care about sessions that are currently active.
1038+ kj::Vector<kj::Own<Active>> alive (actives.size ());
1039+ for (auto & active: actives) {
1040+ if (active->capability != kj::none) {
1041+ alive.add (kj::mv (active));
1042+ }
10021043 }
1003- }
10041044
1005- if (alive.size () == 0 ) {
1006- // Oh! We have no active sessions. Well, never mind then, let's
1007- // transition to a closed state and drop everything on the floor.
1008- inner = Closed{};
1045+ if (alive.size () == 0 ) {
1046+ // Oh! We have no active sessions. Well, never mind then, let's
1047+ // transition to a closed state and drop everything on the floor.
1048+ inner = Closed{};
10091049
1010- // Since we have no more living sessions (e.g. because all tail workers failed to return a valid
1011- // handler), mark the state as closing as we can't handle future events anyway.
1012- closing = true ;
1013- return ;
1014- }
1050+ // Since we have no more living sessions (e.g. because all tail workers failed to return a valid
1051+ // handler), mark the state as closing as we can't handle future events anyway.
1052+ closing = true ;
1053+ return ;
1054+ }
10151055
1016- // If we're already closing, no further events should be reported.
1017- if (closing) return ;
1018- if (event.event .is <tracing::Outcome>()) {
1019- closing = true ;
1020- }
1056+ // If we're already closing, no further events should be reported.
1057+ if (closing) return ;
1058+ if (event.event .is <tracing::Outcome>()) {
1059+ closing = true ;
1060+ }
10211061
1022- // Deliver the event to the queue and make sure we are processing.
1023- for (auto & active: alive) {
1024- active->queue .push (event.clone ());
1025- if (!active->pumping ) {
1026- waitUntilTasks.add (pump (kj::addRef (*active)));
1062+ // Deliver the event to the queue and make sure we are processing.
1063+ for (auto & active: alive) {
1064+ active->queue .push (event.clone ());
1065+ if (!active->pumping ) {
1066+ waitUntilTasks.add (pump (kj::addRef (*active)));
1067+ }
10271068 }
1028- }
10291069
1030- inner = alive.releaseAsArray ();
1031- }
1070+ inner = alive.releaseAsArray ();
1071+ }
10321072
1033- // Delivers the queued tail events to a streaming tail worker.
1034- kj::Promise<void > TailStreamWriterState::pump (kj::Own<Active> current) {
1035- current->pumping = true ;
1036- KJ_DEFER (current->pumping = false );
1037-
1038- if (!current->onsetSeen ) {
1039- // Our first event... yay! Our first job here will be to dispatch
1040- // the onset event to the tail worker. If the tail worker wishes
1041- // to handle the remaining events in the stream, then it will return
1042- // a new capability to which those would be reported. This is done
1043- // via the "result.getPipeline()" API below. If hasPipeline()
1044- // returns false then that means the tail worker did not return
1045- // a handler for this stream and no further attempts to deliver
1046- // events should be made for this stream.
1047- current->onsetSeen = true ;
1048- auto onsetEvent = KJ_ASSERT_NONNULL (current->queue .pop ());
1049- auto builder = KJ_ASSERT_NONNULL (current->capability ).reportRequest ();
1050- auto eventsBuilder = builder.initEvents (1 );
1051- // When sending the onset event to the tail worker, the receiving end
1052- // requires that the onset event be delivered separately, without any
1053- // other events in the bundle. So here we'll separate it out and deliver
1054- // just the one event...
1055- onsetEvent.copyTo (eventsBuilder[0 ]);
1056- auto result = co_await builder.send ();
1057- if (result.getStop ()) {
1058- // If our call to send returns a stop signal, then we'll clear
1059- // the capability and be done.
1060- current->queue .clear ();
1061- current->capability = kj::none;
1062- co_return ;
1073+ // Delivers the queued tail events to a streaming tail worker.
1074+ kj::Promise<void > pump (kj::Own<Active> current) {
1075+ current->pumping = true ;
1076+ KJ_DEFER (current->pumping = false );
1077+
1078+ if (!current->onsetSeen ) {
1079+ // Our first event... yay! Our first job here will be to dispatch
1080+ // the onset event to the tail worker. If the tail worker wishes
1081+ // to handle the remaining events in the stream, then it will return
1082+ // a new capability to which those would be reported. This is done
1083+ // via the "result.getPipeline()" API below. If hasPipeline()
1084+ // returns false then that means the tail worker did not return
1085+ // a handler for this stream and no further attempts to deliver
1086+ // events should be made for this stream.
1087+ current->onsetSeen = true ;
1088+ auto onsetEvent = KJ_ASSERT_NONNULL (current->queue .pop ());
1089+ auto builder = KJ_ASSERT_NONNULL (current->capability ).reportRequest ();
1090+ auto eventsBuilder = builder.initEvents (1 );
1091+ // When sending the onset event to the tail worker, the receiving end
1092+ // requires that the onset event be delivered separately, without any
1093+ // other events in the bundle. So here we'll separate it out and deliver
1094+ // just the one event...
1095+ onsetEvent.copyTo (eventsBuilder[0 ]);
1096+ auto result = co_await builder.send ();
1097+ if (result.getStop ()) {
1098+ // If our call to send returns a stop signal, then we'll clear
1099+ // the capability and be done.
1100+ current->queue .clear ();
1101+ current->capability = kj::none;
1102+ co_return ;
1103+ }
10631104 }
1064- }
10651105
1066- // If we got this far then we have a handler for all of our events.
1067- // Deliver remaining streaming tail events in batches if possible.
1068- while (!current->queue .empty ()) {
1069- auto builder = KJ_ASSERT_NONNULL (current->capability ).reportRequest ();
1070- auto eventsBuilder = builder.initEvents (current->queue .size ());
1071- size_t n = 0 ;
1072- current->queue .drainTo ([&](tracing::TailEvent&& event) { event.copyTo (eventsBuilder[n++]); });
1073-
1074- auto result = co_await builder.send ();
1075-
1076- // Note that although we cleared the current.queue above, it is
1077- // possible/likely that additional events were added to the queue
1078- // while the above builder.send() was being awaited. If the result
1079- // comes back indicating that we should stop, then we'll stop here
1080- // without any further processing. We'll defensively clear the
1081- // queue again and drop the client stub. Otherwise, if result.getStop()
1082- // is false, we'll loop back around to send any items that have since
1083- // been added to the queue or exit this loop if there are no additional
1084- // events waiting to be sent.
1085- if (result.getStop ()) {
1086- current->queue .clear ();
1087- current->capability = kj::none;
1088- co_return ;
1106+ // If we got this far then we have a handler for all of our events.
1107+ // Deliver remaining streaming tail events in batches if possible.
1108+ while (!current->queue .empty ()) {
1109+ auto builder = KJ_ASSERT_NONNULL (current->capability ).reportRequest ();
1110+ auto eventsBuilder = builder.initEvents (current->queue .size ());
1111+ size_t n = 0 ;
1112+ current->queue .drainTo ([&](tracing::TailEvent&& event) { event.copyTo (eventsBuilder[n++]); });
1113+
1114+ auto result = co_await builder.send ();
1115+
1116+ // Note that although we cleared the current.queue above, it is
1117+ // possible/likely that additional events were added to the queue
1118+ // while the above builder.send() was being awaited. If the result
1119+ // comes back indicating that we should stop, then we'll stop here
1120+ // without any further processing. We'll defensively clear the
1121+ // queue again and drop the client stub. Otherwise, if result.getStop()
1122+ // is false, we'll loop back around to send any items that have since
1123+ // been added to the queue or exit this loop if there are no additional
1124+ // events waiting to be sent.
1125+ if (result.getStop ()) {
1126+ current->queue .clear ();
1127+ current->capability = kj::none;
1128+ co_return ;
1129+ }
10891130 }
10901131 }
1091- }
1132+ };
1133+ } // namespace
10921134
10931135// If we are using streaming tail workers, initialize the mechanism that will deliver events
10941136// to that collection of tail workers.
0 commit comments