Skip to content

Commit 9cc3bf2

Browse files
committed
feat: add consistency check at node bootup
1 parent a1cbb0a commit 9cc3bf2

File tree

7 files changed

+305
-5
lines changed

7 files changed

+305
-5
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ reth-node-api = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.1" }
8989
reth-node-ethereum = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.1" }
9090
reth-prune-types = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.1" }
9191
reth-rpc-eth-api = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.1" }
92+
reth-stages-types = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.1" }
9293
reth-transaction-pool = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.1" }
9394

9495
# Foundry periphery

crates/block-processor/src/v1/processor.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -144,10 +144,10 @@ where
144144
// height, so we don't need compute the start from the notification.
145145
let mut start = None;
146146
let mut current = 0;
147+
let last_ru_height = self.ru_provider.last_block_number()?;
147148
let mut prev_block_journal = self.ru_provider.provider_rw()?.latest_journal_hash()?;
148149

149150
let mut net_outcome = ExecutionOutcome::default();
150-
let last_ru_height = self.ru_provider.last_block_number()?;
151151

152152
// There might be a case where we can get a notification that starts
153153
// "lower" than our last processed block,
@@ -183,13 +183,14 @@ where
183183
ru_height = block_extracts.ru_height,
184184
host_height = block_extracts.host_block.number(),
185185
has_ru_block = block_extracts.submitted.is_some(),
186+
height_before_notification = last_ru_height,
186187
);
187188

188-
tracing::trace!("Running EVM");
189-
let block_result = self.run_evm(&block_extracts, spec_id).instrument(span).await?;
189+
let block_result =
190+
self.run_evm(&block_extracts, spec_id).instrument(span.clone()).await?;
190191
metrics::record_block_result(&block_result, &start_time);
191192

192-
tracing::trace!("Committing EVM results");
193+
let _ = span.enter();
193194
let journal =
194195
self.commit_evm_results(&block_extracts, &block_result, prev_block_journal)?;
195196

crates/db/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ alloy.workspace = true
2323
reth.workspace = true
2424
reth-db.workspace = true
2525
reth-prune-types.workspace = true
26+
reth-stages-types.workspace = true
2627

2728
itertools.workspace = true
2829
serde.workspace = true

crates/db/src/consistency.rs

