Skip to content

Conversation

@jayshrivastava
Copy link
Collaborator

@jayshrivastava jayshrivastava commented Aug 13, 2025

This change adds a DashMap-like struct which has a background tasks to clean
up entries that have outlived a configurable TTL.

This data structure is added so it can be used in the ArrowFlightEndpoint, where it's
possible for ExecutionStages to be orphaned due to errors. This change adds the ability to
clean up tasks async.

The implementation is simliar to https://github.com/moka-rs/moka, which also uses
time wheels. Benchmarks show that this implementation has less overhead per operation
than moka (see results #99).

=== TTLMap Moka Benchmark ===
Tasks: 100000
Total time: 162.53ms
Average latency: 45 μs per operation
Throughput: 615257.30 ops/sec

=== TTLMap Lock Contention Benchmark ===
Tasks: 100000
Total time: 137.07ms
Average latency: 985 ns per operation
Entries remaining: 0
DashMap Lock contention time: 21ms
Accounting time: 47ms

There's also an implementation in #92, which
has the worst performance by far.

Tasks: 100000
Total time: 105.65ms
Average latency: 20453 μs per operation
Entries remaining: 0
DashMap Lock contention time: 23ms
Mutex Lock contention time: 2045251ms

Informs: #90

This was referenced Aug 15, 2025
@jayshrivastava jayshrivastava force-pushed the js/experimental-no-global-lock branch 2 times, most recently from de359e3 to 9cc8d5d Compare August 17, 2025 17:08
@jayshrivastava jayshrivastava changed the title Js/experimental no global lock Create TTL map with time wheel architecture Aug 17, 2025
@jayshrivastava jayshrivastava marked this pull request as ready for review August 17, 2025 17:09
Copy link
Collaborator

@gabotechs gabotechs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks really good! left some comments, but the approach LGTM

Comment on lines 24 to 25
// TODO: If an existing entry is accessed, we don't extend its TTL. It's unclear if this is
// necessary for any use cases. This functionality could be added if needed.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can imagine this being useful, but I think can be done in a follow up if we find it necessary.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I filed #101. Can look into this in a follow up PR!

Comment on lines 100 to 101
// TODO: it may be worth exploring if we can group keys by shard and do a batched
// remove.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be nice, but I don't see dashmap having any API for batch deleting a list of keys. I think it should be fine though.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed comment

// necessary for any use cases. This functionality could be added if needed.
use dashmap::{DashMap, Entry};
use datafusion::error::DataFusionError;
use futures::SinkExt;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this use futures::SinkExt; import seems unused, maybe remove it?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

fn gc(time: Arc<AtomicU64>, buckets: &Vec<Bucket<K>>) {
let index = time.load(std::sync::atomic::Ordering::SeqCst) % buckets.len() as u64;
// TODO: error
buckets[index as usize].clear().unwrap();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any chance to remove this .unwrap() in favor of something else? ideally we should not panic intentionally.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. I think we can ignore the error since the we never close the rx. If the rx is dropped, it means the tx is being dropped too.

@jayshrivastava jayshrivastava force-pushed the js/experimental-no-global-lock branch from 9cc8d5d to be4da5f Compare August 18, 2025 18:51
@jayshrivastava
Copy link
Collaborator Author

@gabotechs Thanks for the review. I appreciate your tips! I've just pushed an update to address the comments

@jayshrivastava jayshrivastava force-pushed the js/experimental-no-global-lock branch from be4da5f to 39ab9cc Compare August 18, 2025 18:56
//!
//! Usage
//!
//! let config = TTLMapConfig { tick: Duration::from_secs(30), ttl: Duration::from_mins(5) };
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the ``` here because it was causing compile errors in CI

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Rust, doctests are expected to be compilable and runnable, I can think of two more elegant ways exposing this doc comment:

  1. marking the common module visible to the outside only if cfg(doctest) or something like that (I actually don't know how to do that), and make the code in the doc test actually compile:
//! ```rust
//! # use std::time::Duration;
//! # use datafusion_distributed::common::ttl_map::{TTLMapConfig, TTLMap};
//! # async fn main() {
//!
//! let config = TTLMapConfig { tick: Duration::from_secs(30), ttl: Duration::from_secs(5 * 60) };
//! let ttl_map = TTLMap::new(config).await.unwrap();
//! let value = ttl_map.get_or_init("foo", || "bar").await;
//! # }
//! ```
  1. explicitly tell the Rust compiler to ignore it:
//! ```ignore
//! let config = TTLMapConfig { tick: Duration::from_secs(30), ttl: Duration::from_mins(5) };
//! let ttl_map = TTLMap::new(config).await.unwrap();
//! let value = ttl_map.get_or_init(key, || initial_value).await;
//! ```

Probably the second solution is good enough, and it will still display nicely in doc generators

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried a few iterations of this, but couldn't quite get it working. I went with ignore instead.

#[cfg(doc)]
pub mod common;
#[cfg(not(doc))]
mod common;

Copy link
Collaborator

@gabotechs gabotechs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Getting really close! My main suggestions is about making the API synchronous, otherwise it's looking good

//!
//! Usage
//!
//! let config = TTLMapConfig { tick: Duration::from_secs(30), ttl: Duration::from_mins(5) };
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Rust, doctests are expected to be compilable and runnable, I can think of two more elegant ways exposing this doc comment:

  1. marking the common module visible to the outside only if cfg(doctest) or something like that (I actually don't know how to do that), and make the code in the doc test actually compile:
//! ```rust
//! # use std::time::Duration;
//! # use datafusion_distributed::common::ttl_map::{TTLMapConfig, TTLMap};
//! # async fn main() {
//!
//! let config = TTLMapConfig { tick: Duration::from_secs(30), ttl: Duration::from_secs(5 * 60) };
//! let ttl_map = TTLMap::new(config).await.unwrap();
//! let value = ttl_map.get_or_init("foo", || "bar").await;
//! # }
//! ```
  1. explicitly tell the Rust compiler to ignore it:
//! ```ignore
//! let config = TTLMapConfig { tick: Duration::from_secs(30), ttl: Duration::from_mins(5) };
//! let ttl_map = TTLMap::new(config).await.unwrap();
//! let value = ttl_map.get_or_init(key, || initial_value).await;
//! ```

Probably the second solution is good enough, and it will still display nicely in doc generators

This change adds a DashMap-like struct which has a background tasks to clean
up entries that have outlived a configurable TTL.

This data structure is added so it can be used in the `ArrowFlightEndpoint`, where it's
possible for `ExecutionStages` to be orphaned due to errors. This change adds the ability to
clean up tasks async.

The implementation is simliar to https://github.com/moka-rs/moka, which also uses
time wheels. Benchmarks show that this implementation has less overhead per operation
than moka (see results #99).
```
=== TTLMap Moka Benchmark ===
Tasks: 100000
Total time: 162.53ms
Average latency: 45 μs per operation
Throughput: 615257.30 ops/sec

=== TTLMap Lock Contention Benchmark ===
Tasks: 100000
Total time: 137.07ms
Average latency: 985 ns per operation
Entries remaining: 0
DashMap Lock contention time: 21ms
Accounting time: 47ms
```

There's also an implementation in #92, which
has the worst performance by far.
```
Tasks: 100000
Total time: 105.65ms
Average latency: 20453 μs per operation
Entries remaining: 0
DashMap Lock contention time: 23ms
Mutex Lock contention time: 2045251ms
```

Informs: #90
@jayshrivastava jayshrivastava force-pushed the js/experimental-no-global-lock branch from 39ab9cc to 6a0a0fd Compare August 19, 2025 14:25
Copy link
Collaborator

@gabotechs gabotechs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💯

@gabotechs gabotechs merged commit daa6843 into main Aug 19, 2025
3 checks passed
@gabotechs gabotechs deleted the js/experimental-no-global-lock branch August 19, 2025 15:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants