Skip to content

Commit 0cd7a2f

Browse files
committed
fix(subscriber): ignore spans that weren't initially recorded
Currently, a strange behavior exists in the `console-subscriber` crate. If a span for a task, async op, or resource is created, and the event buffer is full, the aggregator task will not be informed of that span's creation. But, if the event buffer then empties out, we might send the aggregator task enter/exit/close events for that span. This results in the aggregator receiving events for an unknown span, which might result in panics or subtly wrong data. This branch fixes this by changing the `ConsoleLayer` to track whether it was able to successfully send an event for the creation of a span, and to only care about subsequent events on that span if it successfully recorded the span's creation. This is done by inserting a zero-sized marker type into the span's extensions map, and ignoring spans that don't have this marker.
1 parent aa09600 commit 0cd7a2f

File tree

1 file changed

+50
-35
lines changed

1 file changed

+50
-35
lines changed

console-subscriber/src/lib.rs

Lines changed: 50 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,10 @@ enum WakeOp {
252252
Drop,
253253
}
254254

255+
/// Marker type used to indicate that a span is actually tracked by the console.
256+
#[derive(Debug)]
257+
struct Tracked {}
258+
255259
impl ConsoleLayer {
256260
/// Returns a `ConsoleLayer` built with the default settings.
257261
///
@@ -377,10 +381,6 @@ impl ConsoleLayer {
377381
self.async_op_callsites.contains(meta)
378382
}
379383

380-
fn is_async_op_poll(&self, meta: &'static Metadata<'static>) -> bool {
381-
self.async_op_poll_callsites.contains(meta)
382-
}
383-
384384
fn is_id_spawned<S>(&self, id: &span::Id, cx: &Context<'_, S>) -> bool
385385
where
386386
S: Subscriber + for<'a> LookupSpan<'a>,
@@ -408,25 +408,15 @@ impl ConsoleLayer {
408408
.unwrap_or(false)
409409
}
410410

411-
fn is_id_async_op_poll<S>(&self, id: &span::Id, cx: &Context<'_, S>) -> bool
411+
fn is_id_tracked<S>(&self, id: &span::Id, cx: &Context<'_, S>) -> bool
412412
where
413413
S: Subscriber + for<'a> LookupSpan<'a>,
414414
{
415415
cx.span(id)
416-
.map(|span| self.is_async_op_poll(span.metadata()))
416+
.map(|span| span.extensions().get::<Tracked>().is_some())
417417
.unwrap_or(false)
418418
}
419419

420-
fn is_id_tracked<S>(&self, id: &span::Id, cx: &Context<'_, S>) -> bool
421-
where
422-
S: Subscriber + for<'a> LookupSpan<'a>,
423-
{
424-
self.is_id_async_op(id, cx)
425-
|| self.is_id_resource(id, cx)
426-
|| self.is_id_spawned(id, cx)
427-
|| self.is_id_async_op_poll(id, cx)
428-
}
429-
430420
fn first_entered<P>(&self, stack: &SpanStack, p: P) -> Option<span::Id>
431421
where
432422
P: Fn(&span::Id) -> bool,
@@ -440,28 +430,36 @@ impl ConsoleLayer {
440430
.cloned()
441431
}
442432

443-
fn send(&self, dropped: &AtomicUsize, event: Event) {
433+
fn send(&self, dropped: &AtomicUsize, event: Event) -> bool {
444434
use mpsc::error::TrySendError;
445435

446-
match self.tx.try_reserve() {
447-
Ok(permit) => permit.send(event),
436+
// Return whether or not we actually sent the event.
437+
let sent = match self.tx.try_reserve() {
438+
Ok(permit) => {
439+
permit.send(event);
440+
true
441+
}
448442
Err(TrySendError::Closed(_)) => {
449443
// we should warn here eventually, but nop for now because we
450444
// can't trigger tracing events...
445+
false
451446
}
452447
Err(TrySendError::Full(_)) => {
453448
// this shouldn't happen, since we trigger a flush when
454449
// approaching the high water line...but if the executor wait
455450
// time is very high, maybe the aggregator task hasn't been
456451
// polled yet. so... eek?!
457452
dropped.fetch_add(1, Ordering::Release);
453+
false
458454
}
459-
}
455+
};
460456

461457
let capacity = self.tx.capacity();
462458
if capacity <= self.flush_under_capacity {
463459
self.shared.flush.trigger();
464460
}
461+
462+
sent
465463
}
466464
}
467465

@@ -512,7 +510,7 @@ where
512510

513511
fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &span::Id, ctx: Context<'_, S>) {
514512
let metadata = attrs.metadata();
515-
if self.is_spawn(metadata) {
513+
let sent = if self.is_spawn(metadata) {
516514
let at = SystemTime::now();
517515
let mut task_visitor = TaskVisitor::new(metadata.into());
518516
attrs.record(&mut task_visitor);
@@ -526,11 +524,8 @@ where
526524
fields,
527525
location,
528526
},
529-
);
530-
return;
531-
}
532-
533-
if self.is_resource(metadata) {
527+
)
528+
} else if self.is_resource(metadata) {
534529
let mut resource_visitor = ResourceVisitor::default();
535530
attrs.record(&mut resource_visitor);
536531
if let Some(result) = resource_visitor.result() {
@@ -558,12 +553,12 @@ where
558553
is_internal,
559554
inherit_child_attrs,
560555
},
561-
);
562-
} // else unknown resource span format
563-
return;
564-
}
565-
566-
if self.is_async_op(metadata) {
556+
)
557+
} else {
558+
// else unknown resource span format
559+
false
560+
}
561+
} else if self.is_async_op(metadata) {
567562
let mut async_op_visitor = AsyncOpVisitor::default();
568563
attrs.record(&mut async_op_visitor);
569564
if let Some((source, inherit_child_attrs)) = async_op_visitor.result() {
@@ -588,10 +583,30 @@ where
588583
source,
589584
inherit_child_attrs,
590585
},
591-
);
586+
)
587+
} else {
588+
false
592589
}
590+
} else {
591+
// else async op span needs to have a source field
592+
false
593+
}
594+
} else {
595+
false
596+
};
597+
598+
// If we were able to record the span, add a marker extension indicating
599+
// that it's tracked by the console.
600+
if sent {
601+
if let Some(span) = ctx.span(id) {
602+
span.extensions_mut().insert(Tracked {});
603+
} else {
604+
debug_assert!(
605+
false,
606+
"span should exist if `on_new_span` was called for its ID ({:?})",
607+
id
608+
);
593609
}
594-
// else async op span needs to have a source field
595610
}
596611
}
597612

@@ -673,7 +688,7 @@ where
673688
update_type: UpdateType::Resource,
674689
update,
675690
},
676-
)
691+
);
677692
}
678693
}
679694
return;

0 commit comments

Comments
 (0)