Lines changed: 291 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,291 @@
1+
use alloy::primitives::BlockNumber;
2+
use reth::{
3+
api::NodePrimitives,
4+
primitives::EthPrimitives,
5+
providers::{
6+
BlockBodyIndicesProvider, ProviderFactory, ProviderResult, StageCheckpointReader,
7+
StaticFileProviderFactory, StaticFileSegment, StaticFileWriter,
8+
},
9+
};
10+
use reth_db::{cursor::DbCursorRO, table::Table, tables, transaction::DbTx};
11+
use reth_stages_types::StageId;
12+
use signet_node_types::{NodeTypesDbTrait, SignetNodeTypes};
13+
use tracing::{debug, info, info_span, instrument, warn};
14+
15+
/// Extension trait that provides consistency checking for the RU database
16+
/// provider. Consistency checks are MANDATORY on node startup to ensure that
17+
/// the static file segments and database are in sync.
18+
///
19+
/// in general, this should not be implemented outside this crate.
20+
pub trait ProviderConsistencyExt {
21+
/// Check the consistency of the static file segments and return the last
22+
/// known-good block number.
23+
fn ru_check_consistency(&self) -> ProviderResult<Option<BlockNumber>>;
24+
}
25+
26+
impl<Db> ProviderConsistencyExt for ProviderFactory<SignetNodeTypes<Db>>
27+
where
28+
Db: NodeTypesDbTrait,
29+
{
30+
/// Check the consistency of the static file segments and return the last
31+
/// known good block number.
32+
#[instrument(skip(self), fields(read_only = self.static_file_provider().is_read_only()))]
33+
fn ru_check_consistency(&self) -> ProviderResult<Option<BlockNumber>> {
34+
// Based on `StaticFileProvider::check_consistency` in
35+
// `reth/crates/storage/provider/src/providers/static_file/manager.rs`
36+
// with modifications for RU-specific logic.
37+
//
38+
// Comments are largely reproduced from the original source for context.
39+
//
40+
// Last updated @ [email protected]
41+
let prune_modes = self.provider_rw()?.prune_modes_ref().clone();
42+
let sfp = self.static_file_provider();
43+
44+
debug!("Checking static file consistency.");
45+
46+
let last_good_height: Option<BlockNumber> = None;
47+
48+
let update_last_good_height = |new_height: BlockNumber| {
49+
last_good_height.map(|current| current.max(new_height)).or(Some(new_height))
50+
};
51+
52+
for segment in StaticFileSegment::iter() {
53+
let initial_highest_block = sfp.get_highest_static_file_block(segment);
54+
55+
if prune_modes.has_receipts_pruning() && segment.is_receipts() {
56+
// Pruned nodes (including full node) do not store receipts as static files.
57+
continue;
58+
}
59+
60+
let span = info_span!(
61+
"checking_segment",
62+
?segment,
63+
initial_highest_block,
64+
highest_block = tracing::field::Empty,
65+
highest_tx = tracing::field::Empty
66+
);
67+
let _guard = span.enter();
68+
69+
// File consistency is broken if:
70+
//
71+
// * appending data was interrupted before a config commit, then
72+
// data file will be truncated according to the config.
73+
//
74+
// * pruning data was interrupted before a config commit, then we
75+
// have deleted data that we are expected to still have. We need
76+
// to check the Database and unwind everything accordingly.
77+
if sfp.is_read_only() {
78+
sfp.check_segment_consistency(segment)?;
79+
} else {
80+
// Fetching the writer will attempt to heal any file level
81+
// inconsistency.
82+
sfp.latest_writer(segment)?;
83+
}
84+
85+
// Only applies to block-based static files. (Headers)
86+
//
87+
// The updated `highest_block` may have decreased if we healed from a pruning
88+
// interruption.
89+
let mut highest_block = sfp.get_highest_static_file_block(segment);
90+
span.record("highest_block", &highest_block);
91+
92+
if initial_highest_block != highest_block {
93+
update_last_good_height(highest_block.unwrap_or_default());
94+
}
95+
96+
// Only applies to transaction-based static files. (Receipts & Transactions)
97+
//
98+
// Make sure the last transaction matches the last block from its indices, since a heal
99+
// from a pruning interruption might have decreased the number of transactions without
100+
// being able to update the last block of the static file segment.
101+
let highest_tx = sfp.get_highest_static_file_tx(segment);
102+
if let Some(highest_tx) = highest_tx {
103+
span.record("highest_tx", &highest_tx);
104+
let mut last_block = highest_block.unwrap_or_default();
105+
loop {
106+
if let Some(indices) = self.block_body_indices(last_block)? {
107+
if indices.last_tx_num() <= highest_tx {
108+
break;
109+
}
110+
} else {
111+
// If the block body indices can not be found, then it means that static
112+
// files is ahead of database, and the `ensure_invariants` check will fix
113+
// it by comparing with stage checkpoints.
114+
break;
115+
}
116+
if last_block == 0 {
117+
break;
118+
}
119+
last_block -= 1;
120+
121+
highest_block = Some(last_block);
122+
update_last_good_height(last_block);
123+
}
124+
}
125+
126+
if let Some(unwind) = match segment {
127+
StaticFileSegment::Headers => {
128+
ensure_invariants::<
129+
_,
130+
tables::Headers<<EthPrimitives as NodePrimitives>::BlockHeader>,
131+
>(self, segment, highest_block, highest_block)?
132+
}
133+
StaticFileSegment::Transactions => {
134+
ensure_invariants::<
135+
_,
136+
tables::Transactions<<EthPrimitives as NodePrimitives>::SignedTx>,
137+
>(self, segment, highest_tx, highest_block)?
138+
}
139+
StaticFileSegment::Receipts => {
140+
ensure_invariants::<
141+
_,
142+
tables::Receipts<<EthPrimitives as NodePrimitives>::Receipt>,
143+
>(self, segment, highest_tx, highest_block)?
144+
}
145+
} {
146+
update_last_good_height(unwind);
147+
}
148+
}
149+
150+
Ok(last_good_height)
151+
}
152+
}
153+
154+
/// Check invariants for each corresponding table and static file segment:
155+
///
156+
/// 1. The corresponding database table should overlap or have continuity in
157+
/// their keys ([`TxNumber`] or [`BlockNumber`]).
158+
/// 2. Its highest block should match the stage checkpoint block number if it's
159+
/// equal or higher than the corresponding database table last entry.
160+
/// * If the checkpoint block is higher, then request a pipeline unwind to
161+
/// the static file block. This is expressed by returning [`Some`] with the
162+
/// requested pipeline unwind target.
163+
/// * If the checkpoint block is lower, then heal by removing rows from the
164+
/// static file. In this case, the rows will be removed and [`None`] will
165+
/// be returned.
166+
/// 3. If the database tables overlap with static files and have contiguous
167+
/// keys, or the checkpoint block matches the highest static files block,
168+
/// then [`None`] will be returned.
169+
///
170+
/// [`TxNumber`]: alloy::primitives::TxNumber
171+
#[instrument(skip(this), fields(table = T::NAME))]
172+
fn ensure_invariants<Db, T: Table<Key = u64>>(
173+
this: &ProviderFactory<SignetNodeTypes<Db>>,
174+
segment: StaticFileSegment,
175+
highest_static_file_entry: Option<u64>,
176+
highest_static_file_block: Option<BlockNumber>,
177+
) -> ProviderResult<Option<BlockNumber>>
178+
where
179+
Db: NodeTypesDbTrait,
180+
{
181+
let provider = this.provider_rw()?;
182+
let sfp = this.static_file_provider();
183+
184+
let mut db_cursor = provider.tx_ref().cursor_read::<T>()?;
185+
186+
if let Some((db_first_entry, _)) = db_cursor.first()? {
187+
if let (Some(highest_entry), Some(highest_block)) =
188+
(highest_static_file_entry, highest_static_file_block)
189+
{
190+
// If there is a gap between the entry found in static file and
191+
// database, then we have most likely lost static file data and
192+
// need to unwind so we can load it again
193+
if !(db_first_entry <= highest_entry || highest_entry + 1 == db_first_entry) {
194+
info!(
195+
?db_first_entry,
196+
?highest_entry,
197+
unwind_target = highest_block,
198+
?segment,
199+
"Setting unwind target."
200+
);
201+
return Ok(Some(highest_block));
202+
}
203+
}
204+
205+
if let Some((db_last_entry, _)) = db_cursor.last()?
206+
&& highest_static_file_entry.is_none_or(|highest_entry| db_last_entry > highest_entry)
207+
{
208+
return Ok(None);
209+
}
210+
}
211+
212+
let highest_static_file_entry = highest_static_file_entry.unwrap_or_default();
213+
let highest_static_file_block = highest_static_file_block.unwrap_or_default();
214+
215+
// If static file entry is ahead of the database entries, then ensure the
216+
// checkpoint block number matches.
217+
let checkpoint_block_number = provider
218+
.get_stage_checkpoint(match segment {
219+
StaticFileSegment::Headers => StageId::Headers,
220+
StaticFileSegment::Transactions => StageId::Bodies,
221+
StaticFileSegment::Receipts => StageId::Execution,
222+
})?
223+
.unwrap_or_default()
224+
.block_number;
225+
226+
// If the checkpoint is ahead, then we lost static file data. May be data corruption.
227+
if checkpoint_block_number > highest_static_file_block {
228+
info!(
229+
checkpoint_block_number,
230+
unwind_target = highest_static_file_block,
231+
"Setting unwind target."
232+
);
233+
return Ok(Some(highest_static_file_block));
234+
}
235+
236+
// If the checkpoint is behind, then we failed to do a database commit
237+
// **but committed** to static files on executing a stage, or the reverse
238+
// on unwinding a stage.
239+
//
240+
// All we need to do is to prune the extra static file rows.
241+
if checkpoint_block_number < highest_static_file_block {
242+
info!(
243+
from = highest_static_file_block,
244+
to = checkpoint_block_number,
245+
"Unwinding static file segment."
246+
);
247+
248+
let mut writer = sfp.latest_writer(segment)?;
249+
if segment.is_headers() {
250+
// TODO(joshie): is_block_meta
251+
writer.prune_headers(highest_static_file_block - checkpoint_block_number)?;
252+
} else if let Some(block) = provider.block_body_indices(checkpoint_block_number)? {
253+
// todo joshie: is querying block_body_indices a potential issue
254+
// once bbi is moved to sf as well
255+
let number = highest_static_file_entry - block.last_tx_num();
256+
if segment.is_receipts() {
257+
writer.prune_receipts(number, checkpoint_block_number)?;
258+
} else {
259+
writer.prune_transactions(number, checkpoint_block_number)?;
260+
}
261+
}
262+
writer.commit()?;
263+
}
264+
265+
Ok(None)
266+
}
267+
268+
// Some code in this file is adapted from reth. It is used under the terms of
269+
// the MIT License.
270+
//
271+
// The MIT License (MIT)
272+
//
273+
// Copyright (c) 2022-2025 Reth Contributors
274+
//
275+
// Permission is hereby granted, free of charge, to any person obtaining a copy
276+
// of this software and associated documentation files (the "Software"), to deal
277+
// in the Software without restriction, including without limitation the rights
278+
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
279+
// copies of the Software, and to permit persons to whom the Software is
280+
// furnished to do so, subject to the following conditions:
281+
//
282+
// The above copyright notice and this permission notice shall be included in
283+
// all copies or substantial portions of the Software.
284+
//
285+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
286+
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
287+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
288+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
289+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
290+
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
291+
// THE SOFTWARE.

