From d8427598cc76810e18eb11565bfe9dfaab4bc676 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20Ml=C3=A1dek?= Date: Wed, 24 Sep 2025 15:00:04 +0200 Subject: [PATCH] feat: add event-counting filtering layer for spans --- src/layer.rs | 66 ++++++++++++++++ src/layer/filtered.rs | 178 ++++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 2 +- 3 files changed, 245 insertions(+), 1 deletion(-) create mode 100644 src/layer/filtered.rs diff --git a/src/layer.rs b/src/layer.rs index c1f8add..ca68d5f 100644 --- a/src/layer.rs +++ b/src/layer.rs @@ -1,5 +1,6 @@ use crate::stack::IdValueStack; use crate::{OtelData, OtelDataState}; +pub use filtered::FilteredOpenTelemetryLayer; use opentelemetry::ContextGuard; use opentelemetry::{ trace::{self as otel, noop, Span, SpanBuilder, SpanKind, Status, TraceContextExt}, @@ -17,15 +18,19 @@ use tracing_core::{field, Event, Subscriber}; #[cfg(feature = "tracing-log")] use tracing_log::NormalizeEvent; use tracing_subscriber::layer::Context; +use tracing_subscriber::layer::Filter; use tracing_subscriber::registry::LookupSpan; use tracing_subscriber::Layer; #[cfg(all(target_arch = "wasm32", not(target_os = "wasi")))] use web_time::Instant; +mod filtered; + const SPAN_NAME_FIELD: &str = "otel.name"; const SPAN_KIND_FIELD: &str = "otel.kind"; const SPAN_STATUS_CODE_FIELD: &str = "otel.status_code"; const SPAN_STATUS_DESCRIPTION_FIELD: &str = "otel.status_description"; +const SPAN_EVENT_COUNT_FIELD: &str = "otel.tracing_event_count"; const EVENT_EXCEPTION_NAME: &str = "exception"; const FIELD_EXCEPTION_MESSAGE: &str = "exception.message"; @@ -846,6 +851,18 @@ where } } + /// Adds a filter for events that counts all events that came to the layer. See + /// [`FilteredOpenTelemetryLayer`] for more details. + /// + /// If you just want to filter the events out and you don't need to count how many happened, you + /// can use [`Layer::with_filter`] instead. + pub fn with_counting_event_filter>( + self, + filter: F, + ) -> FilteredOpenTelemetryLayer { + FilteredOpenTelemetryLayer::new(self, filter) + } + /// Retrieve the parent OpenTelemetry [`Context`] from the current tracing /// [`span`] through the [`Registry`]. This [`Context`] links spans to their /// parent for proper hierarchical visualization. @@ -1430,6 +1447,7 @@ mod tests { use opentelemetry_sdk::trace::SpanExporter; use std::{collections::HashMap, error::Error, fmt::Display, time::SystemTime}; use tracing::trace_span; + use tracing_core::LevelFilter; use tracing_subscriber::prelude::*; #[derive(Debug, Clone)] @@ -1750,6 +1768,54 @@ mod tests { assert_eq!(iter.next().unwrap().name, "event name 5"); // name attribute should not conflict with event name. } + #[test] + fn event_filter_count() { + let mut tracer = TestTracer::default(); + let subscriber = tracing_subscriber::registry().with( + layer() + .with_tracer(tracer.clone()) + .with_counting_event_filter(LevelFilter::INFO) + .with_filter(LevelFilter::DEBUG), + ); + + tracing::subscriber::with_default(subscriber, || { + tracing::debug_span!("test span").in_scope(|| { + tracing::event!(tracing::Level::TRACE, "1"); + tracing::event!(tracing::Level::DEBUG, "2"); + tracing::event!(tracing::Level::INFO, "3"); + tracing::event!(tracing::Level::WARN, "4"); + tracing::event!(tracing::Level::ERROR, "5"); + }); + }); + + let events = tracer.with_data(|data| data.events.clone()); + + let mut iter = events.iter(); + + assert_eq!(iter.next().unwrap().name, "3"); + assert_eq!(iter.next().unwrap().name, "4"); + assert_eq!(iter.next().unwrap().name, "5"); + assert!(iter.next().is_none()); + + let spans = tracer.spans(); + assert_eq!(spans.len(), 1); + + let Value::I64(event_count) = spans + .first() + .unwrap() + .attributes + .iter() + .find(|key_value| key_value.key.as_str() == SPAN_EVENT_COUNT_FIELD) + .unwrap() + .value + else { + panic!("Unexpected type of `{SPAN_EVENT_COUNT_FIELD}`."); + }; + // We've sent 5 events out of which 1 was filtered out by an actual filter and another by + // our event filter. + assert_eq!(event_count, 4); + } + #[test] fn records_no_error_fields_with_context_activation() { records_no_error_fields_impl(true); diff --git a/src/layer/filtered.rs b/src/layer/filtered.rs new file mode 100644 index 0000000..131cfb4 --- /dev/null +++ b/src/layer/filtered.rs @@ -0,0 +1,178 @@ +use std::any::TypeId; + +use opentelemetry::{trace::TraceContextExt as _, Key, KeyValue, Value}; +use tracing::{span, Event, Subscriber}; +use tracing_subscriber::{ + layer::{Context, Filter}, + registry::LookupSpan, + Layer, +}; + +use crate::{OtelData, OtelDataState}; + +use super::{OpenTelemetryLayer, SPAN_EVENT_COUNT_FIELD}; + +/// A layer wrapping a [`OpenTelemetryLayer`], discarding all events filtered out by a given +/// [`Filter`]. This can be built by calling [`OpenTelemetryLayer::with_counting_event_filter`]. +/// +/// Only events that are not filtered out will be saved as events on the span. All events, including +/// those filtered out, will be counted and the total will be provided in the +/// `otel.tracing_event_count` field of the exported span. +/// +/// This is useful when there is large volume of logs outputted by the application and it would be +/// too expensive to export all of them as span events, but it is still desirable to have +/// information whether there is more information in logs for the given span. +pub struct FilteredOpenTelemetryLayer { + inner: OpenTelemetryLayer, + filter: F, +} + +impl FilteredOpenTelemetryLayer { + pub fn map_inner(self, mapper: Mapper) -> FilteredOpenTelemetryLayer + where + Mapper: FnOnce(OpenTelemetryLayer) -> OpenTelemetryLayer, + F: Filter, + { + FilteredOpenTelemetryLayer { + inner: mapper(self.inner), + filter: self.filter, + } + } + + pub fn with_counting_event_filter(self, filter: F2) -> FilteredOpenTelemetryLayer + where + F2: Filter, + { + FilteredOpenTelemetryLayer { + inner: self.inner, + filter, + } + } + + pub(crate) fn new(inner: OpenTelemetryLayer, filter: F) -> Self + where + S: Subscriber + for<'span> LookupSpan<'span>, + F: Filter, + { + Self { inner, filter } + } +} + +struct EventCount(u32); + +impl Layer for FilteredOpenTelemetryLayer +where + S: Subscriber + for<'lookup> LookupSpan<'lookup>, + OpenTelemetryLayer: Layer, + F: Filter + 'static, +{ + fn on_layer(&mut self, subscriber: &mut S) { + self.inner.on_layer(subscriber); + } + + fn register_callsite( + &self, + metadata: &'static tracing::Metadata<'static>, + ) -> tracing_core::Interest { + self.inner.register_callsite(metadata) + } + + fn enabled(&self, metadata: &tracing::Metadata<'_>, ctx: Context<'_, S>) -> bool { + self.inner.enabled(metadata, ctx) + } + + fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &span::Id, ctx: Context<'_, S>) { + self.inner.on_new_span(attrs, id, ctx); + } + + fn on_record(&self, span: &span::Id, values: &span::Record<'_>, ctx: Context<'_, S>) { + self.inner.on_record(span, values, ctx); + } + + fn on_follows_from(&self, span: &span::Id, follows: &span::Id, ctx: Context<'_, S>) { + self.inner.on_follows_from(span, follows, ctx); + } + + fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) { + let Some(span) = event.parent().and_then(|id| ctx.span(id)).or_else(|| { + event + .is_contextual() + .then(|| ctx.lookup_current()) + .flatten() + }) else { + return; + }; + + { + let mut extensions = span.extensions_mut(); + + if let Some(count) = extensions.get_mut::() { + count.0 += 1; + } else { + extensions.insert(EventCount(1)); + } + } + + drop(span); + + println!("evaluating event with level {}", event.metadata().level()); + if self.filter.enabled(event.metadata(), &ctx) { + println!("processing event with level {}", event.metadata().level()); + self.inner.on_event(event, ctx); + } + } + + fn on_enter(&self, id: &span::Id, ctx: Context<'_, S>) { + self.inner.on_enter(id, ctx); + } + + fn on_exit(&self, id: &span::Id, ctx: Context<'_, S>) { + self.inner.on_exit(id, ctx); + } + + fn on_close(&self, id: span::Id, ctx: Context<'_, S>) { + let span = ctx.span(&id).expect("Span not found, this is a bug"); + let mut extensions = span.extensions_mut(); + + let count = extensions.remove::().map_or(0, |count| count.0); + if let Some(OtelData { state, end_time: _ }) = extensions.get_mut::() { + let key_value = KeyValue::new( + Key::from_static_str(SPAN_EVENT_COUNT_FIELD), + Value::I64(i64::from(count)), + ); + match state { + OtelDataState::Builder { + builder, + parent_cx: _, + } => { + builder.attributes.get_or_insert(Vec::new()).push(key_value); + } + OtelDataState::Context { current_cx } => { + let span = current_cx.span(); + span.set_attribute(key_value); + } + } + } + + drop(extensions); + drop(span); + + self.inner.on_close(id, ctx); + } + + fn on_id_change(&self, old: &span::Id, new: &span::Id, ctx: Context<'_, S>) { + self.inner.on_id_change(old, new, ctx); + } + + /// SAFETY: this is sound as long as the inner implementation is sound. + unsafe fn downcast_raw(&self, id: TypeId) -> Option<*const ()> { + if id == TypeId::of::() { + Some(self as *const _ as *const ()) + } else { + unsafe { self.inner.downcast_raw(id) } + } + } + + // `and_then`, `with_subscriber`, and `with_filter` are not implemented on purpose. Other + // methods should probably be implemented manually if there are new provided methods. +} diff --git a/src/lib.rs b/src/lib.rs index a5c2fcc..09bf537 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -122,7 +122,7 @@ mod stack; use std::time::SystemTime; -pub use layer::{layer, OpenTelemetryLayer}; +pub use layer::{layer, FilteredOpenTelemetryLayer, OpenTelemetryLayer}; #[cfg(feature = "metrics")] pub use metrics::MetricsLayer;