@@ -29,7 +29,8 @@ use dashmap::{DashMap, Entry};
2929use datafusion:: error:: DataFusionError ;
3030use std:: collections:: HashSet ;
3131use std:: hash:: Hash ;
32- use std:: sync:: atomic:: AtomicU64 ;
32+ use std:: sync:: atomic:: Ordering :: Relaxed ;
33+ use std:: sync:: atomic:: { AtomicU64 , AtomicUsize } ;
3334use std:: sync:: Arc ;
3435use std:: time:: Duration ;
3536use tokio:: sync:: Mutex ;
@@ -53,6 +54,9 @@ pub struct TTLMap<K, V> {
5354
5455 // grandularity of the time wheel. How often a bucket is cleared.
5556 tick : Duration ,
57+
58+ dash_map_lock_contention_time : AtomicUsize ,
59+ mutex_lock_contention_time : AtomicUsize ,
5660}
5761
5862pub struct TTLMapParams {
@@ -104,6 +108,8 @@ where
104108 time,
105109 _task : None ,
106110 tick,
111+ dash_map_lock_contention_time : AtomicUsize :: new ( 0 ) ,
112+ mutex_lock_contention_time : AtomicUsize :: new ( 0 ) ,
107113 }
108114 }
109115
@@ -124,7 +130,11 @@ where
124130 F : FnOnce ( ) -> V ,
125131 {
126132 let mut new_entry = false ;
127- let value = match self . data . entry ( key. clone ( ) ) {
133+ let start = std:: time:: Instant :: now ( ) ;
134+ let entry = self . data . entry ( key. clone ( ) ) ;
135+ self . dash_map_lock_contention_time
136+ . fetch_add ( start. elapsed ( ) . as_nanos ( ) as usize , Relaxed ) ;
137+ let value = match entry {
128138 Entry :: Vacant ( entry) => {
129139 let value = f ( ) ;
130140 entry. insert ( value. clone ( ) ) ;
@@ -139,7 +149,10 @@ where
139149 if new_entry {
140150 let time = self . time . load ( std:: sync:: atomic:: Ordering :: SeqCst ) ;
141151 {
152+ let start = std:: time:: Instant :: now ( ) ;
142153 let mut buckets = self . buckets . lock ( ) . await ;
154+ self . mutex_lock_contention_time
155+ . fetch_add ( start. elapsed ( ) . as_nanos ( ) as usize , Relaxed ) ;
143156 let bucket_index = ( time. wrapping_sub ( 1 ) ) % buckets. len ( ) as u64 ;
144157 buckets[ bucket_index as usize ] . insert ( key) ;
145158 }
@@ -341,12 +354,11 @@ mod tests {
341354 assert ! ( final_time < 100 ) ;
342355 }
343356
344-
345- #[ tokio:: test( flavor = "multi_thread" , worker_threads = 2 ) ]
357+ #[ tokio:: test( flavor = "multi_thread" , worker_threads = 16 ) ]
346358 async fn bench_lock_contention ( ) {
347359 use std:: time:: Instant ;
348360
349- let ttl_map = TTLMap :: < String , i32 > :: new ( TTLMapParams {
361+ let ttl_map = TTLMap :: < i32 , i32 > :: new ( TTLMapParams {
350362 tick : Duration :: from_micros ( 1 ) ,
351363 ttl : Duration :: from_micros ( 2 ) ,
352364 } )
@@ -355,55 +367,43 @@ mod tests {
355367
356368 let ttl_map = Arc :: new ( ttl_map) ;
357369
358- let key_count = 10 ;
359370 let start_time = Instant :: now ( ) ;
360- let operations_per_task = 1_000_000 ;
361- let task_count = 100 ;
371+ let task_count = 100_000 ;
362372
363373 // Spawn 10 tasks that repeatedly read the same keys
364374 let mut handles = Vec :: new ( ) ;
365375 for task_id in 0 ..task_count {
366376 let map = Arc :: clone ( & ttl_map) ;
367377 let handle = tokio:: spawn ( async move {
368- let mut local_ops = 0 ;
369- for i in 0 ..operations_per_task {
370- // All tasks fight for the same keys - maximum contention
371- let key = format ! ( "key{}" , i % key_count) ;
372- let _value = map. get_or_init ( key, || task_id * 1000 + i) . await ;
373- local_ops += 1 ;
374-
375- // Small yield to allow GC to run frequently
376- if i % 10 == 0 {
377- tokio:: task:: yield_now ( ) . await ;
378- }
379- }
380- local_ops
378+ // All tasks fight for the same keys - maximum contention
379+ let start = Instant :: now ( ) ;
380+ let _value = map. get_or_init ( rand:: random ( ) , || task_id * 1000 ) . await ;
381+ start. elapsed ( ) . as_nanos ( )
381382 } ) ;
382383 handles. push ( handle) ;
383384 }
384385
385386 // Wait for all tasks and collect operation counts
386- let mut total_operations = 0 ;
387+ let mut avg_time = 0 ;
387388 for handle in handles {
388- total_operations += handle. await . unwrap ( ) ;
389+ avg_time += handle. await . unwrap ( ) ;
389390 }
391+ avg_time /= task_count as u128 ;
390392
391393 let elapsed = start_time. elapsed ( ) ;
392- let ops_per_second = total_operations as f64 / elapsed. as_secs_f64 ( ) ;
393- let avg_latency_us = elapsed. as_micros ( ) as f64 / total_operations as f64 ;
394394
395395 println ! ( "\n === TTLMap Lock Contention Benchmark ===" ) ;
396396 println ! ( "Tasks: {}" , task_count) ;
397- println ! ( "Operations per task: {}" , operations_per_task) ;
398- println ! ( "Total operations: {}" , total_operations) ;
399397 println ! ( "Total time: {:.2?}" , elapsed) ;
400- println ! ( "Throughput: {:.0} ops/sec" , ops_per_second) ;
401- println ! ( "Average latency: {:.2} μs per operation" , avg_latency_us) ;
398+ println ! ( "Average latency: {:.2} μs per operation" , avg_time / 1_000 ) ;
402399 println ! ( "Entries remaining: {}" , ttl_map. data. len( ) ) ;
403-
404- // The benchmark passes if it completes without deadlocks
405- // Performance metrics are printed for analysis
406- assert ! ( ops_per_second > 0.0 ) ; // Sanity check
400+ println ! (
401+ "DashMap Lock contention time: {}ms" ,
402+ ttl_map. dash_map_lock_contention_time. load( Ordering :: SeqCst ) / 1_000_000
403+ ) ;
404+ println ! (
405+ "Mutex Lock contention time: {}ms" ,
406+ ttl_map. mutex_lock_contention_time. load( Ordering :: SeqCst ) / 1_000_000
407+ ) ;
407408 }
408-
409409}
0 commit comments