Skip to content

Commit fdc77e2

Browse files
authored
fix(subscriber): only send *new* tasks/resources/etc over the event channel (#238)
## Motivation Currently, there are some rather bad issues that occur when the event buffer is at capacity and events are dropped. Completely *losing* data due to buffer capacity is relatively okay: if we set a bound on how much memory the console can use, and we don't record new things that occur when we've reached that limit, this is correct and acceptable behavior. However, the current design can result in *incorrect* data when events are lost due to the buffer being at capacity. This is because we currently record things like starting to poll a task/resource, ending a poll, dropping a task/resource/async op, and waker operations, as individual events in the buffer. This means that we can have a situation where the creation of a task was recorded, because there was buffer capacity at the time, but then when the task ended, the buffer was full, so we never recorded its termination. That results in the tasks appearing to run forever and never terminate --- see issue #230. Similarly, if we record the beginning of a poll, but drop the end-of-poll event because we're at capacity, this means we will (incorrectly) record a poll that goes on forever, which is obviously incorrect. I think this may also be the cause of the false positives with the lost waker lint (#149), if we record a waker drop but missed a waker clone that previously occurred. The change in #212 fixed one category of issue that occurs due to event buffer capacity --- when a task, resource, or async op's _creation_ event is dropped due to buffer capacity, we skip any subsequent events related to that task/resource/op. However, this doesn't fix issues where the subsequent events are the ones that are dropped. ## Solution This branch proposes a solution to this whole category of event buffer capacity related issues. Unfortunately, this requires rewriting a *lot* of `console-subscriber`'s internals. In the new approach, we now _only_ send events over the channel when creating a new task, resource, or async op. Those events now contain an `Arc` holding the stats for that entity. Another clone of the `Arc` is stored in the `tracing_subscriber::Registry`'s [span extensions] for the span corresponding to that entity. When the `ConsoleLayer` records subsequent events for a particular entity, such as starting/ending a poll, it looks up the span by ID, and updates the stats type stored in its extensions. The aggregator stores its clone of the `Arc` in a map of entities, just like it does currently, but no longer handles actually updating the stats; just building wire format updates from any tracked entities whose data was updated by the layer. This should fix all issues where dropping something due to event buffer capacity results in incorrect data. Once we have successfully recorded the *creation* of a task, resource, or async op, any subsequent updates to its stats are *guaranteed* to be reliable. If the channel is at capacity and we fail to record a new resource/task/op, we never create a stats extension for it, and we won't record anything for it at all. Otherwise, it will always have correct data recorded. When possible, the stats in the `Arc`ed stats are updated atomically. In some cases, this isn't easily possible, and some fields of the stats types are stored in a mutex. In particualr, this is required for storing timestamps. I don't really love that, but these mutices should be contented very infrequently. Stats aren't marked as having unset updates until after the stats inside the mutices have been updated, so the aggregator will not try to lock the mutex if the layer is currently updating it; instead, it will simply be included in the next update once the layer is no longer touching it. Mutices here will only be contended when multiple threads are updating a task's stats at the same time, which should occur very rarely...and in most cases, they *still* won't have to contend a mutex, since access to most of the mutices are guarded by an atomic variable for e.g. determining which thread actually was the last to complete a concurrent poll. The biggest performance downside of the mutices is probably not actually contention, but the additional heap allocation required when using `std::sync::Mutex`. However, since we have conditional `parking_lot` support, parking_lot can be used to avoid requiring additional allocations. In the future, it's probably possible to make more of this atomic by converting timestamps into integers and storing them in atomic variables. I haven't done this yet because both the protobuf timestamps and `std::time` timestamps are larger than a single 64-bit number and it might take a little extra work to ensure we can nicely fit them in an `AtomicUsize`...but we can probably do that later. [span extensions]: https://docs.rs/tracing-subscriber/latest/tracing_subscriber/registry/struct.SpanRef.html#method.extensions Signed-off-by: Eliza Weisman <[email protected]>
1 parent 9a50b63 commit fdc77e2

File tree

10 files changed

+1049
-854
lines changed

10 files changed

+1049
-854
lines changed

Cargo.lock

Lines changed: 21 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

console-subscriber/Cargo.toml

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,13 +40,16 @@ tracing = "0.1.26"
4040
tracing-subscriber = { version = "0.3.0", default-features = false, features = ["fmt", "registry"] }
4141
futures = { version = "0.3", default-features = false }
4242
hdrhistogram = { version = "7.3.0", default-features = false, features = ["serialization"] }
43-
serde = { version = "1", features = ["derive"] }
44-
serde_json = "1"
4543
# The parking_lot dependency is renamed, because we want our `parking_lot`
4644
# feature to also enable `tracing-subscriber`'s parking_lot feature flag.
4745
parking_lot_crate = { package = "parking_lot", version = "0.11", optional = true }
4846
humantime = "2.1.0"
4947

48+
# Required for recording:
49+
serde = { version = "1", features = ["derive"] }
50+
serde_json = "1"
51+
crossbeam-channel = "0.5"
52+
5053
[dev-dependencies]
5154
tokio = { version = "^1.7", features = ["full", "rt-multi-thread"] }
5255
futures = "0.3"
Lines changed: 16 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,12 @@
1-
use super::{shrink::ShrinkMap, DroppedAt, Id, ToProto};
1+
use super::{shrink::ShrinkMap, Id, ToProto};
2+
use crate::stats::{DroppedAt, Unsent};
23
use std::collections::HashMap;
3-
use std::ops::{Deref, DerefMut};
44
use std::time::{Duration, SystemTime};
55

66
pub(crate) struct IdData<T> {
7-
data: ShrinkMap<Id, (T, bool)>,
7+
data: ShrinkMap<Id, T>,
88
}
99

10-
pub(crate) struct Updating<'a, T>(&'a mut (T, bool));
11-
1210
pub(crate) enum Include {
1311
All,
1412
UpdatedOnly,
@@ -19,31 +17,19 @@ pub(crate) enum Include {
1917
impl<T> Default for IdData<T> {
2018
fn default() -> Self {
2119
IdData {
22-
data: ShrinkMap::<Id, (T, bool)>::new(),
20+
data: ShrinkMap::<Id, T>::new(),
2321
}
2422
}
2523
}
2624

27-
impl<T> IdData<T> {
28-
pub(crate) fn update_or_default(&mut self, id: Id) -> Updating<'_, T>
29-
where
30-
T: Default,
31-
{
32-
Updating(self.data.entry(id).or_default())
33-
}
34-
35-
pub(crate) fn update(&mut self, id: &Id) -> Option<Updating<'_, T>> {
36-
self.data.get_mut(id).map(Updating)
37-
}
38-
25+
impl<T: Unsent> IdData<T> {
3926
pub(crate) fn insert(&mut self, id: Id, data: T) {
40-
self.data.insert(id, (data, true));
27+
self.data.insert(id, data);
4128
}
4229

4330
pub(crate) fn since_last_update(&mut self) -> impl Iterator<Item = (&Id, &mut T)> {
44-
self.data.iter_mut().filter_map(|(id, (data, dirty))| {
45-
if *dirty {
46-
*dirty = false;
31+
self.data.iter_mut().filter_map(|(id, data)| {
32+
if data.take_unsent() {
4733
Some((id, data))
4834
} else {
4935
None
@@ -52,11 +38,11 @@ impl<T> IdData<T> {
5238
}
5339

5440
pub(crate) fn all(&self) -> impl Iterator<Item = (&Id, &T)> {
55-
self.data.iter().map(|(id, (data, _))| (id, data))
41+
self.data.iter()
5642
}
5743

5844
pub(crate) fn get(&self, id: &Id) -> Option<&T> {
59-
self.data.get(id).map(|(data, _)| data)
45+
self.data.get(id)
6046
}
6147

6248
pub(crate) fn as_proto(&mut self, include: Include) -> HashMap<u64, T::Output>
@@ -75,7 +61,7 @@ impl<T> IdData<T> {
7561
}
7662
}
7763

78-
pub(crate) fn drop_closed<R: DroppedAt>(
64+
pub(crate) fn drop_closed<R: DroppedAt + Unsent>(
7965
&mut self,
8066
stats: &mut IdData<R>,
8167
now: SystemTime,
@@ -92,18 +78,19 @@ impl<T> IdData<T> {
9278
// drop closed entities
9379
tracing::trace!(?retention, has_watchers, "dropping closed");
9480

95-
stats.data.retain_and_shrink(|id, (stats, dirty)| {
81+
stats.data.retain_and_shrink(|id, stats| {
9682
if let Some(dropped_at) = stats.dropped_at() {
9783
let dropped_for = now.duration_since(dropped_at).unwrap_or_default();
84+
let dirty = stats.is_unsent();
9885
let should_drop =
9986
// if there are any clients watching, retain all dirty tasks regardless of age
100-
(*dirty && has_watchers)
87+
(dirty && has_watchers)
10188
|| dropped_for > retention;
10289
tracing::trace!(
10390
stats.id = ?id,
10491
stats.dropped_at = ?dropped_at,
10592
stats.dropped_for = ?dropped_for,
106-
stats.dirty = *dirty,
93+
stats.dirty = dirty,
10794
should_drop,
10895
);
10996
return !should_drop;
@@ -114,27 +101,6 @@ impl<T> IdData<T> {
114101

115102
// drop closed entities which no longer have stats.
116103
self.data
117-
.retain_and_shrink(|id, (_, _)| stats.data.contains_key(id));
118-
}
119-
}
120-
121-
// === impl Updating ===
122-
123-
impl<'a, T> Deref for Updating<'a, T> {
124-
type Target = T;
125-
fn deref(&self) -> &Self::Target {
126-
&self.0 .0
127-
}
128-
}
129-
130-
impl<'a, T> DerefMut for Updating<'a, T> {
131-
fn deref_mut(&mut self) -> &mut Self::Target {
132-
&mut self.0 .0
133-
}
134-
}
135-
136-
impl<'a, T> Drop for Updating<'a, T> {
137-
fn drop(&mut self) {
138-
self.0 .1 = true;
104+
.retain_and_shrink(|id, _| stats.data.contains_key(id));
139105
}
140106
}

0 commit comments

Comments
 (0)