From 4346b69fc528a3834a68ae66411d35c7efb243ba Mon Sep 17 00:00:00 2001 From: Jayant Shrivastava Date: Tue, 12 Aug 2025 19:01:51 -0400 Subject: [PATCH 1/6] Create TTL map with time wheel architecture This change adds a DashMap-like struct which has a background tasks to clean up entries that have outlived a configurable TTL. This struct is simliar to https://github.com/moka-rs/moka, which also uses time wheels. Having our own module avoids introducing a large dependency, which keeps this project closer to vanilla datafusion. This change is meant to be useful for https://github.com/datafusion-contrib/datafusion-distributed/pull/89, where it's possible for `ExecutionStages` to be orphaned in `ArrowFlightEndpoint`. We need an async task to clean up old entries. Informs: https://github.com/datafusion-contrib/datafusion-distributed/issues/90 --- src/common/mod.rs | 1 + src/common/ttl_map.rs | 343 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 344 insertions(+) create mode 100644 src/common/ttl_map.rs diff --git a/src/common/mod.rs b/src/common/mod.rs index 812d1ed..572b996 100644 --- a/src/common/mod.rs +++ b/src/common/mod.rs @@ -1 +1,2 @@ +pub mod ttl_map; pub mod util; diff --git a/src/common/ttl_map.rs b/src/common/ttl_map.rs new file mode 100644 index 0000000..6930177 --- /dev/null +++ b/src/common/ttl_map.rs @@ -0,0 +1,343 @@ +/* +TTLMap is a DashMap that automatically removes entries after a specified time-to-live (TTL). + +How the Time Wheel Works + +Time Buckets: [0] [1] [2] [3] [4] [5] [6] [7] ... +Current Time: ^ + | + time % buckets.len() + +When inserting key "A" at time=2: +- Key "A" goes into bucket[(2-1) % 8] = bucket[1] +- Key "A" will be expired when time advances to bucket[1] again + +Generally, keys in a bucket expire when the wheel makes a full rotation, making +the total TTL equal to the tick duration * buckets.len(). + +Usage +```rust +let params = TTLMapParams { tick: Duration::from_secs(30), ttl: Duration::from_mins(5) }; +let ttl_map = TTLMap::new(params).await.unwrap(); +let value = ttl_map.get_or_init(key, || initial_value).await; +``` + +TODO: If an existing entry is accessed, we don't extend its TTL. It's unclear if this is +necessary for any use cases. This functionality could be added if needed. + */ +use dashmap::{DashMap, Entry}; +use datafusion::error::DataFusionError; +use std::collections::HashSet; +use std::hash::Hash; +use std::sync::atomic::AtomicU64; +use std::sync::Arc; +use std::time::Duration; +use tokio::sync::Mutex; + +// TTLMap is a key-value store that automatically removes entries after a specified time-to-live. +pub struct TTLMap { + /// Time wheel buckets containing keys to be expired. Each bucket epresents + /// a time slot. Keys in bucket[i] will be expired when time % buckets.len() == i + buckets: Arc>>>, + + /// The actual key-value storage using DashMap for concurrent access + data: Arc>, + + /// Atomic counter tracking the current time slot for the time wheel. + /// Incremented by the background GC task every `tick` duration. + time: Arc, + + /// Background task handle for the garbage collection process. + /// When dropped, the GC task is automatically aborted. + _task: Option>, + + // grandularity of the time wheel. How often a bucket is cleared. + tick: Duration, +} + +pub struct TTLMapParams { + // tick is how often the map is checks for expired entries + // must be less than ttl + pub tick: Duration, + // ttl is the time-to-live for entries + pub ttl: Duration, +} + +impl Default for TTLMapParams { + fn default() -> Self { + Self { + tick: Duration::from_secs(3), + ttl: Duration::from_secs(60), + } + } +} + +impl TTLMap +where + K: Eq + Hash + Send + Sync + Clone + 'static, + V: Default + Clone + Send + Sync + 'static, +{ + // new creates a new TTLMap. + pub async fn new(params: TTLMapParams) -> Result { + if params.tick > params.ttl { + return Err(DataFusionError::Configuration( + "tick duration must be less than or equal to ttl duration".to_string(), + )); + } + let mut map = Self::_new(params.tick, params.ttl).await; + map._start_gc(); + Ok(map) + } + + async fn _new(tick: Duration, ttl: Duration) -> Self { + let bucket_count = (ttl.as_millis() / tick.as_millis()) as usize; + let mut buckets = Vec::with_capacity(bucket_count); + for _ in 0..bucket_count { + buckets.push(HashSet::new()); + } + let stage_targets = Arc::new(DashMap::new()); + let time_wheel = Arc::new(Mutex::new(buckets)); + let time = Arc::new(AtomicU64::new(0)); + Self { + buckets: time_wheel, + data: stage_targets, + time, + _task: None, + tick, + } + } + + // Start and set the background GC task. + fn _start_gc(&mut self) { + self._task = Some(tokio::spawn(Self::run_gc_loop( + self.data.clone(), + self.buckets.clone(), + self.time.clone(), + self.tick, + ))) + } + + /// get_or_default executes the provided closure with a reference to the map entry for the given key. + /// If the key does not exist, it inserts a new entry with the default value. + pub async fn get_or_init(&self, key: K, f: F) -> V + where + F: FnOnce() -> V, + { + let mut new_entry = false; + let value = match self.data.entry(key.clone()) { + Entry::Vacant(entry) => { + let value = f(); + entry.insert(value.clone()); + new_entry = true; + value + } + Entry::Occupied(entry) => entry.get().clone(), + }; + + // Insert the key into the previous bucket, meaning the key will be evicted + // when the wheel completes a full rotation. + if new_entry { + let time = self.time.load(std::sync::atomic::Ordering::SeqCst); + { + let mut buckets = self.buckets.lock().await; + let bucket_index = (time.wrapping_sub(1)) % buckets.len() as u64; + buckets[bucket_index as usize].insert(key); + } + } + + value + } + + /// run_gc_loop will continuously clear expired entries from the map, checking every `period`. The + /// function terminates if `shutdown` is signalled. + async fn run_gc_loop( + map: Arc>, + time_wheel: Arc>>>, + time: Arc, + period: Duration, + ) { + loop { + Self::gc(map.clone(), time_wheel.clone(), time.clone()).await; + tokio::time::sleep(period).await; + } + } + + /// gc clears expired entries from the map and advances time by 1. + async fn gc( + map: Arc>, + time_wheel: Arc>>>, + time: Arc, + ) { + let keys = { + let mut guard = time_wheel.lock().await; + let len = guard.len(); + let index = time.load(std::sync::atomic::Ordering::SeqCst) % len as u64; + // Replace the HashSet at the index with an empty one and return the original + std::mem::replace(&mut guard[index as usize], HashSet::new()) + }; + + // Remove expired keys from the map. + // TODO: it may be worth exploring if we can group keys by shard and do a batched + // remove. + for key in keys { + map.remove(&key); + } + // May wrap. + time.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::atomic::Ordering; + use tokio::time::{sleep, Duration}; + + #[tokio::test] + async fn test_basic_insert_and_get() { + let ttl_map = + TTLMap::::_new(Duration::from_millis(100), Duration::from_secs(1)).await; + + ttl_map.get_or_init("key1".to_string(), || 42).await; + + let value = ttl_map.get_or_init("key1".to_string(), || 0).await; + assert_eq!(value, 42); + } + + #[tokio::test] + async fn test_time_wheel_bucket_calculation() { + let ttl_map = + TTLMap::::_new(Duration::from_millis(100), Duration::from_secs(1)).await; + + // With 1s TTL and 100ms tick, we should have 10 buckets + assert_eq!(ttl_map.buckets.lock().await.len(), 10); + } + + #[tokio::test] + async fn test_gc_expiration() { + let ttl_map = + TTLMap::::_new(Duration::from_millis(100), Duration::from_secs(1)).await; + + // Initial batch of entries + ttl_map.get_or_init("key1".to_string(), || 42).await; + ttl_map.get_or_init("key2".to_string(), || 84).await; + assert_eq!(ttl_map.data.len(), 2); + + // Run partial GC cycles (should not expire yet) + for _ in 0..5 { + TTLMap::gc( + ttl_map.data.clone(), + ttl_map.buckets.clone(), + ttl_map.time.clone(), + ) + .await; + } + assert_eq!(ttl_map.data.len(), 2); // Still there + + // Add more entries mid-cycle + ttl_map.get_or_init("key3".to_string(), || 168).await; + ttl_map.get_or_init("key4".to_string(), || 0).await; // Default value (0) + ttl_map.get_or_init("key5".to_string(), || 210).await; + assert_eq!(ttl_map.data.len(), 5); + + // Verify default value was set + let default_value = ttl_map.get_or_init("key4".to_string(), || 0).await; + assert_eq!(default_value, 0); + + // Complete the first rotation to expire initial entries + for _ in 5..10 { + TTLMap::gc( + ttl_map.data.clone(), + ttl_map.buckets.clone(), + ttl_map.time.clone(), + ) + .await; + } + assert_eq!(ttl_map.data.len(), 3); // Initial entries expired, new entries still alive + + // Add entries after expiration + ttl_map.get_or_init("new_key1".to_string(), || 999).await; + ttl_map.get_or_init("new_key2".to_string(), || 0).await; // Default value + assert_eq!(ttl_map.data.len(), 5); // 3 from mid-cycle + 2 new ones + + // Verify values + let value1 = ttl_map.get_or_init("new_key1".to_string(), || 0).await; + assert_eq!(value1, 999); + let value2 = ttl_map.get_or_init("new_key2".to_string(), || 0).await; + assert_eq!(value2, 0); + + // Run additional GC cycles to expire remaining entries + // Mid-cycle entries (bucket 4) expire at time=14, late entries (bucket 9) expire at time=19 + for _ in 10..20 { + TTLMap::gc( + ttl_map.data.clone(), + ttl_map.buckets.clone(), + ttl_map.time.clone(), + ) + .await; + } + assert_eq!(ttl_map.data.len(), 0); // All entries expired + } + + #[tokio::test] + async fn test_concurrent_gc_and_access() { + let ttl_map = TTLMap::::new(TTLMapParams { + tick: Duration::from_millis(2), + ttl: Duration::from_millis(10), + }) + .await + .unwrap(); + + assert!(ttl_map._task.is_some()); + + let ttl_map = Arc::new(ttl_map); + + // Spawn 5 concurrent tasks + let mut handles = Vec::new(); + for task_id in 0..5 { + let map = Arc::clone(&ttl_map); + let handle = tokio::spawn(async move { + for i in 0..20 { + let key = format!("task{}_key{}", task_id, i % 4); + map.get_or_init(key, || task_id * 100 + i).await; + sleep(Duration::from_millis(1)).await; + } + }); + handles.push(handle); + } + + // Wait for all tasks to complete + for handle in handles { + handle.await.unwrap(); + } + } + + #[tokio::test] + async fn test_wraparound_time() { + let ttl_map = TTLMap::::_new( + Duration::from_millis(10), + Duration::from_millis(20), // 2 buckets + ) + .await; + + // Manually set time near overflow + ttl_map.time.store(u64::MAX - 2, Ordering::SeqCst); + + ttl_map.get_or_init("test_key".to_string(), || 999).await; + + // Run GC to cause time wraparound + for _ in 0..5 { + TTLMap::gc( + ttl_map.data.clone(), + ttl_map.buckets.clone(), + ttl_map.time.clone(), + ) + .await; + } + + // Entry should be expired and time should have wrapped + assert_eq!(ttl_map.data.len(), 0); + let final_time = ttl_map.time.load(Ordering::SeqCst); + assert!(final_time < 100); + } +} From b557c6317d46fae2ee777689548f384beab9ef59 Mon Sep 17 00:00:00 2001 From: Jayant Shrivastava Date: Wed, 13 Aug 2025 14:37:56 -0400 Subject: [PATCH 2/6] add bench --- src/common/ttl_map.rs | 68 ++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 67 insertions(+), 1 deletion(-) diff --git a/src/common/ttl_map.rs b/src/common/ttl_map.rs index 6930177..c301306 100644 --- a/src/common/ttl_map.rs +++ b/src/common/ttl_map.rs @@ -90,7 +90,7 @@ where } async fn _new(tick: Duration, ttl: Duration) -> Self { - let bucket_count = (ttl.as_millis() / tick.as_millis()) as usize; + let bucket_count = (ttl.as_nanos() / tick.as_nanos()) as usize; let mut buckets = Vec::with_capacity(bucket_count); for _ in 0..bucket_count { buckets.push(HashSet::new()); @@ -340,4 +340,70 @@ mod tests { let final_time = ttl_map.time.load(Ordering::SeqCst); assert!(final_time < 100); } + + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn bench_lock_contention() { + use std::time::Instant; + + let ttl_map = TTLMap::::new(TTLMapParams { + tick: Duration::from_micros(1), + ttl: Duration::from_micros(2), + }) + .await + .unwrap(); + + let ttl_map = Arc::new(ttl_map); + + let key_count = 10; + let start_time = Instant::now(); + let operations_per_task = 1_000_000; + let task_count = 100; + + // Spawn 10 tasks that repeatedly read the same keys + let mut handles = Vec::new(); + for task_id in 0..task_count { + let map = Arc::clone(&ttl_map); + let handle = tokio::spawn(async move { + let mut local_ops = 0; + for i in 0..operations_per_task { + // All tasks fight for the same keys - maximum contention + let key = format!("key{}", i % key_count); + let _value = map.get_or_init(key, || task_id * 1000 + i).await; + local_ops += 1; + + // Small yield to allow GC to run frequently + if i % 10 == 0 { + tokio::task::yield_now().await; + } + } + local_ops + }); + handles.push(handle); + } + + // Wait for all tasks and collect operation counts + let mut total_operations = 0; + for handle in handles { + total_operations += handle.await.unwrap(); + } + + let elapsed = start_time.elapsed(); + let ops_per_second = total_operations as f64 / elapsed.as_secs_f64(); + let avg_latency_us = elapsed.as_micros() as f64 / total_operations as f64; + + println!("\n=== TTLMap Lock Contention Benchmark ==="); + println!("Tasks: {}", task_count); + println!("Operations per task: {}", operations_per_task); + println!("Total operations: {}", total_operations); + println!("Total time: {:.2?}", elapsed); + println!("Throughput: {:.0} ops/sec", ops_per_second); + println!("Average latency: {:.2} μs per operation", avg_latency_us); + println!("Entries remaining: {}", ttl_map.data.len()); + + // The benchmark passes if it completes without deadlocks + // Performance metrics are printed for analysis + assert!(ops_per_second > 0.0); // Sanity check + } + } From e99e38084e83cb07e0ff83f75ea2e5451b533d6c Mon Sep 17 00:00:00 2001 From: Gabriel Musat Mestre Date: Thu, 14 Aug 2025 08:41:37 +0200 Subject: [PATCH 3/6] Benchmark suggestion --- src/common/ttl_map.rs | 68 +++++++++++++++++++++---------------------- 1 file changed, 34 insertions(+), 34 deletions(-) diff --git a/src/common/ttl_map.rs b/src/common/ttl_map.rs index c301306..266ac52 100644 --- a/src/common/ttl_map.rs +++ b/src/common/ttl_map.rs @@ -29,7 +29,8 @@ use dashmap::{DashMap, Entry}; use datafusion::error::DataFusionError; use std::collections::HashSet; use std::hash::Hash; -use std::sync::atomic::AtomicU64; +use std::sync::atomic::Ordering::Relaxed; +use std::sync::atomic::{AtomicU64, AtomicUsize}; use std::sync::Arc; use std::time::Duration; use tokio::sync::Mutex; @@ -53,6 +54,9 @@ pub struct TTLMap { // grandularity of the time wheel. How often a bucket is cleared. tick: Duration, + + dash_map_lock_contention_time: AtomicUsize, + mutex_lock_contention_time: AtomicUsize, } pub struct TTLMapParams { @@ -104,6 +108,8 @@ where time, _task: None, tick, + dash_map_lock_contention_time: AtomicUsize::new(0), + mutex_lock_contention_time: AtomicUsize::new(0), } } @@ -124,7 +130,11 @@ where F: FnOnce() -> V, { let mut new_entry = false; - let value = match self.data.entry(key.clone()) { + let start = std::time::Instant::now(); + let entry = self.data.entry(key.clone()); + self.dash_map_lock_contention_time + .fetch_add(start.elapsed().as_nanos() as usize, Relaxed); + let value = match entry { Entry::Vacant(entry) => { let value = f(); entry.insert(value.clone()); @@ -139,7 +149,10 @@ where if new_entry { let time = self.time.load(std::sync::atomic::Ordering::SeqCst); { + let start = std::time::Instant::now(); let mut buckets = self.buckets.lock().await; + self.mutex_lock_contention_time + .fetch_add(start.elapsed().as_nanos() as usize, Relaxed); let bucket_index = (time.wrapping_sub(1)) % buckets.len() as u64; buckets[bucket_index as usize].insert(key); } @@ -341,12 +354,11 @@ mod tests { assert!(final_time < 100); } - - #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + #[tokio::test(flavor = "multi_thread", worker_threads = 16)] async fn bench_lock_contention() { use std::time::Instant; - let ttl_map = TTLMap::::new(TTLMapParams { + let ttl_map = TTLMap::::new(TTLMapParams { tick: Duration::from_micros(1), ttl: Duration::from_micros(2), }) @@ -355,55 +367,43 @@ mod tests { let ttl_map = Arc::new(ttl_map); - let key_count = 10; let start_time = Instant::now(); - let operations_per_task = 1_000_000; - let task_count = 100; + let task_count = 100_000; // Spawn 10 tasks that repeatedly read the same keys let mut handles = Vec::new(); for task_id in 0..task_count { let map = Arc::clone(&ttl_map); let handle = tokio::spawn(async move { - let mut local_ops = 0; - for i in 0..operations_per_task { - // All tasks fight for the same keys - maximum contention - let key = format!("key{}", i % key_count); - let _value = map.get_or_init(key, || task_id * 1000 + i).await; - local_ops += 1; - - // Small yield to allow GC to run frequently - if i % 10 == 0 { - tokio::task::yield_now().await; - } - } - local_ops + // All tasks fight for the same keys - maximum contention + let start = Instant::now(); + let _value = map.get_or_init(rand::random(), || task_id * 1000).await; + start.elapsed().as_nanos() }); handles.push(handle); } // Wait for all tasks and collect operation counts - let mut total_operations = 0; + let mut avg_time = 0; for handle in handles { - total_operations += handle.await.unwrap(); + avg_time += handle.await.unwrap(); } + avg_time /= task_count as u128; let elapsed = start_time.elapsed(); - let ops_per_second = total_operations as f64 / elapsed.as_secs_f64(); - let avg_latency_us = elapsed.as_micros() as f64 / total_operations as f64; println!("\n=== TTLMap Lock Contention Benchmark ==="); println!("Tasks: {}", task_count); - println!("Operations per task: {}", operations_per_task); - println!("Total operations: {}", total_operations); println!("Total time: {:.2?}", elapsed); - println!("Throughput: {:.0} ops/sec", ops_per_second); - println!("Average latency: {:.2} μs per operation", avg_latency_us); + println!("Average latency: {:.2} μs per operation", avg_time / 1_000); println!("Entries remaining: {}", ttl_map.data.len()); - - // The benchmark passes if it completes without deadlocks - // Performance metrics are printed for analysis - assert!(ops_per_second > 0.0); // Sanity check + println!( + "DashMap Lock contention time: {}ms", + ttl_map.dash_map_lock_contention_time.load(Ordering::SeqCst) / 1_000_000 + ); + println!( + "Mutex Lock contention time: {}ms", + ttl_map.mutex_lock_contention_time.load(Ordering::SeqCst) / 1_000_000 + ); } - } From 248a7b7e4189bee1fa35930806c3dbdec28ccab9 Mon Sep 17 00:00:00 2001 From: Jayant Shrivastava Date: Thu, 14 Aug 2025 22:35:32 -0400 Subject: [PATCH 4/6] add contention factor --- src/common/ttl_map.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/common/ttl_map.rs b/src/common/ttl_map.rs index 266ac52..4926d1b 100644 --- a/src/common/ttl_map.rs +++ b/src/common/ttl_map.rs @@ -131,10 +131,7 @@ where { let mut new_entry = false; let start = std::time::Instant::now(); - let entry = self.data.entry(key.clone()); - self.dash_map_lock_contention_time - .fetch_add(start.elapsed().as_nanos() as usize, Relaxed); - let value = match entry { + let value = match self.data.entry(key.clone()) { Entry::Vacant(entry) => { let value = f(); entry.insert(value.clone()); @@ -143,6 +140,8 @@ where } Entry::Occupied(entry) => entry.get().clone(), }; + self.dash_map_lock_contention_time + .fetch_add(start.elapsed().as_nanos() as usize, Relaxed); // Insert the key into the previous bucket, meaning the key will be evicted // when the wheel completes a full rotation. @@ -369,6 +368,7 @@ mod tests { let start_time = Instant::now(); let task_count = 100_000; + let contention_factor = 10; // Spawn 10 tasks that repeatedly read the same keys let mut handles = Vec::new(); @@ -377,7 +377,7 @@ mod tests { let handle = tokio::spawn(async move { // All tasks fight for the same keys - maximum contention let start = Instant::now(); - let _value = map.get_or_init(rand::random(), || task_id * 1000).await; + let _value = map.get_or_init(task_id % (task_count / contention_factor), || task_id * 1000).await; start.elapsed().as_nanos() }); handles.push(handle); From 25ae1c571a836aa869567ca37629fcad340297c6 Mon Sep 17 00:00:00 2001 From: Jayant Shrivastava Date: Thu, 14 Aug 2025 22:49:35 -0400 Subject: [PATCH 5/6] random --- src/common/ttl_map.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/common/ttl_map.rs b/src/common/ttl_map.rs index 4926d1b..d11777b 100644 --- a/src/common/ttl_map.rs +++ b/src/common/ttl_map.rs @@ -368,7 +368,6 @@ mod tests { let start_time = Instant::now(); let task_count = 100_000; - let contention_factor = 10; // Spawn 10 tasks that repeatedly read the same keys let mut handles = Vec::new(); @@ -377,7 +376,7 @@ mod tests { let handle = tokio::spawn(async move { // All tasks fight for the same keys - maximum contention let start = Instant::now(); - let _value = map.get_or_init(task_id % (task_count / contention_factor), || task_id * 1000).await; + let _value = map.get_or_init(rand::random(), || task_id * 1000).await; start.elapsed().as_nanos() }); handles.push(handle); From 7b3f6dc82145c3b253908361d8d3e93d0285f08e Mon Sep 17 00:00:00 2001 From: Jayant Shrivastava Date: Fri, 15 Aug 2025 14:46:29 -0400 Subject: [PATCH 6/6] use moka --- Cargo.lock | 307 +++++++++++++++++++++++++++++++++++++++++- Cargo.toml | 1 + src/common/ttl_map.rs | 282 ++++++++------------------------------ 3 files changed, 359 insertions(+), 231 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 84e8d02..5130d5e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -384,7 +384,7 @@ dependencies = [ "memchr", "num", "regex", - "regex-syntax", + "regex-syntax 0.8.5", ] [[package]] @@ -404,6 +404,17 @@ dependencies = [ "zstd-safe", ] +[[package]] +name = "async-lock" +version = "3.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5fd03604047cee9b6ce9de9f70c6cd540a0520c813cbd49bae61f33ab80ed1dc" +dependencies = [ + "event-listener", + "event-listener-strategy", + "pin-project-lite", +] + [[package]] name = "async-stream" version = "0.3.6" @@ -730,6 +741,15 @@ dependencies = [ "unicode-width 0.2.1", ] +[[package]] +name = "concurrent-queue" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "console" version = "0.15.11" @@ -792,6 +812,24 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossbeam-channel" +version = "0.5.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "82b8f8f868b36967f9606790d1903570de9ceaf870a7bf9fbbd3016d636a2cb2" +dependencies = [ + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.21" @@ -1123,6 +1161,7 @@ dependencies = [ "http", "insta", "itertools", + "moka", "object_store", "prost", "rand 0.8.5", @@ -1367,7 +1406,7 @@ dependencies = [ "log", "recursive", "regex", - "regex-syntax", + "regex-syntax 0.8.5", ] [[package]] @@ -1626,6 +1665,27 @@ dependencies = [ "windows-sys 0.60.2", ] +[[package]] +name = "event-listener" +version = "5.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e13b66accf52311f30a0db42147dadea9850cb48cd070028831ae5f5d4b856ab" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", +] + +[[package]] +name = "event-listener-strategy" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8be9f3dfaaffdae2972880079a491a1a8bb7cbed0b8dd7a347f668b4150a3b93" +dependencies = [ + "event-listener", + "pin-project-lite", +] + [[package]] name = "fastrand" version = "2.3.0" @@ -1769,6 +1829,20 @@ dependencies = [ "slab", ] +[[package]] +name = "generator" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d18470a76cb7f8ff746cf1f7470914f900252ec36bbc40b569d74b1258446827" +dependencies = [ + "cc", + "cfg-if", + "libc", + "log", + "rustversion", + "windows", +] + [[package]] name = "generic-array" version = "0.14.7" @@ -2374,6 +2448,19 @@ version = "0.4.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "13dc2df351e3202783a1fe0d44375f7295ffb4049267b0f3018346dc122a1d94" +[[package]] +name = "loom" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "419e0dc8046cb947daa77eb95ae174acfbddb7673b4151f56d1eed8e93fbfaca" +dependencies = [ + "cfg-if", + "generator", + "scoped-tls", + "tracing", + "tracing-subscriber", +] + [[package]] name = "lz4_flex" version = "0.11.5" @@ -2394,6 +2481,15 @@ dependencies = [ "pkg-config", ] +[[package]] +name = "matchers" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" +dependencies = [ + "regex-automata 0.1.10", +] + [[package]] name = "matchit" version = "0.7.3" @@ -2442,6 +2538,38 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "moka" +version = "0.12.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9321642ca94a4282428e6ea4af8cc2ca4eac48ac7a6a4ea8f33f76d0ce70926" +dependencies = [ + "async-lock", + "crossbeam-channel", + "crossbeam-epoch", + "crossbeam-utils", + "event-listener", + "futures-util", + "loom", + "parking_lot", + "portable-atomic", + "rustc_version", + "smallvec", + "tagptr", + "thiserror 1.0.69", + "uuid", +] + +[[package]] +name = "nu-ansi-term" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" +dependencies = [ + "overload", + "winapi", +] + [[package]] name = "num" version = "0.4.3" @@ -2540,7 +2668,7 @@ dependencies = [ "itertools", "parking_lot", "percent-encoding", - "thiserror", + "thiserror 2.0.12", "tokio", "tracing", "url", @@ -2570,6 +2698,18 @@ dependencies = [ "num-traits", ] +[[package]] +name = "overload" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" + +[[package]] +name = "parking" +version = "2.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba" + [[package]] name = "parking_lot" version = "0.12.4" @@ -2928,8 +3068,17 @@ checksum = "b544ef1b4eac5dc2db33ea63606ae9ffcfac26c1416a2806ae0bf5f56b201191" dependencies = [ "aho-corasick", "memchr", - "regex-automata", - "regex-syntax", + "regex-automata 0.4.9", + "regex-syntax 0.8.5", +] + +[[package]] +name = "regex-automata" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" +dependencies = [ + "regex-syntax 0.6.29", ] [[package]] @@ -2940,9 +3089,15 @@ checksum = "809e8dc61f6de73b46c85f4c96486310fe304c434cfa43669d7b40f711150908" dependencies = [ "aho-corasick", "memchr", - "regex-syntax", + "regex-syntax 0.8.5", ] +[[package]] +name = "regex-syntax" +version = "0.6.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" + [[package]] name = "regex-syntax" version = "0.8.5" @@ -3012,6 +3167,12 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "scoped-tls" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1cf6437eb19a8f4a6cc0f7dca544973b0b78843adbfeb3683d1a94a0024a294" + [[package]] name = "scopeguard" version = "1.2.0" @@ -3073,6 +3234,15 @@ dependencies = [ "digest", ] +[[package]] +name = "sharded-slab" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" +dependencies = [ + "lazy_static", +] + [[package]] name = "shlex" version = "1.3.0" @@ -3266,6 +3436,12 @@ dependencies = [ "syn 2.0.104", ] +[[package]] +name = "tagptr" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417" + [[package]] name = "tempfile" version = "3.20.0" @@ -3288,13 +3464,33 @@ dependencies = [ "unicode-width 0.1.14", ] +[[package]] +name = "thiserror" +version = "1.0.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52" +dependencies = [ + "thiserror-impl 1.0.69", +] + [[package]] name = "thiserror" version = "2.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "567b8a2dae586314f7be2a752ec7474332959c6460e02bde30d702a66d488708" dependencies = [ - "thiserror-impl", + "thiserror-impl 2.0.12", +] + +[[package]] +name = "thiserror-impl" +version = "1.0.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.104", ] [[package]] @@ -3308,6 +3504,15 @@ dependencies = [ "syn 2.0.104", ] +[[package]] +name = "thread_local" +version = "1.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f60246a4944f24f6e018aa17cdeffb7818b76356965d03b07d6a9886e8962185" +dependencies = [ + "cfg-if", +] + [[package]] name = "thrift" version = "0.17.0" @@ -3498,6 +3703,36 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b9d12581f227e93f094d3af2ae690a574abb8a2b9b7a96e7cfe9647b2b617678" dependencies = [ "once_cell", + "valuable", +] + +[[package]] +name = "tracing-log" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" +dependencies = [ + "log", + "once_cell", + "tracing-core", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008" +dependencies = [ + "matchers", + "nu-ansi-term", + "once_cell", + "regex", + "sharded-slab", + "smallvec", + "thread_local", + "tracing", + "tracing-core", + "tracing-log", ] [[package]] @@ -3582,6 +3817,12 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "valuable" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" + [[package]] name = "vec_map" version = "0.8.2" @@ -3750,6 +3991,28 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows" +version = "0.61.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9babd3a767a4c1aef6900409f85f5d53ce2544ccdfaa86dad48c91782c6d6893" +dependencies = [ + "windows-collections", + "windows-core", + "windows-future", + "windows-link", + "windows-numerics", +] + +[[package]] +name = "windows-collections" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3beeceb5e5cfd9eb1d76b381630e82c4241ccd0d27f1a39ed41b2760b255c5e8" +dependencies = [ + "windows-core", +] + [[package]] name = "windows-core" version = "0.61.2" @@ -3763,6 +4026,17 @@ dependencies = [ "windows-strings", ] +[[package]] +name = "windows-future" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc6a41e98427b19fe4b73c550f060b59fa592d7d686537eebf9385621bfbad8e" +dependencies = [ + "windows-core", + "windows-link", + "windows-threading", +] + [[package]] name = "windows-implement" version = "0.60.0" @@ -3791,6 +4065,16 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5e6ad25900d524eaabdbbb96d20b4311e1e7ae1699af4fb28c17ae66c80d798a" +[[package]] +name = "windows-numerics" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9150af68066c4c5c07ddc0ce30421554771e528bde427614c61038bc2c92c2b1" +dependencies = [ + "windows-core", + "windows-link", +] + [[package]] name = "windows-result" version = "0.3.4" @@ -3868,6 +4152,15 @@ dependencies = [ "windows_x86_64_msvc 0.53.0", ] +[[package]] +name = "windows-threading" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b66463ad2e0ea3bbf808b7f1d371311c80e115c0b71d60efc142cafbcfb057a6" +dependencies = [ + "windows-link", +] + [[package]] name = "windows_aarch64_gnullvm" version = "0.52.6" diff --git a/Cargo.toml b/Cargo.toml index 0235c81..0431d4d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,6 +27,7 @@ url = "2.5.4" uuid = "1.17.0" delegate = "0.13.4" dashmap = "6.1.0" +moka = { version = "0.12", features = ["future"] } prost = "0.13.5" rand = "0.8.5" object_store = "0.12.3" diff --git a/src/common/ttl_map.rs b/src/common/ttl_map.rs index d11777b..12ff7e7 100644 --- a/src/common/ttl_map.rs +++ b/src/common/ttl_map.rs @@ -1,19 +1,5 @@ /* -TTLMap is a DashMap that automatically removes entries after a specified time-to-live (TTL). - -How the Time Wheel Works - -Time Buckets: [0] [1] [2] [3] [4] [5] [6] [7] ... -Current Time: ^ - | - time % buckets.len() - -When inserting key "A" at time=2: -- Key "A" goes into bucket[(2-1) % 8] = bucket[1] -- Key "A" will be expired when time advances to bucket[1] again - -Generally, keys in a bucket expire when the wheel makes a full rotation, making -the total TTL equal to the tick duration * buckets.len(). +TTLMap is a Moka-based cache that automatically removes entries after a specified time-to-live (TTL). Usage ```rust @@ -21,47 +7,20 @@ let params = TTLMapParams { tick: Duration::from_secs(30), ttl: Duration::from_m let ttl_map = TTLMap::new(params).await.unwrap(); let value = ttl_map.get_or_init(key, || initial_value).await; ``` - -TODO: If an existing entry is accessed, we don't extend its TTL. It's unclear if this is -necessary for any use cases. This functionality could be added if needed. */ -use dashmap::{DashMap, Entry}; use datafusion::error::DataFusionError; -use std::collections::HashSet; +use moka::future::Cache; use std::hash::Hash; -use std::sync::atomic::Ordering::Relaxed; -use std::sync::atomic::{AtomicU64, AtomicUsize}; -use std::sync::Arc; use std::time::Duration; -use tokio::sync::Mutex; // TTLMap is a key-value store that automatically removes entries after a specified time-to-live. pub struct TTLMap { - /// Time wheel buckets containing keys to be expired. Each bucket epresents - /// a time slot. Keys in bucket[i] will be expired when time % buckets.len() == i - buckets: Arc>>>, - - /// The actual key-value storage using DashMap for concurrent access - data: Arc>, - - /// Atomic counter tracking the current time slot for the time wheel. - /// Incremented by the background GC task every `tick` duration. - time: Arc, - - /// Background task handle for the garbage collection process. - /// When dropped, the GC task is automatically aborted. - _task: Option>, - - // grandularity of the time wheel. How often a bucket is cleared. - tick: Duration, - - dash_map_lock_contention_time: AtomicUsize, - mutex_lock_contention_time: AtomicUsize, + /// The Moka cache with TTL functionality + cache: Cache, } pub struct TTLMapParams { - // tick is how often the map is checks for expired entries - // must be less than ttl + // tick is ignored when using Moka (kept for API compatibility) pub tick: Duration, // ttl is the time-to-live for entries pub ttl: Duration, @@ -88,39 +47,17 @@ where "tick duration must be less than or equal to ttl duration".to_string(), )); } - let mut map = Self::_new(params.tick, params.ttl).await; - map._start_gc(); - Ok(map) + let cache = Cache::builder() + .time_to_live(params.ttl) + .build(); + Ok(Self { cache }) } - async fn _new(tick: Duration, ttl: Duration) -> Self { - let bucket_count = (ttl.as_nanos() / tick.as_nanos()) as usize; - let mut buckets = Vec::with_capacity(bucket_count); - for _ in 0..bucket_count { - buckets.push(HashSet::new()); - } - let stage_targets = Arc::new(DashMap::new()); - let time_wheel = Arc::new(Mutex::new(buckets)); - let time = Arc::new(AtomicU64::new(0)); - Self { - buckets: time_wheel, - data: stage_targets, - time, - _task: None, - tick, - dash_map_lock_contention_time: AtomicUsize::new(0), - mutex_lock_contention_time: AtomicUsize::new(0), - } - } - - // Start and set the background GC task. - fn _start_gc(&mut self) { - self._task = Some(tokio::spawn(Self::run_gc_loop( - self.data.clone(), - self.buckets.clone(), - self.time.clone(), - self.tick, - ))) + async fn _new(_tick: Duration, ttl: Duration) -> Self { + let cache = Cache::builder() + .time_to_live(ttl) + .build(); + Self { cache } } /// get_or_default executes the provided closure with a reference to the map entry for the given key. @@ -129,80 +66,15 @@ where where F: FnOnce() -> V, { - let mut new_entry = false; - let start = std::time::Instant::now(); - let value = match self.data.entry(key.clone()) { - Entry::Vacant(entry) => { - let value = f(); - entry.insert(value.clone()); - new_entry = true; - value - } - Entry::Occupied(entry) => entry.get().clone(), - }; - self.dash_map_lock_contention_time - .fetch_add(start.elapsed().as_nanos() as usize, Relaxed); - - // Insert the key into the previous bucket, meaning the key will be evicted - // when the wheel completes a full rotation. - if new_entry { - let time = self.time.load(std::sync::atomic::Ordering::SeqCst); - { - let start = std::time::Instant::now(); - let mut buckets = self.buckets.lock().await; - self.mutex_lock_contention_time - .fetch_add(start.elapsed().as_nanos() as usize, Relaxed); - let bucket_index = (time.wrapping_sub(1)) % buckets.len() as u64; - buckets[bucket_index as usize].insert(key); - } - } - - value - } - - /// run_gc_loop will continuously clear expired entries from the map, checking every `period`. The - /// function terminates if `shutdown` is signalled. - async fn run_gc_loop( - map: Arc>, - time_wheel: Arc>>>, - time: Arc, - period: Duration, - ) { - loop { - Self::gc(map.clone(), time_wheel.clone(), time.clone()).await; - tokio::time::sleep(period).await; - } + self.cache.get_with(key, async move { f() }).await } - /// gc clears expired entries from the map and advances time by 1. - async fn gc( - map: Arc>, - time_wheel: Arc>>>, - time: Arc, - ) { - let keys = { - let mut guard = time_wheel.lock().await; - let len = guard.len(); - let index = time.load(std::sync::atomic::Ordering::SeqCst) % len as u64; - // Replace the HashSet at the index with an empty one and return the original - std::mem::replace(&mut guard[index as usize], HashSet::new()) - }; - - // Remove expired keys from the map. - // TODO: it may be worth exploring if we can group keys by shard and do a batched - // remove. - for key in keys { - map.remove(&key); - } - // May wrap. - time.fetch_add(1, std::sync::atomic::Ordering::SeqCst); - } } #[cfg(test)] mod tests { use super::*; - use std::sync::atomic::Ordering; + use std::sync::Arc; use tokio::time::{sleep, Duration}; #[tokio::test] @@ -217,78 +89,57 @@ mod tests { } #[tokio::test] - async fn test_time_wheel_bucket_calculation() { + async fn test_moka_cache_created() { let ttl_map = TTLMap::::_new(Duration::from_millis(100), Duration::from_secs(1)).await; - // With 1s TTL and 100ms tick, we should have 10 buckets - assert_eq!(ttl_map.buckets.lock().await.len(), 10); + // Verify that the cache is properly initialized + assert_eq!(ttl_map.cache.entry_count(), 0); } #[tokio::test] async fn test_gc_expiration() { let ttl_map = - TTLMap::::_new(Duration::from_millis(100), Duration::from_secs(1)).await; + TTLMap::::_new(Duration::from_millis(50), Duration::from_millis(100)).await; // Initial batch of entries ttl_map.get_or_init("key1".to_string(), || 42).await; ttl_map.get_or_init("key2".to_string(), || 84).await; - assert_eq!(ttl_map.data.len(), 2); - - // Run partial GC cycles (should not expire yet) - for _ in 0..5 { - TTLMap::gc( - ttl_map.data.clone(), - ttl_map.buckets.clone(), - ttl_map.time.clone(), - ) - .await; - } - assert_eq!(ttl_map.data.len(), 2); // Still there - - // Add more entries mid-cycle + + // Verify entries exist by checking values + let value1 = ttl_map.get_or_init("key1".to_string(), || 999).await; + assert_eq!(value1, 42); + let value2 = ttl_map.get_or_init("key2".to_string(), || 999).await; + assert_eq!(value2, 84); + + // Add more entries ttl_map.get_or_init("key3".to_string(), || 168).await; ttl_map.get_or_init("key4".to_string(), || 0).await; // Default value (0) ttl_map.get_or_init("key5".to_string(), || 210).await; - assert_eq!(ttl_map.data.len(), 5); // Verify default value was set - let default_value = ttl_map.get_or_init("key4".to_string(), || 0).await; + let default_value = ttl_map.get_or_init("key4".to_string(), || 999).await; assert_eq!(default_value, 0); - // Complete the first rotation to expire initial entries - for _ in 5..10 { - TTLMap::gc( - ttl_map.data.clone(), - ttl_map.buckets.clone(), - ttl_map.time.clone(), - ) - .await; - } - assert_eq!(ttl_map.data.len(), 3); // Initial entries expired, new entries still alive + // Wait for TTL to expire entries + sleep(Duration::from_millis(150)).await; + + // Run maintenance to clean up expired entries + ttl_map.cache.run_pending_tasks().await; - // Add entries after expiration + // Verify entries are expired by trying to get with different default values + let expired_value1 = ttl_map.get_or_init("key1".to_string(), || 777).await; + assert_eq!(expired_value1, 777); // Should get new default value, not cached 42 + + // Add new entries after expiration ttl_map.get_or_init("new_key1".to_string(), || 999).await; ttl_map.get_or_init("new_key2".to_string(), || 0).await; // Default value - assert_eq!(ttl_map.data.len(), 5); // 3 from mid-cycle + 2 new ones // Verify values let value1 = ttl_map.get_or_init("new_key1".to_string(), || 0).await; assert_eq!(value1, 999); let value2 = ttl_map.get_or_init("new_key2".to_string(), || 0).await; assert_eq!(value2, 0); - - // Run additional GC cycles to expire remaining entries - // Mid-cycle entries (bucket 4) expire at time=14, late entries (bucket 9) expire at time=19 - for _ in 10..20 { - TTLMap::gc( - ttl_map.data.clone(), - ttl_map.buckets.clone(), - ttl_map.time.clone(), - ) - .await; - } - assert_eq!(ttl_map.data.len(), 0); // All entries expired } #[tokio::test] @@ -300,8 +151,6 @@ mod tests { .await .unwrap(); - assert!(ttl_map._task.is_some()); - let ttl_map = Arc::new(ttl_map); // Spawn 5 concurrent tasks @@ -325,32 +174,26 @@ mod tests { } #[tokio::test] - async fn test_wraparound_time() { + async fn test_basic_ttl_behavior() { let ttl_map = TTLMap::::_new( Duration::from_millis(10), - Duration::from_millis(20), // 2 buckets + Duration::from_millis(50), ) .await; - // Manually set time near overflow - ttl_map.time.store(u64::MAX - 2, Ordering::SeqCst); - ttl_map.get_or_init("test_key".to_string(), || 999).await; - - // Run GC to cause time wraparound - for _ in 0..5 { - TTLMap::gc( - ttl_map.data.clone(), - ttl_map.buckets.clone(), - ttl_map.time.clone(), - ) - .await; - } - - // Entry should be expired and time should have wrapped - assert_eq!(ttl_map.data.len(), 0); - let final_time = ttl_map.time.load(Ordering::SeqCst); - assert!(final_time < 100); + + // Verify entry exists + let value = ttl_map.get_or_init("test_key".to_string(), || 111).await; + assert_eq!(value, 999); + + // Wait for expiration and run maintenance + sleep(Duration::from_millis(60)).await; + ttl_map.cache.run_pending_tasks().await; + + // Entry should be expired - trying to get it should return new default + let expired_value = ttl_map.get_or_init("test_key".to_string(), || 111).await; + assert_eq!(expired_value, 111); } #[tokio::test(flavor = "multi_thread", worker_threads = 16)] @@ -358,8 +201,8 @@ mod tests { use std::time::Instant; let ttl_map = TTLMap::::new(TTLMapParams { - tick: Duration::from_micros(1), - ttl: Duration::from_micros(2), + tick: Duration::from_secs(1), + ttl: Duration::from_secs(60), }) .await .unwrap(); @@ -369,12 +212,11 @@ mod tests { let start_time = Instant::now(); let task_count = 100_000; - // Spawn 10 tasks that repeatedly read the same keys + // Spawn tasks that repeatedly access random keys let mut handles = Vec::new(); for task_id in 0..task_count { let map = Arc::clone(&ttl_map); let handle = tokio::spawn(async move { - // All tasks fight for the same keys - maximum contention let start = Instant::now(); let _value = map.get_or_init(rand::random(), || task_id * 1000).await; start.elapsed().as_nanos() @@ -391,18 +233,10 @@ mod tests { let elapsed = start_time.elapsed(); - println!("\n=== TTLMap Lock Contention Benchmark ==="); + println!("\n=== TTLMap Moka Benchmark ==="); println!("Tasks: {}", task_count); println!("Total time: {:.2?}", elapsed); println!("Average latency: {:.2} μs per operation", avg_time / 1_000); - println!("Entries remaining: {}", ttl_map.data.len()); - println!( - "DashMap Lock contention time: {}ms", - ttl_map.dash_map_lock_contention_time.load(Ordering::SeqCst) / 1_000_000 - ); - println!( - "Mutex Lock contention time: {}ms", - ttl_map.mutex_lock_contention_time.load(Ordering::SeqCst) / 1_000_000 - ); + println!("Throughput: {:.2} ops/sec", task_count as f64 / elapsed.as_secs_f64()); } }