Skip to content

Conversation

@jayshrivastava
Copy link
Collaborator

@jayshrivastava jayshrivastava commented Aug 12, 2025

This change adds a DashMap-like struct which has a background tasks to clean
up entries that have outlived a configurable TTL.

This struct is simliar to https://github.com/moka-rs/moka, which also uses
time wheels. Having our own module avoids introducing a large dependency, which
keeps this project closer to vanilla datafusion.

This change is meant to be useful for #89, where it's
possible for ExecutionStages to be orphaned in ArrowFlightEndpoint. We need an async task to clean up old entries.

Informs: #90

impl<K, V> TTLMap<K, V>
where
K: Eq + Hash + Send + Sync + Clone + 'static,
V: Default + Clone + Send + Sync + 'static,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can explain why V is clone below

value
}
Entry::Occupied(entry) => entry.get().clone(),
};
Copy link
Collaborator Author

@jayshrivastava jayshrivastava Aug 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is tricky.

When you hold an Entry, you are locking the map and the dashmap specifically says **Locking behaviour:** May deadlock if called when holding any sort of reference into the map

I believe holding the lock over any .await point may deadlock because another task may be scheduled and try to access the same entry (Tbh I think this may happen if the other task accesses a different entry on the same shard).

Alternatives considered:

  • we return a reference to V. This is risky because the caller might hold it across an .await and deadlock
  • we pass a closure which operates on V while we hold the lock. This is not ideal because it increases lock contention

Thus, I went with Clone. If the caller wants a shared reference, they can wrap their type in an Arc.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think requiring Clone for the values is a fair ask, and it's completely fine for our use case 👍

@jayshrivastava jayshrivastava marked this pull request as ready for review August 13, 2025 02:27
@jayshrivastava jayshrivastava changed the title Create TTL map with time wheel architecture and tests Create TTL map with time wheel architecture Aug 13, 2025
This change adds a DashMap-like struct which has a background tasks to clean
up entries that have outlived a configurable TTL.

This struct is simliar to https://github.com/moka-rs/moka, which also uses
time wheels. Having our own module avoids introducing a large dependency, which
keeps this project closer to vanilla datafusion.

This change is meant to be useful for #89, where it's
possible for `ExecutionStages` to be orphaned in `ArrowFlightEndpoint`. We need an async task to clean up old entries.

