Skip to content

Commit d1f6c15

Browse files
authored
Merge pull request #5185 from cloudflare/felix/092525-return-event-reporting
Reland "EW-9366 Report fetch response earlier"
2 parents b83dc6c + fb713e1 commit d1f6c15

File tree

5 files changed

+87
-24
lines changed

5 files changed

+87
-24
lines changed

src/workerd/api/tail-worker-test.js

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/workerd/io/tracer.c++

Lines changed: 40 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
33
// https://opensource.org/licenses/Apache-2.0
44

5+
#include <workerd/io/io-context.h>
56
#include <workerd/io/tracer.h>
67
#include <workerd/util/sentry.h>
78
#include <workerd/util/thread-scopes.h>
@@ -141,10 +142,6 @@ WorkerTracer::~WorkerTracer() noexcept(false) {
141142
KJ_IF_SOME(writer, maybeTailStreamWriter) {
142143
auto& spanContext = KJ_UNWRAP_OR_RETURN(topLevelInvocationSpanContext);
143144

144-
KJ_IF_SOME(fetchResponseInfo, trace->fetchResponseInfo) {
145-
writer->report(spanContext, tracing::Return({fetchResponseInfo.clone()}), completeTime);
146-
}
147-
148145
if (isPredictableModeForTest()) {
149146
writer->report(spanContext,
150147
tracing::Outcome(trace->outcome, 0 * kj::MILLISECONDS, 0 * kj::MILLISECONDS),
@@ -440,6 +437,27 @@ void WorkerTracer::recordTimestamp(kj::Date timestamp) {
440437
}
441438
}
442439

440+
kj::Date BaseTracer::getTime() {
441+
auto& weakIoCtx = KJ_ASSERT_NONNULL(weakIoContext);
442+
kj::Date timestamp = kj::UNIX_EPOCH;
443+
weakIoCtx->runIfAlive([&timestamp](IoContext& context) { timestamp = context.now(); });
444+
if (!weakIoCtx->isValid()) {
445+
// This can happen if we the IoContext gets destroyed following an exception, but we still need
446+
// to report a time for the return event.
447+
if (completeTime != kj::UNIX_EPOCH) {
448+
timestamp = completeTime;
449+
} else {
450+
// Otherwise, we can't actually get an end timestamp that makes sense.
451+
if (isPredictableModeForTest()) {
452+
KJ_FAIL_ASSERT("reported return event without valid IoContext or completeTime");
453+
} else {
454+
LOG_WARNING_PERIODICALLY("reported return event without valid IoContext or completeTime");
455+
}
456+
}
457+
}
458+
return timestamp;
459+
}
460+
443461
void BaseTracer::adjustSpanTime(CompleteSpan& span) {
444462
// To report I/O time, we need the IOContext to still be alive.
445463
// weakIoContext is only none if we are tracing via RPC (in this case span times have already been
@@ -463,26 +481,37 @@ void BaseTracer::adjustSpanTime(CompleteSpan& span) {
463481
if (isPredictableModeForTest()) {
464482
KJ_FAIL_ASSERT("reported span after IoContext was deallocated", span.operationName);
465483
} else {
466-
LOG_WARNING_PERIODICALLY(
467-
"reported span after IoContext was deallocated", span.operationName);
484+
KJ_LOG(WARNING, "reported span after IoContext was deallocated", span.operationName);
468485
}
469486
}
470487
}
471488
}
472489
}
473490

474-
void WorkerTracer::setFetchResponseInfo(tracing::FetchResponseInfo&& info) {
491+
void WorkerTracer::setReturn(
492+
kj::Maybe<kj::Date> timestamp, kj::Maybe<tracing::FetchResponseInfo> fetchResponseInfo) {
475493
// Match the behavior of setEventInfo(). Any resolution of the TODO comments
476494
// in setEventInfo() that are related to this check while probably also affect
477495
// this function.
478496
if (pipelineLogLevel == PipelineLogLevel::NONE) {
479497
return;
480498
}
481499

482-
// Note: In the streaming model, fetchResponseInfo is dispatched when the tail worker returns.
483-
KJ_REQUIRE(KJ_REQUIRE_NONNULL(trace->eventInfo).is<tracing::FetchEventInfo>());
484-
KJ_ASSERT(trace->fetchResponseInfo == kj::none, "setFetchResponseInfo can only be called once");
485-
trace->fetchResponseInfo = kj::mv(info);
500+
KJ_IF_SOME(writer, maybeTailStreamWriter) {
501+
auto& spanContext = KJ_UNWRAP_OR_RETURN(topLevelInvocationSpanContext);
502+
503+
// Fall back to weak IoContext if no timestamp is available
504+
writer->report(spanContext,
505+
tracing::Return({fetchResponseInfo.map([](auto& info) { return info.clone(); })}),
506+
timestamp.orDefault([&]() { return getTime(); }));
507+
}
508+
509+
// Add fetch response info for legacy tail worker
510+
KJ_IF_SOME(info, fetchResponseInfo) {
511+
KJ_REQUIRE(KJ_REQUIRE_NONNULL(trace->eventInfo).is<tracing::FetchEventInfo>());
512+
KJ_ASSERT(trace->fetchResponseInfo == kj::none, "setFetchResponseInfo can only be called once");
513+
trace->fetchResponseInfo = kj::mv(info);
514+
}
486515
}
487516

488517
void BaseTracer::setUserRequestSpan(SpanParent&& span) {

src/workerd/io/tracer.h

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -120,9 +120,12 @@ class BaseTracer: public kj::Refcounted {
120120
virtual void setEventInfo(
121121
IoContext::IncomingRequest& incomingRequest, tracing::EventInfo&& info) = 0;
122122

123-
// Adds info about the response. Must not be called more than once, and only
124-
// after passing a FetchEventInfo to setEventInfo().
125-
virtual void setFetchResponseInfo(tracing::FetchResponseInfo&& info) = 0;
123+
// Sets the return event for Streaming Tail Worker, including fetchResponseInfo (HTTP status code)
124+
// if available. Must not be called more than once, and fetchResponseInfo should only be set for
125+
// fetch events. For legacy tail worker, there is no distinct return event so we only add
126+
// fetchResponseInfo to the trace if present.
127+
virtual void setReturn(kj::Maybe<kj::Date> time = kj::none,
128+
kj::Maybe<tracing::FetchResponseInfo> fetchResponseInfo = kj::none) = 0;
126129

127130
// Reports the outcome event of the worker invocation. For Streaming Tail Worker, this will be the
128131
// final event, causing the stream to terminate.
@@ -143,6 +146,11 @@ class BaseTracer: public kj::Refcounted {
143146
const kj::ConstString& methodName) = 0;
144147

145148
protected:
149+
// Retrieves the current timestamp. If the IoContext is no longer available, we assume that the
150+
// worker must have wrapped up and reported its outcome event, we report completeTime in that case
151+
// acordingly.
152+
kj::Date getTime();
153+
146154
// helper method for addSpan() implementations
147155
void adjustSpanTime(CompleteSpan& span);
148156

@@ -194,13 +202,15 @@ class WorkerTracer final: public BaseTracer {
194202
void setEventInfoInternal(
195203
const tracing::InvocationSpanContext& context, kj::Date timestamp, tracing::EventInfo&& info);
196204

197-
void setFetchResponseInfo(tracing::FetchResponseInfo&& info) override;
198205
void setOutcome(EventOutcome outcome, kj::Duration cpuTime, kj::Duration wallTime) override;
199206
virtual void recordTimestamp(kj::Date timestamp) override;
200207

201208
// Set a worker-level tag/attribute to be provided in the onset event.
202209
void setWorkerAttribute(kj::ConstString key, Span::TagValue value);
203210

211+
void setReturn(kj::Maybe<kj::Date> time = kj::none,
212+
kj::Maybe<tracing::FetchResponseInfo> fetchResponseInfo = kj::none) override;
213+
204214
void setJsRpcInfo(const tracing::InvocationSpanContext& context,
205215
kj::Date timestamp,
206216
const kj::ConstString& methodName) override;

src/workerd/io/worker-entrypoint.c++

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,9 @@ class WorkerEntrypoint::ResponseSentTracker final: public kj::HttpService::Respo
134134
bool isSent() const {
135135
return sent;
136136
}
137+
uint getHttpResponseStatus() const {
138+
return httpResponseStatus;
139+
}
137140

138141
kj::Own<kj::AsyncOutputStream> send(uint statusCode,
139142
kj::StringPtr statusText,
@@ -142,6 +145,7 @@ class WorkerEntrypoint::ResponseSentTracker final: public kj::HttpService::Respo
142145
TRACE_EVENT(
143146
"workerd", "WorkerEntrypoint::ResponseSentTracker::send()", "statusCode", statusCode);
144147
sent = true;
148+
httpResponseStatus = statusCode;
145149
return inner.send(statusCode, statusText, headers, expectedBodySize);
146150
}
147151

@@ -152,6 +156,7 @@ class WorkerEntrypoint::ResponseSentTracker final: public kj::HttpService::Respo
152156
}
153157

154158
private:
159+
uint httpResponseStatus = 0;
155160
kj::HttpService::Response& inner;
156161
bool sent = false;
157162
};
@@ -255,6 +260,11 @@ kj::Promise<void> WorkerEntrypoint::request(kj::HttpMethod method,
255260
auto wrappedResponse = kj::heap<ResponseSentTracker>(response);
256261

257262
bool isActor = context.getActor() != kj::none;
263+
// HACK: Capture workerTracer directly, it's unclear how to acquire the right tracer from context
264+
// when we need it (for DOs, IoContext may point to a different WorkerTracer by the time we use
265+
// it). The tracer lives as long or longer than the IoContext (based on being co-owned
266+
// by IncomingRequest and PipelineTracer) so long enough.
267+
kj::Maybe<BaseTracer&> workerTracer;
258268

259269
KJ_IF_SOME(t, incomingRequest->getWorkerTracer()) {
260270
kj::String cfJson;
@@ -281,6 +291,7 @@ kj::Promise<void> WorkerEntrypoint::request(kj::HttpMethod method,
281291

282292
t.setEventInfo(*incomingRequest,
283293
tracing::FetchEventInfo(method, kj::str(url), kj::mv(cfJson), kj::mv(traceHeadersArray)));
294+
workerTracer = t;
284295
}
285296

286297
auto metricsForCatch = kj::addRef(incomingRequest->getMetrics());
@@ -313,10 +324,19 @@ kj::Promise<void> WorkerEntrypoint::request(kj::HttpMethod method,
313324
cfBlobJson, lock,
314325
lock.getExportedHandler(entrypointName, kj::mv(props), context.getActor()), kj::mv(signal));
315326
})
316-
.then([this](api::DeferredProxy<void> deferredProxy) {
327+
.then([this, &context, &wrappedResponse = *wrappedResponse, workerTracer](
328+
api::DeferredProxy<void> deferredProxy) {
317329
TRACE_EVENT("workerd", "WorkerEntrypoint::request() deferred proxy step",
318330
PERFETTO_FLOW_FROM_POINTER(this));
319331
proxyTask = kj::mv(deferredProxy.proxyTask);
332+
KJ_IF_SOME(t, workerTracer) {
333+
auto httpResponseStatus = wrappedResponse.getHttpResponseStatus();
334+
if (httpResponseStatus != 0) {
335+
t.setReturn(context.now(), tracing::FetchResponseInfo(httpResponseStatus));
336+
} else {
337+
t.setReturn(context.now());
338+
}
339+
}
320340
})
321341
.catch_([this, &context](kj::Exception&& exception) mutable -> kj::Promise<void> {
322342
TRACE_EVENT("workerd", "WorkerEntrypoint::request() catch", PERFETTO_FLOW_FROM_POINTER(this));
@@ -383,8 +403,8 @@ kj::Promise<void> WorkerEntrypoint::request(kj::HttpMethod method,
383403
proxyTask = kj::none;
384404
}))
385405
.catch_([this, wrappedResponse = kj::mv(wrappedResponse), isActor, method, url, &headers,
386-
&requestBody, metrics = kj::mv(metricsForCatch)](
387-
kj::Exception&& exception) mutable -> kj::Promise<void> {
406+
&requestBody, metrics = kj::mv(metricsForCatch),
407+
workerTracer](kj::Exception&& exception) mutable -> kj::Promise<void> {
388408
// Don't return errors to end user.
389409
TRACE_EVENT("workerd", "WorkerEntrypoint::request() exception",
390410
PERFETTO_TERMINATING_FLOW_FROM_POINTER(this));
@@ -449,7 +469,7 @@ kj::Promise<void> WorkerEntrypoint::request(kj::HttpMethod method,
449469
metrics->setFailedOpen(true);
450470
return promise.attach(kj::mv(service));
451471
});
452-
return promise.catch_([this, wrappedResponse = kj::mv(wrappedResponse),
472+
return promise.catch_([this, wrappedResponse = kj::mv(wrappedResponse), workerTracer,
453473
metrics = kj::mv(metrics)](kj::Exception&& e) mutable {
454474
metrics->setFailedOpen(false);
455475
if (e.getType() != kj::Exception::Type::DISCONNECTED &&
@@ -462,6 +482,9 @@ kj::Promise<void> WorkerEntrypoint::request(kj::HttpMethod method,
462482
if (!wrappedResponse->isSent()) {
463483
kj::HttpHeaders headers(threadContext.getHeaderTable());
464484
wrappedResponse->send(500, "Internal Server Error", headers, uint64_t(0));
485+
KJ_IF_SOME(t, workerTracer) {
486+
t.setReturn(kj::none, tracing::FetchResponseInfo(500));
487+
}
465488
}
466489
});
467490
} else if (tunnelExceptions) {
@@ -485,6 +508,10 @@ kj::Promise<void> WorkerEntrypoint::request(kj::HttpMethod method,
485508
} else {
486509
wrappedResponse->send(500, "Internal Server Error", headers, uint64_t(0));
487510
}
511+
KJ_IF_SOME(t, workerTracer) {
512+
t.setReturn(
513+
kj::none, tracing::FetchResponseInfo(wrappedResponse->getHttpResponseStatus()));
514+
}
488515
}
489516

490517
return kj::READY_NOW;

src/workerd/server/server.c++

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1587,9 +1587,6 @@ class RequestObserverWithTracer final: public RequestObserver, public WorkerInte
15871587
auto time = IoContext::current().now();
15881588
t->recordTimestamp(time);
15891589
}
1590-
if (fetchStatus != 0) {
1591-
t->setFetchResponseInfo(tracing::FetchResponseInfo(fetchStatus));
1592-
}
15931590
t->setOutcome(
15941591
outcome, 0 * kj::MILLISECONDS /* cpu time */, 0 * kj::MILLISECONDS /* wall time */);
15951592
}

0 commit comments

Comments
 (0)