Skip to content

Commit c287c84

Browse files
authored
subscriber: change registry exit to decrement local span ref only (#3331)
## Motivation Using with_subscriber will trigger a panic if the async block creates a span & enters it on a different thread that will target the default subscriber. I believe both the with_subscriber AND the enter off-thread scenario are idiomatic. The reason for the crash is that the span.enter() try's recording the span exit into the default subscriber which isn't aware of it since it's registered by the scoped subscriber. ## Solution From my understanding, it's sufficient for `Registry::exit` just to call `self.try_close()` and do local bookkeeping: - The attempt to use the thread dispatcher leads directly to the issues observed in #3223, and - I don't think it's correct to call `try_close` on the whole `Layer` stack at this point anyway, a span being exited is not yet ready to close. All that is needed is to decrement the reference count within the current registry. I've added a test which spawns a thread and enters (and exits, and drops) a span created on a dispatcher not registered to that thread. Before this patch, the test fails with the span never being closed (because there is no thread dispatcher when the span is exited, and so a reference is leaked in `Registry::exit`). With this patch, the bookkeeping demonstrated in the test seems correct to me. Fixes: #3223
1 parent dfe922e commit c287c84

File tree

2 files changed

+191
-1
lines changed

2 files changed

+191
-1
lines changed

tracing-subscriber/src/registry/sharded.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -301,7 +301,7 @@ impl Subscriber for Registry {
301301
fn exit(&self, id: &span::Id) {
302302
if let Some(spans) = self.current_spans.get() {
303303
if spans.borrow_mut().pop(id) {
304-
dispatcher::get_default(|dispatch| dispatch.try_close(id.clone()));
304+
self.try_close(id.clone());
305305
}
306306
}
307307
}
Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
#![cfg(feature = "registry")]
2+
3+
use std::sync::{Arc, Mutex};
4+
5+
use tracing::{
6+
span::{self, Id},
7+
Dispatch, Event, Metadata, Subscriber,
8+
};
9+
use tracing_core::{Interest, LevelFilter};
10+
use tracing_subscriber::{
11+
layer::{Context, SubscriberExt},
12+
Layer, Registry,
13+
};
14+
15+
#[test]
16+
fn span_entered_on_different_thread_from_subscriber() {
17+
/// Counters for various lifecycle events we want to track.
18+
#[derive(Default)]
19+
struct LifecycleCounts {
20+
layer_new_count: usize,
21+
layer_enter_count: usize,
22+
layer_exit_count: usize,
23+
layer_close_count: usize,
24+
25+
sub_new_count: usize,
26+
sub_clone_count: usize,
27+
sub_enter_count: usize,
28+
sub_exit_count: usize,
29+
sub_close_count: usize,
30+
}
31+
32+
/// Wraps `tracing_subscriber::Registry` and adds some accounting
33+
/// to verify that the subscriber is receiving the expected number of calls.
34+
struct CountingSubscriber {
35+
inner: Registry,
36+
counts: Arc<Mutex<LifecycleCounts>>,
37+
}
38+
39+
// Forward all subscriber methods to the inner registry, adding counts where appropriate.
40+
impl Subscriber for CountingSubscriber {
41+
fn on_register_dispatch(&self, subscriber: &Dispatch) {
42+
self.inner.on_register_dispatch(subscriber);
43+
}
44+
45+
fn register_callsite(&self, metadata: &'static Metadata<'static>) -> Interest {
46+
self.inner.register_callsite(metadata)
47+
}
48+
49+
fn max_level_hint(&self) -> Option<LevelFilter> {
50+
self.inner.max_level_hint()
51+
}
52+
53+
fn event_enabled(&self, event: &Event<'_>) -> bool {
54+
self.inner.event_enabled(event)
55+
}
56+
57+
fn clone_span(&self, id: &span::Id) -> span::Id {
58+
self.counts.lock().unwrap().sub_clone_count += 1;
59+
self.inner.clone_span(id)
60+
}
61+
62+
fn drop_span(&self, id: span::Id) {
63+
self.counts.lock().unwrap().sub_close_count += 1;
64+
#[allow(deprecated)]
65+
self.inner.drop_span(id);
66+
}
67+
68+
fn try_close(&self, id: span::Id) -> bool {
69+
self.counts.lock().unwrap().sub_close_count += 1;
70+
self.inner.try_close(id)
71+
}
72+
73+
fn current_span(&self) -> tracing_core::span::Current {
74+
self.inner.current_span()
75+
}
76+
77+
unsafe fn downcast_raw(&self, id: std::any::TypeId) -> Option<*const ()> {
78+
self.inner.downcast_raw(id)
79+
}
80+
81+
fn enabled(&self, metadata: &Metadata<'_>) -> bool {
82+
self.inner.enabled(metadata)
83+
}
84+
85+
fn new_span(&self, span: &span::Attributes<'_>) -> span::Id {
86+
self.counts.lock().unwrap().sub_new_count += 1;
87+
self.inner.new_span(span)
88+
}
89+
90+
fn record(&self, span: &span::Id, values: &span::Record<'_>) {
91+
self.inner.record(span, values);
92+
}
93+
94+
fn record_follows_from(&self, span: &span::Id, follows: &span::Id) {
95+
self.inner.record_follows_from(span, follows);
96+
}
97+
98+
fn event(&self, event: &Event<'_>) {
99+
self.inner.event(event);
100+
}
101+
102+
fn enter(&self, span: &span::Id) {
103+
self.inner.enter(span);
104+
self.counts.lock().unwrap().sub_enter_count += 1;
105+
}
106+
107+
fn exit(&self, span: &span::Id) {
108+
self.inner.exit(span);
109+
self.counts.lock().unwrap().sub_exit_count += 1;
110+
}
111+
}
112+
113+
/// Similar to the above, but for a `Layer` which sits atop the subscriber.
114+
struct CountingLayer {
115+
counts: Arc<Mutex<LifecycleCounts>>,
116+
}
117+
118+
// Just does bookkeeping where relevant.
119+
impl Layer<CountingSubscriber> for CountingLayer {
120+
fn on_new_span(
121+
&self,
122+
_attrs: &span::Attributes<'_>,
123+
_id: &span::Id,
124+
_ctx: Context<'_, CountingSubscriber>,
125+
) {
126+
self.counts.lock().unwrap().layer_new_count += 1;
127+
}
128+
129+
fn on_enter(&self, _id: &span::Id, _ctx: Context<'_, CountingSubscriber>) {
130+
self.counts.lock().unwrap().layer_enter_count += 1;
131+
}
132+
133+
fn on_exit(&self, _id: &span::Id, _ctx: Context<'_, CountingSubscriber>) {
134+
self.counts.lock().unwrap().layer_exit_count += 1;
135+
}
136+
137+
fn on_close(&self, _id: Id, _ctx: Context<'_, CountingSubscriber>) {
138+
self.counts.lock().unwrap().layer_close_count += 1;
139+
}
140+
}
141+
142+
// Setup subscriber and layer.
143+
144+
let counts = Arc::new(Mutex::new(LifecycleCounts::default()));
145+
146+
let layer = CountingLayer {
147+
counts: counts.clone(),
148+
};
149+
150+
let subscriber = CountingSubscriber {
151+
inner: tracing_subscriber::registry(),
152+
counts: counts.clone(),
153+
};
154+
let subscriber = Arc::new(subscriber.with(layer));
155+
156+
// Create a span using the subscriber
157+
let span = tracing::subscriber::with_default(subscriber.clone(), move || {
158+
tracing::span!(tracing::Level::INFO, "span")
159+
});
160+
161+
// Enter the span in a thread which doesn't have a direct relationship to the subscriber.
162+
std::thread::spawn(move || {
163+
let _ = span.entered();
164+
})
165+
.join()
166+
.unwrap();
167+
168+
// layer should have seen exactly one new span & close
169+
// should be one enter / exit cycle
170+
171+
let counts = counts.lock().unwrap();
172+
173+
assert_eq!(counts.layer_new_count, 1);
174+
assert_eq!(counts.layer_enter_count, 1);
175+
assert_eq!(counts.layer_exit_count, 1);
176+
assert_eq!(counts.layer_close_count, 1);
177+
178+
// subscriber should have seen one new span
179+
// new + any clones should equal number of closes
180+
// enter and exit should match layer counts
181+
182+
assert_eq!(counts.sub_new_count, 1);
183+
assert_eq!(
184+
counts.sub_new_count + counts.sub_clone_count,
185+
counts.sub_close_count
186+
);
187+
188+
assert_eq!(counts.sub_enter_count, counts.layer_enter_count);
189+
assert_eq!(counts.sub_exit_count, counts.layer_exit_count);
190+
}

0 commit comments

Comments
 (0)