Informs: #90
Comment on lines +1 to +6
/*
TTLMap is a DashMap that automatically removes entries after a specified time-to-live (TTL).

How the Time Wheel Works

Time Buckets: [0] [1] [2] [3] [4] [5] [6] [7] ...
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Rust, module level docs are defined with //!. That way, documentation compilers and IDEs can interpret them as pieces of documentation and display them appropriately.

use std::time::Duration;
use tokio::sync::Mutex;

// TTLMap is a key-value store that automatically removes entries after a specified time-to-live.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably be a doc comment with a triple ///, otherwise IDEs will ignore it while hovering over the struct

value
}
Entry::Occupied(entry) => entry.get().clone(),
};
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think requiring Clone for the values is a fair ask, and it's completely fine for our use case 👍

Comment on lines 141 to 142
{
let mut buckets = self.buckets.lock().await;
Copy link
Collaborator

@gabotechs gabotechs Aug 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 this looks like it's introducing a global lock shared by all the new requests. This means that all new requests will be competing for the same lock, and it's the main thing the dashmap tried to address in the first place.

I think we should be capable of finding a solution that does not imply a global lock. A hint I know is: spawning tokio tasks is almost free. What to do with that hint? no idea yet

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review!

I've attempted an alternative implementation here, using your hint: https://github.com/datafusion-contrib/datafusion-distributed/pull/96/files#diff-835dd4f31132680b017b402166e24979a012bb5f1975c95c34de467706b964d2

  • Each shard in the time wheel has it's own task and KeySet and a clone of the Arc<DashMap>. Each task is responsible for managing some subset of keys in the map. This avoids the global mutex over all the KeySets
  • Now, when we insert a key into the TTLMap, we send the key on a channel to the task which is responsible for its shard.
  • There's one task responsible for issuing Clear commands (also over the channel) to the time wheel tasks

I pushed a benchmark commit to this branch (and same one to the other branch above) to see if I improved lock contention. Unfortunately, the results look similar regardless of what I do.

There's a lot of variance but generally I see 17531721 - 20000000 ops / sec.

This PR

Tasks: 100
Operations per task: 1000000
Total operations: 100000000
Total time: 5.70s
Throughput: 17531721 ops/sec
Average latency: 0.06 μs per operation
Entries remaining: 10
test common::ttl_map::tests::bench_lock_contention ... ok

Other PR with no mutex

=== TTLMap Lock Contention Benchmark ===
Tasks: 100
Operations per task: 1000000
Total operations: 100000000
Total time: 6.03s
Throughput: 16582541 ops/sec
Average latency: 0.06 μs per operation
Entries remaining: 10
test common::ttl_map::tests::bench_lock_contention ... ok

If I comment all locking and the GC task, the baseline performance is at most 20M ops / sec as well.

=== TTLMap Lock Contention Benchmark ===
Tasks: 100
Operations per task: 1000000
Total operations: 100000000
Total time: 4.93s
Throughput: 20291279 ops/sec
Average latency: 0.05 μs per operation
Entries remaining: 10
test common::ttl_map::tests::bench_lock_contention ... ok

I'm not sure how to proceed. Either the benchmark needs to be better or locking the time wheel is not the bottle neck. The latter is possible because:

  • The critical sections under self.buckets.lock().await are very small (constant # of instructions)
  • The dashmap mutex (which we hold while initializing entries) is probably a bigger source of contention because we hold an Entry's lock while initializing it.
    • This is okay because the first request initializes the key, so the other ones have to wait anyways and can read the cached Value once its done.
    • This might be bad if the shards in the DashMap are large, introducing cross-key contention. Generally, this is an okay tradeoff and we defer this decision to the DashMap

Please let me know your thoughts!

Copy link
Collaborator

@gabotechs gabotechs Aug 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's going to be a bit hard to create an accurate benchmark for this, but I think it's worth getting that right in order to compare approaches. I think there are some improvement points to the benchmark in order to make it more realistic:

  • The worker_threads in the test should be as high as your machine allows you, with just the current worker_threads = 2, the chances of just two physical threads competing concurrently for the same lock are slim.
  • key_count should probably be much higher, as in reality almost all keys are going to be different, I think even making all the keys random could work
  • task_count should be several orders of magnitude higher, something like (number of concurrent queries) * (number of stages per query) * (number of tasks per stage) / (number of available workers), in practice, this can easily go in the order of millions
  • In practice, each task is handled in its own do_get call, and tonic will spawn a tokio task for it, so it should be more realistic that each task in the bench only performs 1 get_or_init
  • I'd remove any source of noise like string allocations (format!), I'd try to just benchmark a pure get_or_init() access

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here's a draft approach #97

The result of running that is:

=== TTLMap Lock Contention Benchmark ===
Tasks: 100000
Total time: 88.52ms
Average latency: 425 μs per operation
Entries remaining: 0
DashMap Lock contention time: 12ms
Mutex Lock contention time: 42503ms

@robtandy
Copy link
Collaborator

@gabotechs comments are on point and +1 to them. I will add that in addition to handling orphaned stages, we also need to handle removing the stage when we know that it is done being used. This way we do not have to wait for a timeout during normal operation. I suggest we do that in a separate PR, but wanted to note it here.

@jayshrivastava
Copy link
Collaborator Author

@robtandy Is there a way for the endpoint to know how many times it may be called? Ex. how many actors will read from it. I'm thinking we can evict a stage once it's called N times, however this won't account for retries.

@robtandy
Copy link
Collaborator

@jayshrivastava , the ExecutionStage added to the map in the ArrowFlightEndpoint, is scoped per task. The task has a partition group. The stage is used up when every partition from that group has been successfully served. That is the happy path anyway, and I think its fine to start with that. I dont yet have a perspective on retry logic and if we want to fail the query or support it and how that would interact with the expiring of stages.

@gabotechs
Copy link
Collaborator

This struct is simliar to https://github.com/moka-rs/moka, which also uses
time wheels. Having our own module avoids introducing a large dependency, which
keeps this project closer to vanilla datafusion.

If this was a JS lib that needs to be bundled in a browser I that would make a lot of sense, but being a Rust library where everything is going to get eventually compiled... I don't think it makes a difference unless there's a significant impact in compilation time or something perceptible by a user.

Introducing dependencies should not make the project closer or farther as long as it's not perceptible from the outside, I think it could be worth exploring https://github.com/moka-rs/moka at least for getting a baseline for our benchmarks

@jayshrivastava
Copy link
Collaborator Author

From the new bench, it looks like the method in #96 is significantly faster.

Next steps

Mutex (extremely high variance btw, sometimes it's 15000us, sometimes its 30000us)

Tasks: 100000
Total time: 105.65ms
Average latency: 20453 μs per operation
Entries remaining: 0
DashMap Lock contention time: 23ms
Mutex Lock contention time: 2045251ms

Non-Mutex (#96)

Tasks: 100000
Total time: 156.35ms
Average latency: 842 ns per operation
Entries remaining: 0
DashMap Lock contention time: 19ms
Mutex Lock contention time: 33ms

@jayshrivastava
Copy link
Collaborator Author

jayshrivastava commented Aug 15, 2025

Got claude to use moka (#99), but it's not as fast as #96 👀

Tasks: 100000
Total time: 162.53ms
Average latency: 45 μs per operation
Throughput: 615257.30 ops/sec

I think this is expected because moka has to do more accounting, like using LRU vs LFU. Using moka means we have less code to maintain, if that's a concern. We probably don't even need a wrapper.

jayshrivastava added a commit that referenced this pull request Aug 17, 2025
This change adds a DashMap-like struct which has a background tasks to clean
up entries that have outlived a configurable TTL.

This data structure is added so it can be used in the `ArrowFlightEndpoint`, where it's
possible for `ExecutionStages` to be orphaned due to errors. This change adds the ability to
clean up tasks async.

The implementation is simliar to https://github.com/moka-rs/moka, which also uses
time wheels. Benchmarks show that this implementation has less overhead per operation
than moka (see results #99).
```
=== TTLMap Moka Benchmark ===
Tasks: 100000
Total time: 162.53ms
Average latency: 45 μs per operation
Throughput: 615257.30 ops/sec

=== TTLMap Lock Contention Benchmark ===
Tasks: 100000
Total time: 137.07ms
Average latency: 985 ns per operation
Entries remaining: 0
DashMap Lock contention time: 21ms
Accounting time: 47ms
```

There's also an implementation in #92, which
has the worst performance by far.
```
Tasks: 100000
Total time: 105.65ms
Average latency: 20453 μs per operation
Entries remaining: 0
DashMap Lock contention time: 23ms
Mutex Lock contention time: 2045251ms
```

Informs: #90
@jayshrivastava
Copy link
Collaborator Author

jayshrivastava commented Aug 17, 2025

I've cleaned up #96 to make it production quality. If it makes sense, we can close this PR in favor of that one.

@jayshrivastava , the ExecutionStage added to the map in the ArrowFlightEndpoint, is scoped per task. The task has a partition group. The stage is used up when every partition from that group has been successfully served. That is the happy path anyway, and I think its fine to start with that. I dont yet have a perspective on retry logic and if we want to fail the query or support it and how that would interact with the expiring of stages.

I'll address this in a separate PR and leave #90 open until that is completed.

jayshrivastava added a commit that referenced this pull request Aug 18, 2025
This change adds a DashMap-like struct which has a background tasks to clean
up entries that have outlived a configurable TTL.

This data structure is added so it can be used in the `ArrowFlightEndpoint`, where it's
possible for `ExecutionStages` to be orphaned due to errors. This change adds the ability to
clean up tasks async.

The implementation is simliar to https://github.com/moka-rs/moka, which also uses
time wheels. Benchmarks show that this implementation has less overhead per operation
than moka (see results #99).
```
=== TTLMap Moka Benchmark ===
Tasks: 100000
Total time: 162.53ms
Average latency: 45 μs per operation
Throughput: 615257.30 ops/sec

=== TTLMap Lock Contention Benchmark ===
Tasks: 100000
Total time: 137.07ms
Average latency: 985 ns per operation
Entries remaining: 0
DashMap Lock contention time: 21ms
Accounting time: 47ms
```

There's also an implementation in #92, which
has the worst performance by far.
```
Tasks: 100000
Total time: 105.65ms
Average latency: 20453 μs per operation
Entries remaining: 0
DashMap Lock contention time: 23ms
Mutex Lock contention time: 2045251ms
```

Informs: #90
jayshrivastava added a commit that referenced this pull request Aug 18, 2025
This change adds a DashMap-like struct which has a background tasks to clean
up entries that have outlived a configurable TTL.

This data structure is added so it can be used in the `ArrowFlightEndpoint`, where it's
possible for `ExecutionStages` to be orphaned due to errors. This change adds the ability to
clean up tasks async.

The implementation is simliar to https://github.com/moka-rs/moka, which also uses
time wheels. Benchmarks show that this implementation has less overhead per operation
than moka (see results #99).
```
=== TTLMap Moka Benchmark ===
Tasks: 100000
Total time: 162.53ms
Average latency: 45 μs per operation
Throughput: 615257.30 ops/sec

=== TTLMap Lock Contention Benchmark ===
Tasks: 100000
Total time: 137.07ms
Average latency: 985 ns per operation
Entries remaining: 0
DashMap Lock contention time: 21ms
Accounting time: 47ms
```

There's also an implementation in #92, which
has the worst performance by far.
```
Tasks: 100000
Total time: 105.65ms
Average latency: 20453 μs per operation
Entries remaining: 0
DashMap Lock contention time: 23ms
Mutex Lock contention time: 2045251ms
```

Informs: #90
@jayshrivastava
Copy link
Collaborator Author

Closing in favor of #96

jayshrivastava added a commit that referenced this pull request Aug 19, 2025
This change adds a DashMap-like struct which has a background tasks to clean
up entries that have outlived a configurable TTL.

This data structure is added so it can be used in the `ArrowFlightEndpoint`, where it's
possible for `ExecutionStages` to be orphaned due to errors. This change adds the ability to
clean up tasks async.

The implementation is simliar to https://github.com/moka-rs/moka, which also uses
time wheels. Benchmarks show that this implementation has less overhead per operation
than moka (see results #99).
```
=== TTLMap Moka Benchmark ===
Tasks: 100000
Total time: 162.53ms
Average latency: 45 μs per operation
Throughput: 615257.30 ops/sec

=== TTLMap Lock Contention Benchmark ===
Tasks: 100000
Total time: 137.07ms
Average latency: 985 ns per operation
Entries remaining: 0
DashMap Lock contention time: 21ms
Accounting time: 47ms
```

There's also an implementation in #92, which
has the worst performance by far.
```
Tasks: 100000
Total time: 105.65ms
Average latency: 20453 μs per operation
Entries remaining: 0
DashMap Lock contention time: 23ms
Mutex Lock contention time: 2045251ms
```

Informs: #90
gabotechs pushed a commit that referenced this pull request Aug 19, 2025
This change adds a DashMap-like struct which has a background tasks to clean
up entries that have outlived a configurable TTL.

This data structure is added so it can be used in the `ArrowFlightEndpoint`, where it's
possible for `ExecutionStages` to be orphaned due to errors. This change adds the ability to
clean up tasks async.

The implementation is simliar to https://github.com/moka-rs/moka, which also uses
time wheels. Benchmarks show that this implementation has less overhead per operation
than moka (see results #99).
```
=== TTLMap Moka Benchmark ===
Tasks: 100000
Total time: 162.53ms
Average latency: 45 μs per operation
Throughput: 615257.30 ops/sec

=== TTLMap Lock Contention Benchmark ===
Tasks: 100000
Total time: 137.07ms
Average latency: 985 ns per operation
Entries remaining: 0
DashMap Lock contention time: 21ms
Accounting time: 47ms
```

There's also an implementation in #92, which
has the worst performance by far.
```
Tasks: 100000
Total time: 105.65ms
Average latency: 20453 μs per operation
Entries remaining: 0
DashMap Lock contention time: 23ms
Mutex Lock contention time: 2045251ms
```

Informs: #90
@jayshrivastava jayshrivastava deleted the js/ttl-map branch October 17, 2025 13:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants