Skip to content

Commit fb713e1

Browse files
committed
[o11y] Fix tail worker return reporting for fetch event exceptions
We still need to report the fetch return code following an exception.
1 parent f63223f commit fb713e1

File tree

3 files changed

+47
-12
lines changed

3 files changed

+47
-12
lines changed

src/workerd/io/tracer.c++

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -437,6 +437,27 @@ void WorkerTracer::recordTimestamp(kj::Date timestamp) {
437437
}
438438
}
439439

440+
kj::Date BaseTracer::getTime() {
441+
auto& weakIoCtx = KJ_ASSERT_NONNULL(weakIoContext);
442+
kj::Date timestamp = kj::UNIX_EPOCH;
443+
weakIoCtx->runIfAlive([&timestamp](IoContext& context) { timestamp = context.now(); });
444+
if (!weakIoCtx->isValid()) {
445+
// This can happen if we the IoContext gets destroyed following an exception, but we still need
446+
// to report a time for the return event.
447+
if (completeTime != kj::UNIX_EPOCH) {
448+
timestamp = completeTime;
449+
} else {
450+
// Otherwise, we can't actually get an end timestamp that makes sense.
451+
if (isPredictableModeForTest()) {
452+
KJ_FAIL_ASSERT("reported return event without valid IoContext or completeTime");
453+
} else {
454+
LOG_WARNING_PERIODICALLY("reported return event without valid IoContext or completeTime");
455+
}
456+
}
457+
}
458+
return timestamp;
459+
}
460+
440461
void BaseTracer::adjustSpanTime(CompleteSpan& span) {
441462
// To report I/O time, we need the IOContext to still be alive.
442463
// weakIoContext is only none if we are tracing via RPC (in this case span times have already been
@@ -460,16 +481,15 @@ void BaseTracer::adjustSpanTime(CompleteSpan& span) {
460481
if (isPredictableModeForTest()) {
461482
KJ_FAIL_ASSERT("reported span after IoContext was deallocated", span.operationName);
462483
} else {
463-
LOG_WARNING_PERIODICALLY(
464-
"reported span after IoContext was deallocated", span.operationName);
484+
KJ_LOG(WARNING, "reported span after IoContext was deallocated", span.operationName);
465485
}
466486
}
467487
}
468488
}
469489
}
470490

