Skip to content

Commit d842759

Browse files
committed
feat: add event-counting filtering layer for spans
1 parent f663332 commit d842759

File tree

3 files changed

+245
-1
lines changed

3 files changed

+245
-1
lines changed

src/layer.rs

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use crate::stack::IdValueStack;
22
use crate::{OtelData, OtelDataState};
3+
pub use filtered::FilteredOpenTelemetryLayer;
34
use opentelemetry::ContextGuard;
45
use opentelemetry::{
56
trace::{self as otel, noop, Span, SpanBuilder, SpanKind, Status, TraceContextExt},
@@ -17,15 +18,19 @@ use tracing_core::{field, Event, Subscriber};
1718
#[cfg(feature = "tracing-log")]
1819
use tracing_log::NormalizeEvent;
1920
use tracing_subscriber::layer::Context;
21+
use tracing_subscriber::layer::Filter;
2022
use tracing_subscriber::registry::LookupSpan;
2123
use tracing_subscriber::Layer;
2224
#[cfg(all(target_arch = "wasm32", not(target_os = "wasi")))]
2325
use web_time::Instant;
2426

27+
mod filtered;
28+
2529
const SPAN_NAME_FIELD: &str = "otel.name";
2630
const SPAN_KIND_FIELD: &str = "otel.kind";
2731
const SPAN_STATUS_CODE_FIELD: &str = "otel.status_code";
2832
const SPAN_STATUS_DESCRIPTION_FIELD: &str = "otel.status_description";
33+
const SPAN_EVENT_COUNT_FIELD: &str = "otel.tracing_event_count";
2934

3035
const EVENT_EXCEPTION_NAME: &str = "exception";
3136
const FIELD_EXCEPTION_MESSAGE: &str = "exception.message";
@@ -846,6 +851,18 @@ where
846851
}
847852
}
848853

854+
/// Adds a filter for events that counts all events that came to the layer. See
855+
/// [`FilteredOpenTelemetryLayer`] for more details.
856+
///
857+
/// If you just want to filter the events out and you don't need to count how many happened, you
858+
/// can use [`Layer::with_filter`] instead.
859+
pub fn with_counting_event_filter<F: Filter<S>>(
860+
self,
861+
filter: F,
862+
) -> FilteredOpenTelemetryLayer<S, T, F> {
863+
FilteredOpenTelemetryLayer::new(self, filter)
864+
}
865+
849866
/// Retrieve the parent OpenTelemetry [`Context`] from the current tracing
850867
/// [`span`] through the [`Registry`]. This [`Context`] links spans to their
851868
/// parent for proper hierarchical visualization.
@@ -1430,6 +1447,7 @@ mod tests {
14301447
use opentelemetry_sdk::trace::SpanExporter;
14311448
use std::{collections::HashMap, error::Error, fmt::Display, time::SystemTime};
14321449
use tracing::trace_span;
1450+
use tracing_core::LevelFilter;
14331451
use tracing_subscriber::prelude::*;
14341452

14351453
#[derive(Debug, Clone)]
@@ -1750,6 +1768,54 @@ mod tests {
17501768
assert_eq!(iter.next().unwrap().name, "event name 5"); // name attribute should not conflict with event name.
17511769
}
17521770

1771+
#[test]
1772+
fn event_filter_count() {
1773+
let mut tracer = TestTracer::default();
1774+
let subscriber = tracing_subscriber::registry().with(
1775+
layer()
1776+
.with_tracer(tracer.clone())
1777+
.with_counting_event_filter(LevelFilter::INFO)
1778+
.with_filter(LevelFilter::DEBUG),
1779+
);
1780+
1781+
tracing::subscriber::with_default(subscriber, || {
1782+
tracing::debug_span!("test span").in_scope(|| {
1783+
tracing::event!(tracing::Level::TRACE, "1");
1784+
tracing::event!(tracing::Level::DEBUG, "2");
1785+
tracing::event!(tracing::Level::INFO, "3");
1786+
tracing::event!(tracing::Level::WARN, "4");
1787+
tracing::event!(tracing::Level::ERROR, "5");
1788+
});
1789+
});
1790+
1791+
let events = tracer.with_data(|data| data.events.clone());
1792+
1793+
let mut iter = events.iter();
1794+
1795+
assert_eq!(iter.next().unwrap().name, "3");
1796+
assert_eq!(iter.next().unwrap().name, "4");
1797+
assert_eq!(iter.next().unwrap().name, "5");
1798+
assert!(iter.next().is_none());
1799+
1800+
let spans = tracer.spans();
1801+
assert_eq!(spans.len(), 1);
1802+
1803+
let Value::I64(event_count) = spans
1804+
.first()
1805+
.unwrap()
1806+
.attributes
1807+
.iter()
1808+
.find(|key_value| key_value.key.as_str() == SPAN_EVENT_COUNT_FIELD)
1809+
.unwrap()
1810+
.value
1811+
else {
1812+
panic!("Unexpected type of `{SPAN_EVENT_COUNT_FIELD}`.");
1813+
};
1814+
// We've sent 5 events out of which 1 was filtered out by an actual filter and another by
1815+
// our event filter.
1816+
assert_eq!(event_count, 4);
1817+
}
1818+
17531819
#[test]
17541820
fn records_no_error_fields_with_context_activation() {
17551821
records_no_error_fields_impl(true);

src/layer/filtered.rs

Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
use std::any::TypeId;
2+
3+
use opentelemetry::{trace::TraceContextExt as _, Key, KeyValue, Value};
4+
use tracing::{span, Event, Subscriber};
5+
use tracing_subscriber::{
6+
layer::{Context, Filter},
7+
registry::LookupSpan,
8+
Layer,
9+
};
10+
11+
use crate::{OtelData, OtelDataState};
12+
13+
use super::{OpenTelemetryLayer, SPAN_EVENT_COUNT_FIELD};
14+
15+
/// A layer wrapping a [`OpenTelemetryLayer`], discarding all events filtered out by a given
16+
/// [`Filter`]. This can be built by calling [`OpenTelemetryLayer::with_counting_event_filter`].
17+
///
18+
/// Only events that are not filtered out will be saved as events on the span. All events, including
19+
/// those filtered out, will be counted and the total will be provided in the
20+
/// `otel.tracing_event_count` field of the exported span.
21+
///
22+
/// This is useful when there is large volume of logs outputted by the application and it would be
23+
/// too expensive to export all of them as span events, but it is still desirable to have
24+
/// information whether there is more information in logs for the given span.
25+
pub struct FilteredOpenTelemetryLayer<S, T, F> {
26+
inner: OpenTelemetryLayer<S, T>,
27+
filter: F,
28+
}
29+
30+
impl<S, T, F> FilteredOpenTelemetryLayer<S, T, F> {
31+
pub fn map_inner<Mapper, S2, T2>(self, mapper: Mapper) -> FilteredOpenTelemetryLayer<S2, T2, F>
32+
where
33+
Mapper: FnOnce(OpenTelemetryLayer<S, T>) -> OpenTelemetryLayer<S2, T2>,
34+
F: Filter<S>,
35+
{
36+
FilteredOpenTelemetryLayer {
37+
inner: mapper(self.inner),
38+
filter: self.filter,
39+
}
40+
}
41+
42+
pub fn with_counting_event_filter<F2>(self, filter: F2) -> FilteredOpenTelemetryLayer<S, T, F2>
43+
where
44+
F2: Filter<S>,
45+
{
46+
FilteredOpenTelemetryLayer {
47+
inner: self.inner,
48+
filter,
49+
}
50+
}
51+
52+
pub(crate) fn new(inner: OpenTelemetryLayer<S, T>, filter: F) -> Self
53+
where
54+
S: Subscriber + for<'span> LookupSpan<'span>,
55+
F: Filter<S>,
56+
{
57+
Self { inner, filter }
58+
}
59+
}
60+
61+
struct EventCount(u32);
62+
63+
impl<S, T, F> Layer<S> for FilteredOpenTelemetryLayer<S, T, F>
64+
where
65+
S: Subscriber + for<'lookup> LookupSpan<'lookup>,
66+
OpenTelemetryLayer<S, T>: Layer<S>,
67+
F: Filter<S> + 'static,
68+
{
69+
fn on_layer(&mut self, subscriber: &mut S) {
70+
self.inner.on_layer(subscriber);
71+
}
72+
73+
fn register_callsite(
74+
&self,
75+
metadata: &'static tracing::Metadata<'static>,
76+
) -> tracing_core::Interest {
77+
self.inner.register_callsite(metadata)
78+
}
79+
80+
fn enabled(&self, metadata: &tracing::Metadata<'_>, ctx: Context<'_, S>) -> bool {
81+
self.inner.enabled(metadata, ctx)
82+
}
83+
84+
fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &span::Id, ctx: Context<'_, S>) {
85+
self.inner.on_new_span(attrs, id, ctx);
86+
}
87+
88+
fn on_record(&self, span: &span::Id, values: &span::Record<'_>, ctx: Context<'_, S>) {
89+
self.inner.on_record(span, values, ctx);
90+
}
91+
92+
fn on_follows_from(&self, span: &span::Id, follows: &span::Id, ctx: Context<'_, S>) {
93+
self.inner.on_follows_from(span, follows, ctx);
94+
}
95+
96+
fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
97+
let Some(span) = event.parent().and_then(|id| ctx.span(id)).or_else(|| {
98+
event
99+
.is_contextual()
100+
.then(|| ctx.lookup_current())
101+
.flatten()
102+
}) else {
103+
return;
104+
};
105+
106+
{
107+
let mut extensions = span.extensions_mut();
108+
109+
if let Some(count) = extensions.get_mut::<EventCount>() {
110+
count.0 += 1;
111+
} else {
112+
extensions.insert(EventCount(1));
113+
}
114+
}
115+
116+
drop(span);
117+
118+
println!("evaluating event with level {}", event.metadata().level());
119+
if self.filter.enabled(event.metadata(), &ctx) {
120+
println!("processing event with level {}", event.metadata().level());
121+
self.inner.on_event(event, ctx);
122+
}
123+
}
124+
125+
fn on_enter(&self, id: &span::Id, ctx: Context<'_, S>) {
126+
self.inner.on_enter(id, ctx);
127+
}
128+
129+
fn on_exit(&self, id: &span::Id, ctx: Context<'_, S>) {
130+
self.inner.on_exit(id, ctx);
131+
}
132+
133+
fn on_close(&self, id: span::Id, ctx: Context<'_, S>) {
134+
let span = ctx.span(&id).expect("Span not found, this is a bug");
135+
let mut extensions = span.extensions_mut();
136+
137+
let count = extensions.remove::<EventCount>().map_or(0, |count| count.0);
138+
if let Some(OtelData { state, end_time: _ }) = extensions.get_mut::<OtelData>() {
139+
let key_value = KeyValue::new(
140+
Key::from_static_str(SPAN_EVENT_COUNT_FIELD),
141+
Value::I64(i64::from(count)),
142+
);
143+
match state {
144+
OtelDataState::Builder {
145+
builder,
146+
parent_cx: _,
147+
} => {
148+
builder.attributes.get_or_insert(Vec::new()).push(key_value);
149+
}
150+
OtelDataState::Context { current_cx } => {
151+
let span = current_cx.span();
152+
span.set_attribute(key_value);
153+
}
154+
}
155+
}
156+
157+
drop(extensions);
158+
drop(span);
159+
160+
self.inner.on_close(id, ctx);
161+
}
162+
163+
fn on_id_change(&self, old: &span::Id, new: &span::Id, ctx: Context<'_, S>) {
164+
self.inner.on_id_change(old, new, ctx);
165+
}
166+
167+
/// SAFETY: this is sound as long as the inner implementation is sound.
168+
unsafe fn downcast_raw(&self, id: TypeId) -> Option<*const ()> {
169+
if id == TypeId::of::<Self>() {
170+
Some(self as *const _ as *const ())
171+
} else {
172+
unsafe { self.inner.downcast_raw(id) }
173+
}
174+
}
175+
176+
// `and_then`, `with_subscriber`, and `with_filter` are not implemented on purpose. Other
177+
// methods should probably be implemented manually if there are new provided methods.
178+
}

src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ mod stack;
122122

123123
use std::time::SystemTime;
124124

125-
pub use layer::{layer, OpenTelemetryLayer};
125+
pub use layer::{layer, FilteredOpenTelemetryLayer, OpenTelemetryLayer};
126126

127127
#[cfg(feature = "metrics")]
128128
pub use metrics::MetricsLayer;

0 commit comments

Comments
 (0)