diff --git a/Cargo.lock b/Cargo.lock index 84e8d02..5130d5e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -384,7 +384,7 @@ dependencies = [ "memchr", "num", "regex", - "regex-syntax", + "regex-syntax 0.8.5", ] [[package]] @@ -404,6 +404,17 @@ dependencies = [ "zstd-safe", ] +[[package]] +name = "async-lock" +version = "3.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5fd03604047cee9b6ce9de9f70c6cd540a0520c813cbd49bae61f33ab80ed1dc" +dependencies = [ + "event-listener", + "event-listener-strategy", + "pin-project-lite", +] + [[package]] name = "async-stream" version = "0.3.6" @@ -730,6 +741,15 @@ dependencies = [ "unicode-width 0.2.1", ] +[[package]] +name = "concurrent-queue" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "console" version = "0.15.11" @@ -792,6 +812,24 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossbeam-channel" +version = "0.5.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "82b8f8f868b36967f9606790d1903570de9ceaf870a7bf9fbbd3016d636a2cb2" +dependencies = [ + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.21" @@ -1123,6 +1161,7 @@ dependencies = [ "http", "insta", "itertools", + "moka", "object_store", "prost", "rand 0.8.5", @@ -1367,7 +1406,7 @@ dependencies = [ "log", "recursive", "regex", - "regex-syntax", + "regex-syntax 0.8.5", ] [[package]] @@ -1626,6 +1665,27 @@ dependencies = [ "windows-sys 0.60.2", ] +[[package]] +name = "event-listener" +version = "5.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e13b66accf52311f30a0db42147dadea9850cb48cd070028831ae5f5d4b856ab" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", +] + +[[package]] +name = "event-listener-strategy" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8be9f3dfaaffdae2972880079a491a1a8bb7cbed0b8dd7a347f668b4150a3b93" +dependencies = [ + "event-listener", + "pin-project-lite", +] + [[package]] name = "fastrand" version = "2.3.0" @@ -1769,6 +1829,20 @@ dependencies = [ "slab", ] +[[package]] +name = "generator" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d18470a76cb7f8ff746cf1f7470914f900252ec36bbc40b569d74b1258446827" +dependencies = [ + "cc", + "cfg-if", + "libc", + "log", + "rustversion", + "windows", +] + [[package]] name = "generic-array" version = "0.14.7" @@ -2374,6 +2448,19 @@ version = "0.4.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "13dc2df351e3202783a1fe0d44375f7295ffb4049267b0f3018346dc122a1d94" +[[package]] +name = "loom" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "419e0dc8046cb947daa77eb95ae174acfbddb7673b4151f56d1eed8e93fbfaca" +dependencies = [ + "cfg-if", + "generator", + "scoped-tls", + "tracing", + "tracing-subscriber", +] + [[package]] name = "lz4_flex" version = "0.11.5" @@ -2394,6 +2481,15 @@ dependencies = [ "pkg-config", ] +[[package]] +name = "matchers" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" +dependencies = [ + "regex-automata 0.1.10", +] + [[package]] name = "matchit" version = "0.7.3" @@ -2442,6 +2538,38 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "moka" +version = "0.12.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9321642ca94a4282428e6ea4af8cc2ca4eac48ac7a6a4ea8f33f76d0ce70926" +dependencies = [ + "async-lock", + "crossbeam-channel", + "crossbeam-epoch", + "crossbeam-utils", + "event-listener", + "futures-util", + "loom", + "parking_lot", + "portable-atomic", + "rustc_version", + "smallvec", + "tagptr", + "thiserror 1.0.69", + "uuid", +] + +[[package]] +name = "nu-ansi-term" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" +dependencies = [ + "overload", + "winapi", +] + [[package]] name = "num" version = "0.4.3" @@ -2540,7 +2668,7 @@ dependencies = [ "itertools", "parking_lot", "percent-encoding", - "thiserror", + "thiserror 2.0.12", "tokio", "tracing", "url", @@ -2570,6 +2698,18 @@ dependencies = [ "num-traits", ] +[[package]] +name = "overload" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" + +[[package]] +name = "parking" +version = "2.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba" + [[package]] name = "parking_lot" version = "0.12.4" @@ -2928,8 +3068,17 @@ checksum = "b544ef1b4eac5dc2db33ea63606ae9ffcfac26c1416a2806ae0bf5f56b201191" dependencies = [ "aho-corasick", "memchr", - "regex-automata", - "regex-syntax", + "regex-automata 0.4.9", + "regex-syntax 0.8.5", +] + +[[package]] +name = "regex-automata" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" +dependencies = [ + "regex-syntax 0.6.29", ] [[package]] @@ -2940,9 +3089,15 @@ checksum = "809e8dc61f6de73b46c85f4c96486310fe304c434cfa43669d7b40f711150908" dependencies = [ "aho-corasick", "memchr", - "regex-syntax", + "regex-syntax 0.8.5", ] +[[package]] +name = "regex-syntax" +version = "0.6.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" + [[package]] name = "regex-syntax" version = "0.8.5" @@ -3012,6 +3167,12 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "scoped-tls" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1cf6437eb19a8f4a6cc0f7dca544973b0b78843adbfeb3683d1a94a0024a294" + [[package]] name = "scopeguard" version = "1.2.0" @@ -3073,6 +3234,15 @@ dependencies = [ "digest", ] +[[package]] +name = "sharded-slab" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" +dependencies = [ + "lazy_static", +] + [[package]] name = "shlex" version = "1.3.0" @@ -3266,6 +3436,12 @@ dependencies = [ "syn 2.0.104", ] +[[package]] +name = "tagptr" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417" + [[package]] name = "tempfile" version = "3.20.0" @@ -3288,13 +3464,33 @@ dependencies = [ "unicode-width 0.1.14", ] +[[package]] +name = "thiserror" +version = "1.0.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52" +dependencies = [ + "thiserror-impl 1.0.69", +] + [[package]] name = "thiserror" version = "2.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "567b8a2dae586314f7be2a752ec7474332959c6460e02bde30d702a66d488708" dependencies = [ - "thiserror-impl", + "thiserror-impl 2.0.12", +] + +[[package]] +name = "thiserror-impl" +version = "1.0.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.104", ] [[package]] @@ -3308,6 +3504,15 @@ dependencies = [ "syn 2.0.104", ] +[[package]] +name = "thread_local" +version = "1.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f60246a4944f24f6e018aa17cdeffb7818b76356965d03b07d6a9886e8962185" +dependencies = [ + "cfg-if", +] + [[package]] name = "thrift" version = "0.17.0" @@ -3498,6 +3703,36 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b9d12581f227e93f094d3af2ae690a574abb8a2b9b7a96e7cfe9647b2b617678" dependencies = [ "once_cell", + "valuable", +] + +[[package]] +name = "tracing-log" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" +dependencies = [ + "log", + "once_cell", + "tracing-core", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008" +dependencies = [ + "matchers", + "nu-ansi-term", + "once_cell", + "regex", + "sharded-slab", + "smallvec", + "thread_local", + "tracing", + "tracing-core", + "tracing-log", ] [[package]] @@ -3582,6 +3817,12 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "valuable" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" + [[package]] name = "vec_map" version = "0.8.2" @@ -3750,6 +3991,28 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows" +version = "0.61.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9babd3a767a4c1aef6900409f85f5d53ce2544ccdfaa86dad48c91782c6d6893" +dependencies = [ + "windows-collections", + "windows-core", + "windows-future", + "windows-link", + "windows-numerics", +] + +[[package]] +name = "windows-collections" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3beeceb5e5cfd9eb1d76b381630e82c4241ccd0d27f1a39ed41b2760b255c5e8" +dependencies = [ + "windows-core", +] + [[package]] name = "windows-core" version = "0.61.2" @@ -3763,6 +4026,17 @@ dependencies = [ "windows-strings", ] +[[package]] +name = "windows-future" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc6a41e98427b19fe4b73c550f060b59fa592d7d686537eebf9385621bfbad8e" +dependencies = [ + "windows-core", + "windows-link", + "windows-threading", +] + [[package]] name = "windows-implement" version = "0.60.0" @@ -3791,6 +4065,16 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5e6ad25900d524eaabdbbb96d20b4311e1e7ae1699af4fb28c17ae66c80d798a" +[[package]] +name = "windows-numerics" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9150af68066c4c5c07ddc0ce30421554771e528bde427614c61038bc2c92c2b1" +dependencies = [ + "windows-core", + "windows-link", +] + [[package]] name = "windows-result" version = "0.3.4" @@ -3868,6 +4152,15 @@ dependencies = [ "windows_x86_64_msvc 0.53.0", ] +[[package]] +name = "windows-threading" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b66463ad2e0ea3bbf808b7f1d371311c80e115c0b71d60efc142cafbcfb057a6" +dependencies = [ + "windows-link", +] + [[package]] name = "windows_aarch64_gnullvm" version = "0.52.6" diff --git a/Cargo.toml b/Cargo.toml index 0235c81..0431d4d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,6 +27,7 @@ url = "2.5.4" uuid = "1.17.0" delegate = "0.13.4" dashmap = "6.1.0" +moka = { version = "0.12", features = ["future"] } prost = "0.13.5" rand = "0.8.5" object_store = "0.12.3" diff --git a/src/common/mod.rs b/src/common/mod.rs index 812d1ed..572b996 100644 --- a/src/common/mod.rs +++ b/src/common/mod.rs @@ -1 +1,2 @@ +pub mod ttl_map; pub mod util; diff --git a/src/common/ttl_map.rs b/src/common/ttl_map.rs new file mode 100644 index 0000000..12ff7e7 --- /dev/null +++ b/src/common/ttl_map.rs @@ -0,0 +1,242 @@ +/* +TTLMap is a Moka-based cache that automatically removes entries after a specified time-to-live (TTL). + +Usage +```rust +let params = TTLMapParams { tick: Duration::from_secs(30), ttl: Duration::from_mins(5) }; +let ttl_map = TTLMap::new(params).await.unwrap(); +let value = ttl_map.get_or_init(key, || initial_value).await; +``` + */ +use datafusion::error::DataFusionError; +use moka::future::Cache; +use std::hash::Hash; +use std::time::Duration; + +// TTLMap is a key-value store that automatically removes entries after a specified time-to-live. +pub struct TTLMap { + /// The Moka cache with TTL functionality + cache: Cache, +} + +pub struct TTLMapParams { + // tick is ignored when using Moka (kept for API compatibility) + pub tick: Duration, + // ttl is the time-to-live for entries + pub ttl: Duration, +} + +impl Default for TTLMapParams { + fn default() -> Self { + Self { + tick: Duration::from_secs(3), + ttl: Duration::from_secs(60), + } + } +} + +impl TTLMap +where + K: Eq + Hash + Send + Sync + Clone + 'static, + V: Default + Clone + Send + Sync + 'static, +{ + // new creates a new TTLMap. + pub async fn new(params: TTLMapParams) -> Result { + if params.tick > params.ttl { + return Err(DataFusionError::Configuration( + "tick duration must be less than or equal to ttl duration".to_string(), + )); + } + let cache = Cache::builder() + .time_to_live(params.ttl) + .build(); + Ok(Self { cache }) + } + + async fn _new(_tick: Duration, ttl: Duration) -> Self { + let cache = Cache::builder() + .time_to_live(ttl) + .build(); + Self { cache } + } + + /// get_or_default executes the provided closure with a reference to the map entry for the given key. + /// If the key does not exist, it inserts a new entry with the default value. + pub async fn get_or_init(&self, key: K, f: F) -> V + where + F: FnOnce() -> V, + { + self.cache.get_with(key, async move { f() }).await + } + +} + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::Arc; + use tokio::time::{sleep, Duration}; + + #[tokio::test] + async fn test_basic_insert_and_get() { + let ttl_map = + TTLMap::::_new(Duration::from_millis(100), Duration::from_secs(1)).await; + + ttl_map.get_or_init("key1".to_string(), || 42).await; + + let value = ttl_map.get_or_init("key1".to_string(), || 0).await; + assert_eq!(value, 42); + } + + #[tokio::test] + async fn test_moka_cache_created() { + let ttl_map = + TTLMap::::_new(Duration::from_millis(100), Duration::from_secs(1)).await; + + // Verify that the cache is properly initialized + assert_eq!(ttl_map.cache.entry_count(), 0); + } + + #[tokio::test] + async fn test_gc_expiration() { + let ttl_map = + TTLMap::::_new(Duration::from_millis(50), Duration::from_millis(100)).await; + + // Initial batch of entries + ttl_map.get_or_init("key1".to_string(), || 42).await; + ttl_map.get_or_init("key2".to_string(), || 84).await; + + // Verify entries exist by checking values + let value1 = ttl_map.get_or_init("key1".to_string(), || 999).await; + assert_eq!(value1, 42); + let value2 = ttl_map.get_or_init("key2".to_string(), || 999).await; + assert_eq!(value2, 84); + + // Add more entries + ttl_map.get_or_init("key3".to_string(), || 168).await; + ttl_map.get_or_init("key4".to_string(), || 0).await; // Default value (0) + ttl_map.get_or_init("key5".to_string(), || 210).await; + + // Verify default value was set + let default_value = ttl_map.get_or_init("key4".to_string(), || 999).await; + assert_eq!(default_value, 0); + + // Wait for TTL to expire entries + sleep(Duration::from_millis(150)).await; + + // Run maintenance to clean up expired entries + ttl_map.cache.run_pending_tasks().await; + + // Verify entries are expired by trying to get with different default values + let expired_value1 = ttl_map.get_or_init("key1".to_string(), || 777).await; + assert_eq!(expired_value1, 777); // Should get new default value, not cached 42 + + // Add new entries after expiration + ttl_map.get_or_init("new_key1".to_string(), || 999).await; + ttl_map.get_or_init("new_key2".to_string(), || 0).await; // Default value + + // Verify values + let value1 = ttl_map.get_or_init("new_key1".to_string(), || 0).await; + assert_eq!(value1, 999); + let value2 = ttl_map.get_or_init("new_key2".to_string(), || 0).await; + assert_eq!(value2, 0); + } + + #[tokio::test] + async fn test_concurrent_gc_and_access() { + let ttl_map = TTLMap::::new(TTLMapParams { + tick: Duration::from_millis(2), + ttl: Duration::from_millis(10), + }) + .await + .unwrap(); + + let ttl_map = Arc::new(ttl_map); + + // Spawn 5 concurrent tasks + let mut handles = Vec::new(); + for task_id in 0..5 { + let map = Arc::clone(&ttl_map); + let handle = tokio::spawn(async move { + for i in 0..20 { + let key = format!("task{}_key{}", task_id, i % 4); + map.get_or_init(key, || task_id * 100 + i).await; + sleep(Duration::from_millis(1)).await; + } + }); + handles.push(handle); + } + + // Wait for all tasks to complete + for handle in handles { + handle.await.unwrap(); + } + } + + #[tokio::test] + async fn test_basic_ttl_behavior() { + let ttl_map = TTLMap::::_new( + Duration::from_millis(10), + Duration::from_millis(50), + ) + .await; + + ttl_map.get_or_init("test_key".to_string(), || 999).await; + + // Verify entry exists + let value = ttl_map.get_or_init("test_key".to_string(), || 111).await; + assert_eq!(value, 999); + + // Wait for expiration and run maintenance + sleep(Duration::from_millis(60)).await; + ttl_map.cache.run_pending_tasks().await; + + // Entry should be expired - trying to get it should return new default + let expired_value = ttl_map.get_or_init("test_key".to_string(), || 111).await; + assert_eq!(expired_value, 111); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 16)] + async fn bench_lock_contention() { + use std::time::Instant; + + let ttl_map = TTLMap::::new(TTLMapParams { + tick: Duration::from_secs(1), + ttl: Duration::from_secs(60), + }) + .await + .unwrap(); + + let ttl_map = Arc::new(ttl_map); + + let start_time = Instant::now(); + let task_count = 100_000; + + // Spawn tasks that repeatedly access random keys + let mut handles = Vec::new(); + for task_id in 0..task_count { + let map = Arc::clone(&ttl_map); + let handle = tokio::spawn(async move { + let start = Instant::now(); + let _value = map.get_or_init(rand::random(), || task_id * 1000).await; + start.elapsed().as_nanos() + }); + handles.push(handle); + } + + // Wait for all tasks and collect operation counts + let mut avg_time = 0; + for handle in handles { + avg_time += handle.await.unwrap(); + } + avg_time /= task_count as u128; + + let elapsed = start_time.elapsed(); + + println!("\n=== TTLMap Moka Benchmark ==="); + println!("Tasks: {}", task_count); + println!("Total time: {:.2?}", elapsed); + println!("Average latency: {:.2} μs per operation", avg_time / 1_000); + println!("Throughput: {:.2} ops/sec", task_count as f64 / elapsed.as_secs_f64()); + } +}