Skip to content

Commit 6275735

Browse files
authored
chore(cubestore): Add simple allocator tracking stats for benchmarks (#9898)
1 parent 1b18344 commit 6275735

File tree

4 files changed

+215
-77
lines changed

4 files changed

+215
-77
lines changed

rust/cubestore/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,6 @@ members = [
99
"cuberpc",
1010
"cuberockstore",
1111
]
12+
13+
[profile.bench]
14+
debug = true

rust/cubestore/cubestore/benches/cachestore_queue.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
1+
use criterion::{criterion_group, BenchmarkId, Criterion};
22
use cubestore::cachestore::{
33
CacheStore, QueueAddPayload, QueueItemStatus, QueueKey, RocksCacheStore,
44
};
@@ -7,6 +7,13 @@ use cubestore::CubeError;
77
use std::sync::Arc;
88
use tokio::runtime::{Builder, Runtime};
99

10+
mod tracking_allocator;
11+
12+
use tracking_allocator::TrackingAllocator;
13+
14+
#[global_allocator]
15+
static ALLOCATOR: TrackingAllocator = TrackingAllocator::new();
16+
1017
fn prepare_cachestore(name: &str) -> Result<Arc<RocksCacheStore>, CubeError> {
1118
let config = Config::test(&name).update_config(|mut config| {
1219
// disable periodic eviction
@@ -188,6 +195,7 @@ fn do_get_bench(
188195
}
189196

190197
fn do_benches(c: &mut Criterion) {
198+
ALLOCATOR.reset_stats();
191199
let runtime = Builder::new_multi_thread().enable_all().build().unwrap();
192200

193201
do_insert_bench(c, &runtime, 512, 64);
@@ -201,4 +209,8 @@ fn do_benches(c: &mut Criterion) {
201209
}
202210

203211
criterion_group!(benches, do_benches);
204-
criterion_main!(benches);
212+
213+
fn main() {
214+
benches();
215+
ALLOCATOR.print_stats();
216+
}

rust/cubestore/cubestore/benches/metastore.rs

Lines changed: 58 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
1+
use criterion::{criterion_group, BenchmarkId, Criterion};
22
use cubestore::config::Config;
33
use cubestore::metastore::{BaseRocksStoreFs, Column, ColumnType, MetaStore, RocksMetaStore};
44
use cubestore::remotefs::LocalDirRemoteFs;
@@ -8,6 +8,13 @@ use std::fs;
88
use std::sync::Arc;
99
use tokio::runtime::{Builder, Runtime};
1010

11+
mod tracking_allocator;
12+
13+
use tracking_allocator::TrackingAllocator;
14+
15+
#[global_allocator]
16+
static ALLOCATOR: TrackingAllocator = TrackingAllocator::new();
17+
1118
fn prepare_metastore(name: &str) -> Result<Arc<RocksMetaStore>, CubeError> {
1219
let config = Config::test(name);
1320

@@ -97,118 +104,94 @@ async fn bench_get_tables_with_path(
97104
}
98105
}
99106

100-
fn benchmark_get_tables_with_path_small(c: &mut Criterion, runtime: &Runtime) {
107+
fn do_get_tables_with_path_bench(
108+
c: &mut Criterion,
109+
runtime: &Runtime,
110+
num_schemas: usize,
111+
tables_per_schema: usize,
112+
iterations: usize,
113+
) {
114+
let total_tables = num_schemas * tables_per_schema;
101115
let metastore = runtime.block_on(async {
102-
let metastore = prepare_metastore("get_tables_with_path_small").unwrap();
103-
populate_metastore(&metastore, 10, 10).await.unwrap(); // 100 tables
116+
let metastore =
117+
prepare_metastore(&format!("get_tables_with_path_{}", total_tables)).unwrap();
118+
populate_metastore(&metastore, num_schemas, tables_per_schema)
119+
.await
120+
.unwrap();
104121
metastore
105122
});
106123

107124
c.bench_with_input(
108-
BenchmarkId::new("get_tables_with_path_small_include_non_ready_true", 100),
109-
&100,
125+
BenchmarkId::new("get_tables_with_path_include_non_ready_true", total_tables),
126+
&iterations,
110127
|b, &iterations| {
111128
b.to_async(runtime)
112129
.iter(|| bench_get_tables_with_path(&metastore, true, iterations));
113130
},
114131
);
115132

116133
c.bench_with_input(
117-
BenchmarkId::new("get_tables_with_path_small_include_non_ready_false", 100),
118-
&100,
134+
BenchmarkId::new("get_tables_with_path_include_non_ready_false", total_tables),
135+
&iterations,
119136
|b, &iterations| {
120137
b.to_async(runtime)
121138
.iter(|| bench_get_tables_with_path(&metastore, false, iterations));
122139
},
123140
);
124141
}
125142

126-
fn benchmark_get_tables_with_path_medium(c: &mut Criterion, runtime: &Runtime) {
127-
let metastore = runtime.block_on(async {
128-
let metastore = prepare_metastore("get_tables_with_path_medium").unwrap();
129-
populate_metastore(&metastore, 50, 20).await.unwrap(); // 1,000 tables
130-
metastore
131-
});
132-
133-
c.bench_with_input(
134-
BenchmarkId::new("get_tables_with_path_medium_include_non_ready_true", 50),
135-
&50,
136-
|b, &iterations| {
137-
b.to_async(runtime)
138-
.iter(|| bench_get_tables_with_path(&metastore, true, iterations));
139-
},
140-
);
141-
142-
c.bench_with_input(
143-
BenchmarkId::new("get_tables_with_path_medium_include_non_ready_false", 50),
144-
&50,
145-
|b, &iterations| {
146-
b.to_async(runtime)
147-
.iter(|| bench_get_tables_with_path(&metastore, false, iterations));
148-
},
149-
);
143+
async fn do_cold_cache_test(num_schemas: usize, tables_per_schema: usize) {
144+
let fresh_metastore = prepare_metastore("cold_cache_fresh").unwrap();
145+
populate_metastore(&fresh_metastore, num_schemas, tables_per_schema)
146+
.await
147+
.unwrap();
148+
let result = fresh_metastore.get_tables_with_path(false).await;
149+
assert!(result.is_ok());
150150
}
151151

152-
fn benchmark_get_tables_with_path_large(c: &mut Criterion, runtime: &Runtime) {
153-
let metastore = runtime.block_on(async {
154-
let metastore = prepare_metastore("get_tables_with_path_large").unwrap();
155-
populate_metastore(&metastore, 25, 1000).await.unwrap(); // 25,000 tables
156-
metastore
157-
});
158-
159-
c.bench_with_input(
160-
BenchmarkId::new("get_tables_with_path_large_include_non_ready_true", 10),
161-
&10,
162-
|b, &iterations| {
163-
b.to_async(runtime)
164-
.iter(|| bench_get_tables_with_path(&metastore, true, iterations));
165-
},
166-
);
167-
168-
c.bench_with_input(
169-
BenchmarkId::new("get_tables_with_path_large_include_non_ready_false", 10),
170-
&10,
171-
|b, &iterations| {
172-
b.to_async(runtime)
173-
.iter(|| bench_get_tables_with_path(&metastore, false, iterations));
174-
},
175-
);
152+
async fn do_warm_cache_test(metastore: &Arc<RocksMetaStore>) {
153+
let result = metastore.get_tables_with_path(false).await;
154+
assert!(result.is_ok());
176155
}
177156

178-
fn cold_vs_warm_cache_benchmark(c: &mut Criterion, runtime: &Runtime) {
157+
fn do_cold_vs_warm_cache_bench(
158+
c: &mut Criterion,
159+
runtime: &Runtime,
160+
num_schemas: usize,
161+
tables_per_schema: usize,
162+
) {
179163
let metastore = runtime.block_on(async {
180164
let metastore = prepare_metastore("cold_warm_cache").unwrap();
181-
populate_metastore(&metastore, 20, 50).await.unwrap(); // 1,000 tables
165+
populate_metastore(&metastore, num_schemas, tables_per_schema)
166+
.await
167+
.unwrap();
182168
metastore
183169
});
184170

185-
// Cold cache benchmark (first call)
186171
c.bench_function("get_tables_with_path_cold_cache", |b| {
187-
b.to_async(runtime).iter(|| async {
188-
let fresh_metastore = prepare_metastore("cold_cache_fresh").unwrap();
189-
populate_metastore(&fresh_metastore, 20, 50).await.unwrap();
190-
let result = fresh_metastore.get_tables_with_path(false).await;
191-
assert!(result.is_ok());
192-
});
172+
b.to_async(runtime)
173+
.iter(|| do_cold_cache_test(num_schemas, tables_per_schema));
193174
});
194175

195-
// Warm cache benchmark (subsequent calls)
196176
c.bench_function("get_tables_with_path_warm_cache", |b| {
197-
b.to_async(runtime).iter(|| async {
198-
let result = metastore.get_tables_with_path(false).await;
199-
assert!(result.is_ok());
200-
});
177+
b.to_async(runtime).iter(|| do_warm_cache_test(&metastore));
201178
});
202179
}
203180

204181
fn do_benches(c: &mut Criterion) {
182+
ALLOCATOR.reset_stats();
205183
let runtime = Builder::new_multi_thread().enable_all().build().unwrap();
206184

207-
benchmark_get_tables_with_path_small(c, &runtime);
208-
benchmark_get_tables_with_path_medium(c, &runtime);
209-
benchmark_get_tables_with_path_large(c, &runtime);
210-
cold_vs_warm_cache_benchmark(c, &runtime);
185+
do_get_tables_with_path_bench(c, &runtime, 10, 10, 100);
186+
do_get_tables_with_path_bench(c, &runtime, 50, 20, 50);
187+
do_get_tables_with_path_bench(c, &runtime, 25, 1000, 10);
188+
189+
do_cold_vs_warm_cache_bench(c, &runtime, 20, 50);
211190
}
212191

213192
criterion_group!(benches, do_benches);
214-
criterion_main!(benches);
193+
194+
fn main() {
195+
benches();
196+
ALLOCATOR.print_stats();
197+
}
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
use std::alloc::{GlobalAlloc, Layout, System};
2+
use std::sync::atomic::{AtomicUsize, Ordering};
3+
4+
pub struct TrackingAllocator {
5+
inner: System,
6+
allocations: AtomicUsize,
7+
deallocations: AtomicUsize,
8+
reallocations: AtomicUsize,
9+
current_allocated: AtomicUsize,
10+
peak_allocated: AtomicUsize,
11+
total_allocated: AtomicUsize,
12+
}
13+
14+
impl TrackingAllocator {
15+
pub const fn new() -> Self {
16+
Self {
17+
inner: System,
18+
allocations: AtomicUsize::new(0),
19+
deallocations: AtomicUsize::new(0),
20+
reallocations: AtomicUsize::new(0),
21+
current_allocated: AtomicUsize::new(0),
22+
peak_allocated: AtomicUsize::new(0),
23+
total_allocated: AtomicUsize::new(0),
24+
}
25+
}
26+
27+
pub fn reset_stats(&self) {
28+
self.allocations.store(0, Ordering::Relaxed);
29+
self.deallocations.store(0, Ordering::Relaxed);
30+
self.reallocations.store(0, Ordering::Relaxed);
31+
self.current_allocated.store(0, Ordering::Relaxed);
32+
self.peak_allocated.store(0, Ordering::Relaxed);
33+
self.total_allocated.store(0, Ordering::Relaxed);
34+
}
35+
36+
pub fn print_stats(&self) {
37+
let allocations = self.allocations.load(Ordering::Relaxed);
38+
let deallocations = self.deallocations.load(Ordering::Relaxed);
39+
let reallocations = self.reallocations.load(Ordering::Relaxed);
40+
let current_allocated = self.current_allocated.load(Ordering::Relaxed);
41+
let peak_allocated = self.peak_allocated.load(Ordering::Relaxed);
42+
let total_allocated = self.total_allocated.load(Ordering::Relaxed);
43+
44+
println!("=== FINAL MEMORY STATISTICS ===");
45+
println!("Total allocations: {}", allocations);
46+
println!("Total deallocations: {}", deallocations);
47+
println!("Total reallocations: {}", reallocations);
48+
println!(
49+
"Current allocated: {} bytes ({:.2} MB)",
50+
current_allocated,
51+
current_allocated as f64 / 1024.0 / 1024.0
52+
);
53+
println!(
54+
"Peak allocated: {} bytes ({:.2} MB)",
55+
peak_allocated,
56+
peak_allocated as f64 / 1024.0 / 1024.0
57+
);
58+
println!(
59+
"Total allocated: {} bytes ({:.2} MB)",
60+
total_allocated,
61+
total_allocated as f64 / 1024.0 / 1024.0
62+
);
63+
println!("===============================");
64+
}
65+
66+
fn update_allocated(&self, size: usize, is_allocation: bool) {
67+
if is_allocation {
68+
self.allocations.fetch_add(1, Ordering::Relaxed);
69+
self.total_allocated.fetch_add(size, Ordering::Relaxed);
70+
let current = self.current_allocated.fetch_add(size, Ordering::Relaxed) + size;
71+
72+
// Update peak if current exceeds it
73+
let mut peak = self.peak_allocated.load(Ordering::Relaxed);
74+
while current > peak {
75+
match self.peak_allocated.compare_exchange_weak(
76+
peak,
77+
current,
78+
Ordering::Relaxed,
79+
Ordering::Relaxed,
80+
) {
81+
Ok(_) => break,
82+
Err(new_peak) => peak = new_peak,
83+
}
84+
}
85+
} else {
86+
self.deallocations.fetch_add(1, Ordering::Relaxed);
87+
// Use saturating_sub to prevent underflow
88+
let current = self.current_allocated.load(Ordering::Relaxed);
89+
let new_current = current.saturating_sub(size);
90+
self.current_allocated.store(new_current, Ordering::Relaxed);
91+
}
92+
}
93+
}
94+
95+
unsafe impl GlobalAlloc for TrackingAllocator {
96+
unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
97+
let ptr = self.inner.alloc(layout);
98+
if !ptr.is_null() {
99+
self.update_allocated(layout.size(), true);
100+
}
101+
ptr
102+
}
103+
104+
unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
105+
self.inner.dealloc(ptr, layout);
106+
self.update_allocated(layout.size(), false);
107+
}
108+
109+
unsafe fn realloc(&self, ptr: *mut u8, layout: Layout, new_size: usize) -> *mut u8 {
110+
let new_ptr = self.inner.realloc(ptr, layout, new_size);
111+
if !new_ptr.is_null() {
112+
self.reallocations.fetch_add(1, Ordering::Relaxed);
113+
114+
// Update counters: subtract old size, add new size
115+
let current = self.current_allocated.load(Ordering::Relaxed);
116+
let after_sub = current.saturating_sub(layout.size());
117+
self.current_allocated.store(after_sub, Ordering::Relaxed);
118+
self.total_allocated.fetch_add(new_size, Ordering::Relaxed);
119+
let current = self
120+
.current_allocated
121+
.fetch_add(new_size, Ordering::Relaxed)
122+
+ new_size;
123+
124+
// Update peak if current exceeds it
125+
let mut peak = self.peak_allocated.load(Ordering::Relaxed);
126+
while current > peak {
127+
match self.peak_allocated.compare_exchange_weak(
128+
peak,
129+
current,
130+
Ordering::Relaxed,
131+
Ordering::Relaxed,
132+
) {
133+
Ok(_) => break,
134+
Err(new_peak) => peak = new_peak,
135+
}
136+
}
137+
}
138+
new_ptr
139+
}
140+
}

0 commit comments

Comments
 (0)