Skip to content

Commit ae6a415

Browse files
prestwichdylanlott
andauthored
feat: add consistency check at node bootup (#62)
* feat: add consistency check at node bootup * fix: clippy * fix: alloy breaking changes * Apply suggestion from @dylanlott Co-authored-by: dylan <[email protected]> --------- Co-authored-by: dylan <[email protected]>
1 parent a1cbb0a commit ae6a415

File tree

8 files changed

+324
-20
lines changed

8 files changed

+324
-20
lines changed

Cargo.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,14 +64,14 @@ trevm = { version = "0.31.2", features = ["full_env_cfg"] }
6464
revm-inspectors = "0.32.0" # should be 1 more than trevm version, usually
6565

6666
# Alloy periphery crates
67-
alloy = { version = "1.0.35", features = [
67+
alloy = { version = "1.4.0", features = [
6868
"full",
6969
"rpc-types-beacon",
7070
"rpc-types-mev",
7171
"genesis",
7272
"arbitrary",
7373
] }
74-
alloy-contract = { version = "1.0.35", features = ["pubsub"] }
74+
alloy-contract = { version = "1.4.0", features = ["pubsub"] }
7575

7676
# Reth
7777
reth = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.1" }
@@ -89,6 +89,7 @@ reth-node-api = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.1" }
8989
reth-node-ethereum = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.1" }
9090
reth-prune-types = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.1" }
9191
reth-rpc-eth-api = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.1" }
92+
reth-stages-types = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.1" }
9293
reth-transaction-pool = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.1" }
9394

9495
# Foundry periphery

crates/block-processor/src/v1/processor.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -144,10 +144,10 @@ where
144144
// height, so we don't need compute the start from the notification.
145145
let mut start = None;
146146
let mut current = 0;
147+
let last_ru_height = self.ru_provider.last_block_number()?;
147148
let mut prev_block_journal = self.ru_provider.provider_rw()?.latest_journal_hash()?;
148149

149150
let mut net_outcome = ExecutionOutcome::default();
150-
let last_ru_height = self.ru_provider.last_block_number()?;
151151

152152
// There might be a case where we can get a notification that starts
153153
// "lower" than our last processed block,
@@ -183,13 +183,14 @@ where
183183
ru_height = block_extracts.ru_height,
184184
host_height = block_extracts.host_block.number(),
185185
has_ru_block = block_extracts.submitted.is_some(),
186+
height_before_notification = last_ru_height,
186187
);
187188

188-
tracing::trace!("Running EVM");
189-
let block_result = self.run_evm(&block_extracts, spec_id).instrument(span).await?;
189+
let block_result =
190+
self.run_evm(&block_extracts, spec_id).instrument(span.clone()).await?;
190191
metrics::record_block_result(&block_result, &start_time);
191192

192-
tracing::trace!("Committing EVM results");
193+
let _ = span.enter();
193194
let journal =
194195
self.commit_evm_results(&block_extracts, &block_result, prev_block_journal)?;
195196

crates/db/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ alloy.workspace = true
2323
reth.workspace = true
2424
reth-db.workspace = true
2525
reth-prune-types.workspace = true
26+
reth-stages-types.workspace = true
2627

2728
itertools.workspace = true
2829
serde.workspace = true

crates/db/src/consistency.rs

Lines changed: 286 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,286 @@
1+
use alloy::primitives::BlockNumber;
2+
use reth::{
3+
api::NodePrimitives,
4+
primitives::EthPrimitives,
5+
providers::{
6+
BlockBodyIndicesProvider, ProviderFactory, ProviderResult, StageCheckpointReader,
7+
StaticFileProviderFactory, StaticFileSegment, StaticFileWriter,
8+
},
9+
};
10+
use reth_db::{cursor::DbCursorRO, table::Table, tables, transaction::DbTx};
11+
use reth_stages_types::StageId;
12+
use signet_node_types::{NodeTypesDbTrait, SignetNodeTypes};
13+
use tracing::{debug, info, info_span, instrument, warn};
14+
15+
/// Extension trait that provides consistency checking for the RU database
16+
/// provider. Consistency checks are MANDATORY on node startup to ensure that
17+
/// the static file segments and database are in sync.
18+
///
19+
/// In general, this should not be implemented outside this crate.
20+
pub trait ProviderConsistencyExt {
21+
/// Check the consistency of the static file segments and return the last
22+
/// known-good block number.
23+
fn ru_check_consistency(&self) -> ProviderResult<Option<BlockNumber>>;
24+
}
25+
26+
impl<Db> ProviderConsistencyExt for ProviderFactory<SignetNodeTypes<Db>>
27+
where
28+
Db: NodeTypesDbTrait,
29+
{
30+
/// Check the consistency of the static file segments and return the last
31+
/// known good block number.
32+
#[instrument(skip(self), fields(read_only = self.static_file_provider().is_read_only()))]
33+
fn ru_check_consistency(&self) -> ProviderResult<Option<BlockNumber>> {
34+
// Based on `StaticFileProvider::check_consistency` in
35+
// `reth/crates/storage/provider/src/providers/static_file/manager.rs`
36+
// with modifications for RU-specific logic.
37+
//
38+
// Comments are largely reproduced from the original source for context.
39+
//
40+
// Last updated @ [email protected]
41+
let prune_modes = self.provider_rw()?.prune_modes_ref().clone();
42+
let sfp = self.static_file_provider();
43+
44+
debug!("Checking static file consistency.");
45+
46+
let mut last_good_height: Option<BlockNumber> = None;
47+
48+
let mut update_last_good_height = |new_height: BlockNumber| {
49+
last_good_height =
50+
last_good_height.map(|current| current.min(new_height)).or(Some(new_height));
51+
};
52+
53+
for segment in StaticFileSegment::iter() {
54+
let initial_highest_block = sfp.get_highest_static_file_block(segment);
55+
56+
if prune_modes.has_receipts_pruning() && segment.is_receipts() {
57+
// Pruned nodes (including full node) do not store receipts as static files.
58+
continue;
59+
}
60+
61+
let span = info_span!(
62+
"checking_segment",
63+
?segment,
64+
initial_highest_block,
65+
highest_block = tracing::field::Empty,
66+
highest_tx = tracing::field::Empty
67+
);
68+
let _guard = span.enter();
69+
70+
// File consistency is broken if:
71+
//
72+
// * appending data was interrupted before a config commit, then
73+
// data file will be truncated according to the config.
74+
//
75+
// * pruning data was interrupted before a config commit, then we
76+
// have deleted data that we are expected to still have. We need
77+
// to check the Database and unwind everything accordingly.
78+
if sfp.is_read_only() {
79+
sfp.check_segment_consistency(segment)?;
80+
} else {
81+
// Fetching the writer will attempt to heal any file level
82+
// inconsistency.
83+
sfp.latest_writer(segment)?;
84+
}
85+
86+
// Only applies to block-based static files. (Headers)
87+
//
88+
// The updated `highest_block` may have decreased if we healed from a pruning
89+
// interruption.
90+
let mut highest_block = sfp.get_highest_static_file_block(segment);
91+
span.record("highest_block", highest_block);
92+
93+
if initial_highest_block != highest_block {
94+
update_last_good_height(highest_block.unwrap_or_default());
95+
}
96+
97+
// Only applies to transaction-based static files. (Receipts & Transactions)
98+
//
99+
// Make sure the last transaction matches the last block from its indices, since a heal
100+
// from a pruning interruption might have decreased the number of transactions without
101+
// being able to update the last block of the static file segment.
102+
let highest_tx = sfp.get_highest_static_file_tx(segment);
103+
if let Some(highest_tx) = highest_tx {
104+
span.record("highest_tx", highest_tx);
105+
let mut last_block = highest_block.unwrap_or_default();
106+
loop {
107+
if let Some(indices) = self.block_body_indices(last_block)? {
108+
if indices.last_tx_num() <= highest_tx {
109+
break;
110+
}
111+
} else {
112+
// If the block body indices can not be found, then it means that static
113+
// files is ahead of database, and the `ensure_invariants` check will fix
114+
// it by comparing with stage checkpoints.
115+
break;
116+
}
117+
if last_block == 0 {
118+
break;
119+
}
120+
last_block -= 1;
121+
122+
highest_block = Some(last_block);
123+
update_last_good_height(last_block);
124+
}
125+
}
126+
127+
if let Some(unwind) = match segment {
128+
StaticFileSegment::Headers => {
129+
ensure_invariants::<
130+
_,
131+
tables::Headers<<EthPrimitives as NodePrimitives>::BlockHeader>,
132+
>(self, segment, highest_block, highest_block)?
133+
}
134+
StaticFileSegment::Transactions => {
135+
ensure_invariants::<
136+
_,
137+
tables::Transactions<<EthPrimitives as NodePrimitives>::SignedTx>,
138+
>(self, segment, highest_tx, highest_block)?
139+
}
140+
StaticFileSegment::Receipts => {
141+
ensure_invariants::<
142+
_,
143+
tables::Receipts<<EthPrimitives as NodePrimitives>::Receipt>,
144+
>(self, segment, highest_tx, highest_block)?
145+
}
146+
} {
147+
update_last_good_height(unwind);
148+
}
149+
}
150+
151+
Ok(last_good_height)
152+
}
153+
}
154+
155+
/// Check invariants for each corresponding table and static file segment:
156+
///
157+
/// 1. The corresponding database table should overlap or have continuity in
158+
/// their keys ([`TxNumber`] or [`BlockNumber`]).
159+
/// 2. Its highest block should match the stage checkpoint block number if it's
160+
/// equal or higher than the corresponding database table last entry.
161+
/// * If the checkpoint block is higher, then request a pipeline unwind to
162+
/// the static file block. This is expressed by returning [`Some`] with the
163+
/// requested pipeline unwind target.
164+
/// * If the checkpoint block is lower, then heal by removing rows from the
165+
/// static file. In this case, the rows will be removed and [`None`] will
166+
/// be returned.
167+
/// 3. If the database tables overlap with static files and have contiguous
168+
/// keys, or the checkpoint block matches the highest static files block,
169+
/// then [`None`] will be returned.
170+
///
171+
/// [`TxNumber`]: alloy::primitives::TxNumber
172+
#[instrument(skip(this, segment), fields(table = T::NAME))]
173+
fn ensure_invariants<Db, T: Table<Key = u64>>(
174+
this: &ProviderFactory<SignetNodeTypes<Db>>,
175+
segment: StaticFileSegment,
176+
highest_static_file_entry: Option<u64>,
177+
highest_static_file_block: Option<BlockNumber>,
178+
) -> ProviderResult<Option<BlockNumber>>
179+
where
180+
Db: NodeTypesDbTrait,
181+
{
182+
let provider = this.provider_rw()?;
183+
let sfp = this.static_file_provider();
184+
185+
let mut db_cursor = provider.tx_ref().cursor_read::<T>()?;
186+
187+
if let Some((db_first_entry, _)) = db_cursor.first()? {
188+
if let (Some(highest_entry), Some(highest_block)) =
189+
(highest_static_file_entry, highest_static_file_block)
190+
{
191+
// If there is a gap between the entry found in static file and
192+
// database, then we have most likely lost static file data and
193+
// need to unwind so we can load it again
194+
if !(db_first_entry <= highest_entry || highest_entry + 1 == db_first_entry) {
195+
info!(unwind_target = highest_block, "Setting unwind target.");
196+
return Ok(Some(highest_block));
197+
}
198+
}
199+
200+
if let Some((db_last_entry, _)) = db_cursor.last()?
201+
&& highest_static_file_entry.is_none_or(|highest_entry| db_last_entry > highest_entry)
202+
{
203+
return Ok(None);
204+
}
205+
}
206+
207+
let highest_static_file_entry = highest_static_file_entry.unwrap_or_default();
208+
let highest_static_file_block = highest_static_file_block.unwrap_or_default();
209+
210+
// If static file entry is ahead of the database entries, then ensure the
211+
// checkpoint block number matches.
212+
let checkpoint_block_number = provider
213+
.get_stage_checkpoint(match segment {
214+
StaticFileSegment::Headers => StageId::Headers,
215+
StaticFileSegment::Transactions => StageId::Bodies,
216+
StaticFileSegment::Receipts => StageId::Execution,
217+
})?
218+
.unwrap_or_default()
219+
.block_number;
220+
221+
// If the checkpoint is ahead, then we lost static file data. May be data corruption.
222+
if checkpoint_block_number > highest_static_file_block {
223+
info!(
224+
checkpoint_block_number,
225+
unwind_target = highest_static_file_block,
226+
"Setting unwind target."
227+
);
228+
return Ok(Some(highest_static_file_block));
229+
}
230+
231+
// If the checkpoint is behind, then we failed to do a database commit
232+
// **but committed** to static files on executing a stage, or the reverse
233+
// on unwinding a stage.
234+
//
235+
// All we need to do is to prune the extra static file rows.
236+
if checkpoint_block_number < highest_static_file_block {
237+
info!(
238+
from = highest_static_file_block,
239+
to = checkpoint_block_number,
240+
"Unwinding static file segment."
241+
);
242+
243+
let mut writer = sfp.latest_writer(segment)?;
244+
if segment.is_headers() {
245+
// TODO(joshie): is_block_meta
246+
writer.prune_headers(highest_static_file_block - checkpoint_block_number)?;
247+
} else if let Some(block) = provider.block_body_indices(checkpoint_block_number)? {
248+
// todo joshie: is querying block_body_indices a potential issue
249+
// once bbi is moved to sf as well
250+
let number = highest_static_file_entry - block.last_tx_num();
251+
if segment.is_receipts() {
252+
writer.prune_receipts(number, checkpoint_block_number)?;
253+
} else {
254+
writer.prune_transactions(number, checkpoint_block_number)?;
255+
}
256+
}
257+
writer.commit()?;
258+
}
259+
260+
Ok(None)
261+
}
262+
263+
// Some code in this file is adapted from reth. It is used under the terms of
264+
// the MIT License.
265+
//
266+
// The MIT License (MIT)
267+
//
268+
// Copyright (c) 2022-2025 Reth Contributors
269+
//
270+
// Permission is hereby granted, free of charge, to any person obtaining a copy
271+
// of this software and associated documentation files (the "Software"), to deal
272+
// in the Software without restriction, including without limitation the rights
273+
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
274+
// copies of the Software, and to permit persons to whom the Software is
275+
// furnished to do so, subject to the following conditions:
276+
//
277+
// The above copyright notice and this permission notice shall be included in
278+
// all copies or substantial portions of the Software.
279+
//
280+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
281+
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
282+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
283+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
284+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
285+
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
286+
// THE SOFTWARE.

crates/db/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ pub use aliases::{RuRevmState, SignetDbRw};
1717
mod chain;
1818
pub use chain::{DbExtractionResults, RuChain};
1919

20+
mod consistency;
21+
pub use consistency::ProviderConsistencyExt;
22+
2023
mod convert;
2124
pub use convert::DataCompat;
2225

crates/db/src/provider.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ where
5555
self.tx_ref().get::<JournalHashes>(ru_height).map_err(Into::into)
5656
}
5757

58+
#[track_caller]
5859
fn latest_journal_hash(&self) -> ProviderResult<B256> {
5960
let latest_height = self.last_block_number()?;
6061
Ok(self

crates/node-tests/src/context.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ impl SignetTestContext {
142142
// after RPC booted, we can create the alloy provider
143143
let alloy_provider = ProviderBuilder::new_with_network()
144144
.disable_recommended_fillers()
145-
.filler(BlobGasFiller)
145+
.filler(BlobGasFiller::default())
146146
.with_gas_estimation()
147147
.with_nonce_management(SimpleNonceManager::default())
148148
.with_chain_id(constants.ru_chain_id())

0 commit comments

Comments
 (0)