Skip to content

Commit 1adf997

Browse files
authored
Merge pull request #5377 from cloudflare/felix/102025-stw-cleanup
[nfc] STW refactor and cleanup
2 parents 8e31352 + 9d571e2 commit 1adf997

File tree

8 files changed

+530
-553
lines changed

8 files changed

+530
-553
lines changed

src/workerd/io/trace-stream.c++

Lines changed: 199 additions & 160 deletions
Large diffs are not rendered by default.

src/workerd/io/trace-stream.h

Lines changed: 0 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -53,51 +53,6 @@ class TailStreamCustomEventImpl final: public WorkerInterface::CustomEvent {
5353
uint16_t typeId;
5454
};
5555

56-
// The TailStreamWriterState holds the current client-side state for a collection
57-
// of streaming tail workers that a worker is reporting events to.
58-
struct TailStreamWriterState {
59-
// The initial state of our tail worker writer is that it is pending the first
60-
// onset event. During this time we will only have a collection of WorkerInterface
61-
// instances. When our first event is reported (the onset) we will arrange to acquire
62-
// tailStream capabilities from each then use those to report the initial onset.
63-
using Pending = kj::Array<kj::Own<WorkerInterface>>;
64-
65-
// Instances of Active are refcounted. The TailStreamWriterState itself
66-
// holds the initial ref. Whenever events are being dispatched, an additional
67-
// ref will be held by the outstanding pump promise in order to keep the
68-
// client stub alive long enough for the rpc calls to complete. It is possible
69-
// that the TailStreamWriterState will be dropped while pump promises are still
70-
// pending.
71-
struct Active: public kj::Refcounted {
72-
// Reference to keep the worker interface instance alive.
73-
kj::Maybe<rpc::TailStreamTarget::Client> capability;
74-
bool pumping = false;
75-
bool onsetSeen = false;
76-
workerd::util::Queue<tracing::TailEvent> queue;
77-
78-
Active(rpc::TailStreamTarget::Client capability): capability(kj::mv(capability)) {}
79-
};
80-
81-
struct Closed {};
82-
83-
// The closing flag will be set when the Outcome event has been reported.
84-
// Once closing is true, no further events will be accepted and the state
85-
// will transition to closed once the currently active pump completes.
86-
bool closing = false;
87-
kj::OneOf<Pending, kj::Array<kj::Own<Active>>, Closed> inner;
88-
kj::TaskSet& waitUntilTasks;
89-
90-
TailStreamWriterState(Pending pending, kj::TaskSet& waitUntilTasks)
91-
: inner(kj::mv(pending)),
92-
waitUntilTasks(waitUntilTasks) {}
93-
KJ_DISALLOW_COPY_AND_MOVE(TailStreamWriterState);
94-
95-
void reportImpl(tracing::TailEvent&& event);
96-
97-
// Delivers the queued tail events to a streaming tail worker.
98-
kj::Promise<void> pump(kj::Own<Active> current);
99-
};
100-
10156
kj::Maybe<kj::Own<tracing::TailStreamWriter>> initializeTailStreamWriter(
10257
kj::Array<kj::Own<WorkerInterface>> streamingTailWorkers, kj::TaskSet& waitUntilTasks);
10358

0 commit comments

Comments
 (0)