Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions src/layer.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::stack::IdValueStack;
use crate::{OtelData, OtelDataState};
pub use filtered::FilteredOpenTelemetryLayer;
use opentelemetry::ContextGuard;
use opentelemetry::{
trace::{self as otel, noop, Span, SpanBuilder, SpanKind, Status, TraceContextExt},
Expand All @@ -17,15 +18,19 @@ use tracing_core::{field, Event, Subscriber};
#[cfg(feature = "tracing-log")]
use tracing_log::NormalizeEvent;
use tracing_subscriber::layer::Context;
use tracing_subscriber::layer::Filter;
use tracing_subscriber::registry::LookupSpan;
use tracing_subscriber::Layer;
#[cfg(all(target_arch = "wasm32", not(target_os = "wasi")))]
use web_time::Instant;

mod filtered;

const SPAN_NAME_FIELD: &str = "otel.name";
const SPAN_KIND_FIELD: &str = "otel.kind";
const SPAN_STATUS_CODE_FIELD: &str = "otel.status_code";
const SPAN_STATUS_DESCRIPTION_FIELD: &str = "otel.status_description";
const SPAN_EVENT_COUNT_FIELD: &str = "otel.tracing_event_count";

const EVENT_EXCEPTION_NAME: &str = "exception";
const FIELD_EXCEPTION_MESSAGE: &str = "exception.message";
Expand Down Expand Up @@ -846,6 +851,18 @@ where
}
}

/// Adds a filter for events that counts all events that came to the layer. See
/// [`FilteredOpenTelemetryLayer`] for more details.
///
/// If you just want to filter the events out and you don't need to count how many happened, you
/// can use [`Layer::with_filter`] instead.
pub fn with_counting_event_filter<F: Filter<S>>(
self,
filter: F,
) -> FilteredOpenTelemetryLayer<S, T, F> {
FilteredOpenTelemetryLayer::new(self, filter)
}

