Skip to content

Commit a5a1cd4

Browse files
schgoogeeknoid
andauthored
feat: Add uniflight crate for duplicate request coalescing (#118)
Adds `uniflight`, a crate for coalescing duplicate async tasks into a single execution. When multiple tasks request the same work (identified by a key), only the leader performs the work while followers wait and receive a clone of the result. * Handles cancellation and panic gracefully (followers become the new leader) * Includes comprehensive module documentation * Includes example demonstrating cache thundering herd prevention --------- Co-authored-by: Martin Taillefer <[email protected]>
1 parent a93aa25 commit a5a1cd4

File tree

15 files changed

+1481
-2
lines changed

15 files changed

+1481
-2
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,4 +35,5 @@ _manifest
3535
ARROW
3636

3737
# Agent files
38-
.claude
38+
.claude
39+
CLAUDE.md

.spelling

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
304
12
23
0.X.Y
34
100k
@@ -82,6 +83,8 @@ C-SERDE
8283
C-SMART-PTR
8384
deallocate
8485
Debuggability
86+
Deduplicate
87+
deduplicating
8588
deduplication
8689
deque
8790
Deque
@@ -279,6 +282,8 @@ unconfigured
279282
uncontended
280283
unhandleable
281284
unicode
285+
uniflight
286+
uniflight's
282287
Uninit
283288
unordered
284289
unredacted

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,4 @@ Please see each crate's change log below:
2020
- [`thread_aware_macros`](./crates/thread_aware_macros/CHANGELOG.md)
2121
- [`thread_aware_macros_impl`](./crates/thread_aware_macros_impl/CHANGELOG.md)
2222
- [`tick`](./crates/tick/CHANGELOG.md)
23+
- [`uniflight`](./crates/uniflight/CHANGELOG.md)

Cargo.lock

Lines changed: 61 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,18 @@ thread_aware = { path = "crates/thread_aware", default-features = false, version
4343
thread_aware_macros = { path = "crates/thread_aware_macros", default-features = false, version = "0.6.1" }
4444
thread_aware_macros_impl = { path = "crates/thread_aware_macros_impl", default-features = false, version = "0.6.1" }
4545
tick = { path = "crates/tick", default-features = false, version = "0.1.2" }
46+
uniflight = { path = "crates/uniflight", default-features = false, version = "0.1.0" }
4647

4748
# external dependencies
49+
ahash = { version = "0.8", default-features = false }
4850
alloc_tracker = { version = "0.5.9", default-features = false }
51+
anyhow = { version = "1.0.100", default-features = false }
52+
async-once-cell = { version = "0.5", default-features = false }
4953
bytes = { version = "1.11.0", default-features = false }
5054
chrono = { version = "0.4.40", default-features = false }
5155
chrono-tz = { version = "0.10.4", default-features = false }
5256
criterion = { version = "0.8.1", default-features = false }
57+
dashmap = { version = "6.1", default-features = false }
5358
derive_more = { version = "2.0.1", default-features = false }
5459
duct = { version = "1.1.1", default-features = false }
5560
dynosaur = { version = "0.3.0", default-features = false }
@@ -73,6 +78,7 @@ once_cell = { version = "1.21.3", default-features = false }
7378
opentelemetry = { version = "0.31.0", default-features = false }
7479
opentelemetry-stdout = { version = "0.31.0", default-features = false }
7580
opentelemetry_sdk = { version = "0.31.0", default-features = false }
81+
parking_lot = { version = "0.12.5", default-features = false }
7682
pin-project-lite = { version = "0.2.13", default-features = false }
7783
pretty_assertions = { version = "1.4.1", default-features = false }
7884
prettyplease = { version = "0.2.37", default-features = false }
@@ -101,6 +107,7 @@ trait-variant = { version = "0.1.2", default-features = false }
101107
trybuild = { version = "1.0.114", default-features = false }
102108
typeid = { version = "1.0.3", default-features = false }
103109
windows-sys = { version = "0.61.2", default-features = false }
110+
xutex = { version = "0.2.0", default-features = false }
104111
xxhash-rust = { version = "0.8.15", default-features = false }
105112

106113
[workspace.lints.rust]

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ These are the primary crates built out of this repo:
3636
- [`seatbelt`](./crates/seatbelt/README.md) - Resilience and recovery mechanisms for fallible operations.
3737
- [`thread_aware`](./crates/thread_aware/README.md) - Facilities to support thread-isolated state.
3838
- [`tick`](./crates/tick/README.md) - Provides primitives to interact with and manipulate machine time.
39+
- [`uniflight`](./crates/uniflight/README.md) - Coalesces duplicate async tasks into a single execution.
3940

4041
## About this Repo
4142

crates/uniflight/CHANGELOG.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
# Changelog
2+
3+
## [0.1.0] - 2025-12-10
4+
5+
- 🧩 Miscellaneous
6+
7+
- Initial commit of uniflight
8+

crates/uniflight/Cargo.toml

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
# Copyright (c) Microsoft Corporation.
2+
# Licensed under the MIT License.
3+
4+
[package]
5+
name = "uniflight"
6+
description = "Coalesces duplicate async tasks into a single execution."
7+
version = "0.1.0"
8+
readme = "README.md"
9+
keywords = ["oxidizer", "coalescing", "stempede", "singleflight", "deduplication"]
10+
categories = ["concurrency"]
11+
12+
edition.workspace = true
13+
rust-version.workspace = true
14+
authors.workspace = true
15+
license.workspace = true
16+
homepage.workspace = true
17+
repository.workspace = true
18+
19+
[package.metadata.cargo_check_external_types]
20+
allowed_external_types = [
21+
"thread_aware::cell::builtin::PerProcess",
22+
"thread_aware::cell::storage::Strategy",
23+
"thread_aware::core::ThreadAware",
24+
]
25+
26+
[dependencies]
27+
ahash = { workspace = true, default-features = false, features = ["std"] }
28+
async-once-cell.workspace = true
29+
dashmap.workspace = true
30+
futures-util = { workspace = true, default-features = false, features = ["std", "alloc"] }
31+
thread_aware.workspace = true
32+
33+
[dev-dependencies]
34+
criterion = { workspace = true, features = ["async_tokio"] }
35+
futures-util = { workspace = true, features = ["alloc", "std"] }
36+
mutants.workspace = true
37+
tick = { workspace = true, features = ["tokio"] }
38+
tokio = { workspace = true, features = [
39+
"macros",
40+
"rt",
41+
"time",
42+
"rt-multi-thread",
43+
] }
44+
45+
[lints]
46+
workspace = true
47+
48+
[[bench]]
49+
name = "performance"
50+
harness = false
51+
52+
[[example]]
53+
name = "cache_population"

crates/uniflight/README.md

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
<div align="center">
2+
<img src="./logo.png" alt="Uniflight Logo" width="96">
3+
4+
# Uniflight
5+
6+
[![crate.io](https://img.shields.io/crates/v/uniflight.svg)](https://crates.io/crates/uniflight)
7+
[![docs.rs](https://docs.rs/uniflight/badge.svg)](https://docs.rs/uniflight)
8+
[![MSRV](https://img.shields.io/crates/msrv/uniflight)](https://crates.io/crates/uniflight)
9+
[![CI](https://github.com/microsoft/oxidizer/actions/workflows/main.yml/badge.svg?event=push)](https://github.com/microsoft/oxidizer/actions/workflows/main.yml)
10+
[![Coverage](https://codecov.io/gh/microsoft/oxidizer/graph/badge.svg?token=FCUG0EL5TI)](https://codecov.io/gh/microsoft/oxidizer)
11+
[![License](https://img.shields.io/badge/license-MIT-blue.svg)](../../LICENSE)
12+
<a href="../.."><img src="../../logo.svg" alt="This crate was developed as part of the Oxidizer project" width="20"></a>
13+
14+
</div>
15+
16+
Coalesces duplicate async tasks into a single execution.
17+
18+
This crate provides [`Merger`][__link0], a mechanism for deduplicating concurrent async operations.
19+
When multiple tasks request the same work (identified by a key), only the first task (the
20+
“leader”) performs the actual work while subsequent tasks (the “followers”) wait and receive
21+
a clone of the result.
22+
23+
## When to Use
24+
25+
Use `Merger` when you have expensive or rate-limited operations that may be requested
26+
concurrently with the same parameters:
27+
28+
* **Cache population**: Prevent thundering herd when a cache entry expires
29+
* **API calls**: Deduplicate concurrent requests to the same endpoint
30+
* **Database queries**: Coalesce identical queries issued simultaneously
31+
* **File I/O**: Avoid reading the same file multiple times concurrently
32+
33+
## Example
34+
35+
```rust
36+
use uniflight::Merger;
37+
38+
let group: Merger<String, String> = Merger::new();
39+
40+
// Multiple concurrent calls with the same key will share a single execution.
41+
// Note: you can pass &str directly when the key type is String.
42+
let result = group.execute("user:123", || async {
43+
// This expensive operation runs only once, even if called concurrently
44+
"expensive_result".to_string()
45+
}).await.expect("leader should not panic");
46+
```
47+
48+
## Flexible Key Types
49+
50+
The [`Merger::execute`][__link1] method accepts keys using [`Borrow`][__link2] semantics, allowing you to pass
51+
borrowed forms of the key type. For example, with `Merger<String, T>`, you can pass `&str`
52+
directly without allocating:
53+
54+
```rust
55+
let merger: Merger<String, i32> = Merger::new();
56+
57+
// Pass &str directly - no need to call .to_string()
58+
let result = merger.execute("my-key", || async { 42 }).await;
59+
assert_eq!(result, Ok(42));
60+
```
61+
62+
## Thread-Aware Scoping
63+
64+
`Merger` supports thread-aware scoping via a [`Strategy`][__link3]
65+
type parameter. This controls how the internal state is partitioned across threads/NUMA nodes:
66+
67+
* [`PerProcess`][__link4] (default): Single global state, maximum deduplication
68+
* [`PerNuma`][__link5]: Separate state per NUMA node, NUMA-local memory access
69+
* [`PerCore`][__link6]: Separate state per core, no deduplication (useful for already-partitioned work)
70+
71+
```rust
72+
use uniflight::Merger;
73+
use thread_aware::PerNuma;
74+
75+
// NUMA-aware merger - each NUMA node gets its own deduplication scope
76+
let merger: Merger<String, String, PerNuma> = Merger::new_per_numa();
77+
```
78+
79+
## Cancellation and Panic Handling
80+
81+
`Merger` handles task cancellation and panics explicitly:
82+
83+
* If the leader task is cancelled or dropped, a follower becomes the new leader
84+
* If the leader task panics, followers receive [`LeaderPanicked`][__link7] error with the panic message
85+
* Followers that join before the leader completes receive the value the leader returns
86+
87+
When a panic occurs, followers are notified via the error type rather than silently
88+
retrying. The panic message is captured and available via [`LeaderPanicked::message`][__link8]:
89+
90+
```rust
91+
let merger: Merger<String, String> = Merger::new();
92+
match merger.execute("key", || async { "result".to_string() }).await {
93+
Ok(value) => println!("got {value}"),
94+
Err(err) => {
95+
println!("leader panicked: {}", err.message());
96+
// Decide whether to retry
97+
}
98+
}
99+
```
100+
101+
## Memory Management
102+
103+
Completed entries are automatically removed from the internal map when the last caller
104+
finishes. This ensures no stale entries accumulate over time.
105+
106+
## Type Requirements
107+
108+
The value type `T` must implement [`Clone`][__link9] because followers receive a clone of the
109+
leader’s result. The key type `K` must implement [`Hash`][__link10] and [`Eq`][__link11].
110+
111+
## Thread Safety
112+
113+
[`Merger`][__link12] is `Send` and `Sync`, and can be shared across threads. The returned futures
114+
are `Send` when the closure, future, key, and value types are `Send`.
115+
116+
## Performance
117+
118+
Run benchmarks with `cargo bench -p uniflight`. The suite covers:
119+
120+
* `single_call`: Baseline latency with no contention
121+
* `high_contention_100`: 100 concurrent tasks on the same key
122+
* `distributed_10x10`: 10 keys with 10 tasks each
123+
124+
Use `--save-baseline` and `--baseline` flags to track regressions over time.
125+
126+
127+
<hr/>
128+
<sub>
129+
This crate was developed as part of <a href="../..">The Oxidizer Project</a>. Browse this crate's <a href="https://github.com/microsoft/oxidizer/tree/main/crates/uniflight">source code</a>.
130+
</sub>
131+
132+
[__cargo_doc2readme_dependencies_info]: ggGkYW0CYXSEGy4k8ldDFPOhG2VNeXtD5nnKG6EPY6OfW5wBG8g18NOFNdxpYXKEGxgwNFq9VUtfG5xaBNm6U4VGG97W2YkyKkPjG4KVgSbTgdOrYWSCgmx0aHJlYWRfYXdhcmVlMC42LjGCaXVuaWZsaWdodGUwLjEuMA
133+
[__link0]: https://docs.rs/uniflight/0.1.0/uniflight/struct.Merger.html
134+
[__link1]: https://docs.rs/uniflight/0.1.0/uniflight/?search=Merger::execute
135+
[__link10]: https://doc.rust-lang.org/stable/std/?search=hash::Hash
136+
[__link11]: https://doc.rust-lang.org/stable/std/cmp/trait.Eq.html
137+
[__link12]: https://docs.rs/uniflight/0.1.0/uniflight/struct.Merger.html
138+
[__link2]: https://doc.rust-lang.org/stable/std/?search=borrow::Borrow
139+
[__link3]: https://docs.rs/thread_aware/0.6.1/thread_aware/?search=storage::Strategy
140+
[__link4]: https://docs.rs/thread_aware/0.6.1/thread_aware/?search=PerProcess
141+
[__link5]: https://docs.rs/thread_aware/0.6.1/thread_aware/?search=PerNuma
142+
[__link6]: https://docs.rs/thread_aware/0.6.1/thread_aware/?search=PerCore
143+
[__link7]: https://docs.rs/uniflight/0.1.0/uniflight/struct.LeaderPanicked.html
144+
[__link8]: https://docs.rs/uniflight/0.1.0/uniflight/?search=LeaderPanicked::message
145+
[__link9]: https://doc.rust-lang.org/stable/std/clone/trait.Clone.html

0 commit comments

Comments
 (0)