Skip to content

Commit 26772cb

Browse files
apollo_network_benchmark: added cpu and memory metrics
1 parent b71404b commit 26772cb

File tree

3 files changed

+218
-0
lines changed

3 files changed

+218
-0
lines changed

crates/apollo_network_benchmark/src/bin/broadcast_network_stress_test_node/main.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ mod protocol;
1717
mod stress_test_node;
1818

1919
use apollo_network_benchmark::node_args::NodeArgs;
20+
use metrics::register_metrics;
2021
use stress_test_node::BroadcastNetworkStressTestNode;
2122

2223
#[tokio::main]
@@ -46,6 +47,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
4647

4748
builder.install().expect("Failed to install prometheus recorder/exporter");
4849

50+
register_metrics();
51+
4952
// Start the tokio runtime metrics reporter to automatically collect and export runtime metrics
5053
tokio::spawn(
5154
RuntimeMetricsReporterBuilder::default()

crates/apollo_network_benchmark/src/bin/broadcast_network_stress_test_node/metrics.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,5 +6,30 @@ define_metrics!(
66
MetricCounter { RECEIVE_MESSAGE_COUNT, "receive_message_count", "Number of stress test messages received via broadcast", init = 0 },
77
MetricCounter { RECEIVE_MESSAGE_BYTES_SUM, "receive_message_bytes_sum", "Sum of the stress test messages received via broadcast", init = 0 },
88
MetricHistogram { RECEIVE_MESSAGE_DELAY_SECONDS, "receive_message_delay_seconds", "Message delay in seconds" },
9+
10+
// system metrics for the node
11+
MetricGauge { SYSTEM_TOTAL_MEMORY_BYTES, "system_total_memory_bytes", "Total system memory in bytes" },
12+
MetricGauge { SYSTEM_AVAILABLE_MEMORY_BYTES, "system_available_memory_bytes", "Available system memory in bytes" },
13+
MetricGauge { SYSTEM_USED_MEMORY_BYTES, "system_used_memory_bytes", "Used system memory in bytes" },
14+
MetricGauge { SYSTEM_CPU_COUNT, "system_cpu_count", "Number of logical CPU cores in the system" },
15+
16+
// system metrics for the process
17+
MetricGauge { SYSTEM_PROCESS_CPU_USAGE_PERCENT, "system_process_cpu_usage_percent", "CPU usage percentage of the current process" },
18+
MetricGauge { SYSTEM_PROCESS_MEMORY_USAGE_BYTES, "system_process_memory_usage_bytes", "Memory usage in bytes of the current process" },
19+
MetricGauge { SYSTEM_PROCESS_VIRTUAL_MEMORY_USAGE_BYTES, "system_process_virtual_memory_usage_bytes", "Virtual memory usage in bytes of the current process" },
920
},
1021
);
22+
23+
pub(crate) fn register_metrics() {
24+
RECEIVE_MESSAGE_BYTES.register();
25+
RECEIVE_MESSAGE_COUNT.register();
26+
RECEIVE_MESSAGE_BYTES_SUM.register();
27+
RECEIVE_MESSAGE_DELAY_SECONDS.register();
28+
SYSTEM_TOTAL_MEMORY_BYTES.register();
29+
SYSTEM_AVAILABLE_MEMORY_BYTES.register();
30+
SYSTEM_USED_MEMORY_BYTES.register();
31+
SYSTEM_CPU_COUNT.register();
32+
SYSTEM_PROCESS_CPU_USAGE_PERCENT.register();
33+
SYSTEM_PROCESS_MEMORY_USAGE_BYTES.register();
34+
SYSTEM_PROCESS_VIRTUAL_MEMORY_USAGE_BYTES.register();
35+
}
Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
use std::fs;
2+
use std::time::{Duration, Instant};
3+
4+
use apollo_metrics::metrics::LossyIntoF64;
5+
use tokio::time::interval;
6+
use tracing::warn;
7+
8+
use crate::metrics::{
9+
SYSTEM_AVAILABLE_MEMORY_BYTES,
10+
SYSTEM_CPU_COUNT,
11+
SYSTEM_PROCESS_CPU_USAGE_PERCENT,
12+
SYSTEM_PROCESS_MEMORY_USAGE_BYTES,
13+
SYSTEM_PROCESS_VIRTUAL_MEMORY_USAGE_BYTES,
14+
SYSTEM_TOTAL_MEMORY_BYTES,
15+
SYSTEM_USED_MEMORY_BYTES,
16+
};
17+
18+
const NANOS_PER_SECOND: f64 = 1_000_000_000.0;
19+
20+
/// Linux USER_HZ: the tick rate exposed to userspace via /proc. This is a stable kernel ABI
21+
/// constant that has been 100 on all mainstream architectures for decades.
22+
const CLOCK_TICKS_PER_SEC: u64 = 100;
23+
24+
/// Reads memory info, returning (total, available) in bytes.
25+
///
26+
/// Tries cgroup limits first (container-aware), falls back to /proc/meminfo.
27+
fn get_memory_info() -> Option<(u64, u64)> {
28+
if let Some(result) = get_cgroup_memory_info() {
29+
return Some(result);
30+
}
31+
get_proc_memory_info()
32+
}
33+
34+
/// Reads cgroup v2 memory limits, then falls back to cgroup v1.
35+
fn get_cgroup_memory_info() -> Option<(u64, u64)> {
36+
let total = fs::read_to_string("/sys/fs/cgroup/memory.max").ok()?;
37+
let total = total.trim();
38+
if total == "max" {
39+
return None;
40+
}
41+
let total_bytes: u64 = total.parse().ok()?;
42+
let current_bytes: u64 =
43+
fs::read_to_string("/sys/fs/cgroup/memory.current").ok()?.trim().parse().ok()?;
44+
let available_bytes = total_bytes.saturating_sub(current_bytes);
45+
Some((total_bytes, available_bytes))
46+
}
47+
48+
/// Reads /proc/meminfo for system memory stats.
49+
fn get_proc_memory_info() -> Option<(u64, u64)> {
50+
let content = match fs::read_to_string("/proc/meminfo") {
51+
Ok(c) => c,
52+
Err(e) => {
53+
warn!("Failed to read /proc/meminfo: {}", e);
54+
return None;
55+
}
56+
};
57+
58+
let mut total_kb = None;
59+
let mut available_kb = None;
60+
61+
for line in content.lines() {
62+
if let Some(val) = line.strip_prefix("MemTotal:") {
63+
total_kb = parse_meminfo_kb(val);
64+
} else if let Some(val) = line.strip_prefix("MemAvailable:") {
65+
available_kb = parse_meminfo_kb(val);
66+
}
67+
if total_kb.is_some() && available_kb.is_some() {
68+
break;
69+
}
70+
}
71+
72+
Some((total_kb? * 1024, available_kb? * 1024))
73+
}
74+
75+
/// Parses a value like " 16384000 kB" into the numeric kB value.
76+
fn parse_meminfo_kb(val: &str) -> Option<u64> {
77+
val.split_whitespace().next()?.parse().ok()
78+
}
79+
80+
/// Reads process CPU ticks (utime + stime) from /proc/self/stat.
81+
fn get_process_cpu_ticks() -> Option<u64> {
82+
let content = match fs::read_to_string("/proc/self/stat") {
83+
Ok(c) => c,
84+
Err(e) => {
85+
warn!("Failed to read /proc/self/stat: {}", e);
86+
return None;
87+
}
88+
};
89+
90+
// Fields in /proc/self/stat are space-separated, but field 2 (comm) is in parentheses
91+
// and may contain spaces. Find the closing ')' to skip past it.
92+
let after_comm = content.rfind(')')?.checked_add(2)?;
93+
let fields: Vec<&str> = content.get(after_comm..)?.split_whitespace().collect();
94+
// After comm, fields are 0-indexed from field 3 of the stat file.
95+
// utime = field 14 (index 11 after comm), stime = field 15 (index 12 after comm)
96+
let utime: u64 = fields.get(11)?.parse().ok()?;
97+
let stime: u64 = fields.get(12)?.parse().ok()?;
98+
Some(utime + stime)
99+
}
100+
101+
/// Reads process memory from /proc/self/status (VmRSS and VmSize in kB).
102+
/// Returns (rss_bytes, vsize_bytes).
103+
fn get_process_memory() -> Option<(u64, u64)> {
104+
let content = match fs::read_to_string("/proc/self/status") {
105+
Ok(c) => c,
106+
Err(e) => {
107+
warn!("Failed to read /proc/self/status: {}", e);
108+
return None;
109+
}
110+
};
111+
112+
let mut rss_kb = None;
113+
let mut vsize_kb = None;
114+
115+
for line in content.lines() {
116+
if let Some(val) = line.strip_prefix("VmRSS:") {
117+
rss_kb = parse_meminfo_kb(val);
118+
} else if let Some(val) = line.strip_prefix("VmSize:") {
119+
vsize_kb = parse_meminfo_kb(val);
120+
}
121+
if rss_kb.is_some() && vsize_kb.is_some() {
122+
break;
123+
}
124+
}
125+
126+
Some((rss_kb? * 1024, vsize_kb? * 1024))
127+
}
128+
129+
struct CpuState {
130+
prev_ticks: u64,
131+
prev_time: Instant,
132+
}
133+
134+
/// Collects system-wide and process-specific metrics (CPU, memory) by reading /proc directly.
135+
fn collect_system_and_process_metrics(cpu_state: &mut Option<CpuState>) {
136+
if let Some((total, available)) = get_memory_info() {
137+
let used = total.saturating_sub(available);
138+
SYSTEM_TOTAL_MEMORY_BYTES.set(total.into_f64());
139+
SYSTEM_AVAILABLE_MEMORY_BYTES.set(available.into_f64());
140+
SYSTEM_USED_MEMORY_BYTES.set(used.into_f64());
141+
}
142+
143+
match std::thread::available_parallelism() {
144+
Ok(count) => SYSTEM_CPU_COUNT.set(count.get().into_f64()),
145+
Err(e) => warn!("Failed to get CPU count: {}", e),
146+
}
147+
148+
if let Some((rss, vsize)) = get_process_memory() {
149+
SYSTEM_PROCESS_MEMORY_USAGE_BYTES.set(rss.into_f64());
150+
SYSTEM_PROCESS_VIRTUAL_MEMORY_USAGE_BYTES.set(vsize.into_f64());
151+
}
152+
153+
if let Some(current_ticks) = get_process_cpu_ticks() {
154+
let now = Instant::now();
155+
if let Some(prev) = cpu_state.as_ref() {
156+
let tick_delta = current_ticks.saturating_sub(prev.prev_ticks);
157+
let elapsed = now.duration_since(prev.prev_time);
158+
let elapsed_secs = elapsed.as_nanos().into_f64() / NANOS_PER_SECOND;
159+
if elapsed_secs > 0.0 {
160+
let cpu_seconds = tick_delta.into_f64() / CLOCK_TICKS_PER_SEC.into_f64();
161+
let cpu_percent = (cpu_seconds / elapsed_secs) * 100.0;
162+
SYSTEM_PROCESS_CPU_USAGE_PERCENT.set(cpu_percent);
163+
}
164+
}
165+
*cpu_state = Some(CpuState { prev_ticks: current_ticks, prev_time: now });
166+
}
167+
}
168+
169+
pub async fn monitor_process_metrics(interval_seconds: u64) {
170+
let mut interval = interval(Duration::from_secs(interval_seconds));
171+
172+
struct State {
173+
cpu_state: Option<CpuState>,
174+
}
175+
176+
let mut state = Some(State { cpu_state: None });
177+
178+
loop {
179+
interval.tick().await;
180+
181+
let passed_state = state.take();
182+
state = tokio::task::spawn_blocking(move || {
183+
let mut state = passed_state.unwrap();
184+
collect_system_and_process_metrics(&mut state.cpu_state);
185+
Some(state)
186+
})
187+
.await
188+
.unwrap();
189+
}
190+
}

0 commit comments

Comments
 (0)