Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 34 additions & 34 deletions src/common/ttl_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ use dashmap::{DashMap, Entry};
use datafusion::error::DataFusionError;
use std::collections::HashSet;
use std::hash::Hash;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering::Relaxed;
use std::sync::atomic::{AtomicU64, AtomicUsize};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
Expand All @@ -53,6 +54,9 @@ pub struct TTLMap<K, V> {

// grandularity of the time wheel. How often a bucket is cleared.
tick: Duration,

dash_map_lock_contention_time: AtomicUsize,
mutex_lock_contention_time: AtomicUsize,
}

pub struct TTLMapParams {
Expand Down Expand Up @@ -104,6 +108,8 @@ where
time,
_task: None,
tick,
dash_map_lock_contention_time: AtomicUsize::new(0),
mutex_lock_contention_time: AtomicUsize::new(0),
}
}

Expand All @@ -124,7 +130,11 @@ where
F: FnOnce() -> V,
{
let mut new_entry = false;
let value = match self.data.entry(key.clone()) {
let start = std::time::Instant::now();
let entry = self.data.entry(key.clone());
self.dash_map_lock_contention_time
.fetch_add(start.elapsed().as_nanos() as usize, Relaxed);
let value = match entry {
Entry::Vacant(entry) => {
let value = f();
entry.insert(value.clone());
Expand All @@ -139,7 +149,10 @@ where
if new_entry {
let time = self.time.load(std::sync::atomic::Ordering::SeqCst);
{
let start = std::time::Instant::now();
let mut buckets = self.buckets.lock().await;
self.mutex_lock_contention_time
.fetch_add(start.elapsed().as_nanos() as usize, Relaxed);
let bucket_index = (time.wrapping_sub(1)) % buckets.len() as u64;
buckets[bucket_index as usize].insert(key);
}
Expand Down Expand Up @@ -341,12 +354,11 @@ mod tests {
assert!(final_time < 100);
}


#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[tokio::test(flavor = "multi_thread", worker_threads = 16)]
async fn bench_lock_contention() {
use std::time::Instant;

let ttl_map = TTLMap::<String, i32>::new(TTLMapParams {
let ttl_map = TTLMap::<i32, i32>::new(TTLMapParams {
tick: Duration::from_micros(1),
ttl: Duration::from_micros(2),
})
Expand All @@ -355,55 +367,43 @@ mod tests {

let ttl_map = Arc::new(ttl_map);

let key_count = 10;
let start_time = Instant::now();
let operations_per_task = 1_000_000;
let task_count = 100;
let task_count = 100_000;

// Spawn 10 tasks that repeatedly read the same keys
let mut handles = Vec::new();
for task_id in 0..task_count {
let map = Arc::clone(&ttl_map);
let handle = tokio::spawn(async move {
let mut local_ops = 0;
for i in 0..operations_per_task {
// All tasks fight for the same keys - maximum contention
let key = format!("key{}", i % key_count);
let _value = map.get_or_init(key, || task_id * 1000 + i).await;
local_ops += 1;

// Small yield to allow GC to run frequently
if i % 10 == 0 {
tokio::task::yield_now().await;
}
}
local_ops
// All tasks fight for the same keys - maximum contention
let start = Instant::now();
let _value = map.get_or_init(rand::random(), || task_id * 1000).await;
start.elapsed().as_nanos()
});
handles.push(handle);
}

// Wait for all tasks and collect operation counts
let mut total_operations = 0;
let mut avg_time = 0;
for handle in handles {
total_operations += handle.await.unwrap();
avg_time += handle.await.unwrap();
}
avg_time /= task_count as u128;

let elapsed = start_time.elapsed();
let ops_per_second = total_operations as f64 / elapsed.as_secs_f64();
let avg_latency_us = elapsed.as_micros() as f64 / total_operations as f64;

println!("\n=== TTLMap Lock Contention Benchmark ===");
println!("Tasks: {}", task_count);
println!("Operations per task: {}", operations_per_task);
println!("Total operations: {}", total_operations);
println!("Total time: {:.2?}", elapsed);
println!("Throughput: {:.0} ops/sec", ops_per_second);
println!("Average latency: {:.2} μs per operation", avg_latency_us);
println!("Average latency: {:.2} μs per operation", avg_time / 1_000);
println!("Entries remaining: {}", ttl_map.data.len());

// The benchmark passes if it completes without deadlocks
// Performance metrics are printed for analysis
assert!(ops_per_second > 0.0); // Sanity check
println!(
"DashMap Lock contention time: {}ms",
ttl_map.dash_map_lock_contention_time.load(Ordering::SeqCst) / 1_000_000
);
println!(
"Mutex Lock contention time: {}ms",
ttl_map.mutex_lock_contention_time.load(Ordering::SeqCst) / 1_000_000
);
}

}