Skip to content

Commit 1756951

Browse files
prestwichclaude
andauthored
fix(storage): keep !Send MDBX write tx out of async state machine (#41)
* fix(storage): keep !Send MDBX write tx out of async state machine Extract synchronous hot-storage unwind logic from `drain_above` into `unwind_hot_above` so the `!Send` MDBX write transaction never appears in the async generator's state machine. This makes the future returned by `drain_above` `Send`, unblocking use from `Send`-bounded executors like `reth::install_exex`. Adds compile-time `Send` canaries for `drain_above` and `cold_lag` parameterized over `DatabaseEnv` to prevent regressions. Closes ENG-2080 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * chore(storage): add Send compile canaries to builder and replay_to_cold Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 19695d5 commit 1756951

File tree

3 files changed

+59
-19
lines changed

3 files changed

+59
-19
lines changed

Cargo.toml

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ members = ["crates/*"]
33
resolver = "2"
44

55
[workspace.package]
6-
version = "0.6.6"
6+
version = "0.6.7"
77
edition = "2024"
88
rust-version = "1.92"
99
authors = ["init4"]
@@ -35,13 +35,13 @@ incremental = false
3535

3636
[workspace.dependencies]
3737
# internal
38-
signet-hot = { version = "0.6.6", path = "./crates/hot" }
39-
signet-hot-mdbx = { version = "0.6.6", path = "./crates/hot-mdbx" }
40-
signet-cold = { version = "0.6.6", path = "./crates/cold" }
41-
signet-cold-mdbx = { version = "0.6.6", path = "./crates/cold-mdbx" }
42-
signet-cold-sql = { version = "0.6.6", path = "./crates/cold-sql" }
43-
signet-storage = { version = "0.6.6", path = "./crates/storage" }
44-
signet-storage-types = { version = "0.6.6", path = "./crates/types" }
38+
signet-hot = { version = "0.6.7", path = "./crates/hot" }
39+
signet-hot-mdbx = { version = "0.6.7", path = "./crates/hot-mdbx" }
40+
signet-cold = { version = "0.6.7", path = "./crates/cold" }
41+
signet-cold-mdbx = { version = "0.6.7", path = "./crates/cold-mdbx" }
42+
signet-cold-sql = { version = "0.6.7", path = "./crates/cold-sql" }
43+
signet-storage = { version = "0.6.7", path = "./crates/storage" }
44+
signet-storage-types = { version = "0.6.7", path = "./crates/types" }
4545

4646
# External, in-house
4747
signet-libmdbx = { version = "0.8.0" }

crates/storage/src/builder.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,13 @@ mod tests {
165165
use super::*;
166166
use serial_test::serial;
167167

168+
/// Compile-time canary: `StorageBuilder::build` must return a `Send`
169+
/// future so it can be used from `Send`-bounded executors.
170+
fn _assert_send<T: Send>(_: T) {}
171+
fn _build_is_send(b: StorageBuilder<MdbxConnector, MdbxConnector>) {
172+
_assert_send(b.build());
173+
}
174+
168175
#[test]
169176
#[serial]
170177
fn from_env_missing_hot_path() {

crates/storage/src/unified.rs

Lines changed: 44 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -255,17 +255,13 @@ impl<H: HotKv> UnifiedStorage<H> {
255255
/// [`Hot`]: crate::StorageError::Hot
256256
/// [`Cold`]: crate::StorageError::Cold
257257
pub async fn drain_above(&self, block: BlockNumber) -> StorageResult<Vec<DrainedBlock>> {
258-
// 1–2. Read headers then unwind hot storage in a single write tx
259-
// to avoid TOCTOU races between reading and unwinding.
260-
let writer = self.hot.writer()?;
261-
let last = match writer.get_execution_range().map_err(|e| e.into_hot_kv_error())? {
262-
Some((_, last)) if last > block => last,
263-
_ => return Ok(Vec::new()),
264-
};
265-
let headers =
266-
writer.get_headers_range(block + 1, last).map_err(|e| e.into_hot_kv_error())?;
267-
writer.unwind_above(block).map_err(|e| e.map_db(|e| e.into_hot_kv_error()))?;
268-
writer.raw_commit().map_err(|e| e.into_hot_kv_error())?;
258+
// 1–2. Read headers and unwind hot storage synchronously.
259+
// Extracted to a sync helper so the `!Send` write transaction
260+
// does not appear in the async state machine.
261+
let headers = self.unwind_hot_above(block)?;
262+
if headers.is_empty() {
263+
return Ok(Vec::new());
264+
}
269265

270266
// 3. Atomically drain cold (best-effort — failure = normal cold lag)
271267
let cold_receipts = self.cold.drain_above(block).await.unwrap_or_default();
@@ -283,6 +279,22 @@ impl<H: HotKv> UnifiedStorage<H> {
283279
Ok(drained)
284280
}
285281

282+
/// Read headers above `block` and unwind hot storage in a single write
283+
/// transaction to avoid TOCTOU races. Returns an empty vec if there is
284+
/// nothing to unwind.
285+
fn unwind_hot_above(&self, block: BlockNumber) -> StorageResult<Vec<SealedHeader>> {
286+
let writer = self.hot.writer()?;
287+
let last = match writer.get_execution_range().map_err(|e| e.into_hot_kv_error())? {
288+
Some((_, last)) if last > block => last,
289+
_ => return Ok(Vec::new()),
290+
};
291+
let headers =
292+
writer.get_headers_range(block + 1, last).map_err(|e| e.into_hot_kv_error())?;
293+
writer.unwind_above(block).map_err(|e| e.map_db(|e| e.into_hot_kv_error()))?;
294+
writer.raw_commit().map_err(|e| e.into_hot_kv_error())?;
295+
Ok(headers)
296+
}
297+
286298
/// Unwind storage above the given block number (reorg handling).
287299
///
288300
/// This method:
@@ -350,3 +362,24 @@ impl<H: HotKv> UnifiedStorage<H> {
350362
self.cold.append_blocks(cold_data).await
351363
}
352364
}
365+
366+
#[cfg(test)]
367+
mod tests {
368+
use super::*;
369+
use signet_hot_mdbx::DatabaseEnv;
370+
371+
/// Compile-time canaries: all async methods on `UnifiedStorage<DatabaseEnv>`
372+
/// must return `Send` futures, even though MDBX write transactions are
373+
/// `!Send`. If a `!Send` type leaks into the async state machine, these
374+
/// will fail to compile.
375+
fn _assert_send<T: Send>(_: T) {}
376+
fn _drain_above_is_send(s: &UnifiedStorage<DatabaseEnv>) {
377+
_assert_send(s.drain_above(0));
378+
}
379+
fn _cold_lag_is_send(s: &UnifiedStorage<DatabaseEnv>) {
380+
_assert_send(s.cold_lag());
381+
}
382+
fn _replay_to_cold_is_send(s: &UnifiedStorage<DatabaseEnv>) {
383+
_assert_send(s.replay_to_cold(Vec::new()));
384+
}
385+
}

0 commit comments

Comments
 (0)