diff --git a/.github/workflows/lustre-metrics-bench.yml b/.github/workflows/lustre-metrics-bench.yml index cd46f092..3bfc10a8 100644 --- a/.github/workflows/lustre-metrics-bench.yml +++ b/.github/workflows/lustre-metrics-bench.yml @@ -59,6 +59,7 @@ jobs: --err \ --adapter rust_iai_callgrind \ --github-actions '${{ secrets.GITHUB_TOKEN }}' \ + --ci-only-thresholds \ "cargo bench --bench lustre_metrics" - name: Compare Lustre Metrics Benchmarks with main branch @@ -74,4 +75,5 @@ jobs: --err \ --adapter rust_iai_callgrind \ --github-actions '${{ secrets.GITHUB_TOKEN }}' \ + --ci-only-thresholds \ "cargo bench --bench lustre_metrics" diff --git a/.github/workflows/quota-parsing.yml b/.github/workflows/quota-parsing.yml index 73288ac4..24e59c6f 100644 --- a/.github/workflows/quota-parsing.yml +++ b/.github/workflows/quota-parsing.yml @@ -29,7 +29,7 @@ jobs: --token '${{ secrets.BENCHER_API_TOKEN }}' \ --branch main \ --testbed ci-runner \ - --threshold-measure Latency \ + --threshold-measure latency \ --threshold-test t_test \ --threshold-max-sample-size 64 \ --threshold-lower-boundary 0.95 \ @@ -37,6 +37,8 @@ jobs: --err \ --adapter rust_criterion \ --github-actions '${{ secrets.GITHUB_TOKEN }}' \ + --ci-only-thresholds \ + --ci-id '${{ github.workflow_ref }}-performance' \ "cargo bench --bench combine_performance" - name: Compare quota parsing metrics benchmarks with main branch @@ -52,13 +54,10 @@ jobs: --err \ --adapter rust_criterion \ --github-actions '${{ secrets.GITHUB_TOKEN }}' \ + --ci-only-thresholds \ + --ci-id '${{ github.workflow_ref }}-performance' \ "cargo bench --bench combine_performance" - - name: Run quota parsing memory usage benchmark - run: | - # Run the benchmark first (suppress Criterion output) - cargo bench --bench combine_memory 2>&1 > /dev/null - - name: Track quota parsing memory usage benchmark on main branch if: github.ref == 'refs/heads/main' && github.event_name == 'push' run: | @@ -67,7 +66,7 @@ jobs: --token '${{ secrets.BENCHER_API_TOKEN }}' \ --branch main \ --testbed ci-runner \ - --threshold-measure peak_rss_mib \ + --threshold-measure start_rss_mib \ --threshold-test t_test \ --threshold-max-sample-size 64 \ --threshold-lower-boundary 0.95 \ @@ -75,7 +74,9 @@ jobs: --err \ --adapter json \ --github-actions '${{ secrets.GITHUB_TOKEN }}' \ - --file lustre-collector/combine_mem_usage.json + --ci-only-thresholds \ + --ci-id '${{ github.workflow_ref }}-memory' \ + "cargo bench --bench combine_memory" - name: Compare quota parsing memory metrics with main branch if: github.event_name == 'pull_request' @@ -90,4 +91,6 @@ jobs: --err \ --adapter json \ --github-actions '${{ secrets.GITHUB_TOKEN }}' \ - --file lustre-collector/combine_mem_usage.json + --ci-only-thresholds \ + --ci-id '${{ github.workflow_ref }}-memory' \ + "cargo bench --bench combine_memory" diff --git a/.github/workflows/scrape-memory-metrics-bench.yml b/.github/workflows/scrape-memory-metrics-bench.yml index 920f31f0..bbee21df 100644 --- a/.github/workflows/scrape-memory-metrics-bench.yml +++ b/.github/workflows/scrape-memory-metrics-bench.yml @@ -31,11 +31,6 @@ jobs: target/ key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }} - - name: Run Scrape Memory Metrics Benchmark - run: | - # Run the benchmark first (suppress Criterion output) - cargo bench --bench scrape_memory_metrics 2>&1 > /dev/null - - name: Track scrape memory metrics benchmark on main branch if: github.ref == 'refs/heads/main' && github.event_name == 'push' run: | @@ -52,7 +47,9 @@ jobs: --err \ --adapter json \ --github-actions '${{ secrets.GITHUB_TOKEN }}' \ - --file lustrefs-exporter/scrape_allocations_results.json + --ci-only-thresholds \ + --ci-id '${{ github.workflow_ref }}' \ + "cargo bench --bench scrape_memory_metrics" - name: Compare scrape memory metrics with main branch if: github.event_name == 'pull_request' @@ -67,4 +64,6 @@ jobs: --err \ --adapter json \ --github-actions '${{ secrets.GITHUB_TOKEN }}' \ - --file lustrefs-exporter/scrape_allocations_results.json + --ci-only-thresholds \ + --ci-id '${{ github.workflow_ref }}' \ + "cargo bench --bench scrape_memory_metrics" diff --git a/Cargo.lock b/Cargo.lock index 2e72e1ba..39616b3c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -455,6 +455,12 @@ version = "0.4.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2957e823c15bde7ecf1e8b64e537aa03a6be5fda0e2334e99887669e75b12e01" +[[package]] +name = "condtype" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf0a07a401f374238ab8e2f11a104d2851bf9ce711ec69804834de8af45c7af" + [[package]] name = "console" version = "0.15.11" @@ -614,6 +620,31 @@ dependencies = [ "syn", ] +[[package]] +name = "divan" +version = "0.1.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a405457ec78b8fe08b0e32b4a3570ab5dff6dd16eb9e76a5ee0a9d9cbd898933" +dependencies = [ + "cfg-if", + "clap", + "condtype", + "divan-macros", + "libc", + "regex-lite", +] + +[[package]] +name = "divan-macros" +version = "0.1.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9556bc800956545d6420a640173e5ba7dfa82f38d3ea5a167eb555bc69ac3323" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "dtoa" version = "1.0.10" @@ -1357,6 +1388,7 @@ dependencies = [ "criterion", "include_dir", "insta", + "memory-benchmarking", "serde", "serde_json", "serde_yaml", @@ -1377,9 +1409,11 @@ dependencies = [ "commandeer-test", "const_format", "criterion", + "divan", "iai-callgrind", "insta", "lustre_collector", + "memory-benchmarking", "pretty_assertions", "prometheus-client", "prometheus-parse", @@ -1418,6 +1452,16 @@ version = "2.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32a282da65faaf38286cf3be983213fcf1d2e2a58700e808f83f4ea9a4804bc0" +[[package]] +name = "memory-benchmarking" +version = "0.11.0" +dependencies = [ + "serde", + "sysinfo", + "thiserror", + "tokio", +] + [[package]] name = "mime" version = "0.3.17" @@ -1800,6 +1844,12 @@ dependencies = [ "regex-syntax 0.8.6", ] +[[package]] +name = "regex-lite" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "943f41321c63ef1c92fd763bfe054d2668f7f225a5c29f0105903dc2fc04ba30" + [[package]] name = "regex-syntax" version = "0.6.29" diff --git a/Cargo.toml b/Cargo.toml index bf40bdb3..d5ffef9a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [workspace] -members = ['lustre-collector', 'lustrefs-exporter'] +members = ['lustre-collector', 'lustrefs-exporter', "memory-benchmarking"] resolver = "3" [workspace.package] diff --git a/lustre-collector/Cargo.toml b/lustre-collector/Cargo.toml index ec3e4844..6df3572f 100644 --- a/lustre-collector/Cargo.toml +++ b/lustre-collector/Cargo.toml @@ -9,6 +9,7 @@ version.workspace = true [dependencies] clap = { workspace = true, features = ["derive"] } combine.workspace = true +memory-benchmarking = { version = "0.11.0", path = "../memory-benchmarking" } serde = { workspace = true, features = ["derive"] } serde_json.workspace = true serde_yaml.workspace = true diff --git a/lustre-collector/benches/combine_memory.rs b/lustre-collector/benches/combine_memory.rs index 12120725..fe05be98 100644 --- a/lustre-collector/benches/combine_memory.rs +++ b/lustre-collector/benches/combine_memory.rs @@ -1,321 +1,34 @@ use combine::parser::EasyParser; -use criterion::{Criterion, criterion_group, criterion_main}; use lustre_collector::quota::parse as combine_parse; -use std::{fs::File, io::Read, time::Duration}; -use sysinfo::{Pid, ProcessExt, System, SystemExt}; - -#[derive(serde::Serialize)] -struct BencherOutput { - memory_usage: BencherMetrics, -} - -#[derive(serde::Serialize)] -struct BencherMetrics { - start_rss_mib: MetricEntry, - peak_rss_mib: MetricEntry, - end_rss_mib: MetricEntry, - memory_growth_mib: MetricEntry, - peak_over_start_rss_ratio: MetricEntry, - avg_runtime_rss_mib: MetricEntry, - start_virtual_mib: MetricEntry, - peak_virtual_mib: MetricEntry, - end_virtual_mib: MetricEntry, - virtual_growth_mib: MetricEntry, - peak_over_start_virtual_ratio: MetricEntry, - avg_runtime_virtual_mib: MetricEntry, -} - -#[derive(serde::Serialize)] -struct MetricEntry { - value: f64, - #[serde(skip_serializing_if = "Option::is_none")] - lower_value: Option, - #[serde(skip_serializing_if = "Option::is_none")] - upper_value: Option, -} - -#[derive(Clone, Debug, serde::Serialize)] -struct MemoryUsage { - start_rss: f64, - end_rss: f64, - memory_growth: f64, - peak_over_start_rss_ratio: f64, - avg_rss: f64, - min_rss: f64, - max_rss: f64, - start_virtual: f64, - end_virtual: f64, - virtual_growth: f64, - peak_over_start_virtual_ratio: f64, - avg_virtual: f64, - min_virtual: f64, - max_virtual: f64, -} - -impl From for BencherOutput { - fn from(x: MemoryUsage) -> Self { - BencherOutput { - memory_usage: BencherMetrics { - start_rss_mib: MetricEntry { - value: x.start_rss, - lower_value: None, - upper_value: None, - }, - peak_rss_mib: MetricEntry { - value: x.max_rss, - lower_value: None, - upper_value: None, - }, - end_rss_mib: MetricEntry { - value: x.end_rss, - lower_value: None, - upper_value: None, - }, - memory_growth_mib: MetricEntry { - value: x.memory_growth, - lower_value: None, - upper_value: None, - }, - peak_over_start_rss_ratio: MetricEntry { - value: x.peak_over_start_rss_ratio, - lower_value: None, - upper_value: None, - }, - avg_runtime_rss_mib: MetricEntry { - value: x.avg_rss, - lower_value: Some(x.min_rss), - upper_value: Some(x.max_rss), - }, - start_virtual_mib: MetricEntry { - value: x.start_virtual, - lower_value: None, - upper_value: None, - }, - peak_virtual_mib: MetricEntry { - value: x.max_virtual, - lower_value: None, - upper_value: None, - }, - end_virtual_mib: MetricEntry { - value: x.end_virtual, - lower_value: None, - upper_value: None, - }, - virtual_growth_mib: MetricEntry { - value: x.virtual_growth, - lower_value: None, - upper_value: None, - }, - peak_over_start_virtual_ratio: MetricEntry { - value: x.peak_over_start_virtual_ratio, - lower_value: None, - upper_value: None, - }, - avg_runtime_virtual_mib: MetricEntry { - value: x.avg_virtual, - lower_value: Some(x.min_virtual), - upper_value: Some(x.max_virtual), - }, - }, - } - } -} - -fn get_memory_stats() -> (f64, f64) { - let mut system = System::new(); - system.refresh_process(Pid::from(std::process::id() as usize)); - - if let Some(process) = system.process(Pid::from(std::process::id() as usize)) { - (process.memory() as f64, process.virtual_memory() as f64) - } else { - (0.0, 0.0) - } -} - -async fn test_combine_with_mem(buffer: &str) -> MemoryUsage { - let (start_rss, start_virtual) = get_memory_stats(); - let mut rss_values = vec![]; - let mut virtual_values = vec![]; - - rss_values.push(start_rss); - virtual_values.push(start_virtual); - - let (tx, mut rx) = tokio::sync::oneshot::channel::<()>(); - let monitor_handle = tokio::spawn(async move { - let mut min_rss = start_rss; - let mut peak_rss = start_rss; - let mut min_virtual = start_virtual; - let mut peak_virtual = start_virtual; - let mut interval = tokio::time::interval(Duration::from_millis(100)); - - while rx.try_recv().is_err() { - interval.tick().await; - - let (current_rss, current_virtual) = get_memory_stats(); - - rss_values.push(current_rss); - min_rss = min_rss.min(current_rss); - peak_rss = peak_rss.max(current_rss); - - virtual_values.push(current_virtual); - min_virtual = min_virtual.min(current_virtual); - peak_virtual = peak_virtual.max(current_virtual); - } - - ( - rss_values, - min_rss, - peak_rss, - virtual_values, - min_virtual, - peak_virtual, - ) - }); - - let mut needle = buffer; - while let Ok((_, e)) = combine_parse().easy_parse(needle) { - needle = e; - } - - tx.send(()) - .expect("Failed to send stop signal to memory monitor"); - - let (rss_values, mut min_rss, mut max_rss, virtual_values, mut min_virtual, mut max_virtual) = - monitor_handle - .await - .expect("Failed to collect memory metrics from run."); - - let (end_rss, end_virtual) = get_memory_stats(); - - min_rss = min_rss.min(end_rss); - max_rss = max_rss.max(end_rss); - - min_virtual = min_virtual.min(end_virtual); - max_virtual = max_virtual.max(end_virtual); - - MemoryUsage { - start_rss, - end_rss, - memory_growth: end_rss - start_rss, - peak_over_start_rss_ratio: max_rss / start_rss, - avg_rss: rss_values.iter().sum::() / rss_values.len() as f64, - min_rss, - max_rss, - start_virtual, - end_virtual, - virtual_growth: end_virtual - start_virtual, - peak_over_start_virtual_ratio: max_virtual / start_virtual, - avg_virtual: virtual_values.iter().sum::() / virtual_values.len() as f64, - min_virtual, - max_virtual, - } -} - -pub fn combine_memory(c: &mut Criterion) { - let (tx, rx) = std::sync::mpsc::channel::(); - - let rt = tokio::runtime::Builder::new_multi_thread() - .enable_time() - .enable_io() - .build() - .expect("Failed to build tokio runtime"); - - let mut group = c.benchmark_group("parse_benchmarks"); - - group.sample_size(10); - group.measurement_time(Duration::from_secs(90)); // Allow more time - - let mut raw = String::new(); - File::open("benches/quotas.yml") - .expect("Failed to open file") - .read_to_string(&mut raw) - .expect("Failed to read file"); - - group.bench_with_input("combine_memory", &raw, |b, input| { - b.to_async(&rt).iter(|| async { - let memory_usage = test_combine_with_mem(input).await; - - let _ = tx.send(memory_usage.clone()); +use memory_benchmarking::{MemoryMetrics, trace_memory}; +use std::{collections::HashMap, time::Duration}; + +pub fn main() { + let buffer = std::fs::read_to_string("benches/quotas.yml").expect("Failed to read file"); + let needle = buffer.as_str(); + + let samples: Vec<_> = (0..100) + .map(|_| { + trace_memory( + move || { + let mut needle = needle; + while let Ok((_, e)) = combine_parse().easy_parse(needle) { + needle = e; + } + }, + Duration::from_millis(10), + ) + .as_slice() + .try_into() + .expect("Failed to extract memory usage from samples") }) - }); - - group.finish(); - - drop(tx); - - let samples = rx.iter().collect::>(); + .collect(); - if !samples.is_empty() { - let aggregated = aggregate_samples(&samples); + let memory_usage: MemoryMetrics = samples.as_slice().into(); - let bencher_output: BencherOutput = aggregated.into(); - - let serialized_metrics = serde_json::to_string_pretty(&bencher_output) + let serialized_metrics = + serde_json::to_string_pretty(&HashMap::from([("quota_parsing", memory_usage)])) .expect("Failed to serialize benchmark output."); - let output = "combine_mem_usage.json"; - std::fs::write(output, serialized_metrics).expect("Failed to write benchmark results"); - - println!("✅ Bencher results written to {output}"); - } + println!("{serialized_metrics}"); } - -fn aggregate_samples(samples: &[MemoryUsage]) -> MemoryUsage { - let size = samples.len() as f64; - const MIB_CONVERSION_FACTOR: f64 = 1_048_576.0; - - MemoryUsage { - start_rss: samples.iter().map(|x| x.start_rss).sum::() / size / MIB_CONVERSION_FACTOR, - end_rss: samples.iter().map(|x| x.end_rss).sum::() / size / MIB_CONVERSION_FACTOR, - memory_growth: samples.iter().map(|x| x.memory_growth).sum::() - / size - / MIB_CONVERSION_FACTOR, - peak_over_start_rss_ratio: samples - .iter() - .map(|x| x.peak_over_start_rss_ratio) - .sum::() - / size, - avg_rss: samples.iter().map(|x| x.avg_rss).sum::() / size / MIB_CONVERSION_FACTOR, - min_rss: samples - .iter() - .map(|x| x.min_rss) - .fold(f64::INFINITY, f64::min) - / MIB_CONVERSION_FACTOR, - max_rss: samples - .iter() - .map(|x| x.max_rss) - .fold(f64::NEG_INFINITY, f64::max) - / MIB_CONVERSION_FACTOR, - - start_virtual: samples.iter().map(|x| x.start_virtual).sum::() - / size - / MIB_CONVERSION_FACTOR, - end_virtual: samples.iter().map(|x| x.end_virtual).sum::() - / size - / MIB_CONVERSION_FACTOR, - virtual_growth: samples.iter().map(|x| x.virtual_growth).sum::() - / size - / MIB_CONVERSION_FACTOR, - peak_over_start_virtual_ratio: samples - .iter() - .map(|x| x.peak_over_start_virtual_ratio) - .sum::() - / size, - avg_virtual: samples.iter().map(|x| x.avg_virtual).sum::() - / size - / MIB_CONVERSION_FACTOR, - min_virtual: samples - .iter() - .map(|x| x.min_virtual) - .fold(f64::INFINITY, f64::min) - / MIB_CONVERSION_FACTOR, - max_virtual: samples - .iter() - .map(|x| x.max_virtual) - .fold(f64::NEG_INFINITY, f64::max) - / MIB_CONVERSION_FACTOR, - } -} - -criterion_group!(benches, combine_memory); -criterion_main!(benches); diff --git a/lustre-collector/combine_mem_usage.json b/lustre-collector/combine_mem_usage.json deleted file mode 100644 index b80b60f5..00000000 --- a/lustre-collector/combine_mem_usage.json +++ /dev/null @@ -1,44 +0,0 @@ -{ - "scrape_allocations": { - "start_rss_mib": { - "value": 318.8093196902655 - }, - "peak_rss_mib": { - "value": 359.84375 - }, - "end_rss_mib": { - "value": 320.30074668141594 - }, - "memory_growth_mib": { - "value": 1.4914269911504425 - }, - "peak_over_start_rss_ratio": { - "value": 1.0788454764864892 - }, - "avg_runtime_rss_mib": { - "value": 319.2835635533252, - "lower_value": 24.296875, - "upper_value": 359.84375 - }, - "start_virtual_mib": { - "value": 402485.0271017699 - }, - "peak_virtual_mib": { - "value": 402526.0625 - }, - "end_virtual_mib": { - "value": 402497.05365044245 - }, - "virtual_growth_mib": { - "value": 12.026548672566372 - }, - "peak_over_start_virtual_ratio": { - "value": 1.0000299443068315 - }, - "avg_runtime_virtual_mib": { - "value": 402490.81627407606, - "lower_value": 401167.0625, - "upper_value": 402526.0625 - } - } -} \ No newline at end of file diff --git a/lustrefs-exporter/Cargo.toml b/lustrefs-exporter/Cargo.toml index 08a28a91..f96f6186 100644 --- a/lustrefs-exporter/Cargo.toml +++ b/lustrefs-exporter/Cargo.toml @@ -7,7 +7,9 @@ version.workspace = true [dependencies] axum = { workspace = true, features = ["http2"] } clap = { workspace = true, features = ["derive", "env", "wrap_help", "string"] } +divan = "0.1.21" lustre_collector.path = "../lustre-collector" +memory-benchmarking = { version = "0.11.0", path = "../memory-benchmarking" } prometheus-client.workspace = true regex = { version = "1", default-features = false, features = [ "perf", diff --git a/lustrefs-exporter/benches/common/mod.rs b/lustrefs-exporter/benches/common/mod.rs index ec723258..841c3f2f 100644 --- a/lustrefs-exporter/benches/common/mod.rs +++ b/lustrefs-exporter/benches/common/mod.rs @@ -2,6 +2,7 @@ // Use of this source code is governed by a MIT-style // license that can be found in the LICENSE file. +use commandeer_test::commandeer; use std::time::Duration; use tokio::{task::JoinSet, time::Instant}; @@ -18,6 +19,7 @@ async fn make_single_request() -> Result Duration { let start = Instant::now(); @@ -49,11 +51,5 @@ pub async fn load_test_concurrent(concurrency: usize, total_requests: usize) -> } } - let elapsed = start.elapsed(); - - println!( - "Load test completed: {successful_requests} successful, {failed_requests} failed requests in {elapsed:?}", - ); - - elapsed + start.elapsed() } diff --git a/lustrefs-exporter/benches/scrape_memory_metrics.rs b/lustrefs-exporter/benches/scrape_memory_metrics.rs index cab94b77..15064087 100644 --- a/lustrefs-exporter/benches/scrape_memory_metrics.rs +++ b/lustrefs-exporter/benches/scrape_memory_metrics.rs @@ -4,337 +4,143 @@ mod common; -use commandeer_test::commandeer; +use axum::{ + Router, + body::Body, + extract::Query, + http::{Response, StatusCode, header::CONTENT_TYPE}, +}; use common::load_test_concurrent; -use core::f64; -use criterion::{Criterion, criterion_group, criterion_main}; +use lustrefs_exporter::Error; +use memory_benchmarking::trace_memory_async; use std::time::Duration; -use sysinfo::{Pid, ProcessExt, System, SystemExt}; - -#[derive(serde::Serialize)] -struct BencherOutput { - scrape_allocations: BencherMetrics, -} - -#[derive(serde::Serialize)] -struct BencherMetrics { - start_rss_mib: MetricEntry, - peak_rss_mib: MetricEntry, - end_rss_mib: MetricEntry, - memory_growth_mib: MetricEntry, - peak_over_start_rss_ratio: MetricEntry, - avg_runtime_rss_mib: MetricEntry, - start_virtual_mib: MetricEntry, - peak_virtual_mib: MetricEntry, - end_virtual_mib: MetricEntry, - virtual_growth_mib: MetricEntry, - peak_over_start_virtual_ratio: MetricEntry, - avg_runtime_virtual_mib: MetricEntry, -} - -#[derive(serde::Serialize)] -struct MetricEntry { - value: f64, - #[serde(skip_serializing_if = "Option::is_none")] - lower_value: Option, - #[serde(skip_serializing_if = "Option::is_none")] - upper_value: Option, -} - -#[derive(Clone, Debug, serde::Serialize)] -struct MemoryUsage { - start_rss: f64, - end_rss: f64, - memory_growth: f64, - peak_over_start_rss_ratio: f64, - avg_rss: f64, - min_rss: f64, - max_rss: f64, - start_virtual: f64, - end_virtual: f64, - virtual_growth: f64, - peak_over_start_virtual_ratio: f64, - avg_virtual: f64, - min_virtual: f64, - max_virtual: f64, +use tokio::task::JoinSet; +use tower::ServiceBuilder; +use tower_http::compression::CompressionLayer; + +use divan::AllocProfiler; + +#[global_allocator] +static ALLOC: AllocProfiler = AllocProfiler::system(); + +#[divan::bench] +pub fn scrape() { + trace_memory_async( + || async { + let listener = tokio::net::TcpListener::bind(("0.0.0.0", 12345)) + .await + .expect("Failed to bind to port 12345"); + + axum::serve(listener, lustrefs_exporter::routes::app()) + .await + .expect("Failed to serve app."); + }, + || async { + load_test_concurrent(10, 60).await; + }, + Duration::from_millis(10), + ) + .as_slice(); } -impl From for BencherOutput { - fn from(x: MemoryUsage) -> Self { - BencherOutput { - scrape_allocations: BencherMetrics { - start_rss_mib: MetricEntry { - value: x.start_rss, - lower_value: None, - upper_value: None, - }, - peak_rss_mib: MetricEntry { - value: x.max_rss, - lower_value: None, - upper_value: None, - }, - end_rss_mib: MetricEntry { - value: x.end_rss, - lower_value: None, - upper_value: None, - }, - memory_growth_mib: MetricEntry { - value: x.memory_growth, - lower_value: None, - upper_value: None, - }, - peak_over_start_rss_ratio: MetricEntry { - value: x.peak_over_start_rss_ratio, - lower_value: None, - upper_value: None, - }, - avg_runtime_rss_mib: MetricEntry { - value: x.avg_rss, - lower_value: Some(x.min_rss), - upper_value: Some(x.max_rss), - }, - start_virtual_mib: MetricEntry { - value: x.start_virtual, - lower_value: None, - upper_value: None, - }, - peak_virtual_mib: MetricEntry { - value: x.max_virtual, - lower_value: None, - upper_value: None, - }, - end_virtual_mib: MetricEntry { - value: x.end_virtual, - lower_value: None, - upper_value: None, - }, - virtual_growth_mib: MetricEntry { - value: x.virtual_growth, - lower_value: None, - upper_value: None, - }, - peak_over_start_virtual_ratio: MetricEntry { - value: x.peak_over_start_virtual_ratio, - lower_value: None, - upper_value: None, - }, - avg_runtime_virtual_mib: MetricEntry { - value: x.avg_virtual, - lower_value: Some(x.min_virtual), - upper_value: Some(x.max_virtual), - }, - }, - } - } +#[divan::bench] +pub fn test_vec() { + let test: Vec = Vec::with_capacity(1024 * 1024 * 1024); } -fn get_memory_stats() -> (f64, f64) { - let mut system = System::new(); - system.refresh_process(Pid::from(std::process::id() as usize)); - - if let Some(process) = system.process(Pid::from(std::process::id() as usize)) { - (process.memory() as f64, process.virtual_memory() as f64) - } else { - (0.0, 0.0) - } +#[divan::bench] +pub fn test_vec_async() { + tokio::runtime::Builder::new_multi_thread() + .enable_time() + .enable_io() + .build() + .expect("Failed to build tokio runtime") + .block_on(async move { + let test: Vec = Vec::with_capacity(1024 * 1024 * 1024); + }) } -async fn load_test_with_memory_tracking( - concurrency: usize, - total_requests: usize, -) -> (Duration, MemoryUsage) { - let (start_rss, start_virtual) = get_memory_stats(); - let mut rss_values = vec![]; - let mut virtual_values = vec![]; - - rss_values.push(start_rss); - virtual_values.push(start_virtual); - - let (tx, mut rx) = tokio::sync::oneshot::channel::<()>(); - let monitor_handle = tokio::spawn(async move { - let mut min_rss = start_rss; - let mut peak_rss = start_rss; - let mut min_virtual = start_virtual; - let mut peak_virtual = start_virtual; - let mut interval = tokio::time::interval(Duration::from_millis(100)); - - while rx.try_recv().is_err() { - interval.tick().await; - - let (current_rss, current_virtual) = get_memory_stats(); - - rss_values.push(current_rss); - min_rss = min_rss.min(current_rss); - peak_rss = peak_rss.max(current_rss); - - virtual_values.push(current_virtual); - min_virtual = min_virtual.min(current_virtual); - peak_virtual = peak_virtual.max(current_virtual); - } - - ( - rss_values, - min_rss, - peak_rss, - virtual_values, - min_virtual, - peak_virtual, - ) - }); - - let duration = load_test_concurrent(concurrency, total_requests).await; - - tx.send(()) - .expect("Failed to send stop signal to memory monitor"); - - let (rss_values, mut min_rss, mut max_rss, virtual_values, mut min_virtual, mut max_virtual) = - monitor_handle - .await - .expect("Failed to collect memory metrics from run."); - - let (end_rss, end_virtual) = get_memory_stats(); - - min_rss = min_rss.min(end_rss); - max_rss = max_rss.max(end_rss); - - min_virtual = min_virtual.min(end_virtual); - max_virtual = max_virtual.max(end_virtual); - - let memory_usage = MemoryUsage { - start_rss, - end_rss, - memory_growth: end_rss - start_rss, - peak_over_start_rss_ratio: max_rss / start_rss, - avg_rss: rss_values.iter().sum::() / rss_values.len() as f64, - min_rss, - max_rss, - start_virtual, - end_virtual, - virtual_growth: end_virtual - start_virtual, - peak_over_start_virtual_ratio: max_virtual / start_virtual, - avg_virtual: virtual_values.iter().sum::() / virtual_values.len() as f64, - min_virtual, - max_virtual, - }; - - (duration, memory_usage) +#[divan::bench] +pub fn test_vec_async_join() { + tokio::runtime::Builder::new_multi_thread() + .enable_time() + .enable_io() + .build() + .expect("Failed to build tokio runtime") + .block_on(async move { + let mut join_set = JoinSet::new(); + for _ in 1..10 { + join_set.spawn(async move { + let test: Vec = Vec::with_capacity(1024 * 1024 * 1024); + }); + } + + join_set.join_all().await; + }) } -#[commandeer(Replay, "lctl", "lnetctl")] -fn scrape_load_test(c: &mut Criterion) { - let (tx, rx) = std::sync::mpsc::channel::(); - +#[divan::bench] +pub fn test_vec_axum() { let rt = tokio::runtime::Builder::new_multi_thread() .enable_time() .enable_io() .build() .expect("Failed to build tokio runtime"); - rt.spawn(async move { - let listener = tokio::net::TcpListener::bind(("0.0.0.0", 12345)) - .await - .expect("Failed to bind to port 12345"); - - axum::serve(listener, lustrefs_exporter::routes::app()) - .await - .expect("Failed to serve app."); - }); + pub async fn scrape(Query(_): Query<()>) -> Result, Error> { + let test: Vec = Vec::with_capacity(1024 * 1024 * 1024); - let mut group = c.benchmark_group("scrape_benchmarks"); + let resp = Response::builder() + .status(StatusCode::OK) + .header( + CONTENT_TYPE, + "application/openmetrics-text; version=1.0.0; charset=utf-8", + ) + .body(Body::from("hello"))?; - // Load test benchmark (like oha: 1000 requests, 10 concurrent) - group.sample_size(10); // Fewer samples since each does 1000 requests - group.measurement_time(Duration::from_secs(1500)); // Allow more time + Ok(resp) + } - group.bench_function("load_test_1000_req_10_concurrent_sequential", |b| { - let tx = tx.clone(); + let (kill_s, kill_r) = tokio::sync::oneshot::channel(); - b.to_async(&rt).iter(|| async { - let (duration, memory_usage) = load_test_with_memory_tracking(10, 60).await; + let server = async move { + let listener = tokio::net::TcpListener::bind(("0.0.0.0", 12345)) + .await + .expect("Failed to bind to port 12345"); - let _ = tx.send(memory_usage.clone()); + axum::serve(listener, { + let load_shedder = ServiceBuilder::new().layer(CompressionLayer::new()); - std::hint::black_box((duration, memory_usage)) + Router::new() + .route("/", axum::routing::get(scrape)) + .layer(load_shedder) }) - }); - - group.finish(); - - drop(tx); - - let samples = rx.iter().collect::>(); - - if !samples.is_empty() { - let aggregated = aggregate_samples(&samples); - - let bencher_output: BencherOutput = aggregated.into(); + .with_graceful_shutdown(async move { + kill_r.await.ok(); + }) + .await + .expect("Failed to serve app."); + }; - let serialized_metrics = serde_json::to_string_pretty(&bencher_output) - .expect("Failed to serialize benchmark output."); + let request = async move { + reqwest::get("http://localhost:12345/") + .await + .expect("Request failed") + .text() + .await + .expect("Parsing response failed"); - std::fs::write("scrape_allocations_results.json", serialized_metrics) - .expect("Failed to write benchmark results to `scrape_allocations_results.json`"); + kill_s.send(()).ok(); + }; - println!("✅ Bencher results written to scrape_allocations_results.json"); - } + rt.block_on(async move { + let mut join_set = JoinSet::new(); + join_set.spawn(request); + join_set.spawn(server); + join_set.join_all().await + }); } -fn aggregate_samples(samples: &[MemoryUsage]) -> MemoryUsage { - let size = samples.len() as f64; - const MIB_CONVERSION_FACTOR: f64 = 1_048_576.0; - - MemoryUsage { - start_rss: samples.iter().map(|x| x.start_rss).sum::() / size / MIB_CONVERSION_FACTOR, - end_rss: samples.iter().map(|x| x.end_rss).sum::() / size / MIB_CONVERSION_FACTOR, - memory_growth: samples.iter().map(|x| x.memory_growth).sum::() - / size - / MIB_CONVERSION_FACTOR, - peak_over_start_rss_ratio: samples - .iter() - .map(|x| x.peak_over_start_rss_ratio) - .sum::() - / size, - avg_rss: samples.iter().map(|x| x.avg_rss).sum::() / size / MIB_CONVERSION_FACTOR, - min_rss: samples - .iter() - .map(|x| x.min_rss) - .fold(f64::INFINITY, f64::min) - / MIB_CONVERSION_FACTOR, - max_rss: samples - .iter() - .map(|x| x.max_rss) - .fold(f64::NEG_INFINITY, f64::max) - / MIB_CONVERSION_FACTOR, - - start_virtual: samples.iter().map(|x| x.start_virtual).sum::() - / size - / MIB_CONVERSION_FACTOR, - end_virtual: samples.iter().map(|x| x.end_virtual).sum::() - / size - / MIB_CONVERSION_FACTOR, - virtual_growth: samples.iter().map(|x| x.virtual_growth).sum::() - / size - / MIB_CONVERSION_FACTOR, - peak_over_start_virtual_ratio: samples - .iter() - .map(|x| x.peak_over_start_virtual_ratio) - .sum::() - / size, - avg_virtual: samples.iter().map(|x| x.avg_virtual).sum::() - / size - / MIB_CONVERSION_FACTOR, - min_virtual: samples - .iter() - .map(|x| x.min_virtual) - .fold(f64::INFINITY, f64::min) - / MIB_CONVERSION_FACTOR, - max_virtual: samples - .iter() - .map(|x| x.max_virtual) - .fold(f64::NEG_INFINITY, f64::max) - / MIB_CONVERSION_FACTOR, - } +fn main() { + divan::main() } - -criterion_group!(benches, scrape_load_test); -criterion_main!(benches); diff --git a/lustrefs-exporter/testcmds/cmds_load_test_concurrent.json b/lustrefs-exporter/testcmds/cmds_load_test_concurrent.json new file mode 100644 index 00000000..e69de29b diff --git a/memory-benchmarking/Cargo.toml b/memory-benchmarking/Cargo.toml new file mode 100644 index 00000000..cd3f8a0c --- /dev/null +++ b/memory-benchmarking/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "memory-benchmarking" +authors.workspace = true +edition.workspace = true +license.workspace = true +version.workspace = true + +[dependencies] +serde = { workspace = true, features = ["derive"] } +sysinfo.workspace = true +thiserror.workspace = true +tokio = { workspace = true, features = ["full"] } + +[lints] +workspace = true diff --git a/memory-benchmarking/src/lib.rs b/memory-benchmarking/src/lib.rs new file mode 100644 index 00000000..7a138cb6 --- /dev/null +++ b/memory-benchmarking/src/lib.rs @@ -0,0 +1,254 @@ +use std::time::Duration; + +use sysinfo::{Pid, ProcessExt, System, SystemExt}; + +#[derive(serde::Serialize)] +pub struct MemoryMetrics { + start_rss_mib: MetricEntry, + peak_rss_mib: MetricEntry, + end_rss_mib: MetricEntry, + memory_growth_mib: MetricEntry, + peak_over_start_rss_ratio: MetricEntry, + avg_runtime_rss_mib: MetricEntry, + start_virtual_mib: MetricEntry, + peak_virtual_mib: MetricEntry, + end_virtual_mib: MetricEntry, + virtual_growth_mib: MetricEntry, + peak_over_start_virtual_ratio: MetricEntry, + avg_runtime_virtual_mib: MetricEntry, +} + +#[derive(serde::Serialize, Default)] +struct MetricEntry { + value: f64, + #[serde(skip_serializing_if = "Option::is_none")] + lower_value: Option, + #[serde(skip_serializing_if = "Option::is_none")] + upper_value: Option, +} + +static MIB: f64 = 1024f64 * 1024f64; + +impl MetricEntry { + fn bytes_as_mib(self) -> Self { + MetricEntry { + value: self.value / MIB, + lower_value: self.lower_value.map(|v| v / MIB), + upper_value: self.upper_value.map(|v| v / MIB), + } + } +} + +#[derive(Clone, Debug, serde::Serialize)] +pub struct MemoryUsage { + pub start_rss: f64, + pub end_rss: f64, + pub avg_rss: f64, + pub min_rss: f64, + pub max_rss: f64, + pub start_virtual: f64, + pub end_virtual: f64, + pub avg_virtual: f64, + pub min_virtual: f64, + pub max_virtual: f64, +} + +impl MemoryUsage { + fn memory_growth(&self) -> f64 { + self.end_rss - self.start_rss + } + + fn peak_over_start_rss(&self) -> f64 { + self.max_rss / self.start_rss + } + + fn virtual_growth(&self) -> f64 { + self.end_virtual - self.start_virtual + } + + fn peak_over_start_virtual(&self) -> f64 { + self.max_virtual / self.start_virtual + } +} + +trait GetStatsExt { + fn get_stats(&self, accessor: impl Fn(&MemoryUsage) -> f64 + Copy) -> MetricEntry; +} + +impl GetStatsExt for &[MemoryUsage] { + fn get_stats(&self, accessor: impl Fn(&MemoryUsage) -> f64 + Copy) -> MetricEntry { + MetricEntry { + value: self.iter().map(accessor).sum::() / self.len() as f64, + lower_value: self.iter().map(accessor).reduce(f64::min), + upper_value: self.iter().map(accessor).reduce(f64::max), + } + } +} + +impl From<&[MemoryUsage]> for MemoryMetrics { + fn from(samples: &[MemoryUsage]) -> Self { + MemoryMetrics { + start_rss_mib: samples.get_stats(|x| x.start_rss).bytes_as_mib(), + peak_rss_mib: samples.get_stats(|x| x.max_rss).bytes_as_mib(), + end_rss_mib: samples.get_stats(|x| x.end_rss).bytes_as_mib(), + memory_growth_mib: samples.get_stats(|x| x.memory_growth()).bytes_as_mib(), + peak_over_start_rss_ratio: samples + .get_stats(|x| x.peak_over_start_rss()) + .bytes_as_mib(), + avg_runtime_rss_mib: samples.get_stats(|x| x.avg_rss).bytes_as_mib(), + start_virtual_mib: samples.get_stats(|x| x.start_virtual).bytes_as_mib(), + peak_virtual_mib: samples.get_stats(|x| x.max_virtual).bytes_as_mib(), + end_virtual_mib: samples.get_stats(|x| x.end_virtual).bytes_as_mib(), + virtual_growth_mib: samples.get_stats(|x| x.virtual_growth()).bytes_as_mib(), + peak_over_start_virtual_ratio: samples + .get_stats(|x| x.peak_over_start_virtual()) + .bytes_as_mib(), + avg_runtime_virtual_mib: samples.get_stats(|x| x.avg_virtual).bytes_as_mib(), + } + } +} + +#[derive(Default)] +pub struct Sample(f64, f64); + +#[derive(thiserror::Error, Debug, Clone)] +pub enum Error { + #[error("Not enough samples")] + NotEnoughSamples, +} + +impl TryFrom<&[Sample]> for MemoryUsage { + type Error = Error; + + fn try_from(value: &[Sample]) -> Result { + if value.len() < 10 { + return Err(Error::NotEnoughSamples); + } + + Ok(MemoryUsage { + start_rss: value.first().map(|s| s.0).unwrap_or_default(), + end_rss: value.last().map(|s| s.0).unwrap_or_default(), + avg_rss: value.iter().map(|s| s.0).sum::() / (value.len() as f64), + min_rss: value + .iter() + .map(|s| s.0) + .reduce(f64::min) + .unwrap_or_default(), + max_rss: value + .iter() + .map(|s| s.0) + .reduce(f64::max) + .unwrap_or_default(), + start_virtual: value.first().map(|s| s.1).unwrap_or_default(), + end_virtual: value.last().map(|s| s.1).unwrap_or_default(), + avg_virtual: value.iter().map(|s| s.1).sum::() / (value.len() as f64), + min_virtual: value + .iter() + .map(|s| s.1) + .reduce(f64::min) + .unwrap_or_default(), + max_virtual: value + .iter() + .map(|s| s.1) + .reduce(f64::max) + .unwrap_or_default(), + }) + } +} + +pub fn sample_memory() -> Sample { + let mut system = System::new(); + system.refresh_process(Pid::from(std::process::id() as usize)); + + system + .process(Pid::from(std::process::id() as usize)) + .map(|process| Sample(process.memory() as f64, process.virtual_memory() as f64)) + .unwrap_or_default() +} + +pub fn trace_memory(routine: impl Fn(), duration: Duration) -> Vec { + tokio::runtime::Builder::new_multi_thread() + .enable_time() + .enable_io() + .build() + .expect("Failed to build tokio runtime") + .block_on(async move { + let mut samples = vec![sample_memory()]; + + let (abort_sender, mut abort_receiver) = tokio::sync::oneshot::channel::<()>(); + let monitor_handle = tokio::spawn(async move { + let mut interval = tokio::time::interval(duration); + + while abort_receiver.try_recv().is_err() { + interval.tick().await; + + samples.push(sample_memory()); + } + + samples + }); + + routine(); + + abort_sender + .send(()) + .expect("Failed to send stop signal to memory monitor"); + + let mut samples = monitor_handle + .await + .expect("Failed to collect memory metrics from run."); + + samples.push(sample_memory()); + + samples + }) +} + +pub fn trace_memory_async( + init: impl Fn() -> I + Send + 'static, + routine: impl Fn() -> F, + duration: Duration, +) -> Vec +where + I: Future + Sized + Send + 'static, + F: Future + Sized, +{ + let rt = tokio::runtime::Builder::new_multi_thread() + .enable_time() + .enable_io() + .build() + .expect("Failed to build tokio runtime"); + + rt.spawn(init()); + + let mut samples = vec![sample_memory()]; + + rt.block_on(async move { + let (abort_sender, mut abort_receiver) = tokio::sync::oneshot::channel::<()>(); + let monitor_handle = tokio::spawn(async move { + let mut interval = tokio::time::interval(duration); + + while abort_receiver.try_recv().is_err() { + interval.tick().await; + + samples.push(sample_memory()); + } + + samples + }); + + routine().await; + + abort_sender + .send(()) + .expect("Failed to send stop signal to memory monitor"); + + let mut samples = monitor_handle + .await + .expect("Failed to collect memory metrics from run."); + + samples.push(sample_memory()); + + samples + }) +}