Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,14 @@ trevm = { version = "0.31.2", features = ["full_env_cfg"] }
revm-inspectors = "0.32.0" # should be 1 more than trevm version, usually

# Alloy periphery crates
alloy = { version = "1.0.35", features = [
alloy = { version = "1.4.0", features = [
"full",
"rpc-types-beacon",
"rpc-types-mev",
"genesis",
"arbitrary",
] }
alloy-contract = { version = "1.0.35", features = ["pubsub"] }
alloy-contract = { version = "1.4.0", features = ["pubsub"] }

# Reth
reth = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.1" }
Expand All @@ -89,6 +89,7 @@ reth-node-api = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.1" }
reth-node-ethereum = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.1" }
reth-prune-types = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.1" }
reth-rpc-eth-api = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.1" }
reth-stages-types = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.1" }
reth-transaction-pool = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.1" }

# Foundry periphery
Expand Down
9 changes: 5 additions & 4 deletions crates/block-processor/src/v1/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,10 @@ where
// height, so we don't need compute the start from the notification.
let mut start = None;
let mut current = 0;
let last_ru_height = self.ru_provider.last_block_number()?;
let mut prev_block_journal = self.ru_provider.provider_rw()?.latest_journal_hash()?;

let mut net_outcome = ExecutionOutcome::default();
let last_ru_height = self.ru_provider.last_block_number()?;

// There might be a case where we can get a notification that starts
// "lower" than our last processed block,
Expand Down Expand Up @@ -183,13 +183,14 @@ where
ru_height = block_extracts.ru_height,
host_height = block_extracts.host_block.number(),
has_ru_block = block_extracts.submitted.is_some(),
height_before_notification = last_ru_height,
);

tracing::trace!("Running EVM");
let block_result = self.run_evm(&block_extracts, spec_id).instrument(span).await?;
let block_result =
self.run_evm(&block_extracts, spec_id).instrument(span.clone()).await?;
metrics::record_block_result(&block_result, &start_time);

tracing::trace!("Committing EVM results");
let _ = span.enter();
let journal =
self.commit_evm_results(&block_extracts, &block_result, prev_block_journal)?;

Expand Down
1 change: 1 addition & 0 deletions crates/db/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ alloy.workspace = true
reth.workspace = true
reth-db.workspace = true
reth-prune-types.workspace = true
reth-stages-types.workspace = true

itertools.workspace = true
serde.workspace = true
Expand Down
286 changes: 286 additions & 0 deletions crates/db/src/consistency.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,286 @@
use alloy::primitives::BlockNumber;
use reth::{
api::NodePrimitives,
primitives::EthPrimitives,
providers::{
BlockBodyIndicesProvider, ProviderFactory, ProviderResult, StageCheckpointReader,
StaticFileProviderFactory, StaticFileSegment, StaticFileWriter,
},
};
use reth_db::{cursor::DbCursorRO, table::Table, tables, transaction::DbTx};
use reth_stages_types::StageId;
use signet_node_types::{NodeTypesDbTrait, SignetNodeTypes};
use tracing::{debug, info, info_span, instrument, warn};

/// Extension trait that provides consistency checking for the RU database
/// provider. Consistency checks are MANDATORY on node startup to ensure that
/// the static file segments and database are in sync.
///
/// In general, this should not be implemented outside this crate.
pub trait ProviderConsistencyExt {
/// Check the consistency of the static file segments and return the last
/// known-good block number.
fn ru_check_consistency(&self) -> ProviderResult<Option<BlockNumber>>;
}

impl<Db> ProviderConsistencyExt for ProviderFactory<SignetNodeTypes<Db>>
where
Db: NodeTypesDbTrait,
{
/// Check the consistency of the static file segments and return the last
/// known good block number.
#[instrument(skip(self), fields(read_only = self.static_file_provider().is_read_only()))]
fn ru_check_consistency(&self) -> ProviderResult<Option<BlockNumber>> {
// Based on `StaticFileProvider::check_consistency` in
// `reth/crates/storage/provider/src/providers/static_file/manager.rs`
// with modifications for RU-specific logic.
//
// Comments are largely reproduced from the original source for context.
//
// Last updated @ [email protected]
let prune_modes = self.provider_rw()?.prune_modes_ref().clone();
let sfp = self.static_file_provider();

debug!("Checking static file consistency.");

let mut last_good_height: Option<BlockNumber> = None;

let mut update_last_good_height = |new_height: BlockNumber| {
last_good_height =
last_good_height.map(|current| current.min(new_height)).or(Some(new_height));
};

for segment in StaticFileSegment::iter() {
let initial_highest_block = sfp.get_highest_static_file_block(segment);

if prune_modes.has_receipts_pruning() && segment.is_receipts() {
// Pruned nodes (including full node) do not store receipts as static files.
continue;
}

let span = info_span!(
"checking_segment",
?segment,
initial_highest_block,
highest_block = tracing::field::Empty,
highest_tx = tracing::field::Empty
);
let _guard = span.enter();

// File consistency is broken if:
//
// * appending data was interrupted before a config commit, then
// data file will be truncated according to the config.
//
// * pruning data was interrupted before a config commit, then we
// have deleted data that we are expected to still have. We need
// to check the Database and unwind everything accordingly.
if sfp.is_read_only() {
sfp.check_segment_consistency(segment)?;
} else {
// Fetching the writer will attempt to heal any file level
// inconsistency.
sfp.latest_writer(segment)?;
}

// Only applies to block-based static files. (Headers)
//
// The updated `highest_block` may have decreased if we healed from a pruning
// interruption.
let mut highest_block = sfp.get_highest_static_file_block(segment);
span.record("highest_block", highest_block);

if initial_highest_block != highest_block {
update_last_good_height(highest_block.unwrap_or_default());
}

// Only applies to transaction-based static files. (Receipts & Transactions)
//
// Make sure the last transaction matches the last block from its indices, since a heal
// from a pruning interruption might have decreased the number of transactions without
// being able to update the last block of the static file segment.
let highest_tx = sfp.get_highest_static_file_tx(segment);
if let Some(highest_tx) = highest_tx {
span.record("highest_tx", highest_tx);
let mut last_block = highest_block.unwrap_or_default();
loop {
if let Some(indices) = self.block_body_indices(last_block)? {
if indices.last_tx_num() <= highest_tx {
break;
}
} else {
// If the block body indices can not be found, then it means that static
// files is ahead of database, and the `ensure_invariants` check will fix
// it by comparing with stage checkpoints.
break;
}
if last_block == 0 {
break;
}
last_block -= 1;

highest_block = Some(last_block);
update_last_good_height(last_block);
}
}

if let Some(unwind) = match segment {
StaticFileSegment::Headers => {
ensure_invariants::<
_,
tables::Headers<<EthPrimitives as NodePrimitives>::BlockHeader>,
>(self, segment, highest_block, highest_block)?
}
StaticFileSegment::Transactions => {
ensure_invariants::<
_,
tables::Transactions<<EthPrimitives as NodePrimitives>::SignedTx>,
>(self, segment, highest_tx, highest_block)?
}
StaticFileSegment::Receipts => {
ensure_invariants::<
_,
tables::Receipts<<EthPrimitives as NodePrimitives>::Receipt>,
>(self, segment, highest_tx, highest_block)?
}
} {
update_last_good_height(unwind);
}
}

Ok(last_good_height)
}
}

/// Check invariants for each corresponding table and static file segment:
///
/// 1. The corresponding database table should overlap or have continuity in
/// their keys ([`TxNumber`] or [`BlockNumber`]).
/// 2. Its highest block should match the stage checkpoint block number if it's
/// equal or higher than the corresponding database table last entry.
/// * If the checkpoint block is higher, then request a pipeline unwind to
/// the static file block. This is expressed by returning [`Some`] with the
/// requested pipeline unwind target.
/// * If the checkpoint block is lower, then heal by removing rows from the
/// static file. In this case, the rows will be removed and [`None`] will
/// be returned.
/// 3. If the database tables overlap with static files and have contiguous
/// keys, or the checkpoint block matches the highest static files block,
/// then [`None`] will be returned.
///
/// [`TxNumber`]: alloy::primitives::TxNumber
#[instrument(skip(this, segment), fields(table = T::NAME))]
fn ensure_invariants<Db, T: Table<Key = u64>>(
this: &ProviderFactory<SignetNodeTypes<Db>>,
segment: StaticFileSegment,
highest_static_file_entry: Option<u64>,
highest_static_file_block: Option<BlockNumber>,
) -> ProviderResult<Option<BlockNumber>>
where
Db: NodeTypesDbTrait,
{
let provider = this.provider_rw()?;
let sfp = this.static_file_provider();

let mut db_cursor = provider.tx_ref().cursor_read::<T>()?;

if let Some((db_first_entry, _)) = db_cursor.first()? {
if let (Some(highest_entry), Some(highest_block)) =
(highest_static_file_entry, highest_static_file_block)
{
// If there is a gap between the entry found in static file and
// database, then we have most likely lost static file data and
// need to unwind so we can load it again
if !(db_first_entry <= highest_entry || highest_entry + 1 == db_first_entry) {
info!(unwind_target = highest_block, "Setting unwind target.");
return Ok(Some(highest_block));
}
}

if let Some((db_last_entry, _)) = db_cursor.last()?
&& highest_static_file_entry.is_none_or(|highest_entry| db_last_entry > highest_entry)
{
return Ok(None);
}
}

let highest_static_file_entry = highest_static_file_entry.unwrap_or_default();
let highest_static_file_block = highest_static_file_block.unwrap_or_default();

// If static file entry is ahead of the database entries, then ensure the
// checkpoint block number matches.
let checkpoint_block_number = provider
.get_stage_checkpoint(match segment {
StaticFileSegment::Headers => StageId::Headers,
StaticFileSegment::Transactions => StageId::Bodies,
StaticFileSegment::Receipts => StageId::Execution,
})?
.unwrap_or_default()
.block_number;

// If the checkpoint is ahead, then we lost static file data. May be data corruption.
if checkpoint_block_number > highest_static_file_block {
info!(
checkpoint_block_number,
unwind_target = highest_static_file_block,
"Setting unwind target."
);
return Ok(Some(highest_static_file_block));
}

// If the checkpoint is behind, then we failed to do a database commit
// **but committed** to static files on executing a stage, or the reverse
// on unwinding a stage.
//
// All we need to do is to prune the extra static file rows.
if checkpoint_block_number < highest_static_file_block {
info!(
from = highest_static_file_block,
to = checkpoint_block_number,
"Unwinding static file segment."
);

let mut writer = sfp.latest_writer(segment)?;
if segment.is_headers() {
// TODO(joshie): is_block_meta
writer.prune_headers(highest_static_file_block - checkpoint_block_number)?;
} else if let Some(block) = provider.block_body_indices(checkpoint_block_number)? {
// todo joshie: is querying block_body_indices a potential issue
// once bbi is moved to sf as well
let number = highest_static_file_entry - block.last_tx_num();
if segment.is_receipts() {
writer.prune_receipts(number, checkpoint_block_number)?;
} else {
writer.prune_transactions(number, checkpoint_block_number)?;
}
}
writer.commit()?;
}

Ok(None)
}

// Some code in this file is adapted from reth. It is used under the terms of
// the MIT License.
//
// The MIT License (MIT)
//
// Copyright (c) 2022-2025 Reth Contributors
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
3 changes: 3 additions & 0 deletions crates/db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ pub use aliases::{RuRevmState, SignetDbRw};
mod chain;
pub use chain::{DbExtractionResults, RuChain};

mod consistency;
pub use consistency::ProviderConsistencyExt;

mod convert;
pub use convert::DataCompat;

Expand Down
1 change: 1 addition & 0 deletions crates/db/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ where
self.tx_ref().get::<JournalHashes>(ru_height).map_err(Into::into)
}

#[track_caller]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL about the track_caller attribute 👍

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, it's nice. helps with diagnosing panics

fn latest_journal_hash(&self) -> ProviderResult<B256> {
let latest_height = self.last_block_number()?;
Ok(self
Expand Down
2 changes: 1 addition & 1 deletion crates/node-tests/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ impl SignetTestContext {
// after RPC booted, we can create the alloy provider
let alloy_provider = ProviderBuilder::new_with_network()
.disable_recommended_fillers()
.filler(BlobGasFiller)
.filler(BlobGasFiller::default())
.with_gas_estimation()
.with_nonce_management(SimpleNonceManager::default())
.with_chain_id(constants.ru_chain_id())
Expand Down
Loading