/// Retrieve the parent OpenTelemetry [`Context`] from the current tracing
/// [`span`] through the [`Registry`]. This [`Context`] links spans to their
/// parent for proper hierarchical visualization.
Expand Down Expand Up @@ -1430,6 +1447,7 @@ mod tests {
use opentelemetry_sdk::trace::SpanExporter;
use std::{collections::HashMap, error::Error, fmt::Display, time::SystemTime};
use tracing::trace_span;
use tracing_core::LevelFilter;
use tracing_subscriber::prelude::*;

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -1750,6 +1768,54 @@ mod tests {
assert_eq!(iter.next().unwrap().name, "event name 5"); // name attribute should not conflict with event name.
}

#[test]
fn event_filter_count() {
let mut tracer = TestTracer::default();
let subscriber = tracing_subscriber::registry().with(
layer()
.with_tracer(tracer.clone())
.with_counting_event_filter(LevelFilter::INFO)
.with_filter(LevelFilter::DEBUG),
);

tracing::subscriber::with_default(subscriber, || {
tracing::debug_span!("test span").in_scope(|| {
tracing::event!(tracing::Level::TRACE, "1");
tracing::event!(tracing::Level::DEBUG, "2");
tracing::event!(tracing::Level::INFO, "3");
tracing::event!(tracing::Level::WARN, "4");
tracing::event!(tracing::Level::ERROR, "5");
});
});

let events = tracer.with_data(|data| data.events.clone());

let mut iter = events.iter();

assert_eq!(iter.next().unwrap().name, "3");
assert_eq!(iter.next().unwrap().name, "4");
assert_eq!(iter.next().unwrap().name, "5");
assert!(iter.next().is_none());

let spans = tracer.spans();
assert_eq!(spans.len(), 1);

let Value::I64(event_count) = spans
.first()
.unwrap()
.attributes
.iter()
.find(|key_value| key_value.key.as_str() == SPAN_EVENT_COUNT_FIELD)
.unwrap()
.value
else {
panic!("Unexpected type of `{SPAN_EVENT_COUNT_FIELD}`.");
};
// We've sent 5 events out of which 1 was filtered out by an actual filter and another by
// our event filter.
assert_eq!(event_count, 4);
}

#[test]
fn records_no_error_fields_with_context_activation() {
records_no_error_fields_impl(true);
Expand Down
178 changes: 178 additions & 0 deletions src/layer/filtered.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
use std::any::TypeId;

use opentelemetry::{trace::TraceContextExt as _, Key, KeyValue, Value};
use tracing::{span, Event, Subscriber};
use tracing_subscriber::{
layer::{Context, Filter},
registry::LookupSpan,
Layer,
};

use crate::{OtelData, OtelDataState};

use super::{OpenTelemetryLayer, SPAN_EVENT_COUNT_FIELD};

/// A layer wrapping a [`OpenTelemetryLayer`], discarding all events filtered out by a given
/// [`Filter`]. This can be built by calling [`OpenTelemetryLayer::with_counting_event_filter`].
///
/// Only events that are not filtered out will be saved as events on the span. All events, including
/// those filtered out, will be counted and the total will be provided in the
/// `otel.tracing_event_count` field of the exported span.
///
/// This is useful when there is large volume of logs outputted by the application and it would be
/// too expensive to export all of them as span events, but it is still desirable to have
/// information whether there is more information in logs for the given span.
pub struct FilteredOpenTelemetryLayer<S, T, F> {
inner: OpenTelemetryLayer<S, T>,
filter: F,
}

impl<S, T, F> FilteredOpenTelemetryLayer<S, T, F> {
pub fn map_inner<Mapper, S2, T2>(self, mapper: Mapper) -> FilteredOpenTelemetryLayer<S2, T2, F>
where
Mapper: FnOnce(OpenTelemetryLayer<S, T>) -> OpenTelemetryLayer<S2, T2>,
F: Filter<S>,
{
FilteredOpenTelemetryLayer {
inner: mapper(self.inner),
filter: self.filter,
}
}

pub fn with_counting_event_filter<F2>(self, filter: F2) -> FilteredOpenTelemetryLayer<S, T, F2>
where
F2: Filter<S>,
{
FilteredOpenTelemetryLayer {
inner: self.inner,
filter,
}
}

pub(crate) fn new(inner: OpenTelemetryLayer<S, T>, filter: F) -> Self
where
S: Subscriber + for<'span> LookupSpan<'span>,
F: Filter<S>,
{
Self { inner, filter }
}
}

struct EventCount(u32);

impl<S, T, F> Layer<S> for FilteredOpenTelemetryLayer<S, T, F>
where
S: Subscriber + for<'lookup> LookupSpan<'lookup>,
OpenTelemetryLayer<S, T>: Layer<S>,
F: Filter<S> + 'static,
{
fn on_layer(&mut self, subscriber: &mut S) {
self.inner.on_layer(subscriber);
}

fn register_callsite(
&self,
metadata: &'static tracing::Metadata<'static>,
) -> tracing_core::Interest {
self.inner.register_callsite(metadata)
}

fn enabled(&self, metadata: &tracing::Metadata<'_>, ctx: Context<'_, S>) -> bool {
self.inner.enabled(metadata, ctx)
}

fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &span::Id, ctx: Context<'_, S>) {
self.inner.on_new_span(attrs, id, ctx);
}

fn on_record(&self, span: &span::Id, values: &span::Record<'_>, ctx: Context<'_, S>) {
self.inner.on_record(span, values, ctx);
}

fn on_follows_from(&self, span: &span::Id, follows: &span::Id, ctx: Context<'_, S>) {
self.inner.on_follows_from(span, follows, ctx);
}

fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
let Some(span) = event.parent().and_then(|id| ctx.span(id)).or_else(|| {
event
.is_contextual()
.then(|| ctx.lookup_current())
.flatten()
}) else {
return;
};

{
let mut extensions = span.extensions_mut();

if let Some(count) = extensions.get_mut::<EventCount>() {
count.0 += 1;
} else {
extensions.insert(EventCount(1));
}
}

drop(span);

println!("evaluating event with level {}", event.metadata().level());
if self.filter.enabled(event.metadata(), &ctx) {
println!("processing event with level {}", event.metadata().level());
self.inner.on_event(event, ctx);
}
}

fn on_enter(&self, id: &span::Id, ctx: Context<'_, S>) {
self.inner.on_enter(id, ctx);
}

fn on_exit(&self, id: &span::Id, ctx: Context<'_, S>) {
self.inner.on_exit(id, ctx);
}

fn on_close(&self, id: span::Id, ctx: Context<'_, S>) {
let span = ctx.span(&id).expect("Span not found, this is a bug");
let mut extensions = span.extensions_mut();

let count = extensions.remove::<EventCount>().map_or(0, |count| count.0);
if let Some(OtelData { state, end_time: _ }) = extensions.get_mut::<OtelData>() {
let key_value = KeyValue::new(
Key::from_static_str(SPAN_EVENT_COUNT_FIELD),
Value::I64(i64::from(count)),
);
match state {
OtelDataState::Builder {
builder,
parent_cx: _,
} => {
builder.attributes.get_or_insert(Vec::new()).push(key_value);
}
OtelDataState::Context { current_cx } => {
let span = current_cx.span();
span.set_attribute(key_value);
}
}
}

drop(extensions);
drop(span);

self.inner.on_close(id, ctx);
}

fn on_id_change(&self, old: &span::Id, new: &span::Id, ctx: Context<'_, S>) {
self.inner.on_id_change(old, new, ctx);
}

/// SAFETY: this is sound as long as the inner implementation is sound.
unsafe fn downcast_raw(&self, id: TypeId) -> Option<*const ()> {
if id == TypeId::of::<Self>() {
Some(self as *const _ as *const ())
} else {
unsafe { self.inner.downcast_raw(id) }
}
}

// `and_then`, `with_subscriber`, and `with_filter` are not implemented on purpose. Other
// methods should probably be implemented manually if there are new provided methods.
}
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ mod stack;

use std::time::SystemTime;

pub use layer::{layer, OpenTelemetryLayer};
pub use layer::{layer, FilteredOpenTelemetryLayer, OpenTelemetryLayer};

#[cfg(feature = "metrics")]
pub use metrics::MetricsLayer;
Expand Down