Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
359 changes: 199 additions & 160 deletions src/workerd/io/trace-stream.c++

Large diffs are not rendered by default.

45 changes: 0 additions & 45 deletions src/workerd/io/trace-stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,51 +55,6 @@ class TailStreamCustomEventImpl final: public WorkerInterface::CustomEvent {
uint16_t typeId;
};

// The TailStreamWriterState holds the current client-side state for a collection
// of streaming tail workers that a worker is reporting events to.
struct TailStreamWriterState {
// The initial state of our tail worker writer is that it is pending the first
// onset event. During this time we will only have a collection of WorkerInterface
// instances. When our first event is reported (the onset) we will arrange to acquire
// tailStream capabilities from each then use those to report the initial onset.
using Pending = kj::Array<kj::Own<WorkerInterface>>;

// Instances of Active are refcounted. The TailStreamWriterState itself
// holds the initial ref. Whenever events are being dispatched, an additional
// ref will be held by the outstanding pump promise in order to keep the
// client stub alive long enough for the rpc calls to complete. It is possible
// that the TailStreamWriterState will be dropped while pump promises are still
// pending.
struct Active: public kj::Refcounted {
// Reference to keep the worker interface instance alive.
kj::Maybe<rpc::TailStreamTarget::Client> capability;
bool pumping = false;
bool onsetSeen = false;
workerd::util::Queue<tracing::TailEvent> queue;

Active(rpc::TailStreamTarget::Client capability): capability(kj::mv(capability)) {}
};

struct Closed {};

// The closing flag will be set when the Outcome event has been reported.
// Once closing is true, no further events will be accepted and the state
// will transition to closed once the currently active pump completes.
bool closing = false;
kj::OneOf<Pending, kj::Array<kj::Own<Active>>, Closed> inner;
kj::TaskSet& waitUntilTasks;

TailStreamWriterState(Pending pending, kj::TaskSet& waitUntilTasks)
: inner(kj::mv(pending)),
waitUntilTasks(waitUntilTasks) {}
KJ_DISALLOW_COPY_AND_MOVE(TailStreamWriterState);

void reportImpl(tracing::TailEvent&& event);

// Delivers the queued tail events to a streaming tail worker.
kj::Promise<void> pump(kj::Own<Active> current);
};

kj::Maybe<kj::Own<tracing::TailStreamWriter>> initializeTailStreamWriter(
kj::Array<kj::Own<WorkerInterface>> streamingTailWorkers, kj::TaskSet& waitUntilTasks);

Expand Down
Loading
Loading