diff --git a/src/workerd/io/trace-stream.c++ b/src/workerd/io/trace-stream.c++ index e47f512afd2..10b4c31d69d 100644 --- a/src/workerd/io/trace-stream.c++ +++ b/src/workerd/io/trace-stream.c++ @@ -125,7 +125,7 @@ class StringCache final { // and define a set of serializers to these types. // Serialize attribute value -jsg::JsValue ToJs(jsg::Lock& js, const tracing::Attribute::Value& value) { +jsg::JsValue ToJs(jsg::Lock& js, const Attribute::Value& value) { KJ_SWITCH_ONEOF(value) { KJ_CASE_ONEOF(str, kj::String) { return js.str(str); @@ -144,7 +144,7 @@ jsg::JsValue ToJs(jsg::Lock& js, const tracing::Attribute::Value& value) { } // Serialize attribute key:value(s) pair object -jsg::JsValue ToJs(jsg::Lock& js, const tracing::Attribute& attribute, StringCache& cache) { +jsg::JsValue ToJs(jsg::Lock& js, const Attribute& attribute, StringCache& cache) { auto obj = js.obj(); obj.set(js, NAME_STR, cache.get(js, attribute.name)); @@ -160,8 +160,7 @@ jsg::JsValue ToJs(jsg::Lock& js, const tracing::Attribute& attribute, StringCach } // Serialize "attributes" event -jsg::JsValue ToJs( - jsg::Lock& js, kj::ArrayPtr attributes, StringCache& cache) { +jsg::JsValue ToJs(jsg::Lock& js, kj::ArrayPtr attributes, StringCache& cache) { auto obj = js.obj(); obj.set(js, TYPE_STR, cache.get(js, ATTRIBUTES_STR)); obj.set(js, INFO_STR, js.arr(attributes, [&cache](jsg::Lock& js, const auto& attr) { @@ -170,13 +169,13 @@ jsg::JsValue ToJs( return obj; } -jsg::JsValue ToJs(jsg::Lock& js, const tracing::FetchResponseInfo& info, StringCache& cache) { +jsg::JsValue ToJs(jsg::Lock& js, const FetchResponseInfo& info, StringCache& cache) { static kj::StringPtr keys[] = {TYPE_STR, STATUSCODE_STR}; jsg::JsValue values[] = {cache.get(js, FETCH_STR), js.num(info.statusCode)}; return js.obj(kj::ArrayPtr(keys), kj::ArrayPtr(values)); } -jsg::JsValue ToJs(jsg::Lock& js, const tracing::FetchEventInfo& info, StringCache& cache) { +jsg::JsValue ToJs(jsg::Lock& js, const FetchEventInfo& info, StringCache& cache) { auto obj = js.obj(); obj.set(js, TYPE_STR, cache.get(js, FETCH_STR)); obj.set(js, METHOD_STR, cache.get(js, kj::str(info.method))); @@ -185,7 +184,7 @@ jsg::JsValue ToJs(jsg::Lock& js, const tracing::FetchEventInfo& info, StringCach obj.set(js, CFJSON_STR, jsg::JsValue(js.parseJson(info.cfJson).getHandle(js))); } - auto ToJs = [](jsg::Lock& js, const tracing::FetchEventInfo::Header& header, StringCache& cache) { + auto ToJs = [](jsg::Lock& js, const FetchEventInfo::Header& header, StringCache& cache) { auto obj = js.obj(); obj.set(js, NAME_STR, cache.get(js, header.name)); obj.set(js, VALUE_STR, js.str(header.value)); @@ -199,13 +198,13 @@ jsg::JsValue ToJs(jsg::Lock& js, const tracing::FetchEventInfo& info, StringCach return obj; } -jsg::JsValue ToJs(jsg::Lock& js, const tracing::JsRpcEventInfo& info, StringCache& cache) { +jsg::JsValue ToJs(jsg::Lock& js, const JsRpcEventInfo& info, StringCache& cache) { static kj::StringPtr keys[] = {TYPE_STR}; jsg::JsValue values[] = {cache.get(js, JSRPC_STR)}; return js.obj(kj::ArrayPtr(keys), kj::ArrayPtr(values)); } -jsg::JsValue ToJs(jsg::Lock& js, const tracing::ScheduledEventInfo& info, StringCache& cache) { +jsg::JsValue ToJs(jsg::Lock& js, const ScheduledEventInfo& info, StringCache& cache) { auto obj = js.obj(); obj.set(js, TYPE_STR, cache.get(js, SCHEDULED_STR)); if (isPredictableModeForTest()) { @@ -217,7 +216,7 @@ jsg::JsValue ToJs(jsg::Lock& js, const tracing::ScheduledEventInfo& info, String return obj; } -jsg::JsValue ToJs(jsg::Lock& js, const tracing::AlarmEventInfo& info, StringCache& cache) { +jsg::JsValue ToJs(jsg::Lock& js, const AlarmEventInfo& info, StringCache& cache) { auto obj = js.obj(); obj.set(js, TYPE_STR, cache.get(js, ALARM_STR)); if (isPredictableModeForTest()) { @@ -228,21 +227,21 @@ jsg::JsValue ToJs(jsg::Lock& js, const tracing::AlarmEventInfo& info, StringCach return obj; } -jsg::JsValue ToJs(jsg::Lock& js, const tracing::QueueEventInfo& info, StringCache& cache) { +jsg::JsValue ToJs(jsg::Lock& js, const QueueEventInfo& info, StringCache& cache) { static kj::StringPtr keys[] = {TYPE_STR, QUEUENAME_STR, BATCHSIZE_STR}; jsg::JsValue values[] = { cache.get(js, QUEUE_STR), js.str(info.queueName), js.num(info.batchSize)}; return js.obj(kj::ArrayPtr(keys), kj::ArrayPtr(values)); } -jsg::JsValue ToJs(jsg::Lock& js, const tracing::EmailEventInfo& info, StringCache& cache) { +jsg::JsValue ToJs(jsg::Lock& js, const EmailEventInfo& info, StringCache& cache) { static kj::StringPtr keys[] = {TYPE_STR, MAILFROM_STR, RCPTTO_STR, RAWSIZE_STR}; jsg::JsValue values[] = { cache.get(js, EMAIL_STR), js.str(info.mailFrom), js.str(info.rcptTo), js.num(info.rawSize)}; return js.obj(kj::ArrayPtr(keys), kj::ArrayPtr(values)); } -jsg::JsValue ToJs(jsg::Lock& js, const tracing::TraceEventInfo& info, StringCache& cache) { +jsg::JsValue ToJs(jsg::Lock& js, const TraceEventInfo& info, StringCache& cache) { auto obj = js.obj(); obj.set(js, TYPE_STR, cache.get(js, TRACE_STR)); obj.set(js, TRACES_STR, @@ -255,23 +254,22 @@ jsg::JsValue ToJs(jsg::Lock& js, const tracing::TraceEventInfo& info, StringCach return obj; } -jsg::JsValue ToJs( - jsg::Lock& js, const tracing::HibernatableWebSocketEventInfo& info, StringCache& cache) { +jsg::JsValue ToJs(jsg::Lock& js, const HibernatableWebSocketEventInfo& info, StringCache& cache) { auto obj = js.obj(); obj.set(js, TYPE_STR, cache.get(js, HIBERNATABLEWEBSOCKET_STR)); KJ_SWITCH_ONEOF(info.type) { - KJ_CASE_ONEOF(message, tracing::HibernatableWebSocketEventInfo::Message) { + KJ_CASE_ONEOF(message, HibernatableWebSocketEventInfo::Message) { auto mobj = js.obj(); mobj.set(js, TYPE_STR, cache.get(js, MESSAGE_STR)); obj.set(js, INFO_STR, mobj); } - KJ_CASE_ONEOF(error, tracing::HibernatableWebSocketEventInfo::Error) { + KJ_CASE_ONEOF(error, HibernatableWebSocketEventInfo::Error) { auto mobj = js.obj(); mobj.set(js, TYPE_STR, cache.get(js, ERROR_STR)); obj.set(js, INFO_STR, mobj); } - KJ_CASE_ONEOF(close, tracing::HibernatableWebSocketEventInfo::Close) { + KJ_CASE_ONEOF(close, HibernatableWebSocketEventInfo::Close) { auto mobj = js.obj(); mobj.set(js, TYPE_STR, cache.get(js, CLOSE_STR)); mobj.set(js, CODE_STR, js.num(close.code)); @@ -283,7 +281,7 @@ jsg::JsValue ToJs( return obj; } -jsg::JsValue ToJs(jsg::Lock& js, const tracing::CustomEventInfo& info, StringCache& cache) { +jsg::JsValue ToJs(jsg::Lock& js, const CustomEventInfo& info, StringCache& cache) { auto obj = js.obj(); obj.set(js, TYPE_STR, cache.get(js, CUSTOM_STR)); return obj; @@ -326,7 +324,7 @@ kj::String enumToStr(const Enum& var) { return kj::str(enums[i].getProto().getName()); } -jsg::JsValue ToJs(jsg::Lock& js, const tracing::Onset& onset, StringCache& cache) { +jsg::JsValue ToJs(jsg::Lock& js, const Onset& onset, StringCache& cache) { auto obj = js.obj(); obj.set(js, TYPE_STR, cache.get(js, ONSET_STR)); obj.set(js, EXECUTIONMODEL_STR, cache.get(js, enumToStr(onset.workerInfo.executionModel))); @@ -361,31 +359,31 @@ jsg::JsValue ToJs(jsg::Lock& js, const tracing::Onset& onset, StringCache& cache } KJ_SWITCH_ONEOF(onset.info) { - KJ_CASE_ONEOF(fetch, tracing::FetchEventInfo) { + KJ_CASE_ONEOF(fetch, FetchEventInfo) { obj.set(js, INFO_STR, ToJs(js, fetch, cache)); } - KJ_CASE_ONEOF(jsrpc, tracing::JsRpcEventInfo) { + KJ_CASE_ONEOF(jsrpc, JsRpcEventInfo) { obj.set(js, INFO_STR, ToJs(js, jsrpc, cache)); } - KJ_CASE_ONEOF(scheduled, tracing::ScheduledEventInfo) { + KJ_CASE_ONEOF(scheduled, ScheduledEventInfo) { obj.set(js, INFO_STR, ToJs(js, scheduled, cache)); } - KJ_CASE_ONEOF(alarm, tracing::AlarmEventInfo) { + KJ_CASE_ONEOF(alarm, AlarmEventInfo) { obj.set(js, INFO_STR, ToJs(js, alarm, cache)); } - KJ_CASE_ONEOF(queue, tracing::QueueEventInfo) { + KJ_CASE_ONEOF(queue, QueueEventInfo) { obj.set(js, INFO_STR, ToJs(js, queue, cache)); } - KJ_CASE_ONEOF(email, tracing::EmailEventInfo) { + KJ_CASE_ONEOF(email, EmailEventInfo) { obj.set(js, INFO_STR, ToJs(js, email, cache)); } - KJ_CASE_ONEOF(trace, tracing::TraceEventInfo) { + KJ_CASE_ONEOF(trace, TraceEventInfo) { obj.set(js, INFO_STR, ToJs(js, trace, cache)); } - KJ_CASE_ONEOF(hws, tracing::HibernatableWebSocketEventInfo) { + KJ_CASE_ONEOF(hws, HibernatableWebSocketEventInfo) { obj.set(js, INFO_STR, ToJs(js, hws, cache)); } - KJ_CASE_ONEOF(custom, tracing::CustomEventInfo) { + KJ_CASE_ONEOF(custom, CustomEventInfo) { obj.set(js, INFO_STR, ToJs(js, custom, cache)); } } @@ -399,7 +397,7 @@ jsg::JsValue ToJs(jsg::Lock& js, const tracing::Onset& onset, StringCache& cache return obj; } -jsg::JsValue ToJs(jsg::Lock& js, const tracing::Outcome& outcome, StringCache& cache) { +jsg::JsValue ToJs(jsg::Lock& js, const Outcome& outcome, StringCache& cache) { auto obj = js.obj(); obj.set(js, TYPE_STR, cache.get(js, OUTCOME_STR)); obj.set(js, OUTCOME_STR, ToJs(js, outcome.outcome, cache)); @@ -413,7 +411,7 @@ jsg::JsValue ToJs(jsg::Lock& js, const tracing::Outcome& outcome, StringCache& c return obj; } -jsg::JsValue ToJs(jsg::Lock& js, const tracing::SpanOpen& spanOpen, StringCache& cache) { +jsg::JsValue ToJs(jsg::Lock& js, const SpanOpen& spanOpen, StringCache& cache) { auto obj = js.obj(); obj.set(js, TYPE_STR, cache.get(js, SPANOPEN_STR)); obj.set(js, NAME_STR, js.str(spanOpen.operationName)); @@ -422,13 +420,13 @@ jsg::JsValue ToJs(jsg::Lock& js, const tracing::SpanOpen& spanOpen, StringCache& KJ_IF_SOME(info, spanOpen.info) { KJ_SWITCH_ONEOF(info) { - KJ_CASE_ONEOF(fetch, tracing::FetchEventInfo) { + KJ_CASE_ONEOF(fetch, FetchEventInfo) { obj.set(js, INFO_STR, ToJs(js, fetch, cache)); } - KJ_CASE_ONEOF(jsrpc, tracing::JsRpcEventInfo) { + KJ_CASE_ONEOF(jsrpc, JsRpcEventInfo) { obj.set(js, INFO_STR, ToJs(js, jsrpc, cache)); } - KJ_CASE_ONEOF(custom, tracing::CustomInfo) { + KJ_CASE_ONEOF(custom, CustomInfo) { obj.set(js, INFO_STR, ToJs(js, custom.asPtr(), cache)); } } @@ -436,14 +434,14 @@ jsg::JsValue ToJs(jsg::Lock& js, const tracing::SpanOpen& spanOpen, StringCache& return obj; } -jsg::JsValue ToJs(jsg::Lock& js, const tracing::SpanClose& spanClose, StringCache& cache) { +jsg::JsValue ToJs(jsg::Lock& js, const SpanClose& spanClose, StringCache& cache) { auto obj = js.obj(); obj.set(js, TYPE_STR, cache.get(js, SPANCLOSE_STR)); obj.set(js, OUTCOME_STR, ToJs(js, spanClose.outcome, cache)); return obj; } -jsg::JsValue ToJs(jsg::Lock& js, const tracing::DiagnosticChannelEvent& dce, StringCache& cache) { +jsg::JsValue ToJs(jsg::Lock& js, const DiagnosticChannelEvent& dce, StringCache& cache) { auto obj = js.obj(); obj.set(js, TYPE_STR, cache.get(js, DIAGNOSTICCHANNEL_STR)); obj.set(js, CHANNEL_STR, cache.get(js, dce.channel)); @@ -455,7 +453,7 @@ jsg::JsValue ToJs(jsg::Lock& js, const tracing::DiagnosticChannelEvent& dce, Str return obj; } -jsg::JsValue ToJs(jsg::Lock& js, const tracing::Exception& ex, StringCache& cache) { +jsg::JsValue ToJs(jsg::Lock& js, const Exception& ex, StringCache& cache) { auto obj = js.obj(); obj.set(js, TYPE_STR, cache.get(js, EXCEPTION_STR)); obj.set(js, NAME_STR, cache.get(js, ex.name)); @@ -470,7 +468,7 @@ jsg::JsValue ToJs(jsg::Lock& js, const LogLevel& level, StringCache& cache) { return cache.get(js, toLower(enumToStr(level))); } -jsg::JsValue ToJs(jsg::Lock& js, const tracing::Log& log, StringCache& cache) { +jsg::JsValue ToJs(jsg::Lock& js, const Log& log, StringCache& cache) { auto obj = js.obj(); obj.set(js, TYPE_STR, cache.get(js, LOG_STR)); obj.set(js, LEVEL_STR, ToJs(js, log.logLevel, cache)); @@ -479,7 +477,7 @@ jsg::JsValue ToJs(jsg::Lock& js, const tracing::Log& log, StringCache& cache) { return obj; } -jsg::JsValue ToJs(jsg::Lock& js, const tracing::Return& ret, StringCache& cache) { +jsg::JsValue ToJs(jsg::Lock& js, const Return& ret, StringCache& cache) { auto obj = js.obj(); obj.set(js, TYPE_STR, cache.get(js, RETURN_STR)); @@ -490,7 +488,7 @@ jsg::JsValue ToJs(jsg::Lock& js, const tracing::Return& ret, StringCache& cache) return obj; } -jsg::JsValue ToJs(jsg::Lock& js, const tracing::TailEvent& event, StringCache& cache) { +jsg::JsValue ToJs(jsg::Lock& js, const TailEvent& event, StringCache& cache) { auto obj = js.obj(); // Set SpanContext @@ -506,28 +504,28 @@ jsg::JsValue ToJs(jsg::Lock& js, const tracing::TailEvent& event, StringCache& c obj.set(js, SEQUENCE_STR, js.num(event.sequence)); KJ_SWITCH_ONEOF(event.event) { - KJ_CASE_ONEOF(onset, tracing::Onset) { + KJ_CASE_ONEOF(onset, Onset) { obj.set(js, EVENT_STR, ToJs(js, onset, cache)); } - KJ_CASE_ONEOF(outcome, tracing::Outcome) { + KJ_CASE_ONEOF(outcome, Outcome) { obj.set(js, EVENT_STR, ToJs(js, outcome, cache)); } - KJ_CASE_ONEOF(spanOpen, tracing::SpanOpen) { + KJ_CASE_ONEOF(spanOpen, SpanOpen) { obj.set(js, EVENT_STR, ToJs(js, spanOpen, cache)); } - KJ_CASE_ONEOF(spanClose, tracing::SpanClose) { + KJ_CASE_ONEOF(spanClose, SpanClose) { obj.set(js, EVENT_STR, ToJs(js, spanClose, cache)); } - KJ_CASE_ONEOF(de, tracing::DiagnosticChannelEvent) { + KJ_CASE_ONEOF(de, DiagnosticChannelEvent) { obj.set(js, EVENT_STR, ToJs(js, de, cache)); } - KJ_CASE_ONEOF(ex, tracing::Exception) { + KJ_CASE_ONEOF(ex, Exception) { obj.set(js, EVENT_STR, ToJs(js, ex, cache)); } - KJ_CASE_ONEOF(log, tracing::Log) { + KJ_CASE_ONEOF(log, Log) { obj.set(js, EVENT_STR, ToJs(js, log, cache)); } - KJ_CASE_ONEOF(ret, tracing::Return) { + KJ_CASE_ONEOF(ret, Return) { obj.set(js, EVENT_STR, ToJs(js, ret, cache)); } KJ_CASE_ONEOF(attrs, CustomInfo) { @@ -539,34 +537,34 @@ jsg::JsValue ToJs(jsg::Lock& js, const tracing::TailEvent& event, StringCache& c } // Returns the name of the handler function for this type of event. -kj::Maybe getHandlerName(const tracing::TailEvent& event) { +kj::Maybe getHandlerName(const TailEvent& event) { KJ_SWITCH_ONEOF(event.event) { - KJ_CASE_ONEOF(_, tracing::Onset) { + KJ_CASE_ONEOF(_, Onset) { KJ_FAIL_ASSERT("Onset event should only be provided to tailStream(), not returned handler"); // return ONSET_STR; } - KJ_CASE_ONEOF(_, tracing::Outcome) { + KJ_CASE_ONEOF(_, Outcome) { return OUTCOME_STR; } - KJ_CASE_ONEOF(_, tracing::SpanOpen) { + KJ_CASE_ONEOF(_, SpanOpen) { return SPANOPEN_STR; } - KJ_CASE_ONEOF(_, tracing::SpanClose) { + KJ_CASE_ONEOF(_, SpanClose) { return SPANCLOSE_STR; } - KJ_CASE_ONEOF(_, tracing::DiagnosticChannelEvent) { + KJ_CASE_ONEOF(_, DiagnosticChannelEvent) { return DIAGNOSTICCHANNEL_STR; } - KJ_CASE_ONEOF(_, tracing::Exception) { + KJ_CASE_ONEOF(_, Exception) { return EXCEPTION_STR; } - KJ_CASE_ONEOF(_, tracing::Log) { + KJ_CASE_ONEOF(_, Log) { return LOG_STR; } - KJ_CASE_ONEOF(_, tracing::Return) { + KJ_CASE_ONEOF(_, Return) { return RETURN_STR; } - KJ_CASE_ONEOF(_, tracing::CustomInfo) { + KJ_CASE_ONEOF(_, CustomInfo) { return ATTRIBUTES_STR; } } @@ -608,9 +606,9 @@ class TailStreamTarget final: public rpc::TailStreamTarget::Server { auto params = reportContext.getParams(); KJ_ASSERT(params.hasEvents(), "Events are required."); auto eventReaders = params.getEvents(); - kj::Vector events(eventReaders.size()); + kj::Vector events(eventReaders.size()); for (auto reader: eventReaders) { - events.add(tracing::TailEvent(reader)); + events.add(TailEvent(reader)); } // If we have not yet received the onset event, the first event in the @@ -682,11 +680,11 @@ class TailStreamTarget final: public rpc::TailStreamTarget::Server { // shutdown. kj::Promise handleOnset(Worker::Lock& lock, IoContext& ioContext, - kj::Array events, + kj::Array events, kj::Rc results) { // There should be only a single onset event in this batch. - KJ_ASSERT(events.size() == 1 && events[0].event.is(), - "Expected only a single onset event"); + KJ_ASSERT( + events.size() == 1 && events[0].event.is(), "Expected only a single onset event"); auto& event = events[0]; auto handler = KJ_REQUIRE_NONNULL( @@ -786,7 +784,7 @@ class TailStreamTarget final: public rpc::TailStreamTarget::Server { kj::Promise handleEvents(Worker::Lock& lock, const jsg::JsValue& handler, IoContext& ioContext, - kj::Array events, + kj::Array events, kj::Rc results) { jsg::Lock& js = lock; @@ -812,7 +810,7 @@ class TailStreamTarget final: public rpc::TailStreamTarget::Server { // If we already received an outcome event, we will stop processing any // further events. if (finishing) break; - if (event.event.is()) { + if (event.event.is()) { finishing = true; results->setStop(true); doneReceiving = true; @@ -896,9 +894,8 @@ class TailStreamTarget final: public rpc::TailStreamTarget::Server { }; } // namespace -kj::Maybe TailStreamCustomEventImpl::getEventInfo() const { - return tracing::EventInfo( - tracing::TraceEventInfo(kj::Array(nullptr))); +kj::Maybe TailStreamCustomEventImpl::getEventInfo() const { + return EventInfo(TraceEventInfo(kj::Array(nullptr))); } kj::Promise TailStreamCustomEventImpl::run( @@ -990,109 +987,151 @@ kj::Promise TailStreamCustomEventImpl::sen } } -void TailStreamWriterState::reportImpl(tracing::TailEvent&& event) { - // In reportImpl, our inner state must be active. - auto& actives = KJ_ASSERT_NONNULL(inner.tryGet>>()); +namespace { +// The TailStreamWriterState holds the current client-side state for a collection +// of streaming tail workers that a worker is reporting events to. +struct TailStreamWriterState { + // The initial state of our tail worker writer is that it is pending the first + // onset event. During this time we will only have a collection of WorkerInterface + // instances. When our first event is reported (the onset) we will arrange to acquire + // tailStream capabilities from each then use those to report the initial onset. + using Pending = kj::Array>; + + // Instances of Active are refcounted. The TailStreamWriterState itself + // holds the initial ref. Whenever events are being dispatched, an additional + // ref will be held by the outstanding pump promise in order to keep the + // client stub alive long enough for the rpc calls to complete. It is possible + // that the TailStreamWriterState will be dropped while pump promises are still + // pending. + struct Active: public kj::Refcounted { + // Reference to keep the worker interface instance alive. + kj::Maybe capability; + bool pumping = false; + bool onsetSeen = false; + workerd::util::Queue queue; + + Active(rpc::TailStreamTarget::Client capability): capability(kj::mv(capability)) {} + }; - // We only care about sessions that are currently active. - kj::Vector> alive(actives.size()); - for (auto& active: actives) { - if (active->capability != kj::none) { - alive.add(kj::mv(active)); + struct Closed {}; + + // The closing flag will be set when the Outcome event has been reported. + // Once closing is true, no further events will be accepted and the state + // will transition to closed once the currently active pump completes. + bool closing = false; + kj::OneOf>, Closed> inner; + kj::TaskSet& waitUntilTasks; + + TailStreamWriterState(Pending pending, kj::TaskSet& waitUntilTasks) + : inner(kj::mv(pending)), + waitUntilTasks(waitUntilTasks) {} + KJ_DISALLOW_COPY_AND_MOVE(TailStreamWriterState); + + void reportImpl(TailEvent&& event) { + // In reportImpl, our inner state must be active. + auto& actives = KJ_ASSERT_NONNULL(inner.tryGet>>()); + + // We only care about sessions that are currently active. + kj::Vector> alive(actives.size()); + for (auto& active: actives) { + if (active->capability != kj::none) { + alive.add(kj::mv(active)); + } } - } - if (alive.size() == 0) { - // Oh! We have no active sessions. Well, never mind then, let's - // transition to a closed state and drop everything on the floor. - inner = Closed{}; + if (alive.size() == 0) { + // Oh! We have no active sessions. Well, never mind then, let's + // transition to a closed state and drop everything on the floor. + inner = Closed{}; - // Since we have no more living sessions (e.g. because all tail workers failed to return a valid - // handler), mark the state as closing as we can't handle future events anyway. - closing = true; - return; - } + // Since we have no more living sessions (e.g. because all tail workers failed to return a valid + // handler), mark the state as closing as we can't handle future events anyway. + closing = true; + return; + } - // If we're already closing, no further events should be reported. - if (closing) return; - if (event.event.is()) { - closing = true; - } + // If we're already closing, no further events should be reported. + if (closing) return; + if (event.event.is()) { + closing = true; + } - // Deliver the event to the queue and make sure we are processing. - for (auto& active: alive) { - active->queue.push(event.clone()); - if (!active->pumping) { - waitUntilTasks.add(pump(kj::addRef(*active))); + // Deliver the event to the queue and make sure we are processing. + for (auto& active: alive) { + active->queue.push(event.clone()); + if (!active->pumping) { + waitUntilTasks.add(pump(kj::addRef(*active))); + } } - } - inner = alive.releaseAsArray(); -} + inner = alive.releaseAsArray(); + } -// Delivers the queued tail events to a streaming tail worker. -kj::Promise TailStreamWriterState::pump(kj::Own current) { - current->pumping = true; - KJ_DEFER(current->pumping = false); - - if (!current->onsetSeen) { - // Our first event... yay! Our first job here will be to dispatch - // the onset event to the tail worker. If the tail worker wishes - // to handle the remaining events in the stream, then it will return - // a new capability to which those would be reported. This is done - // via the "result.getPipeline()" API below. If hasPipeline() - // returns false then that means the tail worker did not return - // a handler for this stream and no further attempts to deliver - // events should be made for this stream. - current->onsetSeen = true; - auto onsetEvent = KJ_ASSERT_NONNULL(current->queue.pop()); - auto builder = KJ_ASSERT_NONNULL(current->capability).reportRequest(); - auto eventsBuilder = builder.initEvents(1); - // When sending the onset event to the tail worker, the receiving end - // requires that the onset event be delivered separately, without any - // other events in the bundle. So here we'll separate it out and deliver - // just the one event... - onsetEvent.copyTo(eventsBuilder[0]); - auto result = co_await builder.send(); - if (result.getStop()) { - // If our call to send returns a stop signal, then we'll clear - // the capability and be done. - current->queue.clear(); - current->capability = kj::none; - co_return; + // Delivers the queued tail events to a streaming tail worker. + kj::Promise pump(kj::Own current) { + current->pumping = true; + KJ_DEFER(current->pumping = false); + + if (!current->onsetSeen) { + // Our first event... yay! Our first job here will be to dispatch + // the onset event to the tail worker. If the tail worker wishes + // to handle the remaining events in the stream, then it will return + // a new capability to which those would be reported. This is done + // via the "result.getPipeline()" API below. If hasPipeline() + // returns false then that means the tail worker did not return + // a handler for this stream and no further attempts to deliver + // events should be made for this stream. + current->onsetSeen = true; + auto onsetEvent = KJ_ASSERT_NONNULL(current->queue.pop()); + auto builder = KJ_ASSERT_NONNULL(current->capability).reportRequest(); + auto eventsBuilder = builder.initEvents(1); + // When sending the onset event to the tail worker, the receiving end + // requires that the onset event be delivered separately, without any + // other events in the bundle. So here we'll separate it out and deliver + // just the one event... + onsetEvent.copyTo(eventsBuilder[0]); + auto result = co_await builder.send(); + if (result.getStop()) { + // If our call to send returns a stop signal, then we'll clear + // the capability and be done. + current->queue.clear(); + current->capability = kj::none; + co_return; + } } - } - // If we got this far then we have a handler for all of our events. - // Deliver remaining streaming tail events in batches if possible. - while (!current->queue.empty()) { - auto builder = KJ_ASSERT_NONNULL(current->capability).reportRequest(); - auto eventsBuilder = builder.initEvents(current->queue.size()); - size_t n = 0; - current->queue.drainTo([&](tracing::TailEvent&& event) { event.copyTo(eventsBuilder[n++]); }); - - auto result = co_await builder.send(); - - // Note that although we cleared the current.queue above, it is - // possible/likely that additional events were added to the queue - // while the above builder.send() was being awaited. If the result - // comes back indicating that we should stop, then we'll stop here - // without any further processing. We'll defensively clear the - // queue again and drop the client stub. Otherwise, if result.getStop() - // is false, we'll loop back around to send any items that have since - // been added to the queue or exit this loop if there are no additional - // events waiting to be sent. - if (result.getStop()) { - current->queue.clear(); - current->capability = kj::none; - co_return; + // If we got this far then we have a handler for all of our events. + // Deliver remaining streaming tail events in batches if possible. + while (!current->queue.empty()) { + auto builder = KJ_ASSERT_NONNULL(current->capability).reportRequest(); + auto eventsBuilder = builder.initEvents(current->queue.size()); + size_t n = 0; + current->queue.drainTo([&](TailEvent&& event) { event.copyTo(eventsBuilder[n++]); }); + + auto result = co_await builder.send(); + + // Note that although we cleared the current.queue above, it is + // possible/likely that additional events were added to the queue + // while the above builder.send() was being awaited. If the result + // comes back indicating that we should stop, then we'll stop here + // without any further processing. We'll defensively clear the + // queue again and drop the client stub. Otherwise, if result.getStop() + // is false, we'll loop back around to send any items that have since + // been added to the queue or exit this loop if there are no additional + // events waiting to be sent. + if (result.getStop()) { + current->queue.clear(); + current->capability = kj::none; + co_return; + } } } -} +}; +} // namespace // If we are using streaming tail workers, initialize the mechanism that will deliver events // to that collection of tail workers. -kj::Maybe> initializeTailStreamWriter( +kj::Maybe> initializeTailStreamWriter( kj::Array> streamingTailWorkers, kj::TaskSet& waitUntilTasks) { if (streamingTailWorkers.size() == 0) { return kj::none; @@ -1100,12 +1139,12 @@ kj::Maybe> initializeTailStreamWriter( auto state = kj::heap(kj::mv(streamingTailWorkers), waitUntilTasks); - return kj::refcounted( + return kj::refcounted( // This lambda is called for every streaming tail event that is reported. We use // the TailStreamWriterState for this stream to actually handle the event. // Pay attention to the ownership of state here. The lambda holds a bare // reference while the instance is attached to the kj::Own below. - [&state = *state, &waitUntilTasks](tracing::TailEvent&& event) mutable { + [&state = *state, &waitUntilTasks](TailEvent&& event) mutable { KJ_SWITCH_ONEOF(state.inner) { KJ_CASE_ONEOF(closed, TailStreamWriterState::Closed) { // The tail stream has already been closed because we have received an outcome event. The @@ -1116,11 +1155,11 @@ kj::Maybe> initializeTailStreamWriter( // This is our first event! It has to be an onset event, which the writer // should have validated for us. Assert if it is not an onset then proceed // to start each of our tail working sessions. - KJ_ASSERT(event.event.is(), "First event must be an onset."); + KJ_ASSERT(event.event.is(), "First event must be an onset."); // Transitions into the active state by grabbing the pending client capability. state.inner = KJ_MAP(wi, pending) { - auto customEvent = kj::heap(); + auto customEvent = kj::heap(); auto result = customEvent->getCap(); auto active = kj::refcounted(kj::mv(result)); @@ -1139,7 +1178,7 @@ kj::Maybe> initializeTailStreamWriter( } KJ_CASE_ONEOF(active, kj::Array>) { // Event cannot be a onset, which should have been validated by the writer. - KJ_ASSERT(!event.event.is(), "Only the first event can be an onset"); + KJ_ASSERT(!event.event.is(), "Only the first event can be an onset"); } } state.reportImpl(kj::mv(event)); diff --git a/src/workerd/io/trace-stream.h b/src/workerd/io/trace-stream.h index 3790841e13b..1ba62a209f0 100644 --- a/src/workerd/io/trace-stream.h +++ b/src/workerd/io/trace-stream.h @@ -53,51 +53,6 @@ class TailStreamCustomEventImpl final: public WorkerInterface::CustomEvent { uint16_t typeId; }; -// The TailStreamWriterState holds the current client-side state for a collection -// of streaming tail workers that a worker is reporting events to. -struct TailStreamWriterState { - // The initial state of our tail worker writer is that it is pending the first - // onset event. During this time we will only have a collection of WorkerInterface - // instances. When our first event is reported (the onset) we will arrange to acquire - // tailStream capabilities from each then use those to report the initial onset. - using Pending = kj::Array>; - - // Instances of Active are refcounted. The TailStreamWriterState itself - // holds the initial ref. Whenever events are being dispatched, an additional - // ref will be held by the outstanding pump promise in order to keep the - // client stub alive long enough for the rpc calls to complete. It is possible - // that the TailStreamWriterState will be dropped while pump promises are still - // pending. - struct Active: public kj::Refcounted { - // Reference to keep the worker interface instance alive. - kj::Maybe capability; - bool pumping = false; - bool onsetSeen = false; - workerd::util::Queue queue; - - Active(rpc::TailStreamTarget::Client capability): capability(kj::mv(capability)) {} - }; - - struct Closed {}; - - // The closing flag will be set when the Outcome event has been reported. - // Once closing is true, no further events will be accepted and the state - // will transition to closed once the currently active pump completes. - bool closing = false; - kj::OneOf>, Closed> inner; - kj::TaskSet& waitUntilTasks; - - TailStreamWriterState(Pending pending, kj::TaskSet& waitUntilTasks) - : inner(kj::mv(pending)), - waitUntilTasks(waitUntilTasks) {} - KJ_DISALLOW_COPY_AND_MOVE(TailStreamWriterState); - - void reportImpl(tracing::TailEvent&& event); - - // Delivers the queued tail events to a streaming tail worker. - kj::Promise pump(kj::Own current); -}; - kj::Maybe> initializeTailStreamWriter( kj::Array> streamingTailWorkers, kj::TaskSet& waitUntilTasks); diff --git a/src/workerd/io/trace-test.c++ b/src/workerd/io/trace-test.c++ index cbea11a1dc1..295712ab143 100644 --- a/src/workerd/io/trace-test.c++ +++ b/src/workerd/io/trace-test.c++ @@ -162,14 +162,14 @@ KJ_TEST("Read/Write FetchEventInfo works") { kj::Vector headers; headers.add(FetchEventInfo::Header(kj::str("foo"), kj::str("bar"))); - tracing::FetchEventInfo info( + FetchEventInfo info( kj::HttpMethod::GET, kj::str("https://example.com"), kj::str("{}"), headers.releaseAsArray()); info.copyTo(fetchInfoBuilder); auto reader = fetchInfoBuilder.asReader(); - tracing::FetchEventInfo info2(reader); + FetchEventInfo info2(reader); KJ_ASSERT(info2.method == kj::HttpMethod::GET); KJ_ASSERT(info2.url == "https://example.com"_kj); KJ_ASSERT(info2.cfJson == "{}"_kj); @@ -177,7 +177,7 @@ KJ_TEST("Read/Write FetchEventInfo works") { KJ_ASSERT(info2.headers[0].name == "foo"_kj); KJ_ASSERT(info2.headers[0].value == "bar"_kj); - tracing::FetchEventInfo info3 = info.clone(); + FetchEventInfo info3 = info.clone(); KJ_ASSERT(info3.method == kj::HttpMethod::GET); KJ_ASSERT(info3.url == "https://example.com"_kj); KJ_ASSERT(info3.cfJson == "{}"_kj); @@ -190,16 +190,16 @@ KJ_TEST("Read/Write JsRpcEventInfo works") { capnp::MallocMessageBuilder builder; auto jsRpcInfoBuilder = builder.initRoot(); - tracing::JsRpcEventInfo info(kj::str("foo")); + JsRpcEventInfo info(kj::str("foo")); info.copyTo(jsRpcInfoBuilder); auto reader = jsRpcInfoBuilder.asReader(); - tracing::JsRpcEventInfo info2(reader); + JsRpcEventInfo info2(reader); KJ_ASSERT(info2.methodName == "foo"_kj); - tracing::JsRpcEventInfo info3 = info.clone(); + JsRpcEventInfo info3 = info.clone(); KJ_ASSERT(info3.methodName == "foo"_kj); } @@ -207,17 +207,17 @@ KJ_TEST("Read/Write ScheduledEventInfo workers") { capnp::MallocMessageBuilder builder; auto infoBuilder = builder.initRoot(); - tracing::ScheduledEventInfo info(1.2, kj::str("foo")); + ScheduledEventInfo info(1.2, kj::str("foo")); info.copyTo(infoBuilder); auto reader = infoBuilder.asReader(); - tracing::ScheduledEventInfo info2(reader); + ScheduledEventInfo info2(reader); KJ_ASSERT(info2.scheduledTime == 1.2); KJ_ASSERT(info2.cron == "foo"_kj); - tracing::ScheduledEventInfo info3 = info.clone(); + ScheduledEventInfo info3 = info.clone(); KJ_ASSERT(info3.scheduledTime == 1.2); KJ_ASSERT(info3.cron == "foo"_kj); } @@ -226,16 +226,16 @@ KJ_TEST("Read/Write AlarmEventInfo works") { capnp::MallocMessageBuilder builder; auto infoBuilder = builder.initRoot(); - tracing::AlarmEventInfo info(kj::UNIX_EPOCH); + AlarmEventInfo info(kj::UNIX_EPOCH); info.copyTo(infoBuilder); auto reader = infoBuilder.asReader(); - tracing::AlarmEventInfo info2(reader); + AlarmEventInfo info2(reader); KJ_ASSERT(info.scheduledTime == info2.scheduledTime); - tracing::AlarmEventInfo info3 = info.clone(); + AlarmEventInfo info3 = info.clone(); KJ_ASSERT(info.scheduledTime == info3.scheduledTime); } @@ -243,17 +243,17 @@ KJ_TEST("Read/Write QueueEventInfo works") { capnp::MallocMessageBuilder builder; auto infoBuilder = builder.initRoot(); - tracing::QueueEventInfo info(kj::str("foo"), 1); + QueueEventInfo info(kj::str("foo"), 1); info.copyTo(infoBuilder); auto reader = infoBuilder.asReader(); - tracing::QueueEventInfo info2(reader); + QueueEventInfo info2(reader); KJ_ASSERT(info2.queueName == "foo"_kj); KJ_ASSERT(info2.batchSize == 1); - tracing::QueueEventInfo info3 = info.clone(); + QueueEventInfo info3 = info.clone(); KJ_ASSERT(info2.queueName == "foo"_kj); KJ_ASSERT(info2.batchSize == 1); } @@ -262,17 +262,17 @@ KJ_TEST("Read/Write EmailEventInfo works") { capnp::MallocMessageBuilder builder; auto infoBuilder = builder.initRoot(); - tracing::EmailEventInfo info(kj::str("foo"), kj::str("bar"), 1); + EmailEventInfo info(kj::str("foo"), kj::str("bar"), 1); info.copyTo(infoBuilder); auto reader = infoBuilder.asReader(); - tracing::EmailEventInfo info2(reader); + EmailEventInfo info2(reader); KJ_ASSERT(info2.mailFrom == "foo"_kj); KJ_ASSERT(info2.rcptTo == "bar"_kj); KJ_ASSERT(info2.rawSize == 1); - tracing::EmailEventInfo info3 = info.clone(); + EmailEventInfo info3 = info.clone(); KJ_ASSERT(info3.mailFrom == "foo"_kj); KJ_ASSERT(info3.rcptTo == "bar"_kj); KJ_ASSERT(info3.rawSize == 1); @@ -286,16 +286,16 @@ KJ_TEST("Read/Write TraceEventInfo works") { items.add(kj::heap(kj::none, kj::str("foo"), kj::none, kj::none, kj::none, kj::Array(), kj::none, ExecutionModel::STATELESS)); - tracing::TraceEventInfo info(items.asPtr()); + TraceEventInfo info(items.asPtr()); info.copyTo(infoBuilder); auto reader = infoBuilder.asReader(); - tracing::TraceEventInfo info2(reader); + TraceEventInfo info2(reader); KJ_ASSERT(info2.traces.size() == 1); KJ_ASSERT(KJ_ASSERT_NONNULL(info2.traces[0].scriptName) == "foo"_kj); - tracing::TraceEventInfo info3 = info.clone(); + TraceEventInfo info3 = info.clone(); KJ_ASSERT(info2.traces.size() == 1); KJ_ASSERT(KJ_ASSERT_NONNULL(info2.traces[0].scriptName) == "foo"_kj); } @@ -304,30 +304,30 @@ KJ_TEST("Read/Write HibernatableWebSocketEventInfo works") { capnp::MallocMessageBuilder builder; auto infoBuilder = builder.initRoot(); - tracing::HibernatableWebSocketEventInfo info(tracing::HibernatableWebSocketEventInfo::Message{}); + HibernatableWebSocketEventInfo info(HibernatableWebSocketEventInfo::Message{}); info.copyTo(infoBuilder); auto reader = infoBuilder.asReader(); - tracing::HibernatableWebSocketEventInfo info2(reader); - KJ_ASSERT(info2.type.is()); + HibernatableWebSocketEventInfo info2(reader); + KJ_ASSERT(info2.type.is()); - tracing::HibernatableWebSocketEventInfo info3 = info.clone(); - KJ_ASSERT(info3.type.is()); + HibernatableWebSocketEventInfo info3 = info.clone(); + KJ_ASSERT(info3.type.is()); } KJ_TEST("Read/Write FetchResponseInfo works") { capnp::MallocMessageBuilder builder; auto infoBuilder = builder.initRoot(); - tracing::FetchResponseInfo info(123); + FetchResponseInfo info(123); info.copyTo(infoBuilder); auto reader = infoBuilder.asReader(); - tracing::FetchResponseInfo info2(reader); + FetchResponseInfo info2(reader); KJ_ASSERT(info2.statusCode == 123); - tracing::FetchResponseInfo info3 = info.clone(); + FetchResponseInfo info3 = info.clone(); KJ_ASSERT(info3.statusCode == 123); } @@ -335,17 +335,17 @@ KJ_TEST("Read/Write DiagnosticChannelEvent works") { capnp::MallocMessageBuilder builder; auto infoBuilder = builder.initRoot(); - tracing::DiagnosticChannelEvent info(kj::UNIX_EPOCH, kj::str("foo"), kj::Array()); + DiagnosticChannelEvent info(kj::UNIX_EPOCH, kj::str("foo"), kj::Array()); info.copyTo(infoBuilder); auto reader = infoBuilder.asReader(); - tracing::DiagnosticChannelEvent info2(reader); + DiagnosticChannelEvent info2(reader); KJ_ASSERT(info2.timestamp == info.timestamp); KJ_ASSERT(info2.channel == "foo"_kj); KJ_ASSERT(info2.message.size() == 0); - tracing::DiagnosticChannelEvent info3 = info.clone(); + DiagnosticChannelEvent info3 = info.clone(); KJ_ASSERT(info3.timestamp == info.timestamp); KJ_ASSERT(info3.channel == "foo"_kj); KJ_ASSERT(info3.message.size() == 0); @@ -355,16 +355,16 @@ KJ_TEST("Read/Write Log works") { capnp::MallocMessageBuilder builder; auto infoBuilder = builder.initRoot(); - tracing::Log info(kj::UNIX_EPOCH, LogLevel::INFO, kj::str("foo")); + Log info(kj::UNIX_EPOCH, LogLevel::INFO, kj::str("foo")); info.copyTo(infoBuilder); auto reader = infoBuilder.asReader(); - tracing::Log info2(reader); + Log info2(reader); KJ_ASSERT(info.timestamp == info2.timestamp); KJ_ASSERT(info2.logLevel == LogLevel::INFO); KJ_ASSERT(info2.message == "foo"_kj); - tracing::Log info3 = info.clone(); + Log info3 = info.clone(); KJ_ASSERT(info.timestamp == info3.timestamp); KJ_ASSERT(info3.logLevel == LogLevel::INFO); KJ_ASSERT(info3.message == "foo"_kj); @@ -374,17 +374,17 @@ KJ_TEST("Read/Write Exception works") { capnp::MallocMessageBuilder builder; auto infoBuilder = builder.initRoot(); - tracing::Exception info(kj::UNIX_EPOCH, kj::str("foo"), kj::str("bar"), kj::none); + Exception info(kj::UNIX_EPOCH, kj::str("foo"), kj::str("bar"), kj::none); info.copyTo(infoBuilder); auto reader = infoBuilder.asReader(); - tracing::Exception info2(reader); + Exception info2(reader); KJ_ASSERT(info.timestamp == info2.timestamp); KJ_ASSERT(info2.name == "foo"_kj); KJ_ASSERT(info2.message == "bar"_kj); KJ_ASSERT(info2.stack == kj::none); - tracing::Exception info3 = info.clone(); + Exception info3 = info.clone(); KJ_ASSERT(info.timestamp == info3.timestamp); KJ_ASSERT(info3.name == "foo"_kj); KJ_ASSERT(info3.message == "bar"_kj); @@ -395,11 +395,11 @@ KJ_TEST("Read/Write Attribute works") { capnp::MallocMessageBuilder builder; auto infoBuilder = builder.initRoot(); - tracing::Attribute attr("foo"_kjc, {123.0, 321.2}); + Attribute attr("foo"_kjc, {123.0, 321.2}); attr.copyTo(infoBuilder); auto reader = infoBuilder.asReader(); - tracing::Attribute info2(reader); + Attribute info2(reader); KJ_ASSERT(info2.name == "foo"_kj); KJ_ASSERT(KJ_ASSERT_NONNULL(info2.value[0].tryGet()) == 123.0); KJ_ASSERT(KJ_ASSERT_NONNULL(info2.value[1].tryGet()) == 321.2); @@ -409,16 +409,16 @@ KJ_TEST("Read/Write Return works") { capnp::MallocMessageBuilder builder; auto infoBuilder = builder.initRoot(); - tracing::FetchResponseInfo fetchInfo(123); - tracing::Return info(kj::mv(fetchInfo)); + FetchResponseInfo fetchInfo(123); + Return info(kj::mv(fetchInfo)); info.copyTo(infoBuilder); auto reader = infoBuilder.asReader(); - tracing::Return info2(reader); + Return info2(reader); auto& fetchInfo2 = KJ_ASSERT_NONNULL(info2.info); KJ_ASSERT(fetchInfo2.statusCode == 123); - tracing::Return info3 = info.clone(); + Return info3 = info.clone(); auto& fetchInfo3 = KJ_ASSERT_NONNULL(info3.info); KJ_ASSERT(fetchInfo3.statusCode == 123); } @@ -427,15 +427,15 @@ KJ_TEST("Read/Write SpanOpen works") { capnp::MallocMessageBuilder builder; auto infoBuilder = builder.initRoot(); - tracing::SpanOpen info(0x2a2a2a2a2a2a2a2a, kj::str("foo"), kj::none); + SpanOpen info(0x2a2a2a2a2a2a2a2a, kj::str("foo"), kj::none); info.copyTo(infoBuilder); auto reader = infoBuilder.asReader(); - tracing::SpanOpen info2(reader); + SpanOpen info2(reader); KJ_ASSERT(info2.operationName == "foo"_kj); KJ_ASSERT(info2.info == kj::none); - tracing::SpanOpen info3 = info.clone(); + SpanOpen info3 = info.clone(); KJ_ASSERT(info3.operationName == "foo"_kj); KJ_ASSERT(info3.info == kj::none); } @@ -444,15 +444,15 @@ KJ_TEST("Read/Write SpanClose works") { capnp::MallocMessageBuilder builder; auto infoBuilder = builder.initRoot(); - tracing::SpanClose info(EventOutcome::EXCEPTION); + SpanClose info(EventOutcome::EXCEPTION); info.copyTo(infoBuilder); auto reader = infoBuilder.asReader(); - tracing::SpanClose info2(reader); + SpanClose info2(reader); KJ_ASSERT(info2.outcome == EventOutcome::EXCEPTION); - tracing::SpanClose info3 = info.clone(); + SpanClose info3 = info.clone(); KJ_ASSERT(info3.outcome == EventOutcome::EXCEPTION); } @@ -460,10 +460,10 @@ KJ_TEST("Read/Write Onset works") { capnp::MallocMessageBuilder builder; auto infoBuilder = builder.initRoot(); - tracing::FetchEventInfo fetchInfo( + FetchEventInfo fetchInfo( kj::HttpMethod::GET, kj::str("https://example.com"), kj::str("{}"), nullptr); - tracing::Onset info(tracing::staticSpanId, tracing::Onset::Info(kj::mv(fetchInfo)), + Onset info(staticSpanId, Onset::Info(kj::mv(fetchInfo)), { .scriptName = kj::str("foo"), }, @@ -471,16 +471,14 @@ KJ_TEST("Read/Write Onset works") { info.copyTo(infoBuilder); auto reader = infoBuilder.asReader(); - tracing::Onset info2(reader); - tracing::FetchEventInfo& fetchInfo2 = - KJ_ASSERT_NONNULL(info2.info.tryGet()); + Onset info2(reader); + FetchEventInfo& fetchInfo2 = KJ_ASSERT_NONNULL(info2.info.tryGet()); KJ_ASSERT(fetchInfo2.method == kj::HttpMethod::GET); KJ_ASSERT(fetchInfo2.url == "https://example.com"_kj); KJ_ASSERT(info2.workerInfo.executionModel == ExecutionModel::STATELESS); - tracing::Onset info3 = info.clone(); - tracing::FetchEventInfo& fetchInfo3 = - KJ_ASSERT_NONNULL(info3.info.tryGet()); + Onset info3 = info.clone(); + FetchEventInfo& fetchInfo3 = KJ_ASSERT_NONNULL(info3.info.tryGet()); KJ_ASSERT(fetchInfo3.method == kj::HttpMethod::GET); KJ_ASSERT(fetchInfo3.url == "https://example.com"_kj); KJ_ASSERT(info3.workerInfo.executionModel == ExecutionModel::STATELESS); @@ -490,16 +488,16 @@ KJ_TEST("Read/Write Outcome works") { capnp::MallocMessageBuilder builder; auto infoBuilder = builder.initRoot(); - tracing::Outcome info(EventOutcome::EXCEPTION, 1 * kj::MILLISECONDS, 2 * kj::MILLISECONDS); + Outcome info(EventOutcome::EXCEPTION, 1 * kj::MILLISECONDS, 2 * kj::MILLISECONDS); info.copyTo(infoBuilder); auto reader = infoBuilder.asReader(); - tracing::Outcome info2(reader); + Outcome info2(reader); KJ_ASSERT(info2.outcome == EventOutcome::EXCEPTION); KJ_ASSERT(info2.wallTime == 2 * kj::MILLISECONDS); KJ_ASSERT(info2.cpuTime == 1 * kj::MILLISECONDS); - tracing::Outcome info3 = info.clone(); + Outcome info3 = info.clone(); KJ_ASSERT(info3.outcome == EventOutcome::EXCEPTION); KJ_ASSERT(info3.wallTime == 2 * kj::MILLISECONDS); KJ_ASSERT(info3.cpuTime == 1 * kj::MILLISECONDS); @@ -509,33 +507,33 @@ KJ_TEST("Read/Write TailEvent works") { capnp::MallocMessageBuilder builder; auto infoBuilder = builder.initRoot(); - auto context = tracing::SpanContext(TraceId(0, 0), {tracing::staticSpanId}); - tracing::Log log(kj::UNIX_EPOCH, LogLevel::INFO, kj::str("foo")); + auto context = SpanContext(TraceId(0, 0), {staticSpanId}); + Log log(kj::UNIX_EPOCH, LogLevel::INFO, kj::str("foo")); auto invocationId = TraceId(0, 0); - tracing::TailEvent info( + TailEvent info( context.getTraceId(), invocationId, context.getSpanId(), kj::UNIX_EPOCH, 0, kj::mv(log)); info.copyTo(infoBuilder); auto reader = infoBuilder.asReader(); - tracing::TailEvent info2(reader); + TailEvent info2(reader); KJ_ASSERT(info2.timestamp == kj::UNIX_EPOCH); KJ_ASSERT(info2.sequence == 0); KJ_ASSERT(info2.invocationId == invocationId); KJ_ASSERT(info2.spanContext == context); - auto& log2 = KJ_ASSERT_NONNULL(info2.event.tryGet()); + auto& log2 = KJ_ASSERT_NONNULL(info2.event.tryGet()); KJ_ASSERT(log2.timestamp == kj::UNIX_EPOCH); KJ_ASSERT(log2.logLevel == LogLevel::INFO); KJ_ASSERT(log2.message == "foo"_kj); - tracing::TailEvent info3 = info.clone(); + TailEvent info3 = info.clone(); KJ_ASSERT(info3.timestamp == kj::UNIX_EPOCH); KJ_ASSERT(info3.sequence == 0); KJ_ASSERT(info3.invocationId == invocationId); KJ_ASSERT(info3.spanContext == context); - auto& log3 = KJ_ASSERT_NONNULL(info3.event.tryGet()); + auto& log3 = KJ_ASSERT_NONNULL(info3.event.tryGet()); KJ_ASSERT(log3.timestamp == kj::UNIX_EPOCH); KJ_ASSERT(log3.logLevel == LogLevel::INFO); KJ_ASSERT(log3.message == "foo"_kj); @@ -546,18 +544,18 @@ KJ_TEST("Read/Write TailEvent with Multiple Attributes") { auto infoBuilder = builder.initRoot(); TraceId traceId(0, 0); - auto context = tracing::SpanContext(traceId, {tracing::staticSpanId}); + auto context = SpanContext(traceId, {staticSpanId}); // An attribute event can have one or more Attributes specified. - kj::Vector attrs(2); - attrs.add(tracing::Attribute("foo"_kjc, true)); - attrs.add(tracing::Attribute("bar"_kjc, (int64_t)123)); + kj::Vector attrs(2); + attrs.add(Attribute("foo"_kjc, true)); + attrs.add(Attribute("bar"_kjc, (int64_t)123)); - tracing::TailEvent info(kj::mv(context), traceId, kj::UNIX_EPOCH, 0, attrs.releaseAsArray()); + TailEvent info(kj::mv(context), traceId, kj::UNIX_EPOCH, 0, attrs.releaseAsArray()); info.copyTo(infoBuilder); - tracing::TailEvent info2(infoBuilder.asReader()); - auto& attrs2 = KJ_ASSERT_NONNULL(info2.event.tryGet>()); + TailEvent info2(infoBuilder.asReader()); + auto& attrs2 = KJ_ASSERT_NONNULL(info2.event.tryGet>()); KJ_ASSERT(attrs2.size() == 2); KJ_ASSERT(attrs2[0].name == "foo"_kj); diff --git a/src/workerd/io/trace.c++ b/src/workerd/io/trace.c++ index e12467aae64..f8001719a00 100644 --- a/src/workerd/io/trace.c++ +++ b/src/workerd/io/trace.c++ @@ -200,7 +200,7 @@ InvocationSpanContext InvocationSpanContext::newForInvocation( kj::Maybe entropySource) { kj::Maybe parent; auto traceId = triggerContext - .map([&](auto& ctx) mutable -> TraceId { + .map([&](auto& ctx) mutable { parent = ctx; return ctx.traceId; }).orDefault([&] { return TraceId::fromEntropy(entropySource); }); @@ -251,33 +251,33 @@ kj::String KJ_STRINGIFY(const InvocationSpanContext& context) { return kj::str(context.getTraceId(), "-", context.getInvocationId(), "-", context.getSpanId()); } -kj::String KJ_STRINGIFY(const tracing::TailEvent::Event& event) { +kj::String KJ_STRINGIFY(const TailEvent::Event& event) { KJ_SWITCH_ONEOF(event) { - KJ_CASE_ONEOF(onset, tracing::Onset) { + KJ_CASE_ONEOF(onset, Onset) { return kj::str("Onset"); } - KJ_CASE_ONEOF(outcome, tracing::Outcome) { + KJ_CASE_ONEOF(outcome, Outcome) { return kj::str("Outcome"); } - KJ_CASE_ONEOF(spanOpen, tracing::SpanOpen) { + KJ_CASE_ONEOF(spanOpen, SpanOpen) { return spanOpen.toString(); } - KJ_CASE_ONEOF(spanClose, tracing::SpanClose) { + KJ_CASE_ONEOF(spanClose, SpanClose) { return spanClose.toString(); } - KJ_CASE_ONEOF(diagnosticChannelEvent, tracing::DiagnosticChannelEvent) { + KJ_CASE_ONEOF(diagnosticChannelEvent, DiagnosticChannelEvent) { return kj::str("diagnosticChannelEvent"); } - KJ_CASE_ONEOF(exception, tracing::Exception) { + KJ_CASE_ONEOF(exception, Exception) { return kj::str("Exception"); } - KJ_CASE_ONEOF(log, tracing::Log) { + KJ_CASE_ONEOF(log, Log) { return kj::str("Log"); } - KJ_CASE_ONEOF(ret, tracing::Return) { + KJ_CASE_ONEOF(ret, Return) { return kj::str("Return"); } - KJ_CASE_ONEOF(customInfo, tracing::CustomInfo) { + KJ_CASE_ONEOF(customInfo, CustomInfo) { return kj::str(customInfo); } } @@ -315,8 +315,6 @@ kj::String KJ_STRINGIFY(const SpanContext& context) { return kj::str(context.getTraceId(), "-", context.getSpanId()); } -} // namespace tracing - namespace { static kj::HttpMethod validateMethod(capnp::HttpMethod method) { @@ -326,14 +324,14 @@ static kj::HttpMethod validateMethod(capnp::HttpMethod method) { } // namespace -tracing::FetchEventInfo::FetchEventInfo( +FetchEventInfo::FetchEventInfo( kj::HttpMethod method, kj::String url, kj::String cfJson, kj::Array
headers) : method(method), url(kj::mv(url)), cfJson(kj::mv(cfJson)), headers(kj::mv(headers)) {} -tracing::FetchEventInfo::FetchEventInfo(rpc::Trace::FetchEventInfo::Reader reader) +FetchEventInfo::FetchEventInfo(rpc::Trace::FetchEventInfo::Reader reader) : method(validateMethod(reader.getMethod())), url(kj::str(reader.getUrl())), cfJson(kj::str(reader.getCfJson())) { @@ -342,7 +340,7 @@ tracing::FetchEventInfo::FetchEventInfo(rpc::Trace::FetchEventInfo::Reader reade headers = v.releaseAsArray(); } -void tracing::FetchEventInfo::copyTo(rpc::Trace::FetchEventInfo::Builder builder) const { +void FetchEventInfo::copyTo(rpc::Trace::FetchEventInfo::Builder builder) const { builder.setMethod(static_cast(method)); builder.setUrl(url); builder.setCfJson(cfJson); @@ -353,203 +351,196 @@ void tracing::FetchEventInfo::copyTo(rpc::Trace::FetchEventInfo::Builder builder } } -tracing::FetchEventInfo tracing::FetchEventInfo::clone() const { +FetchEventInfo FetchEventInfo::clone() const { return FetchEventInfo( method, kj::str(url), kj::str(cfJson), KJ_MAP(h, headers) { return h.clone(); }); } -kj::String tracing::FetchEventInfo::toString() const { +kj::String FetchEventInfo::toString() const { return kj::str("FetchEventInfo: ", kj::delimited( kj::arr(kj::str(method), kj::str(url), kj::str(cfJson), kj::str(headers)), ", "_kjc)); } -tracing::FetchEventInfo::Header::Header(kj::String name, kj::String value) +FetchEventInfo::Header::Header(kj::String name, kj::String value) : name(kj::mv(name)), value(kj::mv(value)) {} -tracing::FetchEventInfo::Header::Header(rpc::Trace::FetchEventInfo::Header::Reader reader) +FetchEventInfo::Header::Header(rpc::Trace::FetchEventInfo::Header::Reader reader) : name(kj::str(reader.getName())), value(kj::str(reader.getValue())) {} -void tracing::FetchEventInfo::Header::copyTo( - rpc::Trace::FetchEventInfo::Header::Builder builder) const { +void FetchEventInfo::Header::copyTo(rpc::Trace::FetchEventInfo::Header::Builder builder) const { builder.setName(name); builder.setValue(value); } -tracing::FetchEventInfo::Header tracing::FetchEventInfo::Header::clone() const { +FetchEventInfo::Header FetchEventInfo::Header::clone() const { return Header(kj::str(name), kj::str(value)); } -kj::String tracing::FetchEventInfo::Header::toString() const { +kj::String FetchEventInfo::Header::toString() const { return kj::str("FetchEventInfo::Header: ", name, ", ", value); } -tracing::JsRpcEventInfo::JsRpcEventInfo(kj::String methodName): methodName(kj::mv(methodName)) {} +JsRpcEventInfo::JsRpcEventInfo(kj::String methodName): methodName(kj::mv(methodName)) {} -tracing::JsRpcEventInfo::JsRpcEventInfo(rpc::Trace::JsRpcEventInfo::Reader reader) +JsRpcEventInfo::JsRpcEventInfo(rpc::Trace::JsRpcEventInfo::Reader reader) : methodName(kj::str(reader.getMethodName())) {} -void tracing::JsRpcEventInfo::copyTo(rpc::Trace::JsRpcEventInfo::Builder builder) const { +void JsRpcEventInfo::copyTo(rpc::Trace::JsRpcEventInfo::Builder builder) const { builder.setMethodName(methodName); } -tracing::JsRpcEventInfo tracing::JsRpcEventInfo::clone() const { +JsRpcEventInfo JsRpcEventInfo::clone() const { return JsRpcEventInfo(kj::str(methodName)); } -kj::String tracing::JsRpcEventInfo::toString() const { +kj::String JsRpcEventInfo::toString() const { return kj::str("JsRpcEventInfo: ", methodName); } -tracing::ScheduledEventInfo::ScheduledEventInfo(double scheduledTime, kj::String cron) +ScheduledEventInfo::ScheduledEventInfo(double scheduledTime, kj::String cron) : scheduledTime(scheduledTime), cron(kj::mv(cron)) {} -tracing::ScheduledEventInfo::ScheduledEventInfo(rpc::Trace::ScheduledEventInfo::Reader reader) +ScheduledEventInfo::ScheduledEventInfo(rpc::Trace::ScheduledEventInfo::Reader reader) : scheduledTime(reader.getScheduledTime()), cron(kj::str(reader.getCron())) {} -void tracing::ScheduledEventInfo::copyTo(rpc::Trace::ScheduledEventInfo::Builder builder) const { +void ScheduledEventInfo::copyTo(rpc::Trace::ScheduledEventInfo::Builder builder) const { builder.setScheduledTime(scheduledTime); builder.setCron(cron); } -tracing::ScheduledEventInfo tracing::ScheduledEventInfo::clone() const { +ScheduledEventInfo ScheduledEventInfo::clone() const { return ScheduledEventInfo(scheduledTime, kj::str(cron)); } -tracing::AlarmEventInfo::AlarmEventInfo(kj::Date scheduledTime): scheduledTime(scheduledTime) {} +AlarmEventInfo::AlarmEventInfo(kj::Date scheduledTime): scheduledTime(scheduledTime) {} -tracing::AlarmEventInfo::AlarmEventInfo(rpc::Trace::AlarmEventInfo::Reader reader) +AlarmEventInfo::AlarmEventInfo(rpc::Trace::AlarmEventInfo::Reader reader) : scheduledTime(reader.getScheduledTimeMs() * kj::MILLISECONDS + kj::UNIX_EPOCH) {} -void tracing::AlarmEventInfo::copyTo(rpc::Trace::AlarmEventInfo::Builder builder) const { +void AlarmEventInfo::copyTo(rpc::Trace::AlarmEventInfo::Builder builder) const { builder.setScheduledTimeMs((scheduledTime - kj::UNIX_EPOCH) / kj::MILLISECONDS); } -tracing::AlarmEventInfo tracing::AlarmEventInfo::clone() const { +AlarmEventInfo AlarmEventInfo::clone() const { return AlarmEventInfo(scheduledTime); } -tracing::QueueEventInfo::QueueEventInfo(kj::String queueName, uint32_t batchSize) +QueueEventInfo::QueueEventInfo(kj::String queueName, uint32_t batchSize) : queueName(kj::mv(queueName)), batchSize(batchSize) {} -tracing::QueueEventInfo::QueueEventInfo(rpc::Trace::QueueEventInfo::Reader reader) +QueueEventInfo::QueueEventInfo(rpc::Trace::QueueEventInfo::Reader reader) : queueName(kj::heapString(reader.getQueueName())), batchSize(reader.getBatchSize()) {} -void tracing::QueueEventInfo::copyTo(rpc::Trace::QueueEventInfo::Builder builder) const { +void QueueEventInfo::copyTo(rpc::Trace::QueueEventInfo::Builder builder) const { builder.setQueueName(queueName); builder.setBatchSize(batchSize); } -tracing::QueueEventInfo tracing::QueueEventInfo::clone() const { +QueueEventInfo QueueEventInfo::clone() const { return QueueEventInfo(kj::str(queueName), batchSize); } -tracing::EmailEventInfo::EmailEventInfo(kj::String mailFrom, kj::String rcptTo, uint32_t rawSize) +EmailEventInfo::EmailEventInfo(kj::String mailFrom, kj::String rcptTo, uint32_t rawSize) : mailFrom(kj::mv(mailFrom)), rcptTo(kj::mv(rcptTo)), rawSize(rawSize) {} -tracing::EmailEventInfo::EmailEventInfo(rpc::Trace::EmailEventInfo::Reader reader) +EmailEventInfo::EmailEventInfo(rpc::Trace::EmailEventInfo::Reader reader) : mailFrom(kj::heapString(reader.getMailFrom())), rcptTo(kj::heapString(reader.getRcptTo())), rawSize(reader.getRawSize()) {} -void tracing::EmailEventInfo::copyTo(rpc::Trace::EmailEventInfo::Builder builder) const { +void EmailEventInfo::copyTo(rpc::Trace::EmailEventInfo::Builder builder) const { builder.setMailFrom(mailFrom); builder.setRcptTo(rcptTo); builder.setRawSize(rawSize); } -tracing::EmailEventInfo tracing::EmailEventInfo::clone() const { +EmailEventInfo EmailEventInfo::clone() const { return EmailEventInfo(kj::str(mailFrom), kj::str(rcptTo), rawSize); } namespace { -kj::Vector getTraceItemsFromTraces( +kj::Vector getTraceItemsFromTraces( kj::ArrayPtr> traces) { - return KJ_MAP(t, traces) -> tracing::TraceEventInfo::TraceItem { - return tracing::TraceEventInfo::TraceItem(mapCopyString(t->scriptName)); - }; + return KJ_MAP(t, traces) { return TraceEventInfo::TraceItem(mapCopyString(t->scriptName)); }; } -kj::Vector getTraceItemsFromReader( +kj::Vector getTraceItemsFromReader( rpc::Trace::TraceEventInfo::Reader reader) { - return KJ_MAP(r, reader.getTraces()) -> tracing::TraceEventInfo::TraceItem { - return tracing::TraceEventInfo::TraceItem(r); - }; + return KJ_MAP(r, reader.getTraces()) { return TraceEventInfo::TraceItem(r); }; } } // namespace -tracing::TraceEventInfo::TraceEventInfo(kj::ArrayPtr> traces) +TraceEventInfo::TraceEventInfo(kj::ArrayPtr> traces) : traces(getTraceItemsFromTraces(traces)) {} -tracing::TraceEventInfo::TraceEventInfo(rpc::Trace::TraceEventInfo::Reader reader) +TraceEventInfo::TraceEventInfo(rpc::Trace::TraceEventInfo::Reader reader) : traces(getTraceItemsFromReader(reader)) {} -void tracing::TraceEventInfo::copyTo(rpc::Trace::TraceEventInfo::Builder builder) const { +void TraceEventInfo::copyTo(rpc::Trace::TraceEventInfo::Builder builder) const { auto list = builder.initTraces(traces.size()); for (auto i: kj::indices(traces)) { traces[i].copyTo(list[i]); } } -tracing::TraceEventInfo tracing::TraceEventInfo::clone() const { +TraceEventInfo TraceEventInfo::clone() const { return TraceEventInfo(KJ_MAP(item, traces) { return item.clone(); }); } -tracing::TraceEventInfo::TraceItem::TraceItem(kj::Maybe scriptName) +TraceEventInfo::TraceItem::TraceItem(kj::Maybe scriptName) : scriptName(kj::mv(scriptName)) {} -tracing::TraceEventInfo::TraceItem::TraceItem(rpc::Trace::TraceEventInfo::TraceItem::Reader reader) +TraceEventInfo::TraceItem::TraceItem(rpc::Trace::TraceEventInfo::TraceItem::Reader reader) : scriptName(kj::str(reader.getScriptName())) {} -void tracing::TraceEventInfo::TraceItem::copyTo( +void TraceEventInfo::TraceItem::copyTo( rpc::Trace::TraceEventInfo::TraceItem::Builder builder) const { KJ_IF_SOME(name, scriptName) { builder.setScriptName(name); } } -tracing::TraceEventInfo::TraceItem tracing::TraceEventInfo::TraceItem::clone() const { +TraceEventInfo::TraceItem TraceEventInfo::TraceItem::clone() const { return TraceItem(mapCopyString(scriptName)); } -tracing::DiagnosticChannelEvent::DiagnosticChannelEvent( +DiagnosticChannelEvent::DiagnosticChannelEvent( kj::Date timestamp, kj::String channel, kj::Array message) : timestamp(timestamp), channel(kj::mv(channel)), message(kj::mv(message)) {} -tracing::DiagnosticChannelEvent::DiagnosticChannelEvent( - rpc::Trace::DiagnosticChannelEvent::Reader reader) +DiagnosticChannelEvent::DiagnosticChannelEvent(rpc::Trace::DiagnosticChannelEvent::Reader reader) : timestamp(kj::UNIX_EPOCH + reader.getTimestampNs() * kj::NANOSECONDS), channel(kj::heapString(reader.getChannel())), message(kj::heapArray(reader.getMessage())) {} -void tracing::DiagnosticChannelEvent::copyTo( - rpc::Trace::DiagnosticChannelEvent::Builder builder) const { +void DiagnosticChannelEvent::copyTo(rpc::Trace::DiagnosticChannelEvent::Builder builder) const { builder.setTimestampNs((timestamp - kj::UNIX_EPOCH) / kj::NANOSECONDS); builder.setChannel(channel); builder.setMessage(message); } -tracing::DiagnosticChannelEvent tracing::DiagnosticChannelEvent::clone() const { +DiagnosticChannelEvent DiagnosticChannelEvent::clone() const { return DiagnosticChannelEvent(timestamp, kj::str(channel), kj::heapArray(message)); } -tracing::HibernatableWebSocketEventInfo::HibernatableWebSocketEventInfo(Type type): type(type) {} +HibernatableWebSocketEventInfo::HibernatableWebSocketEventInfo(Type type): type(type) {} -tracing::HibernatableWebSocketEventInfo::HibernatableWebSocketEventInfo( +HibernatableWebSocketEventInfo::HibernatableWebSocketEventInfo( rpc::Trace::HibernatableWebSocketEventInfo::Reader reader) : type(readFrom(reader)) {} -void tracing::HibernatableWebSocketEventInfo::copyTo( +void HibernatableWebSocketEventInfo::copyTo( rpc::Trace::HibernatableWebSocketEventInfo::Builder builder) const { auto typeBuilder = builder.initType(); KJ_SWITCH_ONEOF(type) { @@ -567,7 +558,7 @@ void tracing::HibernatableWebSocketEventInfo::copyTo( } } -tracing::HibernatableWebSocketEventInfo tracing::HibernatableWebSocketEventInfo::clone() const { +HibernatableWebSocketEventInfo HibernatableWebSocketEventInfo::clone() const { KJ_SWITCH_ONEOF(type) { KJ_CASE_ONEOF(_, Message) { return HibernatableWebSocketEventInfo(Message{}); @@ -585,7 +576,7 @@ tracing::HibernatableWebSocketEventInfo tracing::HibernatableWebSocketEventInfo: KJ_UNREACHABLE; } -tracing::HibernatableWebSocketEventInfo::Type tracing::HibernatableWebSocketEventInfo::readFrom( +HibernatableWebSocketEventInfo::Type HibernatableWebSocketEventInfo::readFrom( rpc::Trace::HibernatableWebSocketEventInfo::Reader reader) { auto type = reader.getType(); switch (type.which()) { @@ -605,31 +596,69 @@ tracing::HibernatableWebSocketEventInfo::Type tracing::HibernatableWebSocketEven } } -tracing::FetchResponseInfo::FetchResponseInfo(uint16_t statusCode): statusCode(statusCode) {} +FetchResponseInfo::FetchResponseInfo(uint16_t statusCode): statusCode(statusCode) {} -tracing::FetchResponseInfo::FetchResponseInfo(rpc::Trace::FetchResponseInfo::Reader reader) +FetchResponseInfo::FetchResponseInfo(rpc::Trace::FetchResponseInfo::Reader reader) : statusCode(reader.getStatusCode()) {} -void tracing::FetchResponseInfo::copyTo(rpc::Trace::FetchResponseInfo::Builder builder) const { +void FetchResponseInfo::copyTo(rpc::Trace::FetchResponseInfo::Builder builder) const { builder.setStatusCode(statusCode); } -tracing::FetchResponseInfo tracing::FetchResponseInfo::clone() const { +FetchResponseInfo FetchResponseInfo::clone() const { return FetchResponseInfo(statusCode); } -tracing::Log::Log(kj::Date timestamp, LogLevel logLevel, kj::String message) +Log::Log(kj::Date timestamp, LogLevel logLevel, kj::String message) : timestamp(timestamp), logLevel(logLevel), message(kj::mv(message)) {} -tracing::Exception::Exception( +void Log::copyTo(rpc::Trace::Log::Builder builder) const { + builder.setTimestampNs((timestamp - kj::UNIX_EPOCH) / kj::NANOSECONDS); + builder.setLogLevel(logLevel); + builder.setMessage(message); +} + +Log Log::clone() const { + return Log(timestamp, logLevel, kj::str(message)); +} + +Exception::Exception( kj::Date timestamp, kj::String name, kj::String message, kj::Maybe stack) : timestamp(timestamp), name(kj::mv(name)), message(kj::mv(message)), stack(kj::mv(stack)) {} +Log::Log(rpc::Trace::Log::Reader reader) + : timestamp(kj::UNIX_EPOCH + reader.getTimestampNs() * kj::NANOSECONDS), + logLevel(reader.getLogLevel()), + message(kj::str(reader.getMessage())) {} + +Exception::Exception(rpc::Trace::Exception::Reader reader) + : timestamp(kj::UNIX_EPOCH + reader.getTimestampNs() * kj::NANOSECONDS), + name(kj::str(reader.getName())), + message(kj::str(reader.getMessage())) { + if (reader.hasStack()) { + stack = kj::str(reader.getStack()); + } +} + +void Exception::copyTo(rpc::Trace::Exception::Builder builder) const { + builder.setTimestampNs((timestamp - kj::UNIX_EPOCH) / kj::NANOSECONDS); + builder.setName(name); + builder.setMessage(message); + KJ_IF_SOME(s, stack) { + builder.setStack(s); + } +} + +Exception Exception::clone() const { + return Exception(timestamp, kj::str(name), kj::str(message), mapCopyString(stack)); +} +} // namespace tracing + Trace::Trace(kj::Maybe stableId, kj::Maybe scriptName, kj::Maybe> scriptVersion, @@ -760,29 +789,6 @@ void Trace::copyTo(rpc::Trace::Builder builder) const { } } -void tracing::Log::copyTo(rpc::Trace::Log::Builder builder) const { - builder.setTimestampNs((timestamp - kj::UNIX_EPOCH) / kj::NANOSECONDS); - builder.setLogLevel(logLevel); - builder.setMessage(message); -} - -tracing::Log tracing::Log::clone() const { - return Log(timestamp, logLevel, kj::str(message)); -} - -void tracing::Exception::copyTo(rpc::Trace::Exception::Builder builder) const { - builder.setTimestampNs((timestamp - kj::UNIX_EPOCH) / kj::NANOSECONDS); - builder.setName(name); - builder.setMessage(message); - KJ_IF_SOME(s, stack) { - builder.setStack(s); - } -} - -tracing::Exception tracing::Exception::clone() const { - return Exception(timestamp, kj::str(name), kj::str(message), mapCopyString(stack)); -} - void Trace::mergeFrom(rpc::Trace::Reader reader, PipelineLogLevel pipelineLogLevel) { // Sandboxed workers currently record their traces as if the pipeline log level were set to // "full", so we may need to filter out the extra data after receiving the traces back. @@ -876,41 +882,42 @@ void Trace::mergeFrom(rpc::Trace::Reader reader, PipelineLogLevel pipelineLogLev } } -tracing::Log::Log(rpc::Trace::Log::Reader reader) - : timestamp(kj::UNIX_EPOCH + reader.getTimestampNs() * kj::NANOSECONDS), - logLevel(reader.getLogLevel()), - message(kj::str(reader.getMessage())) {} -tracing::Exception::Exception(rpc::Trace::Exception::Reader reader) - : timestamp(kj::UNIX_EPOCH + reader.getTimestampNs() * kj::NANOSECONDS), - name(kj::str(reader.getName())), - message(kj::str(reader.getMessage())) { - if (reader.hasStack()) { - stack = kj::str(reader.getStack()); - } -} +namespace tracing { -tracing::Attribute::Attribute(kj::ConstString name, Value&& value) +Attribute::Attribute(kj::ConstString name, Value&& value) : name(kj::mv(name)), value(kj::arr(kj::mv(value))) {} -tracing::Attribute::Attribute(kj::ConstString name, Values&& value) +Attribute::Attribute(kj::ConstString name, Values&& value) : name(kj::mv(name)), value(kj::mv(value)) {} namespace { -kj::Array readValues(const rpc::Trace::Attribute::Reader& reader) { +kj::Array readValues(const rpc::Trace::Attribute::Reader& reader) { // There should always be a value and it always have at least one entry in the list. KJ_ASSERT(reader.hasValue()); auto value = reader.getValue(); return KJ_MAP(v, value) { return deserializeTagValue(v); }; } + +kj::Maybe readReturnInfo(const rpc::Trace::Return::Reader& reader) { + auto info = reader.getInfo(); + switch (info.which()) { + case rpc::Trace::Return::Info::EMPTY: + return kj::none; + case rpc::Trace::Return::Info::FETCH: { + return kj::Maybe(FetchResponseInfo(info.getFetch())); + } + } + KJ_UNREACHABLE; +} } // namespace -tracing::Attribute::Attribute(rpc::Trace::Attribute::Reader reader) +Attribute::Attribute(rpc::Trace::Attribute::Reader reader) : name(kj::str(reader.getName())), value(readValues(reader)) {} -void tracing::Attribute::copyTo(rpc::Trace::Attribute::Builder builder) const { +void Attribute::copyTo(rpc::Trace::Attribute::Builder builder) const { builder.setName(name.asPtr()); auto vec = builder.initValue(value.size()); for (size_t n = 0; n < value.size(); n++) { @@ -918,90 +925,75 @@ void tracing::Attribute::copyTo(rpc::Trace::Attribute::Builder builder) const { } } -tracing::Attribute tracing::Attribute::clone() const { +Attribute Attribute::clone() const { return Attribute(kj::ConstString(kj::str(name)), KJ_MAP(v, value) { return spanTagClone(v); }); } -kj::String tracing::Attribute::toString() const { +kj::String Attribute::toString() const { return kj::str("Attribute: ", name, ", ", kj::str(value)); } -tracing::Return::Return(kj::Maybe info): info(kj::mv(info)) {} - -namespace { -kj::Maybe readReturnInfo(const rpc::Trace::Return::Reader& reader) { - auto info = reader.getInfo(); - switch (info.which()) { - case rpc::Trace::Return::Info::EMPTY: - return kj::none; - case rpc::Trace::Return::Info::FETCH: { - return kj::Maybe(tracing::FetchResponseInfo(info.getFetch())); - } - } - KJ_UNREACHABLE; -} -} // namespace - -tracing::Return::Return(rpc::Trace::Return::Reader reader): info(readReturnInfo(reader)) {} +Return::Return(kj::Maybe info): info(kj::mv(info)) {} +Return::Return(rpc::Trace::Return::Reader reader): info(readReturnInfo(reader)) {} -void tracing::Return::copyTo(rpc::Trace::Return::Builder builder) const { +void Return::copyTo(rpc::Trace::Return::Builder builder) const { KJ_IF_SOME(fetchInfo, info) { auto infoBuilder = builder.initInfo(); fetchInfo.copyTo(infoBuilder.initFetch()); } } -tracing::Return tracing::Return::clone() const { +Return Return::clone() const { KJ_IF_SOME(fetchInfo, info) { return Return(kj::Maybe(fetchInfo.clone())); } return Return(); } -tracing::SpanOpen::SpanOpen(SpanId spanId, kj::String operationName, kj::Maybe info) +SpanOpen::SpanOpen(SpanId spanId, kj::String operationName, kj::Maybe info) : operationName(kj::mv(operationName)), info(kj::mv(info)), spanId(spanId) {} namespace { -kj::Maybe readSpanOpenInfo(rpc::Trace::SpanOpen::Reader& reader) { +kj::Maybe readSpanOpenInfo(rpc::Trace::SpanOpen::Reader& reader) { auto info = reader.getInfo(); switch (info.which()) { case rpc::Trace::SpanOpen::Info::EMPTY: return kj::none; case rpc::Trace::SpanOpen::Info::FETCH: { - return kj::Maybe(tracing::FetchEventInfo(info.getFetch())); + return kj::Maybe(FetchEventInfo(info.getFetch())); } case rpc::Trace::SpanOpen::Info::JS_RPC: { - return kj::Maybe(tracing::JsRpcEventInfo(info.getJsRpc())); + return kj::Maybe(JsRpcEventInfo(info.getJsRpc())); } case rpc::Trace::SpanOpen::Info::CUSTOM: { auto custom = info.getCustom(); - return kj::Maybe(KJ_MAP(a, custom) { return tracing::Attribute(a); }); + return kj::Maybe(KJ_MAP(a, custom) { return Attribute(a); }); } } KJ_UNREACHABLE; } } // namespace -tracing::SpanOpen::SpanOpen(rpc::Trace::SpanOpen::Reader reader) +SpanOpen::SpanOpen(rpc::Trace::SpanOpen::Reader reader) : operationName(kj::str(reader.getOperationName())), info(readSpanOpenInfo(reader)), spanId(reader.getSpanId()) {} -void tracing::SpanOpen::copyTo(rpc::Trace::SpanOpen::Builder builder) const { +void SpanOpen::copyTo(rpc::Trace::SpanOpen::Builder builder) const { builder.setOperationName(operationName.asPtr()); builder.setSpanId(spanId); KJ_IF_SOME(i, info) { auto infoBuilder = builder.initInfo(); KJ_SWITCH_ONEOF(i) { - KJ_CASE_ONEOF(fetch, tracing::FetchEventInfo) { + KJ_CASE_ONEOF(fetch, FetchEventInfo) { fetch.copyTo(infoBuilder.initFetch()); } - KJ_CASE_ONEOF(jsrpc, tracing::JsRpcEventInfo) { + KJ_CASE_ONEOF(jsrpc, JsRpcEventInfo) { jsrpc.copyTo(infoBuilder.initJsRpc()); } - KJ_CASE_ONEOF(custom, tracing::CustomInfo) { + KJ_CASE_ONEOF(custom, CustomInfo) { auto customBuilder = infoBuilder.initCustom(custom.size()); for (size_t n = 0; n < custom.size(); n++) { custom[n].copyTo(customBuilder[n]); @@ -1011,17 +1003,17 @@ void tracing::SpanOpen::copyTo(rpc::Trace::SpanOpen::Builder builder) const { } } -tracing::SpanOpen tracing::SpanOpen::clone() const { - constexpr auto cloneInfo = [](const kj::Maybe& info) -> kj::Maybe { - return info.map([](const Info& info) -> tracing::SpanOpen::Info { +SpanOpen SpanOpen::clone() const { + constexpr auto cloneInfo = [](const kj::Maybe& info) -> kj::Maybe { + return info.map([](const Info& info) -> SpanOpen::Info { KJ_SWITCH_ONEOF(info) { - KJ_CASE_ONEOF(fetch, tracing::FetchEventInfo) { + KJ_CASE_ONEOF(fetch, FetchEventInfo) { return fetch.clone(); } - KJ_CASE_ONEOF(jsrpc, tracing::JsRpcEventInfo) { + KJ_CASE_ONEOF(jsrpc, JsRpcEventInfo) { return jsrpc.clone(); } - KJ_CASE_ONEOF(custom, tracing::CustomInfo) { + KJ_CASE_ONEOF(custom, CustomInfo) { return KJ_MAP(attr, custom) { return attr.clone(); }; } } @@ -1031,76 +1023,75 @@ tracing::SpanOpen tracing::SpanOpen::clone() const { return SpanOpen(spanId, kj::str(operationName), cloneInfo(info)); } -kj::String KJ_STRINGIFY(const tracing::SpanOpen::Info& info) { +kj::String KJ_STRINGIFY(const SpanOpen::Info& info) { KJ_SWITCH_ONEOF(info) { - KJ_CASE_ONEOF(fetch, tracing::FetchEventInfo) { + KJ_CASE_ONEOF(fetch, FetchEventInfo) { return fetch.toString(); } - KJ_CASE_ONEOF(jsrpc, tracing::JsRpcEventInfo) { + KJ_CASE_ONEOF(jsrpc, JsRpcEventInfo) { return jsrpc.toString(); } - KJ_CASE_ONEOF(customInfo, tracing::CustomInfo) { + KJ_CASE_ONEOF(customInfo, CustomInfo) { return kj::str(customInfo); } } KJ_UNREACHABLE } -kj::String tracing::SpanOpen::toString() const { +kj::String SpanOpen::toString() const { return kj::str("SpanOpen:", operationName, ", ", info); } -tracing::SpanClose::SpanClose(EventOutcome outcome): outcome(outcome) {} +SpanClose::SpanClose(EventOutcome outcome): outcome(outcome) {} -tracing::SpanClose::SpanClose(rpc::Trace::SpanClose::Reader reader): outcome(reader.getOutcome()) {} +SpanClose::SpanClose(rpc::Trace::SpanClose::Reader reader): outcome(reader.getOutcome()) {} -void tracing::SpanClose::copyTo(rpc::Trace::SpanClose::Builder builder) const { +void SpanClose::copyTo(rpc::Trace::SpanClose::Builder builder) const { builder.setOutcome(outcome); } -tracing::SpanClose tracing::SpanClose::clone() const { +SpanClose SpanClose::clone() const { return SpanClose(outcome); } -kj::String tracing::SpanClose::toString() const { +kj::String SpanClose::toString() const { return kj::str("SpanClose: ", outcome); } -tracing::Onset::Info tracing::readOnsetInfo(const rpc::Trace::Onset::Info::Reader& info) { +Onset::Info readOnsetInfo(const rpc::Trace::Onset::Info::Reader& info) { switch (info.which()) { case rpc::Trace::Onset::Info::FETCH: { - return tracing::FetchEventInfo(info.getFetch()); + return FetchEventInfo(info.getFetch()); } case rpc::Trace::Onset::Info::JS_RPC: { - return tracing::JsRpcEventInfo(info.getJsRpc()); + return JsRpcEventInfo(info.getJsRpc()); } case rpc::Trace::Onset::Info::SCHEDULED: { - return tracing::ScheduledEventInfo(info.getScheduled()); + return ScheduledEventInfo(info.getScheduled()); } case rpc::Trace::Onset::Info::ALARM: { - return tracing::AlarmEventInfo(info.getAlarm()); + return AlarmEventInfo(info.getAlarm()); } case rpc::Trace::Onset::Info::QUEUE: { - return tracing::QueueEventInfo(info.getQueue()); + return QueueEventInfo(info.getQueue()); } case rpc::Trace::Onset::Info::EMAIL: { - return tracing::EmailEventInfo(info.getEmail()); + return EmailEventInfo(info.getEmail()); } case rpc::Trace::Onset::Info::TRACE: { - return tracing::TraceEventInfo(info.getTrace()); + return TraceEventInfo(info.getTrace()); } case rpc::Trace::Onset::Info::HIBERNATABLE_WEB_SOCKET: { - return tracing::HibernatableWebSocketEventInfo(info.getHibernatableWebSocket()); + return HibernatableWebSocketEventInfo(info.getHibernatableWebSocket()); } case rpc::Trace::Onset::Info::CUSTOM: { - return tracing::CustomEventInfo(); + return CustomEventInfo(); } } KJ_UNREACHABLE; } -void tracing::writeOnsetInfo( - const tracing::Onset::Info& info, rpc::Trace::Onset::Info::Builder& infoBuilder) { +void writeOnsetInfo(const Onset::Info& info, rpc::Trace::Onset::Info::Builder& infoBuilder) { KJ_SWITCH_ONEOF(info) { KJ_CASE_ONEOF(fetch, FetchEventInfo) { fetch.copyTo(infoBuilder.initFetch()); @@ -1180,8 +1171,8 @@ kj::Maybe getEntrypointFromReader(const rpc::Trace::Onset::Reader& r } return kj::none; } -tracing::Onset::WorkerInfo getWorkerInfoFromReader(const rpc::Trace::Onset::Reader& reader) { - return tracing::Onset::WorkerInfo{ +Onset::WorkerInfo getWorkerInfoFromReader(const rpc::Trace::Onset::Reader& reader) { + return Onset::WorkerInfo{ .executionModel = reader.getExecutionModel(), .scriptName = getScriptNameFromReader(reader), .scriptVersion = getScriptVersionFromReader(reader), @@ -1193,22 +1184,20 @@ tracing::Onset::WorkerInfo getWorkerInfoFromReader(const rpc::Trace::Onset::Read } } // namespace -tracing::Onset::Onset(tracing::SpanId spanId, - tracing::Onset::Info&& info, - tracing::Onset::WorkerInfo&& workerInfo, - CustomInfo attributes) +Onset::Onset( + SpanId spanId, Onset::Info&& info, Onset::WorkerInfo&& workerInfo, CustomInfo attributes) : spanId(spanId), info(kj::mv(info)), workerInfo(kj::mv(workerInfo)), attributes(kj::mv(attributes)) {} -tracing::Onset::Onset(rpc::Trace::Onset::Reader reader) +Onset::Onset(rpc::Trace::Onset::Reader reader) : spanId(reader.getSpanId()), info(readOnsetInfo(reader.getInfo())), workerInfo(getWorkerInfoFromReader(reader)), - attributes(KJ_MAP(attr, reader.getAttributes()) { return tracing::Attribute(attr); }) {} + attributes(KJ_MAP(attr, reader.getAttributes()) { return Attribute(attr); }) {} -void tracing::Onset::copyTo(rpc::Trace::Onset::Builder builder) const { +void Onset::copyTo(rpc::Trace::Onset::Builder builder) const { builder.setExecutionModel(workerInfo.executionModel); builder.setSpanId(spanId); KJ_IF_SOME(name, workerInfo.scriptName) { @@ -1241,7 +1230,7 @@ void tracing::Onset::copyTo(rpc::Trace::Onset::Builder builder) const { } } -tracing::Onset::WorkerInfo tracing::Onset::WorkerInfo::clone() const { +Onset::WorkerInfo Onset::WorkerInfo::clone() const { return WorkerInfo{ .executionModel = executionModel, .scriptName = mapCopyString(scriptName), @@ -1254,7 +1243,7 @@ tracing::Onset::WorkerInfo tracing::Onset::WorkerInfo::clone() const { }; } -tracing::EventInfo tracing::cloneEventInfo(const tracing::EventInfo& info) { +EventInfo cloneEventInfo(const EventInfo& info) { KJ_SWITCH_ONEOF(info) { KJ_CASE_ONEOF(fetch, FetchEventInfo) { return fetch.clone(); @@ -1287,43 +1276,40 @@ tracing::EventInfo tracing::cloneEventInfo(const tracing::EventInfo& info) { KJ_UNREACHABLE; } -tracing::Onset tracing::Onset::clone() const { +Onset Onset::clone() const { return Onset(spanId, cloneEventInfo(info), workerInfo.clone(), KJ_MAP(attr, attributes) { return attr.clone(); }); } -tracing::Outcome::Outcome(EventOutcome outcome, kj::Duration cpuTime, kj::Duration wallTime) +Outcome::Outcome(EventOutcome outcome, kj::Duration cpuTime, kj::Duration wallTime) : outcome(outcome), cpuTime(cpuTime), wallTime(wallTime) {} -tracing::Outcome::Outcome(rpc::Trace::Outcome::Reader reader) +Outcome::Outcome(rpc::Trace::Outcome::Reader reader) : outcome(reader.getOutcome()), cpuTime(reader.getCpuTime() * kj::MILLISECONDS), wallTime(reader.getWallTime() * kj::MILLISECONDS) {} -void tracing::Outcome::copyTo(rpc::Trace::Outcome::Builder builder) const { +void Outcome::copyTo(rpc::Trace::Outcome::Builder builder) const { builder.setOutcome(outcome); builder.setCpuTime(cpuTime / kj::MILLISECONDS); builder.setWallTime(wallTime / kj::MILLISECONDS); } -tracing::Outcome tracing::Outcome::clone() const { +Outcome Outcome::clone() const { return Outcome(outcome, cpuTime, wallTime); } -tracing::TailEvent::TailEvent(tracing::SpanContext context, - TraceId invocationId, - kj::Date timestamp, - kj::uint sequence, - Event&& event) +TailEvent::TailEvent( + SpanContext context, TraceId invocationId, kj::Date timestamp, kj::uint sequence, Event&& event) : spanContext(kj::mv(context)), invocationId(invocationId), timestamp(timestamp), sequence(sequence), event(kj::mv(event)) {} -tracing::TailEvent::TailEvent(TraceId traceId, +TailEvent::TailEvent(TraceId traceId, TraceId invocationId, kj::Maybe spanId, kj::Date timestamp, @@ -1336,54 +1322,54 @@ tracing::TailEvent::TailEvent(TraceId traceId, event(kj::mv(event)) {} namespace { -tracing::TailEvent::Event readEventFromTailEvent(const rpc::Trace::TailEvent::Reader& reader) { +TailEvent::Event readEventFromTailEvent(const rpc::Trace::TailEvent::Reader& reader) { const auto event = reader.getEvent(); switch (event.which()) { case rpc::Trace::TailEvent::Event::ONSET: { - return tracing::Onset(event.getOnset()); + return Onset(event.getOnset()); } case rpc::Trace::TailEvent::Event::OUTCOME: { - return tracing::Outcome(event.getOutcome()); + return Outcome(event.getOutcome()); } case rpc::Trace::TailEvent::Event::SPAN_OPEN: { - return tracing::SpanOpen(event.getSpanOpen()); + return SpanOpen(event.getSpanOpen()); } case rpc::Trace::TailEvent::Event::SPAN_CLOSE: { - return tracing::SpanClose(event.getSpanClose()); + return SpanClose(event.getSpanClose()); } case rpc::Trace::TailEvent::Event::ATTRIBUTE: { auto listReader = event.getAttribute(); - kj::Vector attrs(listReader.size()); + kj::Vector attrs(listReader.size()); for (size_t n = 0; n < listReader.size(); n++) { - attrs.add(tracing::Attribute(listReader[n])); + attrs.add(Attribute(listReader[n])); } return attrs.releaseAsArray(); } case rpc::Trace::TailEvent::Event::RETURN: { - return tracing::Return(event.getReturn()); + return Return(event.getReturn()); } case rpc::Trace::TailEvent::Event::DIAGNOSTIC_CHANNEL_EVENT: { - return tracing::DiagnosticChannelEvent(event.getDiagnosticChannelEvent()); + return DiagnosticChannelEvent(event.getDiagnosticChannelEvent()); } case rpc::Trace::TailEvent::Event::EXCEPTION: { - return tracing::Exception(event.getException()); + return Exception(event.getException()); } case rpc::Trace::TailEvent::Event::LOG: { - return tracing::Log(event.getLog()); + return Log(event.getLog()); } } KJ_UNREACHABLE; } } // namespace -tracing::TailEvent::TailEvent(rpc::Trace::TailEvent::Reader reader) +TailEvent::TailEvent(rpc::Trace::TailEvent::Reader reader) : spanContext(SpanContext::fromCapnp(reader.getSpanContext())), invocationId(TraceId::fromCapnp(reader.getInvocationId())), timestamp(kj::UNIX_EPOCH + reader.getTimestampNs() * kj::NANOSECONDS), sequence(reader.getSequence()), event(readEventFromTailEvent(reader)) {} -void tracing::TailEvent::copyTo(rpc::Trace::TailEvent::Builder builder) const { +void TailEvent::copyTo(rpc::Trace::TailEvent::Builder builder) const { spanContext.toCapnp(builder.initSpanContext()); invocationId.toCapnp(builder.initInvocationId()); builder.setTimestampNs((timestamp - kj::UNIX_EPOCH) / kj::NANOSECONDS); @@ -1424,7 +1410,7 @@ void tracing::TailEvent::copyTo(rpc::Trace::TailEvent::Builder builder) const { } } -tracing::TailEvent tracing::TailEvent::clone() const { +TailEvent TailEvent::clone() const { constexpr auto cloneEvent = [](const Event& event) -> Event { KJ_SWITCH_ONEOF(event) { KJ_CASE_ONEOF(onset, Onset) { @@ -1451,7 +1437,7 @@ tracing::TailEvent tracing::TailEvent::clone() const { KJ_CASE_ONEOF(ret, Return) { return ret.clone(); } - KJ_CASE_ONEOF(attrs, tracing::CustomInfo) { + KJ_CASE_ONEOF(attrs, CustomInfo) { return KJ_MAP(attr, attrs) { return attr.clone(); }; } } @@ -1461,6 +1447,53 @@ tracing::TailEvent tracing::TailEvent::clone() const { sequence, cloneEvent(event)); } +void CompleteSpan::copyTo(rpc::UserSpanData::Builder builder) const { + builder.setOperationName(operationName.asPtr()); + builder.setStartTimeNs((startTime - kj::UNIX_EPOCH) / kj::NANOSECONDS); + builder.setEndTimeNs((endTime - kj::UNIX_EPOCH) / kj::NANOSECONDS); + builder.setSpanId(spanId); + builder.setParentSpanId(parentSpanId); + + auto tagsParam = builder.initTags(tags.size()); + auto i = 0; + for (auto& tag: tags) { + auto tagParam = tagsParam[i++]; + tagParam.setKey(tag.key.asPtr()); + serializeTagValue(tagParam.initValue(), tag.value); + } +} + +CompleteSpan::CompleteSpan(rpc::UserSpanData::Reader reader) + : spanId(reader.getSpanId()), + parentSpanId(reader.getParentSpanId()), + operationName(kj::str(reader.getOperationName())), + startTime(kj::UNIX_EPOCH + reader.getStartTimeNs() * kj::NANOSECONDS), + endTime(kj::UNIX_EPOCH + reader.getEndTimeNs() * kj::NANOSECONDS) { + auto tagsParam = reader.getTags(); + tags.reserve(tagsParam.size()); + for (auto tagParam: tagsParam) { + tags.insert(kj::ConstString(kj::heapString(tagParam.getKey())), + deserializeTagValue(tagParam.getValue())); + } +} + +CompleteSpan CompleteSpan::clone() const { + CompleteSpan copy( + spanId, parentSpanId, kj::ConstString(kj::str(operationName)), startTime, endTime); + copy.tags.reserve(tags.size()); + for (auto& tag: tags) { + copy.tags.insert(kj::ConstString(kj::str(tag.key)), spanTagClone(tag.value)); + } + return copy; +} + +kj::String CompleteSpan::toString() const { + return kj::str("CompleteSpan: ", operationName, + kj::strArray( + KJ_MAP(tag, tags) { return kj::str("(", tag.key, ", ", tag.value, ")"); }, ", ")); +} +} // namespace tracing + // ====================================================================================== SpanBuilder::SpanBuilder(kj::Maybe> observer, @@ -1607,52 +1640,6 @@ Span::TagValue deserializeTagValue(RpcValue::Reader value) { } } -void CompleteSpan::copyTo(rpc::UserSpanData::Builder builder) const { - builder.setOperationName(operationName.asPtr()); - builder.setStartTimeNs((startTime - kj::UNIX_EPOCH) / kj::NANOSECONDS); - builder.setEndTimeNs((endTime - kj::UNIX_EPOCH) / kj::NANOSECONDS); - builder.setSpanId(spanId); - builder.setParentSpanId(parentSpanId); - - auto tagsParam = builder.initTags(tags.size()); - auto i = 0; - for (auto& tag: tags) { - auto tagParam = tagsParam[i++]; - tagParam.setKey(tag.key.asPtr()); - serializeTagValue(tagParam.initValue(), tag.value); - } -} - -CompleteSpan::CompleteSpan(rpc::UserSpanData::Reader reader) - : spanId(reader.getSpanId()), - parentSpanId(reader.getParentSpanId()), - operationName(kj::str(reader.getOperationName())), - startTime(kj::UNIX_EPOCH + reader.getStartTimeNs() * kj::NANOSECONDS), - endTime(kj::UNIX_EPOCH + reader.getEndTimeNs() * kj::NANOSECONDS) { - auto tagsParam = reader.getTags(); - tags.reserve(tagsParam.size()); - for (auto tagParam: tagsParam) { - tags.insert(kj::ConstString(kj::heapString(tagParam.getKey())), - deserializeTagValue(tagParam.getValue())); - } -} - -CompleteSpan CompleteSpan::clone() const { - CompleteSpan copy( - spanId, parentSpanId, kj::ConstString(kj::str(operationName)), startTime, endTime); - copy.tags.reserve(tags.size()); - for (auto& tag: tags) { - copy.tags.insert(kj::ConstString(kj::str(tag.key)), spanTagClone(tag.value)); - } - return copy; -} - -kj::String CompleteSpan::toString() const { - return kj::str("CompleteSpan: ", operationName, - kj::strArray( - KJ_MAP(tag, tags) { return kj::str("(", tag.key, ", ", tag.value, ")"); }, ", ")); -} - ScopedDurationTagger::ScopedDurationTagger( SpanBuilder& span, kj::ConstString key, const kj::MonotonicClock& timer) : span(span), diff --git a/src/workerd/io/trace.h b/src/workerd/io/trace.h index cc9873fa82a..f47059e7291 100644 --- a/src/workerd/io/trace.h +++ b/src/workerd/io/trace.h @@ -606,7 +606,6 @@ struct Attribute final { }; using CustomInfo = kj::Array; kj::String KJ_STRINGIFY(const CustomInfo& customInfo); -} // namespace tracing struct CompleteSpan { // Represents a completed span within user tracing. @@ -638,7 +637,6 @@ struct CompleteSpan { kj::String toString() const; }; -namespace tracing { // A Return mark is used to mark the point at which a span operation returned // a value. For instance, when a fetch subrequest response is received, or when // the fetch handler returns a Response. Importantly, it does not signal that the diff --git a/src/workerd/io/tracer.c++ b/src/workerd/io/tracer.c++ index d04df7473eb..6b062193375 100644 --- a/src/workerd/io/tracer.c++ +++ b/src/workerd/io/tracer.c++ @@ -39,12 +39,12 @@ void TailStreamWriter::report( auto& s = KJ_UNWRAP_OR_RETURN(state); // The onset event must be first and must only happen once. - if (event.is()) { + if (event.is()) { KJ_ASSERT(!onsetSeen, "Tail stream onset already provided"); onsetSeen = true; } else { KJ_ASSERT(onsetSeen, "Tail stream onset was not reported"); - if (event.is()) { + if (event.is()) { outcomeSeen = true; } } @@ -52,9 +52,9 @@ void TailStreamWriter::report( // A zero spanId at the TailEvent level signifies that no spanId should be provided to the tail // worker (for Onset events). We go to great lengths to rule out getting an all-zero spanId by // chance (see SpanId::fromEntropy()), so this should be safe. - tracing::TailEvent tailEvent(context.getTraceId(), context.getInvocationId(), - context.getSpanId() == tracing::SpanId::nullId ? kj::none : kj::Maybe(context.getSpanId()), - timestamp, s.sequence++, kj::mv(event)); + TailEvent tailEvent(context.getTraceId(), context.getInvocationId(), + context.getSpanId() == SpanId::nullId ? kj::none : kj::Maybe(context.getSpanId()), timestamp, + s.sequence++, kj::mv(event)); // If the reporter returns false, then we will treat it as a close signal. if (!s.reporter(kj::mv(tailEvent))) state = kj::none; @@ -191,7 +191,7 @@ void WorkerTracer::addLog(const tracing::InvocationSpanContext& context, } } -void WorkerTracer::addSpan(CompleteSpan&& span) { +void WorkerTracer::addSpan(tracing::CompleteSpan&& span) { // This is where we'll actually encode the span. if (pipelineLogLevel == PipelineLogLevel::NONE) { return; @@ -458,7 +458,7 @@ kj::Date BaseTracer::getTime() { return timestamp; } -void BaseTracer::adjustSpanTime(CompleteSpan& span) { +void BaseTracer::adjustSpanTime(tracing::CompleteSpan& span) { // To report I/O time, we need the IOContext to still be alive. // weakIoContext is only none if we are tracing via RPC (in this case span times have already been // adjusted) or if we failed to transmit an Onset event (in that case we'll get an error based on diff --git a/src/workerd/io/tracer.h b/src/workerd/io/tracer.h index 1b038cdc7c2..7ed23908cb6 100644 --- a/src/workerd/io/tracer.h +++ b/src/workerd/io/tracer.h @@ -103,7 +103,7 @@ class BaseTracer: public kj::Refcounted { LogLevel logLevel, kj::String message) = 0; // Add a span. - virtual void addSpan(CompleteSpan&& span) = 0; + virtual void addSpan(tracing::CompleteSpan&& span) = 0; virtual void addException(const tracing::InvocationSpanContext& context, kj::Date timestamp, @@ -152,7 +152,7 @@ class BaseTracer: public kj::Refcounted { kj::Date getTime(); // helper method for addSpan() implementations - void adjustSpanTime(CompleteSpan& span); + void adjustSpanTime(tracing::CompleteSpan& span); // The root span for the new tracing format. SpanParent userRequestSpan = SpanParent(nullptr); @@ -181,7 +181,7 @@ class WorkerTracer final: public BaseTracer { kj::Date timestamp, LogLevel logLevel, kj::String message) override; - void addSpan(CompleteSpan&& span) override; + void addSpan(tracing::CompleteSpan&& span) override; void addException(const tracing::InvocationSpanContext& context, kj::Date timestamp, kj::String name, diff --git a/src/workerd/server/server.c++ b/src/workerd/server/server.c++ index ac222f197f0..f6725299b54 100644 --- a/src/workerd/server/server.c++ +++ b/src/workerd/server/server.c++ @@ -1705,7 +1705,7 @@ class SpanSubmitter final: public kj::Refcounted { void submitSpan(tracing::SpanId spanId, tracing::SpanId parentSpanId, const Span& span) { // We largely recreate the span here which feels inefficient, but is hard to avoid given the // mismatch between the Span type and the full span information required for OTel. - CompleteSpan span2(spanId, parentSpanId, kj::ConstString(kj::str(span.operationName)), + tracing::CompleteSpan span2(spanId, parentSpanId, kj::ConstString(kj::str(span.operationName)), span.startTime, span.endTime); span2.tags.reserve(span.tags.size()); for (auto& tag: span.tags) {