Skip to content

Commit 1aa9b59

Browse files
authored
feat: add resource detail view (#188)
This PR introduces the resoruce detail view. It also brings a couple of additional changes: - we now rely on context of entered spans rather than explicit parent - child relationships - we expose internal resources (`BatchSempahore`) - we show the relationships between resources (`Mutex` -> `BatchSemaphore`) - we show the async ops that are live on a specific resource - we also show the tasks that are awaiting on these async ops, which allows the user to draw a relationship between tasks and resources - there are more examples added in the examples folder (semaphore and mutex) Some screenshots: <img width="1050" alt="Screenshot 2021-11-23 at 19 12 28" src="https://user-images.githubusercontent.com/4391506/143072764-f940ed45-350d-4be4-941b-95ef5650c4d8.png"> <img width="1388" alt="Screenshot 2021-11-23 at 19 12 38" src="https://user-images.githubusercontent.com/4391506/143072791-ee5d485f-ea1e-4609-8946-16a61bf5776f.png"> <img width="1371" alt="Screenshot 2021-11-23 at 19 12 50" src="https://user-images.githubusercontent.com/4391506/143072807-4b577d3f-17d6-4ade-8dcc-006b44fc1014.png"> Signed-off-by: Zahari Dichev <[email protected]>
1 parent a79c505 commit 1aa9b59

File tree

25 files changed

+1423
-222
lines changed

25 files changed

+1423
-222
lines changed

Cargo.lock

Lines changed: 4 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,4 @@ members = [
44
"console-subscriber",
55
"console-api"
66
]
7-
resolver = "2"
7+
resolver = "2"

console-api/proto/async_ops.proto

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,17 @@ message AsyncOp {
4141
// The source of this async operation. Most commonly this should be the name
4242
// of the method where the instantiation of this op has happened.
4343
string source = 3;
44+
// The ID of the parent async op.
45+
//
46+
// This field is only set if this async op was created while inside of another
47+
// async op. For example, `tokio::sync`'s `Mutex::lock` internally calls
48+
// `Semaphore::acquire`.
49+
//
50+
// This field can be empty; if it is empty, this async op is not a child of another
51+
// async op.
52+
common.Id parent_async_op_id = 4;
53+
// The resources's ID.
54+
common.Id resource_id = 5;
4455
}
4556

4657
// Statistics associated with a given async operation.
@@ -49,12 +60,11 @@ message Stats {
4960
google.protobuf.Timestamp created_at = 1;
5061
// Timestamp of when the async op was dropped.
5162
google.protobuf.Timestamp dropped_at = 2;
52-
// The resource Id this `AsyncOp` is associated with. Note that both
53-
// `resource_id` and `task_id` can be None if this async op has not been polled yet
54-
common.Id resource_id = 3;
5563
// The Id of the task that is awaiting on this op.
5664
common.Id task_id = 4;
5765
// Contains the operation poll stats.
5866
common.PollStats poll_stats = 5;
67+
// State attributes of the async op.
68+
repeated common.Attribute attributes = 6;
5969
}
6070

console-api/proto/common.proto

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,3 +183,18 @@ message PollStats {
183183
// has spent *waiting* to be polled.
184184
google.protobuf.Duration busy_time = 6;
185185
}
186+
187+
// State attributes of an entity. These are dependent on the type of the entity.
188+
//
189+
// For example, a timer resource will have a duration, while a semaphore resource may
190+
// have a permit count. Likewise, the async ops of a semaphore may have attributes
191+
// indicating how many permits they are trying to acquire vs how many are acquired.
192+
// These values may change over time. Therefore, they live in the runtime stats rather
193+
// than the static data describing the entity.
194+
message Attribute {
195+
// The key-value pair for the attribute
196+
common.Field field = 1;
197+
// Some values carry a unit of measurement. For example, a duration
198+
// carries an associated unit of time, such as "ms" for milliseconds.
199+
optional string unit = 2;
200+
}

console-api/proto/resources.proto

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,13 @@ message Resource {
4141
Kind kind = 4;
4242
// The location in code where the resource was created.
4343
common.Location location = 5;
44+
// The ID of the parent resource.
45+
common.Id parent_resource_id = 6;
46+
// Is the resource an internal component of another resource?
47+
//
48+
// For example, a `tokio::time::Interval` resource might contain a
49+
// `tokio::time::Sleep` resource internally.
50+
bool is_internal = 7;
4451

4552
// The kind of resource (e.g. timer, mutex).
4653
message Kind {
@@ -70,16 +77,7 @@ message Stats {
7077
// have permits as an attribute. These values may change over time as the state of
7178
// the resource changes. Therefore, they live in the runtime stats rather than the
7279
// static data describing the resource.
73-
repeated Attribute attributes = 3;
74-
75-
// A single key-value pair associated with a resource.
76-
message Attribute {
77-
// The key-value pair for the attribute
78-
common.Field field = 1;
79-
// Some values carry a unit of measurement. For example, a duration
80-
// carries an associated unit of time, such as "ms" for milliseconds.
81-
optional string unit = 2;
82-
}
80+
repeated common.Attribute attributes = 3;
8381
}
8482

8583
// A `PollOp` describes each poll operation that completes within the async

console-subscriber/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ parking_lot = ["parking_lot_crate", "tracing-subscriber/parking_lot"]
1212

1313
[dependencies]
1414

15-
tokio = { version = "^1.13", features = ["sync", "time", "macros", "tracing"] }
15+
tokio = { version = "^1.15", features = ["sync", "time", "macros", "tracing"] }
1616
tokio-stream = "0.1"
1717
thread_local = "1.1.3"
1818
console-api = { path = "../console-api", features = ["transport"] }
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
use std::sync::Arc;
2+
use std::time::Duration;
3+
use tokio::sync::Barrier;
4+
use tokio::task;
5+
6+
#[tokio::main]
7+
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
8+
console_subscriber::init();
9+
task::Builder::default()
10+
.name("main-task")
11+
.spawn(async move {
12+
let mut handles = Vec::with_capacity(30);
13+
let barrier = Arc::new(Barrier::new(30));
14+
for i in 0..30 {
15+
let c = barrier.clone();
16+
let task_name = format!("task-{}", i);
17+
handles.push(task::Builder::default().name(&task_name).spawn(async move {
18+
tokio::time::sleep(Duration::from_secs(i)).await;
19+
let wait_result = c.wait().await;
20+
wait_result
21+
}));
22+
}
23+
24+
// Will not resolve until all "after wait" messages have been printed
25+
let mut num_leaders = 0;
26+
for handle in handles {
27+
let wait_result = handle.await.unwrap();
28+
if wait_result.is_leader() {
29+
num_leaders += 1;
30+
}
31+
}
32+
33+
tokio::time::sleep(Duration::from_secs(10)).await;
34+
// Exactly one barrier will resolve as the "leader"
35+
assert_eq!(num_leaders, 1);
36+
})
37+
.await?;
38+
39+
Ok(())
40+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
use std::sync::Arc;
2+
use std::time::Duration;
3+
use tokio::{sync::Mutex, task};
4+
5+
#[tokio::main]
6+
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
7+
console_subscriber::init();
8+
task::Builder::default()
9+
.name("main-task")
10+
.spawn(async move {
11+
let count = Arc::new(Mutex::new(0));
12+
for i in 0..5 {
13+
let my_count = Arc::clone(&count);
14+
let task_name = format!("increment-{}", i);
15+
tokio::task::Builder::default()
16+
.name(&task_name)
17+
.spawn(async move {
18+
for _ in 0..10 {
19+
let mut lock = my_count.lock().await;
20+
*lock += 1;
21+
tokio::time::sleep(Duration::from_secs(1)).await;
22+
}
23+
});
24+
}
25+
26+
while *count.lock().await < 50 {}
27+
})
28+
.await?;
29+
30+
Ok(())
31+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
use std::sync::Arc;
2+
use std::time::Duration;
3+
use tokio::{sync::RwLock, task};
4+
5+
#[tokio::main]
6+
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
7+
console_subscriber::init();
8+
task::Builder::default()
9+
.name("main-task")
10+
.spawn(async move {
11+
let count = Arc::new(RwLock::new(0));
12+
for i in 0..5 {
13+
let my_count = Arc::clone(&count);
14+
let task_name = format!("increment-{}", i);
15+
tokio::task::Builder::default()
16+
.name(&task_name)
17+
.spawn(async move {
18+
for _ in 0..10 {
19+
let mut lock = my_count.write().await;
20+
*lock += 1;
21+
tokio::time::sleep(Duration::from_secs(1)).await;
22+
}
23+
});
24+
}
25+
26+
loop {
27+
let c = count.read().await;
28+
tokio::time::sleep(Duration::from_secs(1)).await;
29+
if *c >= 50 {
30+
break;
31+
}
32+
}
33+
})
34+
.await?;
35+
36+
Ok(())
37+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
use std::sync::Arc;
2+
use std::time::Duration;
3+
use tokio::task;
4+
5+
#[tokio::main]
6+
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
7+
console_subscriber::init();
8+
task::Builder::default()
9+
.name("main-task")
10+
.spawn(async move {
11+
let sem = Arc::new(tokio::sync::Semaphore::new(0));
12+
let mut tasks = Vec::default();
13+
for i in 0..5 {
14+
let acquire_sem = Arc::clone(&sem);
15+
let add_sem = Arc::clone(&sem);
16+
let acquire_task_name = format!("acquire-{}", i);
17+
let add_task_name = format!("add-{}", i);
18+
tasks.push(
19+
tokio::task::Builder::default()
20+
.name(&acquire_task_name)
21+
.spawn(async move {
22+
let _permit = acquire_sem.acquire_many(i).await.unwrap();
23+
tokio::time::sleep(Duration::from_secs(i as u64 * 2)).await;
24+
}),
25+
);
26+
tasks.push(tokio::task::Builder::default().name(&add_task_name).spawn(
27+
async move {
28+
tokio::time::sleep(Duration::from_secs(i as u64 * 5)).await;
29+
add_sem.add_permits(i as usize);
30+
},
31+
));
32+
}
33+
34+
let all_tasks = futures::future::try_join_all(tasks);
35+
all_tasks.await.unwrap();
36+
})
37+
.await?;
38+
39+
Ok(())
40+
}

0 commit comments

Comments
 (0)