From c0aa91e141cc8be2f3773df870ba4b1874459bcf Mon Sep 17 00:00:00 2001 From: Gabriel Musat Mestre Date: Thu, 14 Aug 2025 08:41:37 +0200 Subject: [PATCH] Benchmark suggestion --- src/common/ttl_map.rs | 68 +++++++++++++++++++++---------------------- 1 file changed, 34 insertions(+), 34 deletions(-) diff --git a/src/common/ttl_map.rs b/src/common/ttl_map.rs index c301306..266ac52 100644 --- a/src/common/ttl_map.rs +++ b/src/common/ttl_map.rs @@ -29,7 +29,8 @@ use dashmap::{DashMap, Entry}; use datafusion::error::DataFusionError; use std::collections::HashSet; use std::hash::Hash; -use std::sync::atomic::AtomicU64; +use std::sync::atomic::Ordering::Relaxed; +use std::sync::atomic::{AtomicU64, AtomicUsize}; use std::sync::Arc; use std::time::Duration; use tokio::sync::Mutex; @@ -53,6 +54,9 @@ pub struct TTLMap { // grandularity of the time wheel. How often a bucket is cleared. tick: Duration, + + dash_map_lock_contention_time: AtomicUsize, + mutex_lock_contention_time: AtomicUsize, } pub struct TTLMapParams { @@ -104,6 +108,8 @@ where time, _task: None, tick, + dash_map_lock_contention_time: AtomicUsize::new(0), + mutex_lock_contention_time: AtomicUsize::new(0), } } @@ -124,7 +130,11 @@ where F: FnOnce() -> V, { let mut new_entry = false; - let value = match self.data.entry(key.clone()) { + let start = std::time::Instant::now(); + let entry = self.data.entry(key.clone()); + self.dash_map_lock_contention_time + .fetch_add(start.elapsed().as_nanos() as usize, Relaxed); + let value = match entry { Entry::Vacant(entry) => { let value = f(); entry.insert(value.clone()); @@ -139,7 +149,10 @@ where if new_entry { let time = self.time.load(std::sync::atomic::Ordering::SeqCst); { + let start = std::time::Instant::now(); let mut buckets = self.buckets.lock().await; + self.mutex_lock_contention_time + .fetch_add(start.elapsed().as_nanos() as usize, Relaxed); let bucket_index = (time.wrapping_sub(1)) % buckets.len() as u64; buckets[bucket_index as usize].insert(key); } @@ -341,12 +354,11 @@ mod tests { assert!(final_time < 100); } - - #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + #[tokio::test(flavor = "multi_thread", worker_threads = 16)] async fn bench_lock_contention() { use std::time::Instant; - let ttl_map = TTLMap::::new(TTLMapParams { + let ttl_map = TTLMap::::new(TTLMapParams { tick: Duration::from_micros(1), ttl: Duration::from_micros(2), }) @@ -355,55 +367,43 @@ mod tests { let ttl_map = Arc::new(ttl_map); - let key_count = 10; let start_time = Instant::now(); - let operations_per_task = 1_000_000; - let task_count = 100; + let task_count = 100_000; // Spawn 10 tasks that repeatedly read the same keys let mut handles = Vec::new(); for task_id in 0..task_count { let map = Arc::clone(&ttl_map); let handle = tokio::spawn(async move { - let mut local_ops = 0; - for i in 0..operations_per_task { - // All tasks fight for the same keys - maximum contention - let key = format!("key{}", i % key_count); - let _value = map.get_or_init(key, || task_id * 1000 + i).await; - local_ops += 1; - - // Small yield to allow GC to run frequently - if i % 10 == 0 { - tokio::task::yield_now().await; - } - } - local_ops + // All tasks fight for the same keys - maximum contention + let start = Instant::now(); + let _value = map.get_or_init(rand::random(), || task_id * 1000).await; + start.elapsed().as_nanos() }); handles.push(handle); } // Wait for all tasks and collect operation counts - let mut total_operations = 0; + let mut avg_time = 0; for handle in handles { - total_operations += handle.await.unwrap(); + avg_time += handle.await.unwrap(); } + avg_time /= task_count as u128; let elapsed = start_time.elapsed(); - let ops_per_second = total_operations as f64 / elapsed.as_secs_f64(); - let avg_latency_us = elapsed.as_micros() as f64 / total_operations as f64; println!("\n=== TTLMap Lock Contention Benchmark ==="); println!("Tasks: {}", task_count); - println!("Operations per task: {}", operations_per_task); - println!("Total operations: {}", total_operations); println!("Total time: {:.2?}", elapsed); - println!("Throughput: {:.0} ops/sec", ops_per_second); - println!("Average latency: {:.2} μs per operation", avg_latency_us); + println!("Average latency: {:.2} μs per operation", avg_time / 1_000); println!("Entries remaining: {}", ttl_map.data.len()); - - // The benchmark passes if it completes without deadlocks - // Performance metrics are printed for analysis - assert!(ops_per_second > 0.0); // Sanity check + println!( + "DashMap Lock contention time: {}ms", + ttl_map.dash_map_lock_contention_time.load(Ordering::SeqCst) / 1_000_000 + ); + println!( + "Mutex Lock contention time: {}ms", + ttl_map.mutex_lock_contention_time.load(Ordering::SeqCst) / 1_000_000 + ); } - }