Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions rust/cubestore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,6 @@ members = [
"cuberpc",
"cuberockstore",
]

[profile.bench]
debug = true
16 changes: 14 additions & 2 deletions rust/cubestore/cubestore/benches/cachestore_queue.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
use criterion::{criterion_group, BenchmarkId, Criterion};
use cubestore::cachestore::{
CacheStore, QueueAddPayload, QueueItemStatus, QueueKey, RocksCacheStore,
};
Expand All @@ -7,6 +7,13 @@ use cubestore::CubeError;
use std::sync::Arc;
use tokio::runtime::{Builder, Runtime};

mod tracking_allocator;

use tracking_allocator::TrackingAllocator;

#[global_allocator]
static ALLOCATOR: TrackingAllocator = TrackingAllocator::new();

fn prepare_cachestore(name: &str) -> Result<Arc<RocksCacheStore>, CubeError> {
let config = Config::test(&name).update_config(|mut config| {
// disable periodic eviction
Expand Down Expand Up @@ -188,6 +195,7 @@ fn do_get_bench(
}

fn do_benches(c: &mut Criterion) {
ALLOCATOR.reset_stats();
let runtime = Builder::new_multi_thread().enable_all().build().unwrap();

do_insert_bench(c, &runtime, 512, 64);
Expand All @@ -201,4 +209,8 @@ fn do_benches(c: &mut Criterion) {
}

criterion_group!(benches, do_benches);
criterion_main!(benches);

fn main() {
benches();
ALLOCATOR.print_stats();
}
133 changes: 58 additions & 75 deletions rust/cubestore/cubestore/benches/metastore.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
use criterion::{criterion_group, BenchmarkId, Criterion};
use cubestore::config::Config;
use cubestore::metastore::{BaseRocksStoreFs, Column, ColumnType, MetaStore, RocksMetaStore};
use cubestore::remotefs::LocalDirRemoteFs;
Expand All @@ -8,6 +8,13 @@ use std::fs;
use std::sync::Arc;
use tokio::runtime::{Builder, Runtime};

mod tracking_allocator;

use tracking_allocator::TrackingAllocator;

#[global_allocator]
static ALLOCATOR: TrackingAllocator = TrackingAllocator::new();

fn prepare_metastore(name: &str) -> Result<Arc<RocksMetaStore>, CubeError> {
let config = Config::test(name);

Expand Down Expand Up @@ -97,118 +104,94 @@ async fn bench_get_tables_with_path(
}
}

fn benchmark_get_tables_with_path_small(c: &mut Criterion, runtime: &Runtime) {
fn do_get_tables_with_path_bench(
c: &mut Criterion,
runtime: &Runtime,
num_schemas: usize,
tables_per_schema: usize,
iterations: usize,
) {
let total_tables = num_schemas * tables_per_schema;
let metastore = runtime.block_on(async {
let metastore = prepare_metastore("get_tables_with_path_small").unwrap();
populate_metastore(&metastore, 10, 10).await.unwrap(); // 100 tables
let metastore =
prepare_metastore(&format!("get_tables_with_path_{}", total_tables)).unwrap();
populate_metastore(&metastore, num_schemas, tables_per_schema)
.await
.unwrap();
metastore
});

c.bench_with_input(
BenchmarkId::new("get_tables_with_path_small_include_non_ready_true", 100),
&100,
BenchmarkId::new("get_tables_with_path_include_non_ready_true", total_tables),
&iterations,
|b, &iterations| {
b.to_async(runtime)
.iter(|| bench_get_tables_with_path(&metastore, true, iterations));
},
);

c.bench_with_input(
BenchmarkId::new("get_tables_with_path_small_include_non_ready_false", 100),
&100,
BenchmarkId::new("get_tables_with_path_include_non_ready_false", total_tables),
&iterations,
|b, &iterations| {
b.to_async(runtime)
.iter(|| bench_get_tables_with_path(&metastore, false, iterations));
},
);
}

fn benchmark_get_tables_with_path_medium(c: &mut Criterion, runtime: &Runtime) {
let metastore = runtime.block_on(async {
let metastore = prepare_metastore("get_tables_with_path_medium").unwrap();
populate_metastore(&metastore, 50, 20).await.unwrap(); // 1,000 tables
metastore
});

c.bench_with_input(
BenchmarkId::new("get_tables_with_path_medium_include_non_ready_true", 50),
&50,
|b, &iterations| {
b.to_async(runtime)
.iter(|| bench_get_tables_with_path(&metastore, true, iterations));
},
);

c.bench_with_input(
BenchmarkId::new("get_tables_with_path_medium_include_non_ready_false", 50),
&50,
|b, &iterations| {
b.to_async(runtime)
.iter(|| bench_get_tables_with_path(&metastore, false, iterations));
},
);
async fn do_cold_cache_test(num_schemas: usize, tables_per_schema: usize) {
let fresh_metastore = prepare_metastore("cold_cache_fresh").unwrap();
populate_metastore(&fresh_metastore, num_schemas, tables_per_schema)
.await
.unwrap();
let result = fresh_metastore.get_tables_with_path(false).await;
assert!(result.is_ok());
}

fn benchmark_get_tables_with_path_large(c: &mut Criterion, runtime: &Runtime) {
let metastore = runtime.block_on(async {
let metastore = prepare_metastore("get_tables_with_path_large").unwrap();
populate_metastore(&metastore, 25, 1000).await.unwrap(); // 25,000 tables
metastore
});

c.bench_with_input(
BenchmarkId::new("get_tables_with_path_large_include_non_ready_true", 10),
&10,
|b, &iterations| {
b.to_async(runtime)
.iter(|| bench_get_tables_with_path(&metastore, true, iterations));
},
);

c.bench_with_input(
BenchmarkId::new("get_tables_with_path_large_include_non_ready_false", 10),
&10,
|b, &iterations| {
b.to_async(runtime)
.iter(|| bench_get_tables_with_path(&metastore, false, iterations));
},
);
async fn do_warm_cache_test(metastore: &Arc<RocksMetaStore>) {
let result = metastore.get_tables_with_path(false).await;
assert!(result.is_ok());
}

fn cold_vs_warm_cache_benchmark(c: &mut Criterion, runtime: &Runtime) {
fn do_cold_vs_warm_cache_bench(
c: &mut Criterion,
runtime: &Runtime,
num_schemas: usize,
tables_per_schema: usize,
) {
let metastore = runtime.block_on(async {
let metastore = prepare_metastore("cold_warm_cache").unwrap();
populate_metastore(&metastore, 20, 50).await.unwrap(); // 1,000 tables
populate_metastore(&metastore, num_schemas, tables_per_schema)
.await
.unwrap();
metastore
});

// Cold cache benchmark (first call)
c.bench_function("get_tables_with_path_cold_cache", |b| {
b.to_async(runtime).iter(|| async {
let fresh_metastore = prepare_metastore("cold_cache_fresh").unwrap();
populate_metastore(&fresh_metastore, 20, 50).await.unwrap();
let result = fresh_metastore.get_tables_with_path(false).await;
assert!(result.is_ok());
});
b.to_async(runtime)
.iter(|| do_cold_cache_test(num_schemas, tables_per_schema));
});

// Warm cache benchmark (subsequent calls)
c.bench_function("get_tables_with_path_warm_cache", |b| {
b.to_async(runtime).iter(|| async {
let result = metastore.get_tables_with_path(false).await;
assert!(result.is_ok());
});
b.to_async(runtime).iter(|| do_warm_cache_test(&metastore));
});
}

fn do_benches(c: &mut Criterion) {
ALLOCATOR.reset_stats();
let runtime = Builder::new_multi_thread().enable_all().build().unwrap();

benchmark_get_tables_with_path_small(c, &runtime);
benchmark_get_tables_with_path_medium(c, &runtime);
benchmark_get_tables_with_path_large(c, &runtime);
cold_vs_warm_cache_benchmark(c, &runtime);
do_get_tables_with_path_bench(c, &runtime, 10, 10, 100);
do_get_tables_with_path_bench(c, &runtime, 50, 20, 50);
do_get_tables_with_path_bench(c, &runtime, 25, 1000, 10);

do_cold_vs_warm_cache_bench(c, &runtime, 20, 50);
}

criterion_group!(benches, do_benches);
criterion_main!(benches);

fn main() {
benches();
ALLOCATOR.print_stats();
}
140 changes: 140 additions & 0 deletions rust/cubestore/cubestore/benches/tracking_allocator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
use std::alloc::{GlobalAlloc, Layout, System};
use std::sync::atomic::{AtomicUsize, Ordering};

pub struct TrackingAllocator {
inner: System,
allocations: AtomicUsize,
deallocations: AtomicUsize,
reallocations: AtomicUsize,
current_allocated: AtomicUsize,
peak_allocated: AtomicUsize,
total_allocated: AtomicUsize,
}

impl TrackingAllocator {
pub const fn new() -> Self {
Self {
inner: System,
allocations: AtomicUsize::new(0),
deallocations: AtomicUsize::new(0),
reallocations: AtomicUsize::new(0),
current_allocated: AtomicUsize::new(0),
peak_allocated: AtomicUsize::new(0),
total_allocated: AtomicUsize::new(0),
}
}

pub fn reset_stats(&self) {
self.allocations.store(0, Ordering::Relaxed);
self.deallocations.store(0, Ordering::Relaxed);
self.reallocations.store(0, Ordering::Relaxed);
self.current_allocated.store(0, Ordering::Relaxed);
self.peak_allocated.store(0, Ordering::Relaxed);
self.total_allocated.store(0, Ordering::Relaxed);
}

pub fn print_stats(&self) {
let allocations = self.allocations.load(Ordering::Relaxed);
let deallocations = self.deallocations.load(Ordering::Relaxed);
let reallocations = self.reallocations.load(Ordering::Relaxed);
let current_allocated = self.current_allocated.load(Ordering::Relaxed);
let peak_allocated = self.peak_allocated.load(Ordering::Relaxed);
let total_allocated = self.total_allocated.load(Ordering::Relaxed);

println!("=== FINAL MEMORY STATISTICS ===");
println!("Total allocations: {}", allocations);
println!("Total deallocations: {}", deallocations);
println!("Total reallocations: {}", reallocations);
println!(
"Current allocated: {} bytes ({:.2} MB)",
current_allocated,
current_allocated as f64 / 1024.0 / 1024.0
);
println!(
"Peak allocated: {} bytes ({:.2} MB)",
peak_allocated,
peak_allocated as f64 / 1024.0 / 1024.0
);
println!(
"Total allocated: {} bytes ({:.2} MB)",
total_allocated,
total_allocated as f64 / 1024.0 / 1024.0
);
println!("===============================");
}

fn update_allocated(&self, size: usize, is_allocation: bool) {
if is_allocation {
self.allocations.fetch_add(1, Ordering::Relaxed);
self.total_allocated.fetch_add(size, Ordering::Relaxed);
let current = self.current_allocated.fetch_add(size, Ordering::Relaxed) + size;

// Update peak if current exceeds it
let mut peak = self.peak_allocated.load(Ordering::Relaxed);
while current > peak {
match self.peak_allocated.compare_exchange_weak(
peak,
current,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(new_peak) => peak = new_peak,
}
}
} else {
self.deallocations.fetch_add(1, Ordering::Relaxed);
// Use saturating_sub to prevent underflow
let current = self.current_allocated.load(Ordering::Relaxed);
let new_current = current.saturating_sub(size);
self.current_allocated.store(new_current, Ordering::Relaxed);
}
}
}

unsafe impl GlobalAlloc for TrackingAllocator {
unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
let ptr = self.inner.alloc(layout);
if !ptr.is_null() {
self.update_allocated(layout.size(), true);
}
ptr
}

unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
self.inner.dealloc(ptr, layout);
self.update_allocated(layout.size(), false);
}

unsafe fn realloc(&self, ptr: *mut u8, layout: Layout, new_size: usize) -> *mut u8 {
let new_ptr = self.inner.realloc(ptr, layout, new_size);
if !new_ptr.is_null() {
self.reallocations.fetch_add(1, Ordering::Relaxed);

// Update counters: subtract old size, add new size
let current = self.current_allocated.load(Ordering::Relaxed);
let after_sub = current.saturating_sub(layout.size());
self.current_allocated.store(after_sub, Ordering::Relaxed);
self.total_allocated.fetch_add(new_size, Ordering::Relaxed);
let current = self
.current_allocated
.fetch_add(new_size, Ordering::Relaxed)
+ new_size;

// Update peak if current exceeds it
let mut peak = self.peak_allocated.load(Ordering::Relaxed);
while current > peak {
match self.peak_allocated.compare_exchange_weak(
peak,
current,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(new_peak) => peak = new_peak,
}
}
}
new_ptr
}
}
Loading