Skip to content

Commit 7ed21f0

Browse files
committed
[o11y] Consolidate adding trace events for legacy and streaming tail workers
Working towards abstracting away differences between legacy/streaming models so that they only differ at the JS backend.
1 parent 6cde05d commit 7ed21f0

File tree

14 files changed

+144
-146
lines changed

14 files changed

+144
-146
lines changed

src/workerd/api/hibernatable-web-socket.c++

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -94,16 +94,12 @@ kj::Promise<WorkerInterface::CustomEvent::Result> HibernatableWebSocketCustomEve
9494
KJ_UNREACHABLE;
9595
};
9696

97+
// TODO(streaming-tail-workers): Support Hibernate and Resume events properly.
9798
KJ_IF_SOME(t, incomingRequest->getWorkerTracer()) {
98-
t.setEventInfo(context.now(), tracing::HibernatableWebSocketEventInfo(getType()));
99+
t.setEventInfo(incomingRequest->getContext().getInvocationSpanContext(), context.now(),
100+
tracing::HibernatableWebSocketEventInfo(getType()));
99101
}
100102

101-
// TODO(streaming-tail-workers): Support Hibernate and Resume events properly.
102-
context.getMetrics().reportTailEvent(context.getInvocationSpanContext(), [&] {
103-
return tracing::Onset(
104-
tracing::HibernatableWebSocketEventInfo(getType()), tracing::Onset::WorkerInfo{}, kj::none);
105-
});
106-
107103
auto outcomeObserver = kj::rc<OutcomeObserver>(
108104
kj::addRef(incomingRequest->getMetrics()), context.getInvocationSpanContext());
109105

src/workerd/api/node/diagnostics-channel.c++

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@ void Channel::publish(jsg::Lock& js, jsg::Value message) {
3737
Error,
3838
"Diagnostic events cannot be published with SharedArrayBuffer or "
3939
"transferred ArrayBuffer instances");
40-
tracer.addDiagnosticChannelEvent(context.now(), name.toString(js), kj::mv(tmp.data));
40+
tracer.addDiagnosticChannelEvent(
41+
context.getInvocationSpanContext(), context.now(), name.toString(js), kj::mv(tmp.data));
4142
}
4243
}
4344