471491
void WorkerTracer::setReturn(
472-
kj::Date timestamp, kj::Maybe<tracing::FetchResponseInfo> fetchResponseInfo) {
492+
kj::Maybe<kj::Date> timestamp, kj::Maybe<tracing::FetchResponseInfo> fetchResponseInfo) {
473493
// Match the behavior of setEventInfo(). Any resolution of the TODO comments
474494
// in setEventInfo() that are related to this check while probably also affect
475495
// this function.
@@ -480,9 +500,10 @@ void WorkerTracer::setReturn(
480500
KJ_IF_SOME(writer, maybeTailStreamWriter) {
481501
auto& spanContext = KJ_UNWRAP_OR_RETURN(topLevelInvocationSpanContext);
482502

503+
// Fall back to weak IoContext if no timestamp is available
483504
writer->report(spanContext,
484505
tracing::Return({fetchResponseInfo.map([](auto& info) { return info.clone(); })}),
485-
timestamp);
506+
timestamp.orDefault([&]() { return getTime(); }));
486507
}
487508

488509
// Add fetch response info for legacy tail worker

src/workerd/io/tracer.h

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -124,8 +124,8 @@ class BaseTracer: public kj::Refcounted {
124124
// if available. Must not be called more than once, and fetchResponseInfo should only be set for
125125
// fetch events. For legacy tail worker, there is no distinct return event so we only add
126126
// fetchResponseInfo to the trace if present.
127-
virtual void setReturn(
128-
kj::Date time, kj::Maybe<tracing::FetchResponseInfo> fetchResponseInfo = kj::none) = 0;
127+
virtual void setReturn(kj::Maybe<kj::Date> time = kj::none,
128+
kj::Maybe<tracing::FetchResponseInfo> fetchResponseInfo = kj::none) = 0;
129129

130130
// Reports the outcome event of the worker invocation. For Streaming Tail Worker, this will be the
131131
// final event, causing the stream to terminate.
@@ -146,6 +146,11 @@ class BaseTracer: public kj::Refcounted {
146146
const kj::ConstString& methodName) = 0;
147147

148148
protected:
149+
// Retrieves the current timestamp. If the IoContext is no longer available, we assume that the
150+
// worker must have wrapped up and reported its outcome event, we report completeTime in that case
151+
// acordingly.
152+
kj::Date getTime();
153+
149154
// helper method for addSpan() implementations
150155
void adjustSpanTime(CompleteSpan& span);
151156

@@ -203,8 +208,8 @@ class WorkerTracer final: public BaseTracer {
203208
// Set a worker-level tag/attribute to be provided in the onset event.
204209
void setWorkerAttribute(kj::ConstString key, Span::TagValue value);
205210

206-
void setReturn(
207-
kj::Date time, kj::Maybe<tracing::FetchResponseInfo> fetchResponseInfo = kj::none) override;
211+
void setReturn(kj::Maybe<kj::Date> time = kj::none,
212+
kj::Maybe<tracing::FetchResponseInfo> fetchResponseInfo = kj::none) override;
208213

209214
void setJsRpcInfo(const tracing::InvocationSpanContext& context,
210215
kj::Date timestamp,

src/workerd/io/worker-entrypoint.c++

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,9 @@ kj::Promise<void> WorkerEntrypoint::request(kj::HttpMethod method,
261261

262262
bool isActor = context.getActor() != kj::none;
263263
// HACK: Capture workerTracer directly, it's unclear how to acquire the right tracer from context
264-
// when we need it.
264+
// when we need it (for DOs, IoContext may point to a different WorkerTracer by the time we use
265+
// it). The tracer lives as long or longer than the IoContext (based on being co-owned
266+
// by IncomingRequest and PipelineTracer) so long enough.
265267
kj::Maybe<BaseTracer&> workerTracer;
266268

267269
KJ_IF_SOME(t, incomingRequest->getWorkerTracer()) {
@@ -401,8 +403,8 @@ kj::Promise<void> WorkerEntrypoint::request(kj::HttpMethod method,
401403
proxyTask = kj::none;
402404
}))
403405
.catch_([this, wrappedResponse = kj::mv(wrappedResponse), isActor, method, url, &headers,
404-
&requestBody, metrics = kj::mv(metricsForCatch)](
405-
kj::Exception&& exception) mutable -> kj::Promise<void> {
406+
&requestBody, metrics = kj::mv(metricsForCatch),
407+
workerTracer](kj::Exception&& exception) mutable -> kj::Promise<void> {
406408
// Don't return errors to end user.
407409
TRACE_EVENT("workerd", "WorkerEntrypoint::request() exception",
408410
PERFETTO_TERMINATING_FLOW_FROM_POINTER(this));
@@ -467,7 +469,7 @@ kj::Promise<void> WorkerEntrypoint::request(kj::HttpMethod method,
467469
metrics->setFailedOpen(true);
468470
return promise.attach(kj::mv(service));
469471
});
470-
return promise.catch_([this, wrappedResponse = kj::mv(wrappedResponse),
472+
return promise.catch_([this, wrappedResponse = kj::mv(wrappedResponse), workerTracer,
471473
metrics = kj::mv(metrics)](kj::Exception&& e) mutable {
472474
metrics->setFailedOpen(false);
473475
if (e.getType() != kj::Exception::Type::DISCONNECTED &&
@@ -480,6 +482,9 @@ kj::Promise<void> WorkerEntrypoint::request(kj::HttpMethod method,
480482
if (!wrappedResponse->isSent()) {
481483
kj::HttpHeaders headers(threadContext.getHeaderTable());
482484
wrappedResponse->send(500, "Internal Server Error", headers, uint64_t(0));
485+
KJ_IF_SOME(t, workerTracer) {
486+
t.setReturn(kj::none, tracing::FetchResponseInfo(500));
487+
}
483488
}
484489
});
485490
} else if (tunnelExceptions) {
@@ -503,6 +508,10 @@ kj::Promise<void> WorkerEntrypoint::request(kj::HttpMethod method,
503508
} else {
504509
wrappedResponse->send(500, "Internal Server Error", headers, uint64_t(0));
505510
}
511+
KJ_IF_SOME(t, workerTracer) {
512+
t.setReturn(
513+
kj::none, tracing::FetchResponseInfo(wrappedResponse->getHttpResponseStatus()));
514+
}
506515
}
507516

508517
return kj::READY_NOW;

0 commit comments

Comments
 (0)