crates/db/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ pub use aliases::{RuRevmState, SignetDbRw};
1717
mod chain;
1818
pub use chain::{DbExtractionResults, RuChain};
1919

20+
mod consistency;
21+
pub use consistency::ProviderConsistencyExt;
22+
2023
mod convert;
2124
pub use convert::DataCompat;
2225

crates/db/src/provider.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ where
5555
self.tx_ref().get::<JournalHashes>(ru_height).map_err(Into::into)
5656
}
5757

58+
#[track_caller]
5859
fn latest_journal_hash(&self) -> ProviderResult<B256> {
5960
let latest_height = self.last_block_number()?;
6061
Ok(self

crates/node/src/node.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use reth_exex::{ExExContext, ExExEvent, ExExHead, ExExNotificationsStream};
2020
use reth_node_api::{FullNodeComponents, FullNodeTypes, NodeTypes};
2121
use signet_blobber::BlobFetcher;
2222
use signet_block_processor::{AliasOracleFactory, SignetBlockProcessorV1};
23-
use signet_db::{DbProviderExt, RuChain, RuWriter};
23+
use signet_db::{DbProviderExt, ProviderConsistencyExt, RuChain, RuWriter};
2424
use signet_node_config::SignetNodeConfig;
2525
use signet_node_types::{NodeStatus, NodeTypesDbTrait, SignetNodeTypes};
2626
use signet_rpc::RpcServerGuard;
@@ -179,6 +179,8 @@ where
179179
/// errors.
180180
#[instrument(skip(self), fields(host = ?self.host.config.chain.chain()))]
181181
pub async fn start(mut self) -> eyre::Result<()> {
182+
self.ru_provider.ru_check_consistency()?;
183+
182184
// This exists only to bypass the `tracing::instrument(err)` macro to
183185
// ensure that full sources get reported.
184186
self.start_inner().await.inspect_err(|err| {

0 commit comments

Comments
 (0)