From 695e858abb86f739fc47ce242dc409dafb6796f6 Mon Sep 17 00:00:00 2001 From: Jean-Baptiste Skutnik Date: Fri, 12 Sep 2025 10:25:45 -0400 Subject: [PATCH 01/13] Omit non-threshold metrics in bencher output --- .github/workflows/lustre-metrics-bench.yml | 2 + .github/workflows/quota-parsing.yml | 12 ++++- .../workflows/scrape-memory-metrics-bench.yml | 4 ++ lustre-collector/combine_mem_usage.json | 44 ------------------- 4 files changed, 16 insertions(+), 46 deletions(-) delete mode 100644 lustre-collector/combine_mem_usage.json diff --git a/.github/workflows/lustre-metrics-bench.yml b/.github/workflows/lustre-metrics-bench.yml index cd46f092..3bfc10a8 100644 --- a/.github/workflows/lustre-metrics-bench.yml +++ b/.github/workflows/lustre-metrics-bench.yml @@ -59,6 +59,7 @@ jobs: --err \ --adapter rust_iai_callgrind \ --github-actions '${{ secrets.GITHUB_TOKEN }}' \ + --ci-only-thresholds \ "cargo bench --bench lustre_metrics" - name: Compare Lustre Metrics Benchmarks with main branch @@ -74,4 +75,5 @@ jobs: --err \ --adapter rust_iai_callgrind \ --github-actions '${{ secrets.GITHUB_TOKEN }}' \ + --ci-only-thresholds \ "cargo bench --bench lustre_metrics" diff --git a/.github/workflows/quota-parsing.yml b/.github/workflows/quota-parsing.yml index 73288ac4..52ac25e2 100644 --- a/.github/workflows/quota-parsing.yml +++ b/.github/workflows/quota-parsing.yml @@ -29,7 +29,7 @@ jobs: --token '${{ secrets.BENCHER_API_TOKEN }}' \ --branch main \ --testbed ci-runner \ - --threshold-measure Latency \ + --threshold-measure latency \ --threshold-test t_test \ --threshold-max-sample-size 64 \ --threshold-lower-boundary 0.95 \ @@ -37,6 +37,8 @@ jobs: --err \ --adapter rust_criterion \ --github-actions '${{ secrets.GITHUB_TOKEN }}' \ + --ci-only-thresholds \ + --ci-id '${{ github.workflow_ref }}-performance' \ "cargo bench --bench combine_performance" - name: Compare quota parsing metrics benchmarks with main branch @@ -52,6 +54,8 @@ jobs: --err \ --adapter rust_criterion \ --github-actions '${{ secrets.GITHUB_TOKEN }}' \ + --ci-only-thresholds \ + --ci-id '${{ github.workflow_ref }}-performance' \ "cargo bench --bench combine_performance" - name: Run quota parsing memory usage benchmark @@ -67,7 +71,7 @@ jobs: --token '${{ secrets.BENCHER_API_TOKEN }}' \ --branch main \ --testbed ci-runner \ - --threshold-measure peak_rss_mib \ + --threshold-measure start_rss_mib \ --threshold-test t_test \ --threshold-max-sample-size 64 \ --threshold-lower-boundary 0.95 \ @@ -75,6 +79,8 @@ jobs: --err \ --adapter json \ --github-actions '${{ secrets.GITHUB_TOKEN }}' \ + --ci-only-thresholds \ + --ci-id '${{ github.workflow_ref }}-memory' \ --file lustre-collector/combine_mem_usage.json - name: Compare quota parsing memory metrics with main branch @@ -90,4 +96,6 @@ jobs: --err \ --adapter json \ --github-actions '${{ secrets.GITHUB_TOKEN }}' \ + --ci-only-thresholds \ + --ci-id '${{ github.workflow_ref }}-memory' \ --file lustre-collector/combine_mem_usage.json diff --git a/.github/workflows/scrape-memory-metrics-bench.yml b/.github/workflows/scrape-memory-metrics-bench.yml index 920f31f0..a9542995 100644 --- a/.github/workflows/scrape-memory-metrics-bench.yml +++ b/.github/workflows/scrape-memory-metrics-bench.yml @@ -52,6 +52,8 @@ jobs: --err \ --adapter json \ --github-actions '${{ secrets.GITHUB_TOKEN }}' \ + --ci-only-thresholds \ + --ci-id '${{ github.workflow_ref }}' \ --file lustrefs-exporter/scrape_allocations_results.json - name: Compare scrape memory metrics with main branch @@ -67,4 +69,6 @@ jobs: --err \ --adapter json \ --github-actions '${{ secrets.GITHUB_TOKEN }}' \ + --ci-only-thresholds \ + --ci-id '${{ github.workflow_ref }}' \ --file lustrefs-exporter/scrape_allocations_results.json diff --git a/lustre-collector/combine_mem_usage.json b/lustre-collector/combine_mem_usage.json deleted file mode 100644 index b80b60f5..00000000 --- a/lustre-collector/combine_mem_usage.json +++ /dev/null @@ -1,44 +0,0 @@ -{ - "scrape_allocations": { - "start_rss_mib": { - "value": 318.8093196902655 - }, - "peak_rss_mib": { - "value": 359.84375 - }, - "end_rss_mib": { - "value": 320.30074668141594 - }, - "memory_growth_mib": { - "value": 1.4914269911504425 - }, - "peak_over_start_rss_ratio": { - "value": 1.0788454764864892 - }, - "avg_runtime_rss_mib": { - "value": 319.2835635533252, - "lower_value": 24.296875, - "upper_value": 359.84375 - }, - "start_virtual_mib": { - "value": 402485.0271017699 - }, - "peak_virtual_mib": { - "value": 402526.0625 - }, - "end_virtual_mib": { - "value": 402497.05365044245 - }, - "virtual_growth_mib": { - "value": 12.026548672566372 - }, - "peak_over_start_virtual_ratio": { - "value": 1.0000299443068315 - }, - "avg_runtime_virtual_mib": { - "value": 402490.81627407606, - "lower_value": 401167.0625, - "upper_value": 402526.0625 - } - } -} \ No newline at end of file From 2803d5d1d0af9d89eadb0df4147137d09742be09 Mon Sep 17 00:00:00 2001 From: Jean-Baptiste Skutnik Date: Mon, 22 Sep 2025 14:49:05 +0530 Subject: [PATCH 02/13] Add memory-benchmarking --- Cargo.lock | 9 + Cargo.toml | 2 +- lustre-collector/Cargo.toml | 1 + lustre-collector/benches/combine_memory.rs | 187 +-------------------- memory-benchmarking/Cargo.toml | 13 ++ memory-benchmarking/src/lib.rs | 175 +++++++++++++++++++ 6 files changed, 200 insertions(+), 187 deletions(-) create mode 100644 memory-benchmarking/Cargo.toml create mode 100644 memory-benchmarking/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 2e72e1ba..e276dc65 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1357,6 +1357,7 @@ dependencies = [ "criterion", "include_dir", "insta", + "memory-benchmarking", "serde", "serde_json", "serde_yaml", @@ -1418,6 +1419,14 @@ version = "2.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32a282da65faaf38286cf3be983213fcf1d2e2a58700e808f83f4ea9a4804bc0" +[[package]] +name = "memory-benchmarking" +version = "0.11.0" +dependencies = [ + "serde", + "sysinfo", +] + [[package]] name = "mime" version = "0.3.17" diff --git a/Cargo.toml b/Cargo.toml index bf40bdb3..d5ffef9a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [workspace] -members = ['lustre-collector', 'lustrefs-exporter'] +members = ['lustre-collector', 'lustrefs-exporter', "memory-benchmarking"] resolver = "3" [workspace.package] diff --git a/lustre-collector/Cargo.toml b/lustre-collector/Cargo.toml index ec3e4844..6df3572f 100644 --- a/lustre-collector/Cargo.toml +++ b/lustre-collector/Cargo.toml @@ -9,6 +9,7 @@ version.workspace = true [dependencies] clap = { workspace = true, features = ["derive"] } combine.workspace = true +memory-benchmarking = { version = "0.11.0", path = "../memory-benchmarking" } serde = { workspace = true, features = ["derive"] } serde_json.workspace = true serde_yaml.workspace = true diff --git a/lustre-collector/benches/combine_memory.rs b/lustre-collector/benches/combine_memory.rs index 12120725..399a3b55 100644 --- a/lustre-collector/benches/combine_memory.rs +++ b/lustre-collector/benches/combine_memory.rs @@ -1,136 +1,8 @@ use combine::parser::EasyParser; use criterion::{Criterion, criterion_group, criterion_main}; use lustre_collector::quota::parse as combine_parse; +use memory_benchmarking::{BencherOutput, MemoryUsage, aggregate_samples, get_memory_stats}; use std::{fs::File, io::Read, time::Duration}; -use sysinfo::{Pid, ProcessExt, System, SystemExt}; - -#[derive(serde::Serialize)] -struct BencherOutput { - memory_usage: BencherMetrics, -} - -#[derive(serde::Serialize)] -struct BencherMetrics { - start_rss_mib: MetricEntry, - peak_rss_mib: MetricEntry, - end_rss_mib: MetricEntry, - memory_growth_mib: MetricEntry, - peak_over_start_rss_ratio: MetricEntry, - avg_runtime_rss_mib: MetricEntry, - start_virtual_mib: MetricEntry, - peak_virtual_mib: MetricEntry, - end_virtual_mib: MetricEntry, - virtual_growth_mib: MetricEntry, - peak_over_start_virtual_ratio: MetricEntry, - avg_runtime_virtual_mib: MetricEntry, -} - -#[derive(serde::Serialize)] -struct MetricEntry { - value: f64, - #[serde(skip_serializing_if = "Option::is_none")] - lower_value: Option, - #[serde(skip_serializing_if = "Option::is_none")] - upper_value: Option, -} - -#[derive(Clone, Debug, serde::Serialize)] -struct MemoryUsage { - start_rss: f64, - end_rss: f64, - memory_growth: f64, - peak_over_start_rss_ratio: f64, - avg_rss: f64, - min_rss: f64, - max_rss: f64, - start_virtual: f64, - end_virtual: f64, - virtual_growth: f64, - peak_over_start_virtual_ratio: f64, - avg_virtual: f64, - min_virtual: f64, - max_virtual: f64, -} - -impl From for BencherOutput { - fn from(x: MemoryUsage) -> Self { - BencherOutput { - memory_usage: BencherMetrics { - start_rss_mib: MetricEntry { - value: x.start_rss, - lower_value: None, - upper_value: None, - }, - peak_rss_mib: MetricEntry { - value: x.max_rss, - lower_value: None, - upper_value: None, - }, - end_rss_mib: MetricEntry { - value: x.end_rss, - lower_value: None, - upper_value: None, - }, - memory_growth_mib: MetricEntry { - value: x.memory_growth, - lower_value: None, - upper_value: None, - }, - peak_over_start_rss_ratio: MetricEntry { - value: x.peak_over_start_rss_ratio, - lower_value: None, - upper_value: None, - }, - avg_runtime_rss_mib: MetricEntry { - value: x.avg_rss, - lower_value: Some(x.min_rss), - upper_value: Some(x.max_rss), - }, - start_virtual_mib: MetricEntry { - value: x.start_virtual, - lower_value: None, - upper_value: None, - }, - peak_virtual_mib: MetricEntry { - value: x.max_virtual, - lower_value: None, - upper_value: None, - }, - end_virtual_mib: MetricEntry { - value: x.end_virtual, - lower_value: None, - upper_value: None, - }, - virtual_growth_mib: MetricEntry { - value: x.virtual_growth, - lower_value: None, - upper_value: None, - }, - peak_over_start_virtual_ratio: MetricEntry { - value: x.peak_over_start_virtual_ratio, - lower_value: None, - upper_value: None, - }, - avg_runtime_virtual_mib: MetricEntry { - value: x.avg_virtual, - lower_value: Some(x.min_virtual), - upper_value: Some(x.max_virtual), - }, - }, - } - } -} - -fn get_memory_stats() -> (f64, f64) { - let mut system = System::new(); - system.refresh_process(Pid::from(std::process::id() as usize)); - - if let Some(process) = system.process(Pid::from(std::process::id() as usize)) { - (process.memory() as f64, process.virtual_memory() as f64) - } else { - (0.0, 0.0) - } -} async fn test_combine_with_mem(buffer: &str) -> MemoryUsage { let (start_rss, start_virtual) = get_memory_stats(); @@ -260,62 +132,5 @@ pub fn combine_memory(c: &mut Criterion) { } } -fn aggregate_samples(samples: &[MemoryUsage]) -> MemoryUsage { - let size = samples.len() as f64; - const MIB_CONVERSION_FACTOR: f64 = 1_048_576.0; - - MemoryUsage { - start_rss: samples.iter().map(|x| x.start_rss).sum::() / size / MIB_CONVERSION_FACTOR, - end_rss: samples.iter().map(|x| x.end_rss).sum::() / size / MIB_CONVERSION_FACTOR, - memory_growth: samples.iter().map(|x| x.memory_growth).sum::() - / size - / MIB_CONVERSION_FACTOR, - peak_over_start_rss_ratio: samples - .iter() - .map(|x| x.peak_over_start_rss_ratio) - .sum::() - / size, - avg_rss: samples.iter().map(|x| x.avg_rss).sum::() / size / MIB_CONVERSION_FACTOR, - min_rss: samples - .iter() - .map(|x| x.min_rss) - .fold(f64::INFINITY, f64::min) - / MIB_CONVERSION_FACTOR, - max_rss: samples - .iter() - .map(|x| x.max_rss) - .fold(f64::NEG_INFINITY, f64::max) - / MIB_CONVERSION_FACTOR, - - start_virtual: samples.iter().map(|x| x.start_virtual).sum::() - / size - / MIB_CONVERSION_FACTOR, - end_virtual: samples.iter().map(|x| x.end_virtual).sum::() - / size - / MIB_CONVERSION_FACTOR, - virtual_growth: samples.iter().map(|x| x.virtual_growth).sum::() - / size - / MIB_CONVERSION_FACTOR, - peak_over_start_virtual_ratio: samples - .iter() - .map(|x| x.peak_over_start_virtual_ratio) - .sum::() - / size, - avg_virtual: samples.iter().map(|x| x.avg_virtual).sum::() - / size - / MIB_CONVERSION_FACTOR, - min_virtual: samples - .iter() - .map(|x| x.min_virtual) - .fold(f64::INFINITY, f64::min) - / MIB_CONVERSION_FACTOR, - max_virtual: samples - .iter() - .map(|x| x.max_virtual) - .fold(f64::NEG_INFINITY, f64::max) - / MIB_CONVERSION_FACTOR, - } -} - criterion_group!(benches, combine_memory); criterion_main!(benches); diff --git a/memory-benchmarking/Cargo.toml b/memory-benchmarking/Cargo.toml new file mode 100644 index 00000000..8785f7b4 --- /dev/null +++ b/memory-benchmarking/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "memory-benchmarking" +authors.workspace = true +edition.workspace = true +license.workspace = true +version.workspace = true + +[dependencies] +serde = { workspace = true, features = ["derive"] } +sysinfo.workspace = true + +[lints] +workspace = true diff --git a/memory-benchmarking/src/lib.rs b/memory-benchmarking/src/lib.rs new file mode 100644 index 00000000..83ab329a --- /dev/null +++ b/memory-benchmarking/src/lib.rs @@ -0,0 +1,175 @@ +use sysinfo::{Pid, ProcessExt, System, SystemExt}; + +#[derive(serde::Serialize)] +pub struct BencherOutput { + memory_usage: BencherMetrics, +} + +#[derive(serde::Serialize)] +struct BencherMetrics { + start_rss_mib: MetricEntry, + peak_rss_mib: MetricEntry, + end_rss_mib: MetricEntry, + memory_growth_mib: MetricEntry, + peak_over_start_rss_ratio: MetricEntry, + avg_runtime_rss_mib: MetricEntry, + start_virtual_mib: MetricEntry, + peak_virtual_mib: MetricEntry, + end_virtual_mib: MetricEntry, + virtual_growth_mib: MetricEntry, + peak_over_start_virtual_ratio: MetricEntry, + avg_runtime_virtual_mib: MetricEntry, +} + +#[derive(serde::Serialize, Default)] +struct MetricEntry { + value: f64, + #[serde(skip_serializing_if = "Option::is_none")] + lower_value: Option, + #[serde(skip_serializing_if = "Option::is_none")] + upper_value: Option, +} + +#[derive(Clone, Debug, serde::Serialize)] +pub struct MemoryUsage { + pub start_rss: f64, + pub end_rss: f64, + pub memory_growth: f64, + pub peak_over_start_rss_ratio: f64, + pub avg_rss: f64, + pub min_rss: f64, + pub max_rss: f64, + pub start_virtual: f64, + pub end_virtual: f64, + pub virtual_growth: f64, + pub peak_over_start_virtual_ratio: f64, + pub avg_virtual: f64, + pub min_virtual: f64, + pub max_virtual: f64, +} + +impl From for BencherOutput { + fn from(x: MemoryUsage) -> Self { + BencherOutput { + memory_usage: BencherMetrics { + start_rss_mib: MetricEntry { + value: x.start_rss, + ..Default::default() + }, + peak_rss_mib: MetricEntry { + value: x.max_rss, + ..Default::default() + }, + end_rss_mib: MetricEntry { + value: x.end_rss, + ..Default::default() + }, + memory_growth_mib: MetricEntry { + value: x.memory_growth, + ..Default::default() + }, + peak_over_start_rss_ratio: MetricEntry { + value: x.peak_over_start_rss_ratio, + ..Default::default() + }, + avg_runtime_rss_mib: MetricEntry { + value: x.avg_rss, + lower_value: Some(x.min_rss), + upper_value: Some(x.max_rss), + }, + start_virtual_mib: MetricEntry { + value: x.start_virtual, + ..Default::default() + }, + peak_virtual_mib: MetricEntry { + value: x.max_virtual, + ..Default::default() + }, + end_virtual_mib: MetricEntry { + value: x.end_virtual, + ..Default::default() + }, + virtual_growth_mib: MetricEntry { + value: x.virtual_growth, + ..Default::default() + }, + peak_over_start_virtual_ratio: MetricEntry { + value: x.peak_over_start_virtual_ratio, + ..Default::default() + }, + avg_runtime_virtual_mib: MetricEntry { + value: x.avg_virtual, + lower_value: Some(x.min_virtual), + upper_value: Some(x.max_virtual), + }, + }, + } + } +} + +pub fn aggregate_samples(samples: &[MemoryUsage]) -> MemoryUsage { + let size = samples.len() as f64; + const MIB_CONVERSION_FACTOR: f64 = 1_048_576.0; + + MemoryUsage { + start_rss: samples.iter().map(|x| x.start_rss).sum::() / size / MIB_CONVERSION_FACTOR, + end_rss: samples.iter().map(|x| x.end_rss).sum::() / size / MIB_CONVERSION_FACTOR, + memory_growth: samples.iter().map(|x| x.memory_growth).sum::() + / size + / MIB_CONVERSION_FACTOR, + peak_over_start_rss_ratio: samples + .iter() + .map(|x| x.peak_over_start_rss_ratio) + .sum::() + / size, + avg_rss: samples.iter().map(|x| x.avg_rss).sum::() / size / MIB_CONVERSION_FACTOR, + min_rss: samples + .iter() + .map(|x| x.min_rss) + .fold(f64::INFINITY, f64::min) + / MIB_CONVERSION_FACTOR, + max_rss: samples + .iter() + .map(|x| x.max_rss) + .fold(f64::NEG_INFINITY, f64::max) + / MIB_CONVERSION_FACTOR, + + start_virtual: samples.iter().map(|x| x.start_virtual).sum::() + / size + / MIB_CONVERSION_FACTOR, + end_virtual: samples.iter().map(|x| x.end_virtual).sum::() + / size + / MIB_CONVERSION_FACTOR, + virtual_growth: samples.iter().map(|x| x.virtual_growth).sum::() + / size + / MIB_CONVERSION_FACTOR, + peak_over_start_virtual_ratio: samples + .iter() + .map(|x| x.peak_over_start_virtual_ratio) + .sum::() + / size, + avg_virtual: samples.iter().map(|x| x.avg_virtual).sum::() + / size + / MIB_CONVERSION_FACTOR, + min_virtual: samples + .iter() + .map(|x| x.min_virtual) + .fold(f64::INFINITY, f64::min) + / MIB_CONVERSION_FACTOR, + max_virtual: samples + .iter() + .map(|x| x.max_virtual) + .fold(f64::NEG_INFINITY, f64::max) + / MIB_CONVERSION_FACTOR, + } +} + +pub fn get_memory_stats() -> (f64, f64) { + let mut system = System::new(); + system.refresh_process(Pid::from(std::process::id() as usize)); + + system + .process(Pid::from(std::process::id() as usize)) + .map(|process| (process.memory() as f64, process.virtual_memory() as f64)) + .unwrap_or_default() +} From a4dfacf993b619fa5e9d04924f881e2dddce155e Mon Sep 17 00:00:00 2001 From: Jean-Baptiste Skutnik Date: Mon, 22 Sep 2025 15:36:22 +0530 Subject: [PATCH 03/13] Simplify usage --- Cargo.lock | 1 + lustre-collector/benches/combine_memory.rs | 81 +++++------------- memory-benchmarking/Cargo.toml | 1 + memory-benchmarking/src/lib.rs | 98 ++++++++++++++++------ 4 files changed, 95 insertions(+), 86 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e276dc65..f92a94af 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1425,6 +1425,7 @@ version = "0.11.0" dependencies = [ "serde", "sysinfo", + "thiserror", ] [[package]] diff --git a/lustre-collector/benches/combine_memory.rs b/lustre-collector/benches/combine_memory.rs index 399a3b55..8df9e982 100644 --- a/lustre-collector/benches/combine_memory.rs +++ b/lustre-collector/benches/combine_memory.rs @@ -1,47 +1,25 @@ use combine::parser::EasyParser; use criterion::{Criterion, criterion_group, criterion_main}; use lustre_collector::quota::parse as combine_parse; -use memory_benchmarking::{BencherOutput, MemoryUsage, aggregate_samples, get_memory_stats}; +use memory_benchmarking::{ + BencherOutput, Error as MemBenchError, MemoryUsage, aggregate_samples, sample_memory, +}; use std::{fs::File, io::Read, time::Duration}; -async fn test_combine_with_mem(buffer: &str) -> MemoryUsage { - let (start_rss, start_virtual) = get_memory_stats(); - let mut rss_values = vec![]; - let mut virtual_values = vec![]; - - rss_values.push(start_rss); - virtual_values.push(start_virtual); +async fn test_combine_with_mem(buffer: &str) -> Result { + let mut samples = vec![sample_memory()]; let (tx, mut rx) = tokio::sync::oneshot::channel::<()>(); let monitor_handle = tokio::spawn(async move { - let mut min_rss = start_rss; - let mut peak_rss = start_rss; - let mut min_virtual = start_virtual; - let mut peak_virtual = start_virtual; let mut interval = tokio::time::interval(Duration::from_millis(100)); while rx.try_recv().is_err() { interval.tick().await; - let (current_rss, current_virtual) = get_memory_stats(); - - rss_values.push(current_rss); - min_rss = min_rss.min(current_rss); - peak_rss = peak_rss.max(current_rss); - - virtual_values.push(current_virtual); - min_virtual = min_virtual.min(current_virtual); - peak_virtual = peak_virtual.max(current_virtual); + samples.push(sample_memory()); } - ( - rss_values, - min_rss, - peak_rss, - virtual_values, - min_virtual, - peak_virtual, - ) + samples }); let mut needle = buffer; @@ -52,39 +30,19 @@ async fn test_combine_with_mem(buffer: &str) -> MemoryUsage { tx.send(()) .expect("Failed to send stop signal to memory monitor"); - let (rss_values, mut min_rss, mut max_rss, virtual_values, mut min_virtual, mut max_virtual) = - monitor_handle - .await - .expect("Failed to collect memory metrics from run."); - - let (end_rss, end_virtual) = get_memory_stats(); - - min_rss = min_rss.min(end_rss); - max_rss = max_rss.max(end_rss); - - min_virtual = min_virtual.min(end_virtual); - max_virtual = max_virtual.max(end_virtual); - - MemoryUsage { - start_rss, - end_rss, - memory_growth: end_rss - start_rss, - peak_over_start_rss_ratio: max_rss / start_rss, - avg_rss: rss_values.iter().sum::() / rss_values.len() as f64, - min_rss, - max_rss, - start_virtual, - end_virtual, - virtual_growth: end_virtual - start_virtual, - peak_over_start_virtual_ratio: max_virtual / start_virtual, - avg_virtual: virtual_values.iter().sum::() / virtual_values.len() as f64, - min_virtual, - max_virtual, - } + let mut samples = monitor_handle + .await + .expect("Failed to collect memory metrics from run."); + + samples.push(sample_memory()); + + let mem = MemoryUsage::try_from(samples.as_slice())?; + + Ok(mem) } pub fn combine_memory(c: &mut Criterion) { - let (tx, rx) = std::sync::mpsc::channel::(); + let (tx, rx) = std::sync::mpsc::channel(); let rt = tokio::runtime::Builder::new_multi_thread() .enable_time() @@ -115,7 +73,10 @@ pub fn combine_memory(c: &mut Criterion) { drop(tx); - let samples = rx.iter().collect::>(); + let samples = rx + .iter() + .collect::, _>>() + .expect("Failed to get memory information"); if !samples.is_empty() { let aggregated = aggregate_samples(&samples); diff --git a/memory-benchmarking/Cargo.toml b/memory-benchmarking/Cargo.toml index 8785f7b4..067b4e2e 100644 --- a/memory-benchmarking/Cargo.toml +++ b/memory-benchmarking/Cargo.toml @@ -8,6 +8,7 @@ version.workspace = true [dependencies] serde = { workspace = true, features = ["derive"] } sysinfo.workspace = true +thiserror.workspace = true [lints] workspace = true diff --git a/memory-benchmarking/src/lib.rs b/memory-benchmarking/src/lib.rs index 83ab329a..d42fca78 100644 --- a/memory-benchmarking/src/lib.rs +++ b/memory-benchmarking/src/lib.rs @@ -34,20 +34,34 @@ struct MetricEntry { pub struct MemoryUsage { pub start_rss: f64, pub end_rss: f64, - pub memory_growth: f64, - pub peak_over_start_rss_ratio: f64, pub avg_rss: f64, pub min_rss: f64, pub max_rss: f64, pub start_virtual: f64, pub end_virtual: f64, - pub virtual_growth: f64, - pub peak_over_start_virtual_ratio: f64, pub avg_virtual: f64, pub min_virtual: f64, pub max_virtual: f64, } +impl MemoryUsage { + fn memory_growth(&self) -> f64 { + self.end_rss - self.start_rss + } + + fn peak_over_start_rss(&self) -> f64 { + self.max_rss / self.start_rss + } + + fn virtual_growth(&self) -> f64 { + self.end_virtual - self.start_virtual + } + + fn peak_over_start_virtual(&self) -> f64 { + self.max_virtual / self.start_virtual + } +} + impl From for BencherOutput { fn from(x: MemoryUsage) -> Self { BencherOutput { @@ -65,11 +79,11 @@ impl From for BencherOutput { ..Default::default() }, memory_growth_mib: MetricEntry { - value: x.memory_growth, + value: x.memory_growth(), ..Default::default() }, peak_over_start_rss_ratio: MetricEntry { - value: x.peak_over_start_rss_ratio, + value: x.peak_over_start_rss(), ..Default::default() }, avg_runtime_rss_mib: MetricEntry { @@ -90,11 +104,11 @@ impl From for BencherOutput { ..Default::default() }, virtual_growth_mib: MetricEntry { - value: x.virtual_growth, + value: x.virtual_growth(), ..Default::default() }, peak_over_start_virtual_ratio: MetricEntry { - value: x.peak_over_start_virtual_ratio, + value: x.peak_over_start_virtual(), ..Default::default() }, avg_runtime_virtual_mib: MetricEntry { @@ -114,14 +128,6 @@ pub fn aggregate_samples(samples: &[MemoryUsage]) -> MemoryUsage { MemoryUsage { start_rss: samples.iter().map(|x| x.start_rss).sum::() / size / MIB_CONVERSION_FACTOR, end_rss: samples.iter().map(|x| x.end_rss).sum::() / size / MIB_CONVERSION_FACTOR, - memory_growth: samples.iter().map(|x| x.memory_growth).sum::() - / size - / MIB_CONVERSION_FACTOR, - peak_over_start_rss_ratio: samples - .iter() - .map(|x| x.peak_over_start_rss_ratio) - .sum::() - / size, avg_rss: samples.iter().map(|x| x.avg_rss).sum::() / size / MIB_CONVERSION_FACTOR, min_rss: samples .iter() @@ -140,14 +146,6 @@ pub fn aggregate_samples(samples: &[MemoryUsage]) -> MemoryUsage { end_virtual: samples.iter().map(|x| x.end_virtual).sum::() / size / MIB_CONVERSION_FACTOR, - virtual_growth: samples.iter().map(|x| x.virtual_growth).sum::() - / size - / MIB_CONVERSION_FACTOR, - peak_over_start_virtual_ratio: samples - .iter() - .map(|x| x.peak_over_start_virtual_ratio) - .sum::() - / size, avg_virtual: samples.iter().map(|x| x.avg_virtual).sum::() / size / MIB_CONVERSION_FACTOR, @@ -164,12 +162,60 @@ pub fn aggregate_samples(samples: &[MemoryUsage]) -> MemoryUsage { } } -pub fn get_memory_stats() -> (f64, f64) { +#[derive(Default)] +pub struct Sample(f64, f64); + +#[derive(thiserror::Error, Debug, Clone)] +pub enum Error { + #[error("Not enough samples")] + NotEnoughSamples, +} + +impl TryFrom<&[Sample]> for MemoryUsage { + type Error = Error; + + fn try_from(value: &[Sample]) -> Result { + if value.is_empty() { + return Err(Error::NotEnoughSamples); + } + + Ok(MemoryUsage { + start_rss: value.first().map(|s| s.0).unwrap_or_default(), + end_rss: value.last().map(|s| s.0).unwrap_or_default(), + avg_rss: value.iter().map(|s| s.0).sum::() / (value.len() as f64), + min_rss: value + .iter() + .map(|s| s.0) + .reduce(f64::min) + .unwrap_or_default(), + max_rss: value + .iter() + .map(|s| s.0) + .reduce(f64::max) + .unwrap_or_default(), + start_virtual: value.first().map(|s| s.1).unwrap_or_default(), + end_virtual: value.last().map(|s| s.1).unwrap_or_default(), + avg_virtual: value.iter().map(|s| s.1).sum::() / (value.len() as f64), + min_virtual: value + .iter() + .map(|s| s.1) + .reduce(f64::min) + .unwrap_or_default(), + max_virtual: value + .iter() + .map(|s| s.1) + .reduce(f64::max) + .unwrap_or_default(), + }) + } +} + +pub fn sample_memory() -> Sample { let mut system = System::new(); system.refresh_process(Pid::from(std::process::id() as usize)); system .process(Pid::from(std::process::id() as usize)) - .map(|process| (process.memory() as f64, process.virtual_memory() as f64)) + .map(|process| Sample(process.memory() as f64, process.virtual_memory() as f64)) .unwrap_or_default() } From c249c2b86199b992d8a77bc7bccd2bcc189b5166 Mon Sep 17 00:00:00 2001 From: Jean-Baptiste Skutnik Date: Mon, 22 Sep 2025 16:25:05 +0530 Subject: [PATCH 04/13] Generic trace_memory command --- Cargo.lock | 1 + lustre-collector/benches/combine_memory.rs | 62 ++++++---------------- memory-benchmarking/Cargo.toml | 1 + memory-benchmarking/src/lib.rs | 33 ++++++++++++ 4 files changed, 51 insertions(+), 46 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f92a94af..cc231184 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1426,6 +1426,7 @@ dependencies = [ "serde", "sysinfo", "thiserror", + "tokio", ] [[package]] diff --git a/lustre-collector/benches/combine_memory.rs b/lustre-collector/benches/combine_memory.rs index 8df9e982..46c221bc 100644 --- a/lustre-collector/benches/combine_memory.rs +++ b/lustre-collector/benches/combine_memory.rs @@ -1,48 +1,11 @@ use combine::parser::EasyParser; use criterion::{Criterion, criterion_group, criterion_main}; use lustre_collector::quota::parse as combine_parse; -use memory_benchmarking::{ - BencherOutput, Error as MemBenchError, MemoryUsage, aggregate_samples, sample_memory, -}; -use std::{fs::File, io::Read, time::Duration}; - -async fn test_combine_with_mem(buffer: &str) -> Result { - let mut samples = vec![sample_memory()]; - - let (tx, mut rx) = tokio::sync::oneshot::channel::<()>(); - let monitor_handle = tokio::spawn(async move { - let mut interval = tokio::time::interval(Duration::from_millis(100)); - - while rx.try_recv().is_err() { - interval.tick().await; - - samples.push(sample_memory()); - } - - samples - }); - - let mut needle = buffer; - while let Ok((_, e)) = combine_parse().easy_parse(needle) { - needle = e; - } - - tx.send(()) - .expect("Failed to send stop signal to memory monitor"); - - let mut samples = monitor_handle - .await - .expect("Failed to collect memory metrics from run."); - - samples.push(sample_memory()); - - let mem = MemoryUsage::try_from(samples.as_slice())?; - - Ok(mem) -} +use memory_benchmarking::{BencherOutput, MemoryUsage, aggregate_samples, trace_memory}; +use std::{fs::File, io::Read, sync::mpsc, time::Duration}; pub fn combine_memory(c: &mut Criterion) { - let (tx, rx) = std::sync::mpsc::channel(); + let (tx, rx) = mpsc::channel(); let rt = tokio::runtime::Builder::new_multi_thread() .enable_time() @@ -63,8 +26,18 @@ pub fn combine_memory(c: &mut Criterion) { group.bench_with_input("combine_memory", &raw, |b, input| { b.to_async(&rt).iter(|| async { - let memory_usage = test_combine_with_mem(input).await; - + let routine = move || { + let mut needle = input.as_str(); + while let Ok((_, e)) = combine_parse().easy_parse(needle) { + needle = e; + } + }; + + let memory_usage: MemoryUsage = trace_memory(routine, Duration::from_millis(100)) + .await + .as_slice() + .try_into() + .expect("Failed to extract memory usage from samples"); let _ = tx.send(memory_usage.clone()); }) }); @@ -73,10 +46,7 @@ pub fn combine_memory(c: &mut Criterion) { drop(tx); - let samples = rx - .iter() - .collect::, _>>() - .expect("Failed to get memory information"); + let samples = rx.iter().collect::>(); if !samples.is_empty() { let aggregated = aggregate_samples(&samples); diff --git a/memory-benchmarking/Cargo.toml b/memory-benchmarking/Cargo.toml index 067b4e2e..cd3f8a0c 100644 --- a/memory-benchmarking/Cargo.toml +++ b/memory-benchmarking/Cargo.toml @@ -9,6 +9,7 @@ version.workspace = true serde = { workspace = true, features = ["derive"] } sysinfo.workspace = true thiserror.workspace = true +tokio = { workspace = true, features = ["full"] } [lints] workspace = true diff --git a/memory-benchmarking/src/lib.rs b/memory-benchmarking/src/lib.rs index d42fca78..1f022522 100644 --- a/memory-benchmarking/src/lib.rs +++ b/memory-benchmarking/src/lib.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + use sysinfo::{Pid, ProcessExt, System, SystemExt}; #[derive(serde::Serialize)] @@ -219,3 +221,34 @@ pub fn sample_memory() -> Sample { .map(|process| Sample(process.memory() as f64, process.virtual_memory() as f64)) .unwrap_or_default() } + +pub async fn trace_memory(mut routine: impl FnMut(), duration: Duration) -> Vec { + let mut samples = vec![sample_memory()]; + + let (abort_sender, mut abort_receiver) = tokio::sync::oneshot::channel::<()>(); + let monitor_handle = tokio::spawn(async move { + let mut interval = tokio::time::interval(duration); + + while abort_receiver.try_recv().is_err() { + interval.tick().await; + + samples.push(sample_memory()); + } + + samples + }); + + routine(); + + abort_sender + .send(()) + .expect("Failed to send stop signal to memory monitor"); + + let mut samples = monitor_handle + .await + .expect("Failed to collect memory metrics from run."); + + samples.push(sample_memory()); + + samples +} From 065b5f3bbb14c94d15f2e421061b20b87dfd1274 Mon Sep 17 00:00:00 2001 From: Jean-Baptiste Skutnik Date: Mon, 22 Sep 2025 16:40:38 +0530 Subject: [PATCH 05/13] Move runtime creation to trace_memory command --- lustre-collector/benches/combine_memory.rs | 9 +---- lustre-collector/combine_mem_usage.json | 44 ++++++++++++++++++++ memory-benchmarking/src/lib.rs | 47 +++++++++++++--------- 3 files changed, 72 insertions(+), 28 deletions(-) create mode 100644 lustre-collector/combine_mem_usage.json diff --git a/lustre-collector/benches/combine_memory.rs b/lustre-collector/benches/combine_memory.rs index 46c221bc..0a190237 100644 --- a/lustre-collector/benches/combine_memory.rs +++ b/lustre-collector/benches/combine_memory.rs @@ -7,12 +7,6 @@ use std::{fs::File, io::Read, sync::mpsc, time::Duration}; pub fn combine_memory(c: &mut Criterion) { let (tx, rx) = mpsc::channel(); - let rt = tokio::runtime::Builder::new_multi_thread() - .enable_time() - .enable_io() - .build() - .expect("Failed to build tokio runtime"); - let mut group = c.benchmark_group("parse_benchmarks"); group.sample_size(10); @@ -25,7 +19,7 @@ pub fn combine_memory(c: &mut Criterion) { .expect("Failed to read file"); group.bench_with_input("combine_memory", &raw, |b, input| { - b.to_async(&rt).iter(|| async { + b.iter(|| { let routine = move || { let mut needle = input.as_str(); while let Ok((_, e)) = combine_parse().easy_parse(needle) { @@ -34,7 +28,6 @@ pub fn combine_memory(c: &mut Criterion) { }; let memory_usage: MemoryUsage = trace_memory(routine, Duration::from_millis(100)) - .await .as_slice() .try_into() .expect("Failed to extract memory usage from samples"); diff --git a/lustre-collector/combine_mem_usage.json b/lustre-collector/combine_mem_usage.json new file mode 100644 index 00000000..91eae2b6 --- /dev/null +++ b/lustre-collector/combine_mem_usage.json @@ -0,0 +1,44 @@ +{ + "memory_usage": { + "start_rss_mib": { + "value": 48.78263927003293 + }, + "peak_rss_mib": { + "value": 54.484375 + }, + "end_rss_mib": { + "value": 48.81320321075741 + }, + "memory_growth_mib": { + "value": 0.03056394072447688 + }, + "peak_over_start_rss_ratio": { + "value": 1.1168804274489026 + }, + "avg_runtime_rss_mib": { + "value": 48.79867161772777, + "lower_value": 24.5, + "upper_value": 54.484375 + }, + "start_virtual_mib": { + "value": 402134.2293496158 + }, + "peak_virtual_mib": { + "value": 402380.0625 + }, + "end_virtual_mib": { + "value": 402134.66074368823 + }, + "virtual_growth_mib": { + "value": 0.43139407242415473 + }, + "peak_over_start_virtual_ratio": { + "value": 1.000611321127231 + }, + "avg_runtime_virtual_mib": { + "value": 402134.4801728869, + "lower_value": 401310.0625, + "upper_value": 402380.0625 + } + } +} \ No newline at end of file diff --git a/memory-benchmarking/src/lib.rs b/memory-benchmarking/src/lib.rs index 1f022522..6db6234b 100644 --- a/memory-benchmarking/src/lib.rs +++ b/memory-benchmarking/src/lib.rs @@ -222,33 +222,40 @@ pub fn sample_memory() -> Sample { .unwrap_or_default() } -pub async fn trace_memory(mut routine: impl FnMut(), duration: Duration) -> Vec { - let mut samples = vec![sample_memory()]; +pub fn trace_memory(mut routine: impl FnMut(), duration: Duration) -> Vec { + tokio::runtime::Builder::new_multi_thread() + .enable_time() + .enable_io() + .build() + .expect("Failed to build tokio runtime") + .block_on(async move { + let mut samples = vec![sample_memory()]; - let (abort_sender, mut abort_receiver) = tokio::sync::oneshot::channel::<()>(); - let monitor_handle = tokio::spawn(async move { - let mut interval = tokio::time::interval(duration); + let (abort_sender, mut abort_receiver) = tokio::sync::oneshot::channel::<()>(); + let monitor_handle = tokio::spawn(async move { + let mut interval = tokio::time::interval(duration); - while abort_receiver.try_recv().is_err() { - interval.tick().await; + while abort_receiver.try_recv().is_err() { + interval.tick().await; - samples.push(sample_memory()); - } + samples.push(sample_memory()); + } - samples - }); + samples + }); - routine(); + routine(); - abort_sender - .send(()) - .expect("Failed to send stop signal to memory monitor"); + abort_sender + .send(()) + .expect("Failed to send stop signal to memory monitor"); - let mut samples = monitor_handle - .await - .expect("Failed to collect memory metrics from run."); + let mut samples = monitor_handle + .await + .expect("Failed to collect memory metrics from run."); - samples.push(sample_memory()); + samples.push(sample_memory()); - samples + samples + }) } From 6bee26fae5fc299025b58d65e5f9ad7ad2b877fd Mon Sep 17 00:00:00 2001 From: Jean-Baptiste Skutnik Date: Tue, 23 Sep 2025 14:58:20 +0530 Subject: [PATCH 06/13] Ditch criterion in memory benchmarks --- lustre-collector/benches/combine_memory.rs | 58 ++++++---------------- 1 file changed, 15 insertions(+), 43 deletions(-) diff --git a/lustre-collector/benches/combine_memory.rs b/lustre-collector/benches/combine_memory.rs index 0a190237..61b5541c 100644 --- a/lustre-collector/benches/combine_memory.rs +++ b/lustre-collector/benches/combine_memory.rs @@ -1,60 +1,32 @@ use combine::parser::EasyParser; -use criterion::{Criterion, criterion_group, criterion_main}; use lustre_collector::quota::parse as combine_parse; -use memory_benchmarking::{BencherOutput, MemoryUsage, aggregate_samples, trace_memory}; -use std::{fs::File, io::Read, sync::mpsc, time::Duration}; +use memory_benchmarking::{BencherOutput, trace_memory}; +use std::time::Duration; -pub fn combine_memory(c: &mut Criterion) { - let (tx, rx) = mpsc::channel(); +pub fn main() { + let buffer = std::fs::read_to_string("benches/quotas.yml").expect("Failed to read file"); + let needle = buffer.as_str(); - let mut group = c.benchmark_group("parse_benchmarks"); - - group.sample_size(10); - group.measurement_time(Duration::from_secs(90)); // Allow more time - - let mut raw = String::new(); - File::open("benches/quotas.yml") - .expect("Failed to open file") - .read_to_string(&mut raw) - .expect("Failed to read file"); - - group.bench_with_input("combine_memory", &raw, |b, input| { - b.iter(|| { + let samples: Vec<_> = (0..100) + .map(|_| { let routine = move || { - let mut needle = input.as_str(); + let mut needle = needle; while let Ok((_, e)) = combine_parse().easy_parse(needle) { needle = e; } }; - let memory_usage: MemoryUsage = trace_memory(routine, Duration::from_millis(100)) + trace_memory(routine, Duration::from_millis(10)) .as_slice() .try_into() - .expect("Failed to extract memory usage from samples"); - let _ = tx.send(memory_usage.clone()); + .expect("Failed to extract memory usage from samples") }) - }); - - group.finish(); - - drop(tx); + .collect(); - let samples = rx.iter().collect::>(); + let bencher_output: BencherOutput = samples.as_slice().into(); - if !samples.is_empty() { - let aggregated = aggregate_samples(&samples); + let serialized_metrics = serde_json::to_string_pretty(&bencher_output) + .expect("Failed to serialize benchmark output."); - let bencher_output: BencherOutput = aggregated.into(); - - let serialized_metrics = serde_json::to_string_pretty(&bencher_output) - .expect("Failed to serialize benchmark output."); - - let output = "combine_mem_usage.json"; - std::fs::write(output, serialized_metrics).expect("Failed to write benchmark results"); - - println!("✅ Bencher results written to {output}"); - } + println!("{serialized_metrics}"); } - -criterion_group!(benches, combine_memory); -criterion_main!(benches); From b1ed519be1576f6207630eaf6b64f4d00e616147 Mon Sep 17 00:00:00 2001 From: Jean-Baptiste Skutnik Date: Tue, 23 Sep 2025 16:37:51 +0530 Subject: [PATCH 07/13] Update scrape memory --- Cargo.lock | 1 + lustre-collector/benches/combine_memory.rs | 23 +- lustre-collector/combine_mem_usage.json | 44 --- lustrefs-exporter/Cargo.toml | 1 + lustrefs-exporter/benches/common/mod.rs | 2 + .../benches/scrape_memory_metrics.rs | 353 ++---------------- memory-benchmarking/src/lib.rs | 174 ++++----- 7 files changed, 121 insertions(+), 477 deletions(-) delete mode 100644 lustre-collector/combine_mem_usage.json diff --git a/Cargo.lock b/Cargo.lock index cc231184..1a1b7d4b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1381,6 +1381,7 @@ dependencies = [ "iai-callgrind", "insta", "lustre_collector", + "memory-benchmarking", "pretty_assertions", "prometheus-client", "prometheus-parse", diff --git a/lustre-collector/benches/combine_memory.rs b/lustre-collector/benches/combine_memory.rs index 61b5541c..aa380ca0 100644 --- a/lustre-collector/benches/combine_memory.rs +++ b/lustre-collector/benches/combine_memory.rs @@ -9,17 +9,18 @@ pub fn main() { let samples: Vec<_> = (0..100) .map(|_| { - let routine = move || { - let mut needle = needle; - while let Ok((_, e)) = combine_parse().easy_parse(needle) { - needle = e; - } - }; - - trace_memory(routine, Duration::from_millis(10)) - .as_slice() - .try_into() - .expect("Failed to extract memory usage from samples") + trace_memory( + move || { + let mut needle = needle; + while let Ok((_, e)) = combine_parse().easy_parse(needle) { + needle = e; + } + }, + Duration::from_millis(10), + ) + .as_slice() + .try_into() + .expect("Failed to extract memory usage from samples") }) .collect(); diff --git a/lustre-collector/combine_mem_usage.json b/lustre-collector/combine_mem_usage.json deleted file mode 100644 index 91eae2b6..00000000 --- a/lustre-collector/combine_mem_usage.json +++ /dev/null @@ -1,44 +0,0 @@ -{ - "memory_usage": { - "start_rss_mib": { - "value": 48.78263927003293 - }, - "peak_rss_mib": { - "value": 54.484375 - }, - "end_rss_mib": { - "value": 48.81320321075741 - }, - "memory_growth_mib": { - "value": 0.03056394072447688 - }, - "peak_over_start_rss_ratio": { - "value": 1.1168804274489026 - }, - "avg_runtime_rss_mib": { - "value": 48.79867161772777, - "lower_value": 24.5, - "upper_value": 54.484375 - }, - "start_virtual_mib": { - "value": 402134.2293496158 - }, - "peak_virtual_mib": { - "value": 402380.0625 - }, - "end_virtual_mib": { - "value": 402134.66074368823 - }, - "virtual_growth_mib": { - "value": 0.43139407242415473 - }, - "peak_over_start_virtual_ratio": { - "value": 1.000611321127231 - }, - "avg_runtime_virtual_mib": { - "value": 402134.4801728869, - "lower_value": 401310.0625, - "upper_value": 402380.0625 - } - } -} \ No newline at end of file diff --git a/lustrefs-exporter/Cargo.toml b/lustrefs-exporter/Cargo.toml index 08a28a91..402455a9 100644 --- a/lustrefs-exporter/Cargo.toml +++ b/lustrefs-exporter/Cargo.toml @@ -8,6 +8,7 @@ version.workspace = true axum = { workspace = true, features = ["http2"] } clap = { workspace = true, features = ["derive", "env", "wrap_help", "string"] } lustre_collector.path = "../lustre-collector" +memory-benchmarking = { version = "0.11.0", path = "../memory-benchmarking" } prometheus-client.workspace = true regex = { version = "1", default-features = false, features = [ "perf", diff --git a/lustrefs-exporter/benches/common/mod.rs b/lustrefs-exporter/benches/common/mod.rs index ec723258..99123901 100644 --- a/lustrefs-exporter/benches/common/mod.rs +++ b/lustrefs-exporter/benches/common/mod.rs @@ -2,6 +2,7 @@ // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. +use commandeer_test::commandeer; use std::time::Duration; use tokio::{task::JoinSet, time::Instant}; @@ -18,6 +19,7 @@ async fn make_single_request() -> Result Duration { let start = Instant::now(); diff --git a/lustrefs-exporter/benches/scrape_memory_metrics.rs b/lustrefs-exporter/benches/scrape_memory_metrics.rs index cab94b77..00cf7ee7 100644 --- a/lustrefs-exporter/benches/scrape_memory_metrics.rs +++ b/lustrefs-exporter/benches/scrape_memory_metrics.rs @@ -4,337 +4,36 @@ mod common; -use commandeer_test::commandeer; use common::load_test_concurrent; -use core::f64; -use criterion::{Criterion, criterion_group, criterion_main}; +use memory_benchmarking::{BencherOutput, trace_memory_async}; use std::time::Duration; -use sysinfo::{Pid, ProcessExt, System, SystemExt}; -#[derive(serde::Serialize)] -struct BencherOutput { - scrape_allocations: BencherMetrics, -} - -#[derive(serde::Serialize)] -struct BencherMetrics { - start_rss_mib: MetricEntry, - peak_rss_mib: MetricEntry, - end_rss_mib: MetricEntry, - memory_growth_mib: MetricEntry, - peak_over_start_rss_ratio: MetricEntry, - avg_runtime_rss_mib: MetricEntry, - start_virtual_mib: MetricEntry, - peak_virtual_mib: MetricEntry, - end_virtual_mib: MetricEntry, - virtual_growth_mib: MetricEntry, - peak_over_start_virtual_ratio: MetricEntry, - avg_runtime_virtual_mib: MetricEntry, -} - -#[derive(serde::Serialize)] -struct MetricEntry { - value: f64, - #[serde(skip_serializing_if = "Option::is_none")] - lower_value: Option, - #[serde(skip_serializing_if = "Option::is_none")] - upper_value: Option, -} - -#[derive(Clone, Debug, serde::Serialize)] -struct MemoryUsage { - start_rss: f64, - end_rss: f64, - memory_growth: f64, - peak_over_start_rss_ratio: f64, - avg_rss: f64, - min_rss: f64, - max_rss: f64, - start_virtual: f64, - end_virtual: f64, - virtual_growth: f64, - peak_over_start_virtual_ratio: f64, - avg_virtual: f64, - min_virtual: f64, - max_virtual: f64, -} - -impl From for BencherOutput { - fn from(x: MemoryUsage) -> Self { - BencherOutput { - scrape_allocations: BencherMetrics { - start_rss_mib: MetricEntry { - value: x.start_rss, - lower_value: None, - upper_value: None, - }, - peak_rss_mib: MetricEntry { - value: x.max_rss, - lower_value: None, - upper_value: None, - }, - end_rss_mib: MetricEntry { - value: x.end_rss, - lower_value: None, - upper_value: None, - }, - memory_growth_mib: MetricEntry { - value: x.memory_growth, - lower_value: None, - upper_value: None, - }, - peak_over_start_rss_ratio: MetricEntry { - value: x.peak_over_start_rss_ratio, - lower_value: None, - upper_value: None, - }, - avg_runtime_rss_mib: MetricEntry { - value: x.avg_rss, - lower_value: Some(x.min_rss), - upper_value: Some(x.max_rss), - }, - start_virtual_mib: MetricEntry { - value: x.start_virtual, - lower_value: None, - upper_value: None, - }, - peak_virtual_mib: MetricEntry { - value: x.max_virtual, - lower_value: None, - upper_value: None, - }, - end_virtual_mib: MetricEntry { - value: x.end_virtual, - lower_value: None, - upper_value: None, - }, - virtual_growth_mib: MetricEntry { - value: x.virtual_growth, - lower_value: None, - upper_value: None, - }, - peak_over_start_virtual_ratio: MetricEntry { - value: x.peak_over_start_virtual_ratio, - lower_value: None, - upper_value: None, - }, - avg_runtime_virtual_mib: MetricEntry { - value: x.avg_virtual, - lower_value: Some(x.min_virtual), - upper_value: Some(x.max_virtual), - }, - }, - } - } -} - -fn get_memory_stats() -> (f64, f64) { - let mut system = System::new(); - system.refresh_process(Pid::from(std::process::id() as usize)); - - if let Some(process) = system.process(Pid::from(std::process::id() as usize)) { - (process.memory() as f64, process.virtual_memory() as f64) - } else { - (0.0, 0.0) - } -} - -async fn load_test_with_memory_tracking( - concurrency: usize, - total_requests: usize, -) -> (Duration, MemoryUsage) { - let (start_rss, start_virtual) = get_memory_stats(); - let mut rss_values = vec![]; - let mut virtual_values = vec![]; - - rss_values.push(start_rss); - virtual_values.push(start_virtual); - - let (tx, mut rx) = tokio::sync::oneshot::channel::<()>(); - let monitor_handle = tokio::spawn(async move { - let mut min_rss = start_rss; - let mut peak_rss = start_rss; - let mut min_virtual = start_virtual; - let mut peak_virtual = start_virtual; - let mut interval = tokio::time::interval(Duration::from_millis(100)); - - while rx.try_recv().is_err() { - interval.tick().await; - - let (current_rss, current_virtual) = get_memory_stats(); - - rss_values.push(current_rss); - min_rss = min_rss.min(current_rss); - peak_rss = peak_rss.max(current_rss); - - virtual_values.push(current_virtual); - min_virtual = min_virtual.min(current_virtual); - peak_virtual = peak_virtual.max(current_virtual); - } - - ( - rss_values, - min_rss, - peak_rss, - virtual_values, - min_virtual, - peak_virtual, - ) - }); - - let duration = load_test_concurrent(concurrency, total_requests).await; - - tx.send(()) - .expect("Failed to send stop signal to memory monitor"); - - let (rss_values, mut min_rss, mut max_rss, virtual_values, mut min_virtual, mut max_virtual) = - monitor_handle - .await - .expect("Failed to collect memory metrics from run."); - - let (end_rss, end_virtual) = get_memory_stats(); - - min_rss = min_rss.min(end_rss); - max_rss = max_rss.max(end_rss); - - min_virtual = min_virtual.min(end_virtual); - max_virtual = max_virtual.max(end_virtual); - - let memory_usage = MemoryUsage { - start_rss, - end_rss, - memory_growth: end_rss - start_rss, - peak_over_start_rss_ratio: max_rss / start_rss, - avg_rss: rss_values.iter().sum::() / rss_values.len() as f64, - min_rss, - max_rss, - start_virtual, - end_virtual, - virtual_growth: end_virtual - start_virtual, - peak_over_start_virtual_ratio: max_virtual / start_virtual, - avg_virtual: virtual_values.iter().sum::() / virtual_values.len() as f64, - min_virtual, - max_virtual, - }; - - (duration, memory_usage) -} - -#[commandeer(Replay, "lctl", "lnetctl")] -fn scrape_load_test(c: &mut Criterion) { - let (tx, rx) = std::sync::mpsc::channel::(); - - let rt = tokio::runtime::Builder::new_multi_thread() - .enable_time() - .enable_io() - .build() - .expect("Failed to build tokio runtime"); - - rt.spawn(async move { - let listener = tokio::net::TcpListener::bind(("0.0.0.0", 12345)) - .await - .expect("Failed to bind to port 12345"); - - axum::serve(listener, lustrefs_exporter::routes::app()) - .await - .expect("Failed to serve app."); - }); - - let mut group = c.benchmark_group("scrape_benchmarks"); - - // Load test benchmark (like oha: 1000 requests, 10 concurrent) - group.sample_size(10); // Fewer samples since each does 1000 requests - group.measurement_time(Duration::from_secs(1500)); // Allow more time - - group.bench_function("load_test_1000_req_10_concurrent_sequential", |b| { - let tx = tx.clone(); - - b.to_async(&rt).iter(|| async { - let (duration, memory_usage) = load_test_with_memory_tracking(10, 60).await; - - let _ = tx.send(memory_usage.clone()); - - std::hint::black_box((duration, memory_usage)) +pub fn main() { + let samples: Vec<_> = (0..10) + .map(|_| { + trace_memory_async( + || async { + let listener = tokio::net::TcpListener::bind(("0.0.0.0", 12345)) + .await + .expect("Failed to bind to port 12345"); + + axum::serve(listener, lustrefs_exporter::routes::app()) + .await + .expect("Failed to serve app."); + }, + || async { + load_test_concurrent(10, 60).await; + }, + Duration::from_millis(10), + ) + .as_slice() + .try_into() + .expect("Failed to extract memory usage from samples") }) - }); - - group.finish(); - - drop(tx); - - let samples = rx.iter().collect::>(); - - if !samples.is_empty() { - let aggregated = aggregate_samples(&samples); + .collect(); - let bencher_output: BencherOutput = aggregated.into(); + let serialized_metrics = serde_json::to_string_pretty(&BencherOutput::from(samples.as_slice())) + .expect("Failed to serialize benchmark output."); - let serialized_metrics = serde_json::to_string_pretty(&bencher_output) - .expect("Failed to serialize benchmark output."); - - std::fs::write("scrape_allocations_results.json", serialized_metrics) - .expect("Failed to write benchmark results to `scrape_allocations_results.json`"); - - println!("✅ Bencher results written to scrape_allocations_results.json"); - } + println!("{serialized_metrics}"); } - -fn aggregate_samples(samples: &[MemoryUsage]) -> MemoryUsage { - let size = samples.len() as f64; - const MIB_CONVERSION_FACTOR: f64 = 1_048_576.0; - - MemoryUsage { - start_rss: samples.iter().map(|x| x.start_rss).sum::() / size / MIB_CONVERSION_FACTOR, - end_rss: samples.iter().map(|x| x.end_rss).sum::() / size / MIB_CONVERSION_FACTOR, - memory_growth: samples.iter().map(|x| x.memory_growth).sum::() - / size - / MIB_CONVERSION_FACTOR, - peak_over_start_rss_ratio: samples - .iter() - .map(|x| x.peak_over_start_rss_ratio) - .sum::() - / size, - avg_rss: samples.iter().map(|x| x.avg_rss).sum::() / size / MIB_CONVERSION_FACTOR, - min_rss: samples - .iter() - .map(|x| x.min_rss) - .fold(f64::INFINITY, f64::min) - / MIB_CONVERSION_FACTOR, - max_rss: samples - .iter() - .map(|x| x.max_rss) - .fold(f64::NEG_INFINITY, f64::max) - / MIB_CONVERSION_FACTOR, - - start_virtual: samples.iter().map(|x| x.start_virtual).sum::() - / size - / MIB_CONVERSION_FACTOR, - end_virtual: samples.iter().map(|x| x.end_virtual).sum::() - / size - / MIB_CONVERSION_FACTOR, - virtual_growth: samples.iter().map(|x| x.virtual_growth).sum::() - / size - / MIB_CONVERSION_FACTOR, - peak_over_start_virtual_ratio: samples - .iter() - .map(|x| x.peak_over_start_virtual_ratio) - .sum::() - / size, - avg_virtual: samples.iter().map(|x| x.avg_virtual).sum::() - / size - / MIB_CONVERSION_FACTOR, - min_virtual: samples - .iter() - .map(|x| x.min_virtual) - .fold(f64::INFINITY, f64::min) - / MIB_CONVERSION_FACTOR, - max_virtual: samples - .iter() - .map(|x| x.max_virtual) - .fold(f64::NEG_INFINITY, f64::max) - / MIB_CONVERSION_FACTOR, - } -} - -criterion_group!(benches, scrape_load_test); -criterion_main!(benches); diff --git a/memory-benchmarking/src/lib.rs b/memory-benchmarking/src/lib.rs index 6db6234b..296d8cf0 100644 --- a/memory-benchmarking/src/lib.rs +++ b/memory-benchmarking/src/lib.rs @@ -64,103 +64,38 @@ impl MemoryUsage { } } -impl From for BencherOutput { - fn from(x: MemoryUsage) -> Self { - BencherOutput { - memory_usage: BencherMetrics { - start_rss_mib: MetricEntry { - value: x.start_rss, - ..Default::default() - }, - peak_rss_mib: MetricEntry { - value: x.max_rss, - ..Default::default() - }, - end_rss_mib: MetricEntry { - value: x.end_rss, - ..Default::default() - }, - memory_growth_mib: MetricEntry { - value: x.memory_growth(), - ..Default::default() - }, - peak_over_start_rss_ratio: MetricEntry { - value: x.peak_over_start_rss(), - ..Default::default() - }, - avg_runtime_rss_mib: MetricEntry { - value: x.avg_rss, - lower_value: Some(x.min_rss), - upper_value: Some(x.max_rss), - }, - start_virtual_mib: MetricEntry { - value: x.start_virtual, - ..Default::default() - }, - peak_virtual_mib: MetricEntry { - value: x.max_virtual, - ..Default::default() - }, - end_virtual_mib: MetricEntry { - value: x.end_virtual, - ..Default::default() - }, - virtual_growth_mib: MetricEntry { - value: x.virtual_growth(), - ..Default::default() - }, - peak_over_start_virtual_ratio: MetricEntry { - value: x.peak_over_start_virtual(), - ..Default::default() - }, - avg_runtime_virtual_mib: MetricEntry { - value: x.avg_virtual, - lower_value: Some(x.min_virtual), - upper_value: Some(x.max_virtual), - }, - }, +trait GetStatsExt { + fn get_stats(&self, accessor: impl Fn(&MemoryUsage) -> f64 + Copy) -> MetricEntry; +} + +impl GetStatsExt for &[MemoryUsage] { + fn get_stats(&self, accessor: impl Fn(&MemoryUsage) -> f64 + Copy) -> MetricEntry { + MetricEntry { + value: self.iter().map(accessor).sum::() / self.len() as f64, + lower_value: self.iter().map(accessor).reduce(f64::min), + upper_value: self.iter().map(accessor).reduce(f64::max), } } } -pub fn aggregate_samples(samples: &[MemoryUsage]) -> MemoryUsage { - let size = samples.len() as f64; - const MIB_CONVERSION_FACTOR: f64 = 1_048_576.0; - - MemoryUsage { - start_rss: samples.iter().map(|x| x.start_rss).sum::() / size / MIB_CONVERSION_FACTOR, - end_rss: samples.iter().map(|x| x.end_rss).sum::() / size / MIB_CONVERSION_FACTOR, - avg_rss: samples.iter().map(|x| x.avg_rss).sum::() / size / MIB_CONVERSION_FACTOR, - min_rss: samples - .iter() - .map(|x| x.min_rss) - .fold(f64::INFINITY, f64::min) - / MIB_CONVERSION_FACTOR, - max_rss: samples - .iter() - .map(|x| x.max_rss) - .fold(f64::NEG_INFINITY, f64::max) - / MIB_CONVERSION_FACTOR, - - start_virtual: samples.iter().map(|x| x.start_virtual).sum::() - / size - / MIB_CONVERSION_FACTOR, - end_virtual: samples.iter().map(|x| x.end_virtual).sum::() - / size - / MIB_CONVERSION_FACTOR, - avg_virtual: samples.iter().map(|x| x.avg_virtual).sum::() - / size - / MIB_CONVERSION_FACTOR, - min_virtual: samples - .iter() - .map(|x| x.min_virtual) - .fold(f64::INFINITY, f64::min) - / MIB_CONVERSION_FACTOR, - max_virtual: samples - .iter() - .map(|x| x.max_virtual) - .fold(f64::NEG_INFINITY, f64::max) - / MIB_CONVERSION_FACTOR, +impl From<&[MemoryUsage]> for BencherOutput { + fn from(samples: &[MemoryUsage]) -> Self { + BencherOutput { + memory_usage: BencherMetrics { + start_rss_mib: samples.get_stats(|x| x.start_rss), + peak_rss_mib: samples.get_stats(|x| x.max_rss), + end_rss_mib: samples.get_stats(|x| x.end_rss), + memory_growth_mib: samples.get_stats(|x| x.memory_growth()), + peak_over_start_rss_ratio: samples.get_stats(|x| x.peak_over_start_rss()), + avg_runtime_rss_mib: samples.get_stats(|x| x.avg_rss), + start_virtual_mib: samples.get_stats(|x| x.start_virtual), + peak_virtual_mib: samples.get_stats(|x| x.max_virtual), + end_virtual_mib: samples.get_stats(|x| x.end_virtual), + virtual_growth_mib: samples.get_stats(|x| x.virtual_growth()), + peak_over_start_virtual_ratio: samples.get_stats(|x| x.peak_over_start_virtual()), + avg_runtime_virtual_mib: samples.get_stats(|x| x.avg_virtual), + }, + } } } @@ -177,7 +112,7 @@ impl TryFrom<&[Sample]> for MemoryUsage { type Error = Error; fn try_from(value: &[Sample]) -> Result { - if value.is_empty() { + if value.len() < 10 { return Err(Error::NotEnoughSamples); } @@ -222,7 +157,7 @@ pub fn sample_memory() -> Sample { .unwrap_or_default() } -pub fn trace_memory(mut routine: impl FnMut(), duration: Duration) -> Vec { +pub fn trace_memory(routine: impl Fn(), duration: Duration) -> Vec { tokio::runtime::Builder::new_multi_thread() .enable_time() .enable_io() @@ -259,3 +194,52 @@ pub fn trace_memory(mut routine: impl FnMut(), duration: Duration) -> Vec( + init: impl Fn() -> I + Send + 'static, + routine: impl Fn() -> F, + duration: Duration, +) -> Vec +where + I: Future + Sized + Send + 'static, + F: Future + Sized, +{ + let rt = tokio::runtime::Builder::new_multi_thread() + .enable_time() + .enable_io() + .build() + .expect("Failed to build tokio runtime"); + + rt.spawn(init()); + + let mut samples = vec![sample_memory()]; + + rt.block_on(async move { + let (abort_sender, mut abort_receiver) = tokio::sync::oneshot::channel::<()>(); + let monitor_handle = tokio::spawn(async move { + let mut interval = tokio::time::interval(duration); + + while abort_receiver.try_recv().is_err() { + interval.tick().await; + + samples.push(sample_memory()); + } + + samples + }); + + routine().await; + + abort_sender + .send(()) + .expect("Failed to send stop signal to memory monitor"); + + let mut samples = monitor_handle + .await + .expect("Failed to collect memory metrics from run."); + + samples.push(sample_memory()); + + samples + }) +} From cc7c6c925a9275383217b8a9e46a76a05af6282c Mon Sep 17 00:00:00 2001 From: Jean-Baptiste Skutnik Date: Tue, 23 Sep 2025 16:45:49 +0530 Subject: [PATCH 08/13] Update workflows --- .github/workflows/quota-parsing.yml | 9 ++------- .github/workflows/scrape-memory-metrics-bench.yml | 9 ++------- 2 files changed, 4 insertions(+), 14 deletions(-) diff --git a/.github/workflows/quota-parsing.yml b/.github/workflows/quota-parsing.yml index 52ac25e2..24e59c6f 100644 --- a/.github/workflows/quota-parsing.yml +++ b/.github/workflows/quota-parsing.yml @@ -58,11 +58,6 @@ jobs: --ci-id '${{ github.workflow_ref }}-performance' \ "cargo bench --bench combine_performance" - - name: Run quota parsing memory usage benchmark - run: | - # Run the benchmark first (suppress Criterion output) - cargo bench --bench combine_memory 2>&1 > /dev/null - - name: Track quota parsing memory usage benchmark on main branch if: github.ref == 'refs/heads/main' && github.event_name == 'push' run: | @@ -81,7 +76,7 @@ jobs: --github-actions '${{ secrets.GITHUB_TOKEN }}' \ --ci-only-thresholds \ --ci-id '${{ github.workflow_ref }}-memory' \ - --file lustre-collector/combine_mem_usage.json + "cargo bench --bench combine_memory" - name: Compare quota parsing memory metrics with main branch if: github.event_name == 'pull_request' @@ -98,4 +93,4 @@ jobs: --github-actions '${{ secrets.GITHUB_TOKEN }}' \ --ci-only-thresholds \ --ci-id '${{ github.workflow_ref }}-memory' \ - --file lustre-collector/combine_mem_usage.json + "cargo bench --bench combine_memory" diff --git a/.github/workflows/scrape-memory-metrics-bench.yml b/.github/workflows/scrape-memory-metrics-bench.yml index a9542995..bbee21df 100644 --- a/.github/workflows/scrape-memory-metrics-bench.yml +++ b/.github/workflows/scrape-memory-metrics-bench.yml @@ -31,11 +31,6 @@ jobs: target/ key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }} - - name: Run Scrape Memory Metrics Benchmark - run: | - # Run the benchmark first (suppress Criterion output) - cargo bench --bench scrape_memory_metrics 2>&1 > /dev/null - - name: Track scrape memory metrics benchmark on main branch if: github.ref == 'refs/heads/main' && github.event_name == 'push' run: | @@ -54,7 +49,7 @@ jobs: --github-actions '${{ secrets.GITHUB_TOKEN }}' \ --ci-only-thresholds \ --ci-id '${{ github.workflow_ref }}' \ - --file lustrefs-exporter/scrape_allocations_results.json + "cargo bench --bench scrape_memory_metrics" - name: Compare scrape memory metrics with main branch if: github.event_name == 'pull_request' @@ -71,4 +66,4 @@ jobs: --github-actions '${{ secrets.GITHUB_TOKEN }}' \ --ci-only-thresholds \ --ci-id '${{ github.workflow_ref }}' \ - --file lustrefs-exporter/scrape_allocations_results.json + "cargo bench --bench scrape_memory_metrics" From a0d30831164b27e10caac7a79fcd35b54f2c956a Mon Sep 17 00:00:00 2001 From: Jean-Baptiste Skutnik Date: Tue, 23 Sep 2025 16:50:15 +0530 Subject: [PATCH 09/13] Keep stdout clear --- lustrefs-exporter/benches/common/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lustrefs-exporter/benches/common/mod.rs b/lustrefs-exporter/benches/common/mod.rs index 99123901..71452688 100644 --- a/lustrefs-exporter/benches/common/mod.rs +++ b/lustrefs-exporter/benches/common/mod.rs @@ -53,7 +53,7 @@ pub async fn load_test_concurrent(concurrency: usize, total_requests: usize) -> let elapsed = start.elapsed(); - println!( + eprintln!( "Load test completed: {successful_requests} successful, {failed_requests} failed requests in {elapsed:?}", ); From 71b699137f2e07acc29c203199e0614b3e0ccc8c Mon Sep 17 00:00:00 2001 From: Jean-Baptiste Skutnik Date: Tue, 23 Sep 2025 17:08:34 +0530 Subject: [PATCH 10/13] Output MiBs again --- memory-benchmarking/src/lib.rs | 40 ++++++++++++++++++++++++---------- 1 file changed, 28 insertions(+), 12 deletions(-) diff --git a/memory-benchmarking/src/lib.rs b/memory-benchmarking/src/lib.rs index 296d8cf0..cbf60e78 100644 --- a/memory-benchmarking/src/lib.rs +++ b/memory-benchmarking/src/lib.rs @@ -32,6 +32,18 @@ struct MetricEntry { upper_value: Option, } +static MIB: f64 = 1024f64 * 1024f64; + +impl MetricEntry { + fn bytes_as_mib(self) -> Self { + MetricEntry { + value: self.value / MIB, + lower_value: self.lower_value.map(|v| v / MIB), + upper_value: self.upper_value.map(|v| v / MIB), + } + } +} + #[derive(Clone, Debug, serde::Serialize)] pub struct MemoryUsage { pub start_rss: f64, @@ -82,18 +94,22 @@ impl From<&[MemoryUsage]> for BencherOutput { fn from(samples: &[MemoryUsage]) -> Self { BencherOutput { memory_usage: BencherMetrics { - start_rss_mib: samples.get_stats(|x| x.start_rss), - peak_rss_mib: samples.get_stats(|x| x.max_rss), - end_rss_mib: samples.get_stats(|x| x.end_rss), - memory_growth_mib: samples.get_stats(|x| x.memory_growth()), - peak_over_start_rss_ratio: samples.get_stats(|x| x.peak_over_start_rss()), - avg_runtime_rss_mib: samples.get_stats(|x| x.avg_rss), - start_virtual_mib: samples.get_stats(|x| x.start_virtual), - peak_virtual_mib: samples.get_stats(|x| x.max_virtual), - end_virtual_mib: samples.get_stats(|x| x.end_virtual), - virtual_growth_mib: samples.get_stats(|x| x.virtual_growth()), - peak_over_start_virtual_ratio: samples.get_stats(|x| x.peak_over_start_virtual()), - avg_runtime_virtual_mib: samples.get_stats(|x| x.avg_virtual), + start_rss_mib: samples.get_stats(|x| x.start_rss).bytes_as_mib(), + peak_rss_mib: samples.get_stats(|x| x.max_rss).bytes_as_mib(), + end_rss_mib: samples.get_stats(|x| x.end_rss).bytes_as_mib(), + memory_growth_mib: samples.get_stats(|x| x.memory_growth()).bytes_as_mib(), + peak_over_start_rss_ratio: samples + .get_stats(|x| x.peak_over_start_rss()) + .bytes_as_mib(), + avg_runtime_rss_mib: samples.get_stats(|x| x.avg_rss).bytes_as_mib(), + start_virtual_mib: samples.get_stats(|x| x.start_virtual).bytes_as_mib(), + peak_virtual_mib: samples.get_stats(|x| x.max_virtual).bytes_as_mib(), + end_virtual_mib: samples.get_stats(|x| x.end_virtual).bytes_as_mib(), + virtual_growth_mib: samples.get_stats(|x| x.virtual_growth()).bytes_as_mib(), + peak_over_start_virtual_ratio: samples + .get_stats(|x| x.peak_over_start_virtual()) + .bytes_as_mib(), + avg_runtime_virtual_mib: samples.get_stats(|x| x.avg_virtual).bytes_as_mib(), }, } } From 5088f4bfd00653d48a99830e919740a0745915a2 Mon Sep 17 00:00:00 2001 From: Jean-Baptiste Skutnik Date: Tue, 23 Sep 2025 17:32:36 +0530 Subject: [PATCH 11/13] empty From 7848e6db7c0bdeab10d02a1ecc61b6aaeaed3d44 Mon Sep 17 00:00:00 2001 From: Jean-Baptiste Skutnik Date: Tue, 23 Sep 2025 17:53:30 +0530 Subject: [PATCH 12/13] Rename benchmarks --- lustre-collector/benches/combine_memory.rs | 11 ++--- .../benches/scrape_memory_metrics.rs | 11 +++-- .../testcmds/cmds_load_test_concurrent.json | 0 memory-benchmarking/src/lib.rs | 45 ++++++++----------- 4 files changed, 32 insertions(+), 35 deletions(-) create mode 100644 lustrefs-exporter/testcmds/cmds_load_test_concurrent.json diff --git a/lustre-collector/benches/combine_memory.rs b/lustre-collector/benches/combine_memory.rs index aa380ca0..fe05be98 100644 --- a/lustre-collector/benches/combine_memory.rs +++ b/lustre-collector/benches/combine_memory.rs @@ -1,7 +1,7 @@ use combine::parser::EasyParser; use lustre_collector::quota::parse as combine_parse; -use memory_benchmarking::{BencherOutput, trace_memory}; -use std::time::Duration; +use memory_benchmarking::{MemoryMetrics, trace_memory}; +use std::{collections::HashMap, time::Duration}; pub fn main() { let buffer = std::fs::read_to_string("benches/quotas.yml").expect("Failed to read file"); @@ -24,10 +24,11 @@ pub fn main() { }) .collect(); - let bencher_output: BencherOutput = samples.as_slice().into(); + let memory_usage: MemoryMetrics = samples.as_slice().into(); - let serialized_metrics = serde_json::to_string_pretty(&bencher_output) - .expect("Failed to serialize benchmark output."); + let serialized_metrics = + serde_json::to_string_pretty(&HashMap::from([("quota_parsing", memory_usage)])) + .expect("Failed to serialize benchmark output."); println!("{serialized_metrics}"); } diff --git a/lustrefs-exporter/benches/scrape_memory_metrics.rs b/lustrefs-exporter/benches/scrape_memory_metrics.rs index 00cf7ee7..2ad7579e 100644 --- a/lustrefs-exporter/benches/scrape_memory_metrics.rs +++ b/lustrefs-exporter/benches/scrape_memory_metrics.rs @@ -5,8 +5,8 @@ mod common; use common::load_test_concurrent; -use memory_benchmarking::{BencherOutput, trace_memory_async}; -use std::time::Duration; +use memory_benchmarking::{MemoryMetrics, trace_memory_async}; +use std::{collections::HashMap, time::Duration}; pub fn main() { let samples: Vec<_> = (0..10) @@ -32,8 +32,11 @@ pub fn main() { }) .collect(); - let serialized_metrics = serde_json::to_string_pretty(&BencherOutput::from(samples.as_slice())) - .expect("Failed to serialize benchmark output."); + let serialized_metrics = serde_json::to_string_pretty(&HashMap::from([( + "scrape_memory_usage", + MemoryMetrics::from(samples.as_slice()), + )])) + .expect("Failed to serialize benchmark output."); println!("{serialized_metrics}"); } diff --git a/lustrefs-exporter/testcmds/cmds_load_test_concurrent.json b/lustrefs-exporter/testcmds/cmds_load_test_concurrent.json new file mode 100644 index 00000000..e69de29b diff --git a/memory-benchmarking/src/lib.rs b/memory-benchmarking/src/lib.rs index cbf60e78..7a138cb6 100644 --- a/memory-benchmarking/src/lib.rs +++ b/memory-benchmarking/src/lib.rs @@ -3,12 +3,7 @@ use std::time::Duration; use sysinfo::{Pid, ProcessExt, System, SystemExt}; #[derive(serde::Serialize)] -pub struct BencherOutput { - memory_usage: BencherMetrics, -} - -#[derive(serde::Serialize)] -struct BencherMetrics { +pub struct MemoryMetrics { start_rss_mib: MetricEntry, peak_rss_mib: MetricEntry, end_rss_mib: MetricEntry, @@ -90,27 +85,25 @@ impl GetStatsExt for &[MemoryUsage] { } } -impl From<&[MemoryUsage]> for BencherOutput { +impl From<&[MemoryUsage]> for MemoryMetrics { fn from(samples: &[MemoryUsage]) -> Self { - BencherOutput { - memory_usage: BencherMetrics { - start_rss_mib: samples.get_stats(|x| x.start_rss).bytes_as_mib(), - peak_rss_mib: samples.get_stats(|x| x.max_rss).bytes_as_mib(), - end_rss_mib: samples.get_stats(|x| x.end_rss).bytes_as_mib(), - memory_growth_mib: samples.get_stats(|x| x.memory_growth()).bytes_as_mib(), - peak_over_start_rss_ratio: samples - .get_stats(|x| x.peak_over_start_rss()) - .bytes_as_mib(), - avg_runtime_rss_mib: samples.get_stats(|x| x.avg_rss).bytes_as_mib(), - start_virtual_mib: samples.get_stats(|x| x.start_virtual).bytes_as_mib(), - peak_virtual_mib: samples.get_stats(|x| x.max_virtual).bytes_as_mib(), - end_virtual_mib: samples.get_stats(|x| x.end_virtual).bytes_as_mib(), - virtual_growth_mib: samples.get_stats(|x| x.virtual_growth()).bytes_as_mib(), - peak_over_start_virtual_ratio: samples - .get_stats(|x| x.peak_over_start_virtual()) - .bytes_as_mib(), - avg_runtime_virtual_mib: samples.get_stats(|x| x.avg_virtual).bytes_as_mib(), - }, + MemoryMetrics { + start_rss_mib: samples.get_stats(|x| x.start_rss).bytes_as_mib(), + peak_rss_mib: samples.get_stats(|x| x.max_rss).bytes_as_mib(), + end_rss_mib: samples.get_stats(|x| x.end_rss).bytes_as_mib(), + memory_growth_mib: samples.get_stats(|x| x.memory_growth()).bytes_as_mib(), + peak_over_start_rss_ratio: samples + .get_stats(|x| x.peak_over_start_rss()) + .bytes_as_mib(), + avg_runtime_rss_mib: samples.get_stats(|x| x.avg_rss).bytes_as_mib(), + start_virtual_mib: samples.get_stats(|x| x.start_virtual).bytes_as_mib(), + peak_virtual_mib: samples.get_stats(|x| x.max_virtual).bytes_as_mib(), + end_virtual_mib: samples.get_stats(|x| x.end_virtual).bytes_as_mib(), + virtual_growth_mib: samples.get_stats(|x| x.virtual_growth()).bytes_as_mib(), + peak_over_start_virtual_ratio: samples + .get_stats(|x| x.peak_over_start_virtual()) + .bytes_as_mib(), + avg_runtime_virtual_mib: samples.get_stats(|x| x.avg_virtual).bytes_as_mib(), } } } From 77a733541c5f1594945eabbc9994ce66f759dec3 Mon Sep 17 00:00:00 2001 From: Jean-Baptiste Skutnik Date: Thu, 25 Sep 2025 17:04:06 +0530 Subject: [PATCH 13/13] Use divan --- Cargo.lock | 38 ++++ lustrefs-exporter/Cargo.toml | 1 + lustrefs-exporter/benches/common/mod.rs | 10 +- .../benches/scrape_memory_metrics.rs | 164 ++++++++++++++---- 4 files changed, 175 insertions(+), 38 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1a1b7d4b..39616b3c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -455,6 +455,12 @@ version = "0.4.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2957e823c15bde7ecf1e8b64e537aa03a6be5fda0e2334e99887669e75b12e01" +[[package]] +name = "condtype" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf0a07a401f374238ab8e2f11a104d2851bf9ce711ec69804834de8af45c7af" + [[package]] name = "console" version = "0.15.11" @@ -614,6 +620,31 @@ dependencies = [ "syn", ] +[[package]] +name = "divan" +version = "0.1.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a405457ec78b8fe08b0e32b4a3570ab5dff6dd16eb9e76a5ee0a9d9cbd898933" +dependencies = [ + "cfg-if", + "clap", + "condtype", + "divan-macros", + "libc", + "regex-lite", +] + +[[package]] +name = "divan-macros" +version = "0.1.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9556bc800956545d6420a640173e5ba7dfa82f38d3ea5a167eb555bc69ac3323" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "dtoa" version = "1.0.10" @@ -1378,6 +1409,7 @@ dependencies = [ "commandeer-test", "const_format", "criterion", + "divan", "iai-callgrind", "insta", "lustre_collector", @@ -1812,6 +1844,12 @@ dependencies = [ "regex-syntax 0.8.6", ] +[[package]] +name = "regex-lite" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "943f41321c63ef1c92fd763bfe054d2668f7f225a5c29f0105903dc2fc04ba30" + [[package]] name = "regex-syntax" version = "0.6.29" diff --git a/lustrefs-exporter/Cargo.toml b/lustrefs-exporter/Cargo.toml index 402455a9..f96f6186 100644 --- a/lustrefs-exporter/Cargo.toml +++ b/lustrefs-exporter/Cargo.toml @@ -7,6 +7,7 @@ version.workspace = true [dependencies] axum = { workspace = true, features = ["http2"] } clap = { workspace = true, features = ["derive", "env", "wrap_help", "string"] } +divan = "0.1.21" lustre_collector.path = "../lustre-collector" memory-benchmarking = { version = "0.11.0", path = "../memory-benchmarking" } prometheus-client.workspace = true diff --git a/lustrefs-exporter/benches/common/mod.rs b/lustrefs-exporter/benches/common/mod.rs index 71452688..841c3f2f 100644 --- a/lustrefs-exporter/benches/common/mod.rs +++ b/lustrefs-exporter/benches/common/mod.rs @@ -19,7 +19,7 @@ async fn make_single_request() -> Result Duration { let start = Instant::now(); @@ -51,11 +51,5 @@ pub async fn load_test_concurrent(concurrency: usize, total_requests: usize) -> } } - let elapsed = start.elapsed(); - - eprintln!( - "Load test completed: {successful_requests} successful, {failed_requests} failed requests in {elapsed:?}", - ); - - elapsed + start.elapsed() } diff --git a/lustrefs-exporter/benches/scrape_memory_metrics.rs b/lustrefs-exporter/benches/scrape_memory_metrics.rs index 2ad7579e..15064087 100644 --- a/lustrefs-exporter/benches/scrape_memory_metrics.rs +++ b/lustrefs-exporter/benches/scrape_memory_metrics.rs @@ -4,39 +4,143 @@ mod common; +use axum::{ + Router, + body::Body, + extract::Query, + http::{Response, StatusCode, header::CONTENT_TYPE}, +}; use common::load_test_concurrent; -use memory_benchmarking::{MemoryMetrics, trace_memory_async}; -use std::{collections::HashMap, time::Duration}; - -pub fn main() { - let samples: Vec<_> = (0..10) - .map(|_| { - trace_memory_async( - || async { - let listener = tokio::net::TcpListener::bind(("0.0.0.0", 12345)) - .await - .expect("Failed to bind to port 12345"); - - axum::serve(listener, lustrefs_exporter::routes::app()) - .await - .expect("Failed to serve app."); - }, - || async { - load_test_concurrent(10, 60).await; - }, - Duration::from_millis(10), +use lustrefs_exporter::Error; +use memory_benchmarking::trace_memory_async; +use std::time::Duration; +use tokio::task::JoinSet; +use tower::ServiceBuilder; +use tower_http::compression::CompressionLayer; + +use divan::AllocProfiler; + +#[global_allocator] +static ALLOC: AllocProfiler = AllocProfiler::system(); + +#[divan::bench] +pub fn scrape() { + trace_memory_async( + || async { + let listener = tokio::net::TcpListener::bind(("0.0.0.0", 12345)) + .await + .expect("Failed to bind to port 12345"); + + axum::serve(listener, lustrefs_exporter::routes::app()) + .await + .expect("Failed to serve app."); + }, + || async { + load_test_concurrent(10, 60).await; + }, + Duration::from_millis(10), + ) + .as_slice(); +} + +#[divan::bench] +pub fn test_vec() { + let test: Vec = Vec::with_capacity(1024 * 1024 * 1024); +} + +#[divan::bench] +pub fn test_vec_async() { + tokio::runtime::Builder::new_multi_thread() + .enable_time() + .enable_io() + .build() + .expect("Failed to build tokio runtime") + .block_on(async move { + let test: Vec = Vec::with_capacity(1024 * 1024 * 1024); + }) +} + +#[divan::bench] +pub fn test_vec_async_join() { + tokio::runtime::Builder::new_multi_thread() + .enable_time() + .enable_io() + .build() + .expect("Failed to build tokio runtime") + .block_on(async move { + let mut join_set = JoinSet::new(); + for _ in 1..10 { + join_set.spawn(async move { + let test: Vec = Vec::with_capacity(1024 * 1024 * 1024); + }); + } + + join_set.join_all().await; + }) +} + +#[divan::bench] +pub fn test_vec_axum() { + let rt = tokio::runtime::Builder::new_multi_thread() + .enable_time() + .enable_io() + .build() + .expect("Failed to build tokio runtime"); + + pub async fn scrape(Query(_): Query<()>) -> Result, Error> { + let test: Vec = Vec::with_capacity(1024 * 1024 * 1024); + + let resp = Response::builder() + .status(StatusCode::OK) + .header( + CONTENT_TYPE, + "application/openmetrics-text; version=1.0.0; charset=utf-8", ) - .as_slice() - .try_into() - .expect("Failed to extract memory usage from samples") + .body(Body::from("hello"))?; + + Ok(resp) + } + + let (kill_s, kill_r) = tokio::sync::oneshot::channel(); + + let server = async move { + let listener = tokio::net::TcpListener::bind(("0.0.0.0", 12345)) + .await + .expect("Failed to bind to port 12345"); + + axum::serve(listener, { + let load_shedder = ServiceBuilder::new().layer(CompressionLayer::new()); + + Router::new() + .route("/", axum::routing::get(scrape)) + .layer(load_shedder) + }) + .with_graceful_shutdown(async move { + kill_r.await.ok(); }) - .collect(); + .await + .expect("Failed to serve app."); + }; - let serialized_metrics = serde_json::to_string_pretty(&HashMap::from([( - "scrape_memory_usage", - MemoryMetrics::from(samples.as_slice()), - )])) - .expect("Failed to serialize benchmark output."); + let request = async move { + reqwest::get("http://localhost:12345/") + .await + .expect("Request failed") + .text() + .await + .expect("Parsing response failed"); + + kill_s.send(()).ok(); + }; + + rt.block_on(async move { + let mut join_set = JoinSet::new(); + join_set.spawn(request); + join_set.spawn(server); + join_set.join_all().await + }); +} - println!("{serialized_metrics}"); +fn main() { + divan::main() }