Skip to content

Commit 8f2811d

Browse files
authored
fix: ensure MerkleChangeSets pruner only runs if pipeline stage has finished (#20073)
1 parent 9260f2f commit 8f2811d

File tree

7 files changed

+55
-5
lines changed

7 files changed

+55
-5
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/prune/prune/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ reth-tokio-util.workspace = true
2121
reth-config.workspace = true
2222
reth-prune-types.workspace = true
2323
reth-primitives-traits.workspace = true
24+
reth-stages-types.workspace = true
2425
reth-static-file-types.workspace = true
2526

2627
# ethereum

crates/prune/prune/src/builder.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use reth_primitives_traits::NodePrimitives;
77
use reth_provider::{
88
providers::StaticFileProvider, BlockReader, ChainStateBlockReader, DBProvider,
99
DatabaseProviderFactory, NodePrimitivesProvider, PruneCheckpointReader, PruneCheckpointWriter,
10-
StaticFileProviderFactory, StorageSettingsCache,
10+
StageCheckpointReader, StaticFileProviderFactory, StorageSettingsCache,
1111
};
1212
use reth_prune_types::PruneModes;
1313
use std::time::Duration;
@@ -81,6 +81,7 @@ impl PrunerBuilder {
8181
+ BlockReader<Transaction: Encodable2718>
8282
+ ChainStateBlockReader
8383
+ StorageSettingsCache
84+
+ StageCheckpointReader
8485
+ StaticFileProviderFactory<
8586
Primitives: NodePrimitives<SignedTx: Value, Receipt: Value, BlockHeader: Value>,
8687
>,
@@ -114,7 +115,8 @@ impl PrunerBuilder {
114115
+ ChainStateBlockReader
115116
+ PruneCheckpointWriter
116117
+ PruneCheckpointReader
117-
+ StorageSettingsCache,
118+
+ StorageSettingsCache
119+
+ StageCheckpointReader,
118120
{
119121
let segments = SegmentSet::<Provider>::from_components(static_file_provider, self.segments);
120122

crates/prune/prune/src/pruner.rs

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,10 @@ use alloy_primitives::BlockNumber;
88
use reth_exex_types::FinishedExExHeight;
99
use reth_provider::{
1010
DBProvider, DatabaseProviderFactory, PruneCheckpointReader, PruneCheckpointWriter,
11+
StageCheckpointReader,
1112
};
1213
use reth_prune_types::{PruneProgress, PrunedSegmentInfo, PrunerOutput};
14+
use reth_stages_types::StageId;
1315
use reth_tokio_util::{EventSender, EventStream};
1416
use std::time::{Duration, Instant};
1517
use tokio::sync::watch;
@@ -100,7 +102,7 @@ where
100102

101103
impl<Provider, S> Pruner<Provider, S>
102104
where
103-
Provider: PruneCheckpointReader + PruneCheckpointWriter,
105+
Provider: PruneCheckpointReader + PruneCheckpointWriter + StageCheckpointReader,
104106
{
105107
/// Listen for events on the pruner.
106108
pub fn events(&self) -> EventStream<PrunerEvent> {
@@ -200,6 +202,19 @@ where
200202
.transpose()?
201203
.flatten()
202204
{
205+
// Check if segment has a required stage that must be finished first
206+
if let Some(required_stage) = segment.required_stage() &&
207+
!is_stage_finished(provider, required_stage)?
208+
{
209+
debug!(
210+
target: "pruner",
211+
segment = ?segment.segment(),
212+
?required_stage,
213+
"Segment's required stage not finished, skipping"
214+
);
215+
continue
216+
}
217+
203218
debug!(
204219
target: "pruner",
205220
segment = ?segment.segment(),
@@ -318,7 +333,9 @@ where
318333

319334
impl<PF> Pruner<PF::ProviderRW, PF>
320335
where
321-
PF: DatabaseProviderFactory<ProviderRW: PruneCheckpointWriter + PruneCheckpointReader>,
336+
PF: DatabaseProviderFactory<
337+
ProviderRW: PruneCheckpointWriter + PruneCheckpointReader + StageCheckpointReader,
338+
>,
322339
{
323340
/// Run the pruner. This will only prune data up to the highest finished ExEx height, if there
324341
/// are no ExExes.
@@ -333,6 +350,19 @@ where
333350
}
334351
}
335352

353+
/// Checks if the given stage has caught up with the `Finish` stage.
354+
///
355+
/// Returns `true` if the stage checkpoint is >= the Finish stage checkpoint.
356+
fn is_stage_finished<Provider: StageCheckpointReader>(
357+
provider: &Provider,
358+
stage_id: StageId,
359+
) -> Result<bool, PrunerError> {
360+
let stage_checkpoint = provider.get_stage_checkpoint(stage_id)?.map(|c| c.block_number);
361+
let finish_checkpoint = provider.get_stage_checkpoint(StageId::Finish)?.map(|c| c.block_number);
362+
363+
Ok(stage_checkpoint >= finish_checkpoint)
364+
}
365+
336366
#[cfg(test)]
337367
mod tests {
338368
use crate::Pruner;

crates/prune/prune/src/segments/mod.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use reth_prune_types::{
1111
PruneCheckpoint, PruneMode, PruneProgress, PrunePurpose, PruneSegment, SegmentOutput,
1212
SegmentOutputCheckpoint,
1313
};
14+
use reth_stages_types::StageId;
1415
use reth_static_file_types::StaticFileSegment;
1516
pub use set::SegmentSet;
1617
use std::{fmt::Debug, ops::RangeInclusive};
@@ -84,6 +85,14 @@ pub trait Segment<Provider>: Debug + Send + Sync {
8485
{
8586
provider.save_prune_checkpoint(self.segment(), checkpoint)
8687
}
88+
89+
/// Returns the stage this segment depends on, if any.
90+
///
91+
/// If this returns `Some(stage_id)`, the pruner will skip this segment if the stage
92+
/// has not yet caught up with the `Finish` stage checkpoint.
93+
fn required_stage(&self) -> Option<StageId> {
94+
None
95+
}
8796
}
8897

8998
/// Segment pruning input, see [`Segment::prune`].

crates/prune/prune/src/segments/user/merkle_change_sets.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use reth_provider::{
1313
use reth_prune_types::{
1414
PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint,
1515
};
16+
use reth_stages_types::StageId;
1617
use tracing::{instrument, trace};
1718

1819
#[derive(Debug)]
@@ -47,6 +48,10 @@ where
4748
PrunePurpose::User
4849
}
4950

51+
fn required_stage(&self) -> Option<StageId> {
52+
Some(StageId::MerkleChangeSets)
53+
}
54+
5055
#[instrument(level = "trace", target = "pruner", skip(self, provider), ret)]
5156
fn prune(&self, provider: &Provider, input: PruneInput) -> Result<SegmentOutput, PrunerError> {
5257
let Some(block_range) = input.get_next_block_range() else {

crates/stages/stages/src/stages/prune.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use reth_db_api::{table::Value, transaction::DbTxMut};
22
use reth_primitives_traits::NodePrimitives;
33
use reth_provider::{
44
BlockReader, ChainStateBlockReader, DBProvider, PruneCheckpointReader, PruneCheckpointWriter,
5-
StaticFileProviderFactory, StorageSettingsCache,
5+
StageCheckpointReader, StaticFileProviderFactory, StorageSettingsCache,
66
};
77
use reth_prune::{
88
PruneMode, PruneModes, PruneSegment, PrunerBuilder, SegmentOutput, SegmentOutputCheckpoint,
@@ -43,6 +43,7 @@ where
4343
+ PruneCheckpointWriter
4444
+ BlockReader
4545
+ ChainStateBlockReader
46+
+ StageCheckpointReader
4647
+ StaticFileProviderFactory<
4748
Primitives: NodePrimitives<SignedTx: Value, Receipt: Value, BlockHeader: Value>,
4849
> + StorageSettingsCache,
@@ -144,6 +145,7 @@ where
144145
+ PruneCheckpointWriter
145146
+ BlockReader
146147
+ ChainStateBlockReader
148+
+ StageCheckpointReader
147149
+ StaticFileProviderFactory<
148150
Primitives: NodePrimitives<SignedTx: Value, Receipt: Value, BlockHeader: Value>,
149151
> + StorageSettingsCache,

0 commit comments

Comments
 (0)