Skip to content

Commit f63223f

Browse files
committed
Reland "EW-9366 Report fetch response earlier"
This reverts commit 8612789.
1 parent b83dc6c commit f63223f

File tree

5 files changed

+47
-19
lines changed

5 files changed

+47
-19
lines changed

src/workerd/api/tail-worker-test.js

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/workerd/io/tracer.c++

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
33
// https://opensource.org/licenses/Apache-2.0
44

5+
#include <workerd/io/io-context.h>
56
#include <workerd/io/tracer.h>
67
#include <workerd/util/sentry.h>
78
#include <workerd/util/thread-scopes.h>
@@ -141,10 +142,6 @@ WorkerTracer::~WorkerTracer() noexcept(false) {
141142
KJ_IF_SOME(writer, maybeTailStreamWriter) {
142143
auto& spanContext = KJ_UNWRAP_OR_RETURN(topLevelInvocationSpanContext);
143144

144-
KJ_IF_SOME(fetchResponseInfo, trace->fetchResponseInfo) {
145-
writer->report(spanContext, tracing::Return({fetchResponseInfo.clone()}), completeTime);
146-
}
147-
148145
if (isPredictableModeForTest()) {
149146
writer->report(spanContext,
150147
tracing::Outcome(trace->outcome, 0 * kj::MILLISECONDS, 0 * kj::MILLISECONDS),
@@ -471,18 +468,29 @@ void BaseTracer::adjustSpanTime(CompleteSpan& span) {
471468
}
472469
}
473470

474-
void WorkerTracer::setFetchResponseInfo(tracing::FetchResponseInfo&& info) {
471+
void WorkerTracer::setReturn(
472+
kj::Date timestamp, kj::Maybe<tracing::FetchResponseInfo> fetchResponseInfo) {
475473
// Match the behavior of setEventInfo(). Any resolution of the TODO comments
476474
// in setEventInfo() that are related to this check while probably also affect
477475
// this function.
478476
if (pipelineLogLevel == PipelineLogLevel::NONE) {
479477
return;
480478
}
481479

482-
// Note: In the streaming model, fetchResponseInfo is dispatched when the tail worker returns.
483-
KJ_REQUIRE(KJ_REQUIRE_NONNULL(trace->eventInfo).is<tracing::FetchEventInfo>());
484-
KJ_ASSERT(trace->fetchResponseInfo == kj::none, "setFetchResponseInfo can only be called once");
485-
trace->fetchResponseInfo = kj::mv(info);
480+
KJ_IF_SOME(writer, maybeTailStreamWriter) {
481+
auto& spanContext = KJ_UNWRAP_OR_RETURN(topLevelInvocationSpanContext);
482+
483+
writer->report(spanContext,
484+
tracing::Return({fetchResponseInfo.map([](auto& info) { return info.clone(); })}),
485+
timestamp);
486+
}
487+
488+
// Add fetch response info for legacy tail worker
489+
KJ_IF_SOME(info, fetchResponseInfo) {
490+
KJ_REQUIRE(KJ_REQUIRE_NONNULL(trace->eventInfo).is<tracing::FetchEventInfo>());
491+
KJ_ASSERT(trace->fetchResponseInfo == kj::none, "setFetchResponseInfo can only be called once");
492+
trace->fetchResponseInfo = kj::mv(info);
493+
}
486494
}
487495

488496
void BaseTracer::setUserRequestSpan(SpanParent&& span) {

src/workerd/io/tracer.h

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -120,9 +120,12 @@ class BaseTracer: public kj::Refcounted {
120120
virtual void setEventInfo(
121121
IoContext::IncomingRequest& incomingRequest, tracing::EventInfo&& info) = 0;
122122

123-
// Adds info about the response. Must not be called more than once, and only
124-
// after passing a FetchEventInfo to setEventInfo().
125-
virtual void setFetchResponseInfo(tracing::FetchResponseInfo&& info) = 0;
123+
// Sets the return event for Streaming Tail Worker, including fetchResponseInfo (HTTP status code)
124+
// if available. Must not be called more than once, and fetchResponseInfo should only be set for
125+
// fetch events. For legacy tail worker, there is no distinct return event so we only add
126+
// fetchResponseInfo to the trace if present.
127+
virtual void setReturn(
128+
kj::Date time, kj::Maybe<tracing::FetchResponseInfo> fetchResponseInfo = kj::none) = 0;
126129

127130
// Reports the outcome event of the worker invocation. For Streaming Tail Worker, this will be the
128131
// final event, causing the stream to terminate.
@@ -194,13 +197,15 @@ class WorkerTracer final: public BaseTracer {
194197
void setEventInfoInternal(
195198
const tracing::InvocationSpanContext& context, kj::Date timestamp, tracing::EventInfo&& info);
196199

197-
void setFetchResponseInfo(tracing::FetchResponseInfo&& info) override;
198200
void setOutcome(EventOutcome outcome, kj::Duration cpuTime, kj::Duration wallTime) override;
199201
virtual void recordTimestamp(kj::Date timestamp) override;
200202

201203
// Set a worker-level tag/attribute to be provided in the onset event.
202204
void setWorkerAttribute(kj::ConstString key, Span::TagValue value);
203205

206+
void setReturn(
207+
kj::Date time, kj::Maybe<tracing::FetchResponseInfo> fetchResponseInfo = kj::none) override;
208+
204209
void setJsRpcInfo(const tracing::InvocationSpanContext& context,
205210
kj::Date timestamp,
206211
const kj::ConstString& methodName) override;

src/workerd/io/worker-entrypoint.c++

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,9 @@ class WorkerEntrypoint::ResponseSentTracker final: public kj::HttpService::Respo
134134
bool isSent() const {
135135
return sent;
136136
}
137+
uint getHttpResponseStatus() const {
138+
return httpResponseStatus;
139+
}
137140

138141
kj::Own<kj::AsyncOutputStream> send(uint statusCode,
139142
kj::StringPtr statusText,
@@ -142,6 +145,7 @@ class WorkerEntrypoint::ResponseSentTracker final: public kj::HttpService::Respo
142145
TRACE_EVENT(
143146
"workerd", "WorkerEntrypoint::ResponseSentTracker::send()", "statusCode", statusCode);
144147
sent = true;
148+
httpResponseStatus = statusCode;
145149
return inner.send(statusCode, statusText, headers, expectedBodySize);
146150
}
147151

@@ -152,6 +156,7 @@ class WorkerEntrypoint::ResponseSentTracker final: public kj::HttpService::Respo
152156
}
153157

154158
private:
159+
uint httpResponseStatus = 0;
155160
kj::HttpService::Response& inner;
156161
bool sent = false;
157162
};
@@ -255,6 +260,9 @@ kj::Promise<void> WorkerEntrypoint::request(kj::HttpMethod method,
255260
auto wrappedResponse = kj::heap<ResponseSentTracker>(response);
256261

257262
bool isActor = context.getActor() != kj::none;
263+
// HACK: Capture workerTracer directly, it's unclear how to acquire the right tracer from context
264+
// when we need it.
265+
kj::Maybe<BaseTracer&> workerTracer;
258266

259267
KJ_IF_SOME(t, incomingRequest->getWorkerTracer()) {
260268
kj::String cfJson;
@@ -281,6 +289,7 @@ kj::Promise<void> WorkerEntrypoint::request(kj::HttpMethod method,
281289

282290
t.setEventInfo(*incomingRequest,
283291
tracing::FetchEventInfo(method, kj::str(url), kj::mv(cfJson), kj::mv(traceHeadersArray)));
292+
workerTracer = t;
284293
}
285294

286295
auto metricsForCatch = kj::addRef(incomingRequest->getMetrics());
@@ -313,10 +322,19 @@ kj::Promise<void> WorkerEntrypoint::request(kj::HttpMethod method,
313322
cfBlobJson, lock,
314323
lock.getExportedHandler(entrypointName, kj::mv(props), context.getActor()), kj::mv(signal));
315324
})
316-
.then([this](api::DeferredProxy<void> deferredProxy) {
325+
.then([this, &context, &wrappedResponse = *wrappedResponse, workerTracer](
326+
api::DeferredProxy<void> deferredProxy) {
317327
TRACE_EVENT("workerd", "WorkerEntrypoint::request() deferred proxy step",
318328
PERFETTO_FLOW_FROM_POINTER(this));
319329
proxyTask = kj::mv(deferredProxy.proxyTask);
330+
KJ_IF_SOME(t, workerTracer) {
331+
auto httpResponseStatus = wrappedResponse.getHttpResponseStatus();
332+
if (httpResponseStatus != 0) {
333+
t.setReturn(context.now(), tracing::FetchResponseInfo(httpResponseStatus));
334+
} else {
335+
t.setReturn(context.now());
336+
}
337+
}
320338
})
321339
.catch_([this, &context](kj::Exception&& exception) mutable -> kj::Promise<void> {
322340
TRACE_EVENT("workerd", "WorkerEntrypoint::request() catch", PERFETTO_FLOW_FROM_POINTER(this));

src/workerd/server/server.c++

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1587,9 +1587,6 @@ class RequestObserverWithTracer final: public RequestObserver, public WorkerInte
15871587
auto time = IoContext::current().now();
15881588
t->recordTimestamp(time);
15891589
}
1590-
if (fetchStatus != 0) {
1591-
t->setFetchResponseInfo(tracing::FetchResponseInfo(fetchStatus));
1592-
}
15931590
t->setOutcome(
15941591
outcome, 0 * kj::MILLISECONDS /* cpu time */, 0 * kj::MILLISECONDS /* wall time */);
15951592
}

0 commit comments

Comments
 (0)