src/workerd/api/node/util.c++

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -274,8 +274,9 @@ namespace {
274274
// JSG_KJ_EXCEPTION would not give us that, and we only want to incur the cost
275275
// of creating and capturing the stack when we actually need it.
276276
auto ex = KJ_ASSERT_NONNULL(js.error(message).tryCast<jsg::JsObject>());
277-
tracer.addException(ioContext.now(), ex.get(js, "name"_kj).toString(js),
278-
ex.get(js, "message"_kj).toString(js), ex.get(js, "stack"_kj).toString(js));
277+
tracer.addException(ioContext.getInvocationSpanContext(), ioContext.now(),
278+
ex.get(js, "name"_kj).toString(js), ex.get(js, "message"_kj).toString(js),
279+
ex.get(js, "stack"_kj).toString(js));
279280
ioContext.abort(js.exceptionToKj(ex));
280281
} else {
281282
ioContext.abort(JSG_KJ_EXCEPTION(FAILED, Error, kj::mv(message)));

src/workerd/api/queue.c++

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -556,14 +556,10 @@ kj::Promise<WorkerInterface::CustomEvent::Result> QueueCustomEventImpl::run(
556556
}
557557

558558
KJ_IF_SOME(t, incomingRequest->getWorkerTracer()) {
559-
t.setEventInfo(context.now(), tracing::QueueEventInfo(kj::str(queueName), batchSize));
559+
t.setEventInfo(context.getInvocationSpanContext(), context.now(),
560+
tracing::QueueEventInfo(kj::str(queueName), batchSize));
560561
}
561562

562-
context.getMetrics().reportTailEvent(context.getInvocationSpanContext(), [&] {
563-
return tracing::Onset(tracing::QueueEventInfo(kj::mv(queueName), batchSize),
564-
tracing::Onset::WorkerInfo{}, kj::none);
565-
});
566-
567563
auto outcomeObserver = kj::rc<OutcomeObserver>(
568564
kj::addRef(incomingRequest->getMetrics()), context.getInvocationSpanContext());
569565

src/workerd/api/tail-worker-test.js

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -30,23 +30,23 @@ export const test = {
3030

3131
let expected = [
3232
// http-test.js: fetch and scheduled events get reported correctly.
33-
'{"type":"onset","info":{"type":"fetch","method":"POST","url":"http://placeholder/body-length","cfJson":"{}","headers":[]}}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',
34-
'{"type":"onset","info":{"type":"fetch","method":"POST","url":"http://placeholder/body-length","cfJson":"{}","headers":[]}}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',
33+
'{"type":"onset","info":{"type":"fetch","method":"POST","url":"http://placeholder/body-length","cfJson":"","headers":[]}}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',
34+
'{"type":"onset","info":{"type":"fetch","method":"POST","url":"http://placeholder/body-length","cfJson":"","headers":[]}}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',
3535
'{"type":"onset","info":{"type":"scheduled","scheduledTime":"1970-01-01T00:00:00.000Z","cron":""}}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',
3636
'{"type":"onset","info":{"type":"scheduled","scheduledTime":"1970-01-01T00:00:00.000Z","cron":"* * * * 30"}}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',
37-
'{"type":"onset","info":{"type":"fetch","method":"GET","url":"http://placeholder/not-found","cfJson":"{}","headers":[]}}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',
38-
'{"type":"onset","info":{"type":"fetch","method":"GET","url":"http://placeholder/web-socket","cfJson":"{}","headers":[]}}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',
37+
'{"type":"onset","info":{"type":"fetch","method":"GET","url":"http://placeholder/not-found","cfJson":"","headers":[]}}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',
38+
'{"type":"onset","info":{"type":"fetch","method":"GET","url":"http://placeholder/web-socket","cfJson":"","headers":[{"name":"upgrade","value":"websocket"}]}}{"type":"exception","name":"Error","message":"The script will never generate a response."}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',
3939

4040
// queue-test.js: queue events
41-
'{"type":"onset","info":{"type":"fetch","method":"POST","url":"https://fake-host/message","cfJson":"{}","headers":[]}}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',
42-
'{"type":"onset","info":{"type":"fetch","method":"POST","url":"https://fake-host/message","cfJson":"{}","headers":[]}}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',
43-
'{"type":"onset","info":{"type":"fetch","method":"POST","url":"https://fake-host/message","cfJson":"{}","headers":[]}}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',
44-
'{"type":"onset","info":{"type":"fetch","method":"POST","url":"https://fake-host/message","cfJson":"{}","headers":[]}}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',
45-
'{"type":"onset","info":{"type":"fetch","method":"POST","url":"https://fake-host/batch","cfJson":"{}","headers":[]}}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',
41+
'{"type":"onset","info":{"type":"fetch","method":"POST","url":"https://fake-host/message","cfJson":"","headers":[{"name":"content-type","value":"application/octet-stream"},{"name":"x-msg-delay-secs","value":"2"},{"name":"x-msg-fmt","value":"text"}]}}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',
42+
'{"type":"onset","info":{"type":"fetch","method":"POST","url":"https://fake-host/message","cfJson":"","headers":[{"name":"content-type","value":"application/octet-stream"},{"name":"x-msg-fmt","value":"bytes"}]}}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',
43+
'{"type":"onset","info":{"type":"fetch","method":"POST","url":"https://fake-host/message","cfJson":"","headers":[{"name":"content-type","value":"application/octet-stream"},{"name":"x-msg-fmt","value":"json"}]}}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',
44+
'{"type":"onset","info":{"type":"fetch","method":"POST","url":"https://fake-host/message","cfJson":"","headers":[{"name":"content-type","value":"application/octet-stream"},{"name":"x-msg-fmt","value":"v8"}]}}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',
45+
'{"type":"onset","info":{"type":"fetch","method":"POST","url":"https://fake-host/batch","cfJson":"","headers":[{"name":"cf-queue-batch-bytes","value":"31"},{"name":"cf-queue-batch-count","value":"4"},{"name":"cf-queue-largest-msg","value":"13"},{"name":"content-type","value":"application/json"},{"name":"x-msg-delay-secs","value":"2"}]}}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',
4646
'{"type":"onset","info":{"type":"queue","queueName":"test-queue","batchSize":5}}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',
4747

4848
// actor-alarms-test.js: alarm events
49-
'{"type":"onset","info":{"type":"fetch","method":"GET","url":"http://foo/test","cfJson":"{}","headers":[]}}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',
49+
'{"type":"onset","info":{"type":"fetch","method":"GET","url":"http://foo/test","cfJson":"","headers":[]}}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',
5050
'{"type":"onset","info":{"type":"alarm","scheduledTime":"1970-01-01T00:00:00.000Z"}}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',
5151

5252
// legacy tail worker, triggered via alarm test. It would appear that these being recorded
@@ -58,8 +58,8 @@ export const test = {
5858
'{"type":"onset","info":{"type":"trace","traces":[""]}}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',
5959

6060
// tests/websocket-hibernation.js: hibernatableWebSocket events
61-
'{"type":"onset","info":{"type":"fetch","method":"GET","url":"http://example.com/","cfJson":"{}","headers":[]}}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',
62-
'{"type":"onset","info":{"type":"fetch","method":"GET","url":"http://example.com/hibernation","cfJson":"{}","headers":[]}}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',
61+
'{"type":"onset","info":{"type":"fetch","method":"GET","url":"http://example.com/","cfJson":"","headers":[{"name":"upgrade","value":"websocket"}]}}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',
62+
'{"type":"onset","info":{"type":"fetch","method":"GET","url":"http://example.com/hibernation","cfJson":"","headers":[{"name":"upgrade","value":"websocket"}]}}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',
6363
'{"type":"onset","info":{"type":"hibernatableWebSocket","info":{"type":"message"}}}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',
6464
'{"type":"onset","info":{"type":"hibernatableWebSocket","info":{"type":"close","code":1000,"wasClean":true}}}{"type":"outcome","outcome":"ok","cpuTime":0,"wallTime":0}',
6565
];

src/workerd/api/trace.c++

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -653,12 +653,10 @@ kj::Promise<void> sendTracesToExportedHandler(kj::Own<IoContext::IncomingRequest
653653
auto& metrics = incomingRequest->getMetrics();
654654

655655
KJ_IF_SOME(t, incomingRequest->getWorkerTracer()) {
656-
t.setEventInfo(context.now(), tracing::TraceEventInfo(traces));
656+
t.setEventInfo(
657+
context.getInvocationSpanContext(), context.now(), tracing::TraceEventInfo(traces));
657658
}
658659

659-
metrics.reportTailEvent(context.getInvocationSpanContext(), [&] {
660-
return tracing::Onset(tracing::TraceEventInfo(traces), tracing::Onset::WorkerInfo{}, kj::none);
661-
});
662660
auto outcomeObserver = kj::rc<OutcomeObserver>(
663661
kj::addRef(incomingRequest->getMetrics()), context.getInvocationSpanContext());
664662

src/workerd/api/worker-rpc.c++

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1833,12 +1833,9 @@ class EntrypointJsRpcTarget final: public JsRpcTargetBase {
18331833

18341834
void addTrace(jsg::Lock& js, IoContext& ioctx, kj::StringPtr methodName) override {
18351835
KJ_IF_SOME(t, tracer) {
1836-
t->setEventInfo(ioctx.now(), tracing::JsRpcEventInfo(kj::str(methodName)));
1836+
t->setEventInfo(ioctx.getInvocationSpanContext(), ioctx.now(),
1837+
tracing::JsRpcEventInfo(kj::str(methodName)));
18371838
}
1838-
ioctx.getMetrics().reportTailEvent(ioctx.getInvocationSpanContext(), [&] {
1839-
return tracing::Onset(
1840-
tracing::JsRpcEventInfo(kj::str(methodName)), tracing::Onset::WorkerInfo{}, kj::none);
1841-
});
18421839
}
18431840
};
18441841

src/workerd/io/observer.h

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -128,21 +128,6 @@ class RequestObserver: public kj::Refcounted {
128128
return nullptr;
129129
}
130130

131-
// If the worker is configured to support streaming tail workers, reportTailEvent
132-
// will forward the given event on to the collection of streaming tail workers
133-
// that are configured with this observer. Otherwise, this is a non-op.
134-
virtual void reportTailEvent(
135-
const tracing::InvocationSpanContext& context, tracing::TailEvent::Event&& event) {
136-
reportTailEvent(context, [event = kj::mv(event)]() mutable { return kj::mv(event); });
137-
}
138-
139-
// If the worker is configured to support streaming tail workers, reportTailEvent
140-
// will forward the event returned by the callback on to the collection of streaming
141-
// fail workers that are configured with this observer. The callback will only be
142-
// invoked if there are tail workers.
143-
virtual void reportTailEvent(const tracing::InvocationSpanContext& context,
144-
kj::FunctionParam<tracing::TailEvent::Event()> fn) {}
145-
146131
// Reports the outcome event to any configured streaming tail workers, signalizing that the
147132
// request has completed and will not produce any more events.
148133
virtual void reportOutcome(const tracing::InvocationSpanContext& context) {}

src/workerd/io/trace-stream.c++

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -926,7 +926,8 @@ kj::Promise<WorkerInterface::CustomEvent::Result> TailStreamCustomEventImpl::run
926926
incomingRequest->delivered();
927927

928928
KJ_IF_SOME(t, incomingRequest->getWorkerTracer()) {
929-
t.setEventInfo(ioContext.now(), TraceEventInfo(kj::Array<TraceEventInfo::TraceItem>(nullptr)));
929+
t.setEventInfo(ioContext.getInvocationSpanContext(), ioContext.now(),
930+
TraceEventInfo(kj::Array<TraceEventInfo::TraceItem>(nullptr)));
930931
}
931932

932933
// TODO(streaming-tail): Support instrementation for streaming tail workers themselves – need to
@@ -1114,10 +1115,10 @@ kj::Maybe<kj::Own<tracing::TailStreamWriter>> initializeTailStreamWriter(
11141115

11151116
auto state = kj::heap<TailStreamWriterState>(kj::mv(streamingTailWorkers), waitUntilTasks);
11161117

1117-
return kj::heap<tracing::TailStreamWriter>(
1118+
return kj::refcounted<tracing::TailStreamWriter>(
11181119
// This lambda is called for every streaming tail event that is reported. We use
11191120
// the TailStreamWriterState for this stream to actually handle the event.
1120-
// Pay attention to the ownership of state here. The lamba holds a bare
1121+
// Pay attention to the ownership of state here. The lambda holds a bare
11211122
// reference while the instance is attached to the kj::Own below.
11221123
[&state = *state, &waitUntilTasks](tracing::TailEvent&& event) mutable {
11231124
KJ_SWITCH_ONEOF(state.inner) {

src/workerd/io/tracer.c++

Lines changed: 50 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -74,23 +74,28 @@ kj::Own<WorkerTracer> PipelineTracer::makeWorkerTracer(PipelineLogLevel pipeline
7474
kj::Maybe<kj::Own<ScriptVersion::Reader>> scriptVersion,
7575
kj::Maybe<kj::String> dispatchNamespace,
7676
kj::Array<kj::String> scriptTags,
77-
kj::Maybe<kj::String> entrypoint) {
77+
kj::Maybe<kj::String> entrypoint,
78+
kj::Maybe<kj::Own<tracing::TailStreamWriter>> maybeTailStreamWriter) {
7879
auto trace = kj::refcounted<Trace>(kj::mv(stableId), kj::mv(scriptName), kj::mv(scriptVersion),
7980
kj::mv(dispatchNamespace), kj::mv(scriptId), kj::mv(scriptTags), kj::mv(entrypoint),
8081
executionModel);
8182
traces.add(kj::addRef(*trace));
82-
return kj::refcounted<WorkerTracer>(addRefToThis(), kj::mv(trace), pipelineLogLevel);
83+
return kj::refcounted<WorkerTracer>(
84+
addRefToThis(), kj::mv(trace), pipelineLogLevel, kj::mv(maybeTailStreamWriter));
8385
}
8486

8587
void PipelineTracer::addTrace(rpc::Trace::Reader reader) {
8688
traces.add(kj::refcounted<Trace>(reader));
8789
}
8890

89-
WorkerTracer::WorkerTracer(
90-
kj::Rc<PipelineTracer> parentPipeline, kj::Own<Trace> trace, PipelineLogLevel pipelineLogLevel)
91+
WorkerTracer::WorkerTracer(kj::Rc<PipelineTracer> parentPipeline,
92+
kj::Own<Trace> trace,
93+
PipelineLogLevel pipelineLogLevel,
94+
kj::Maybe<kj::Own<tracing::TailStreamWriter>> maybeTailStreamWriter)
9195
: pipelineLogLevel(pipelineLogLevel),
9296
trace(kj::mv(trace)),
9397
parentPipeline(kj::mv(parentPipeline)),
98+
maybeTailStreamWriter(kj::mv(maybeTailStreamWriter)),
9499
self(kj::refcounted<WeakRef<WorkerTracer>>(kj::Badge<WorkerTracer>{}, *this)) {}
95100
WorkerTracer::WorkerTracer(PipelineLogLevel pipelineLogLevel, ExecutionModel executionModel)
96101
: pipelineLogLevel(pipelineLogLevel),
@@ -101,7 +106,10 @@ WorkerTracer::WorkerTracer(PipelineLogLevel pipelineLogLevel, ExecutionModel exe
101106
constexpr kj::LiteralStringConst logSizeExceeded =
102107
"[\"Log size limit exceeded: More than 128KB of data (across console.log statements, exception, request metadata and headers) was logged during a single request. Subsequent data for this request will not be recorded in logs, appear when tailing this Worker's logs, or in Tail Workers.\"]"_kjc;
103108

104-
void WorkerTracer::addLog(kj::Date timestamp, LogLevel logLevel, kj::String message) {
109+
void WorkerTracer::addLog(const tracing::InvocationSpanContext& context,
110+
kj::Date timestamp,
111+
LogLevel logLevel,
112+
kj::String message) {
105113
if (trace->exceededLogLimit) {
106114
return;
107115
}
@@ -117,6 +125,12 @@ void WorkerTracer::addLog(kj::Date timestamp, LogLevel logLevel, kj::String mess
117125
return;
118126
}
119127
trace->bytesUsed = newSize;
128+
// TODO(streaming-tail): Here we add the log to the trace object and the tail stream writer, if
129+
// available. If the given worker stage is only tailed by a streaming tail worker, adding the log
130+
// to the legacy trace object is not needed; this will be addressed in a future refactor.
131+
KJ_IF_SOME(writer, maybeTailStreamWriter) {
132+
writer->report(context, tracing::Mark(tracing::Log(timestamp, logLevel, kj::str(message))));
133+
}
120134
trace->logs.add(timestamp, logLevel, kj::mv(message));
121135
}
122136

@@ -165,8 +179,11 @@ void WorkerTracer::addSpan(CompleteSpan&& span) {
165179
trace->numSpans++;
166180
}
167181

168-
void WorkerTracer::addException(
169-
kj::Date timestamp, kj::String name, kj::String message, kj::Maybe<kj::String> stack) {
182+
void WorkerTracer::addException(const tracing::InvocationSpanContext& context,
183+
kj::Date timestamp,
184+
kj::String name,
185+
kj::String message,
186+
kj::Maybe<kj::String> stack) {
170187
if (trace->exceededExceptionLimit) {
171188
return;
172189
}
@@ -188,11 +205,18 @@ void WorkerTracer::addException(
188205
return;
189206
}
190207
trace->bytesUsed = newSize;
208+
KJ_IF_SOME(writer, maybeTailStreamWriter) {
209+
writer->report(context,
210+
tracing::Mark(tracing::Exception(timestamp, kj::str(name), kj::str(message),
211+
stack.map([](kj::String& stack) -> kj::String { return kj::str(stack); }))));
212+
}
191213
trace->exceptions.add(timestamp, kj::mv(name), kj::mv(message), kj::mv(stack));
192214
}
193215

194-
void WorkerTracer::addDiagnosticChannelEvent(
195-
kj::Date timestamp, kj::String channel, kj::Array<kj::byte> message) {
216+
void WorkerTracer::addDiagnosticChannelEvent(const tracing::InvocationSpanContext& context,
217+
kj::Date timestamp,
218+
kj::String channel,
219+
kj::Array<kj::byte> message) {
196220
if (trace->exceededDiagnosticChannelEventLimit) {
197221
return;
198222
}
@@ -209,10 +233,17 @@ void WorkerTracer::addDiagnosticChannelEvent(
209233
return;
210234
}
211235
trace->bytesUsed = newSize;
236+
237+
KJ_IF_SOME(writer, maybeTailStreamWriter) {
238+
writer->report(context,
239+
tracing::Mark(tracing::DiagnosticChannelEvent(
240+
timestamp, kj::str(channel), kj::heapArray<kj::byte>(message))));
241+
}
212242
trace->diagnosticChannelEvents.add(timestamp, kj::mv(channel), kj::mv(message));
213243
}
214244

215-
void WorkerTracer::setEventInfo(kj::Date timestamp, tracing::EventInfo&& info) {
245+
void WorkerTracer::setEventInfo(
246+
const tracing::InvocationSpanContext& context, kj::Date timestamp, tracing::EventInfo&& info) {
216247
KJ_ASSERT(trace->eventInfo == kj::none, "tracer can only be used for a single event");
217248

218249
// TODO(someday): For now, we're using logLevel == none as a hint to avoid doing anything
@@ -245,6 +276,11 @@ void WorkerTracer::setEventInfo(kj::Date timestamp, tracing::EventInfo&& info) {
245276
KJ_CASE_ONEOF_DEFAULT {}
246277
}
247278
trace->bytesUsed = newSize;
279+
280+
KJ_IF_SOME(writer, maybeTailStreamWriter) {
281+
writer->report(
282+
context, tracing::Onset(cloneEventInfo(info), tracing::Onset::WorkerInfo{}, kj::none));
283+
}
248284
trace->eventInfo = kj::mv(info);
249285
}
250286

@@ -267,6 +303,10 @@ void WorkerTracer::setFetchResponseInfo(tracing::FetchResponseInfo&& info) {
267303
trace->fetchResponseInfo = kj::mv(info);
268304
}
269305

306+
kj::Maybe<kj::Own<tracing::TailStreamWriter>>& WorkerTracer::getTailStreamWriter() {
307+
return maybeTailStreamWriter;
308+
}
309+
270310
void WorkerTracer::extractTrace(rpc::Trace::Builder builder) {
271311
trace->copyTo(builder);
272312
}

0 commit comments

Comments
 (0)