Skip to content

Commit 6d3cf65

Browse files
kariyclaude
andcommitted
feat: add state pruning mechanism to sync pipeline
This commit introduces a new pruning mechanism for managing historical state in the sync pipeline. The system allows users to control state retention with flexible pruning modes. ## Changes ### Stage Trait Enhancement - Added `Stage::prune` method to enable stages to prune historical data - Introduced `PruneInput` and `PruneOutput` types for pruning operations - Added `PruningMode` enum supporting three modes: - `Full`: Keep all historical state (no pruning) - `HistoricalBlocks(n)`: Keep last N blocks of historical state - `LatestOnly`: Keep only the latest state ### Pipeline Integration - Added `PruningConfig` to configure pruning behavior - Integrated pruning into the pipeline loop with configurable intervals - Pruning runs automatically after processing a specified number of blocks - Added `should_prune()` and `prune()` methods to Pipeline ### Stage Implementations - **StateTrie**: Added no-op prune implementation (TODO for provider API) - **Blocks**: Added no-op prune implementation (TODO for provider API) - **Classes**: Added no-op prune with rationale (classes are immutable) ## Design Notes The pruning mechanism is designed with a single, unified API that handles all three pruning scenarios. The `PruneInput::prune_before()` method calculates the cutoff block based on the pruning mode, making it easy for stages to determine what to prune. For now, stage implementations are placeholders (no-ops) as actual pruning requires provider API extensions. The infrastructure is in place for future implementation. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
1 parent 92196ed commit 6d3cf65

File tree

5 files changed

+272
-8
lines changed

5 files changed

+272
-8
lines changed

crates/sync/pipeline/src/lib.rs

Lines changed: 104 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ use futures::future::BoxFuture;
8080
use katana_primitives::block::BlockNumber;
8181
use katana_provider_api::stage::StageCheckpointProvider;
8282
use katana_provider_api::ProviderError;
83-
use katana_stage::{Stage, StageExecutionInput, StageExecutionOutput};
83+
use katana_stage::{PruneInput, PruneOutput, PruningMode, Stage, StageExecutionInput, StageExecutionOutput};
8484
use tokio::sync::watch;
8585
use tokio::task::yield_now;
8686
use tracing::{debug, error, info, info_span, Instrument};
@@ -99,6 +99,9 @@ pub enum Error {
9999
#[error("stage {id} execution failed: {error}")]
100100
StageExecution { id: &'static str, error: katana_stage::Error },
101101

102+
#[error("stage {id} pruning failed: {error}")]
103+
StagePruning { id: &'static str, error: katana_stage::Error },
104+
102105
#[error(transparent)]
103106
Provider(#[from] ProviderError),
104107
}
@@ -199,6 +202,35 @@ impl PipelineHandle {
199202
}
200203
}
201204

205+
/// Configuration for pruning behavior in the pipeline.
206+
#[derive(Debug, Clone)]
207+
pub struct PruningConfig {
208+
/// The pruning mode to use.
209+
pub mode: PruningMode,
210+
/// How many blocks to process between pruning runs.
211+
/// Pruning will be triggered after every `interval` blocks are synced.
212+
/// If `None`, pruning is disabled.
213+
pub interval: Option<u64>,
214+
}
215+
216+
impl Default for PruningConfig {
217+
fn default() -> Self {
218+
Self { mode: PruningMode::Full, interval: None }
219+
}
220+
}
221+
222+
impl PruningConfig {
223+
/// Creates a new pruning configuration with the specified mode and interval.
224+
pub fn new(mode: PruningMode, interval: Option<u64>) -> Self {
225+
Self { mode, interval }
226+
}
227+
228+
/// Returns whether pruning is enabled.
229+
pub fn is_enabled(&self) -> bool {
230+
!matches!(self.mode, PruningMode::Full) && self.interval.is_some()
231+
}
232+
}
233+
202234
/// Syncing pipeline.
203235
///
204236
/// The pipeline drives the execution of stages, running each stage to completion in the order they
@@ -221,6 +253,8 @@ pub struct Pipeline<P> {
221253
command_tx: watch::Sender<Option<PipelineCommand>>,
222254
block_tx: watch::Sender<Option<BlockNumber>>,
223255
tip: Option<BlockNumber>,
256+
pruning_config: PruningConfig,
257+
last_pruned_block: Option<BlockNumber>,
224258
}
225259

226260
impl<P> Pipeline<P> {
@@ -246,10 +280,24 @@ impl<P> Pipeline<P> {
246280
provider,
247281
chunk_size,
248282
tip: None,
283+
pruning_config: PruningConfig::default(),
284+
last_pruned_block: None,
249285
};
250286
(pipeline, handle)
251287
}
252288

289+
/// Sets the pruning configuration for the pipeline.
290+
///
291+
/// This controls how and when historical state is pruned during synchronization.
292+
pub fn set_pruning_config(&mut self, config: PruningConfig) {
293+
self.pruning_config = config;
294+
}
295+
296+
/// Returns the current pruning configuration.
297+
pub fn pruning_config(&self) -> &PruningConfig {
298+
&self.pruning_config
299+
}
300+
253301
/// Adds a new stage to the end of the pipeline.
254302
///
255303
/// Stages are executed in the order they are added.
@@ -418,6 +466,13 @@ impl<P: StageCheckpointProvider> Pipeline<P> {
418466
// Notify subscribers about the newly processed block
419467
let _ = self.block_tx.send(Some(last_block_processed));
420468

469+
// Check if we should run pruning
470+
if self.should_prune(last_block_processed) {
471+
info!(target: "pipeline", block = %last_block_processed, "Starting pruning.");
472+
self.prune(last_block_processed).await?;
473+
self.last_pruned_block = Some(last_block_processed);
474+
}
475+
421476
if last_block_processed >= tip {
422477
info!(target: "pipeline", %tip, "Finished syncing until tip.");
423478
self.tip = None;
@@ -440,6 +495,52 @@ impl<P: StageCheckpointProvider> Pipeline<P> {
440495
yield_now().await;
441496
}
442497
}
498+
499+
/// Determines if pruning should be performed based on the current block and configuration.
500+
fn should_prune(&self, current_block: BlockNumber) -> bool {
501+
if !self.pruning_config.is_enabled() {
502+
return false;
503+
}
504+
505+
let interval = match self.pruning_config.interval {
506+
Some(i) => i,
507+
None => return false,
508+
};
509+
510+
match self.last_pruned_block {
511+
Some(last_pruned) => current_block.saturating_sub(last_pruned) >= interval,
512+
None => current_block >= interval,
513+
}
514+
}
515+
516+
/// Runs pruning on all stages.
517+
async fn prune(&mut self, tip: BlockNumber) -> PipelineResult<()> {
518+
if self.stages.is_empty() {
519+
return Ok(());
520+
}
521+
522+
let prune_input = PruneInput::new(tip, self.pruning_config.mode);
523+
524+
for stage in self.stages.iter_mut() {
525+
let id = stage.id();
526+
let span = info_span!(target: "pipeline", "stage.prune", stage = %id, %tip);
527+
let enter = span.entered();
528+
529+
info!(target: "pipeline", mode = ?self.pruning_config.mode, "Pruning stage.");
530+
531+
let span_inner = enter.exit();
532+
let PruneOutput { pruned_count } = stage
533+
.prune(&prune_input)
534+
.instrument(span_inner.clone())
535+
.await
536+
.map_err(|error| Error::StagePruning { id, error })?;
537+
538+
let _enter = span_inner.enter();
539+
info!(target: "pipeline", %pruned_count, "Stage pruning completed.");
540+
}
541+
542+
Ok(())
543+
}
443544
}
444545

445546
impl<P> IntoFuture for Pipeline<P>
@@ -467,6 +568,8 @@ where
467568
.field("command", &self.command_rx)
468569
.field("provider", &self.provider)
469570
.field("chunk_size", &self.chunk_size)
571+
.field("pruning_config", &self.pruning_config)
572+
.field("last_pruned_block", &self.last_pruned_block)
470573
.field("stages", &self.stages.iter().map(|s| s.id()).collect::<Vec<_>>())
471574
.finish()
472575
}

crates/sync/stage/src/blocks/mod.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use num_traits::ToPrimitive;
1717
use starknet::core::types::ResourcePrice;
1818
use tracing::{error, info_span, Instrument};
1919

20-
use crate::{Stage, StageExecutionInput, StageExecutionOutput, StageResult};
20+
use crate::{PruneInput, PruneOutput, PruneResult, Stage, StageExecutionInput, StageExecutionOutput, StageResult};
2121

2222
mod downloader;
2323

@@ -133,6 +133,17 @@ where
133133
Ok(StageExecutionOutput { last_block_processed: input.to() })
134134
})
135135
}
136+
137+
fn prune<'a>(&'a mut self, _input: &'a PruneInput) -> BoxFuture<'a, PruneResult> {
138+
Box::pin(async move {
139+
// TODO: Implement block pruning once the BlockWriter provider API supports it.
140+
// For now, this is a no-op. Block pruning would involve:
141+
// 1. Determining which blocks to prune based on input.prune_before()
142+
// 2. Removing block headers, bodies, receipts, and state updates for those blocks
143+
// 3. Ensuring the chain tip and recent blocks remain intact
144+
Ok(PruneOutput::default())
145+
})
146+
}
136147
}
137148

138149
#[derive(Debug, thiserror::Error)]

crates/sync/stage/src/classes.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use katana_rpc_types::class::ConversionError;
1414
use rayon::prelude::*;
1515
use tracing::{debug, error, info_span, Instrument};
1616

17-
use super::{Stage, StageExecutionInput, StageExecutionOutput, StageResult};
17+
use super::{PruneInput, PruneOutput, PruneResult, Stage, StageExecutionInput, StageExecutionOutput, StageResult};
1818
use crate::downloader::{BatchDownloader, Downloader, DownloaderResult};
1919

2020
/// A stage for downloading and storing contract classes.
@@ -158,6 +158,15 @@ where
158158
Ok(StageExecutionOutput { last_block_processed: input.to() })
159159
})
160160
}
161+
162+
fn prune<'a>(&'a mut self, _input: &'a PruneInput) -> BoxFuture<'a, PruneResult> {
163+
Box::pin(async move {
164+
// Classes are immutable once declared and don't need pruning.
165+
// A class declared at block N can still be used to deploy contracts at block N+1000.
166+
// Therefore, we cannot safely prune classes based on block age alone.
167+
Ok(PruneOutput::default())
168+
})
169+
}
161170
}
162171

163172
#[derive(Debug, thiserror::Error)]

crates/sync/stage/src/lib.rs

Lines changed: 134 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ pub use trie::StateTrie;
1818
/// The result type of a stage execution. See [Stage::execute].
1919
pub type StageResult = Result<StageExecutionOutput, Error>;
2020

21+
/// The result type of a stage pruning. See [Stage::prune].
22+
pub type PruneResult = Result<PruneOutput, Error>;
23+
2124
/// Input parameters for stage execution.
2225
///
2326
/// # Invariant
@@ -62,6 +65,76 @@ pub struct StageExecutionOutput {
6265
pub last_block_processed: BlockNumber,
6366
}
6467

68+
/// Pruning mode configuration that determines how much historical state to retain.
69+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
70+
pub enum PruningMode {
71+
/// Keep all historical state since genesis (no pruning).
72+
Full,
73+
/// Keep only the last N blocks of historical state.
74+
HistoricalBlocks(u64),
75+
/// Keep only the latest state, pruning all historical data.
76+
LatestOnly,
77+
}
78+
79+
impl Default for PruningMode {
80+
fn default() -> Self {
81+
Self::Full
82+
}
83+
}
84+
85+
/// Input parameters for stage pruning.
86+
#[derive(Debug, Clone)]
87+
pub struct PruneInput {
88+
/// The current tip of the chain (highest synced block).
89+
tip: BlockNumber,
90+
/// The pruning mode to apply.
91+
mode: PruningMode,
92+
}
93+
94+
impl PruneInput {
95+
/// Creates a new [`PruneInput`] with the given tip and pruning mode.
96+
pub fn new(tip: BlockNumber, mode: PruningMode) -> Self {
97+
Self { tip, mode }
98+
}
99+
100+
/// Returns the current chain tip.
101+
#[inline]
102+
pub fn tip(&self) -> BlockNumber {
103+
self.tip
104+
}
105+
106+
/// Returns the pruning mode.
107+
#[inline]
108+
pub fn mode(&self) -> PruningMode {
109+
self.mode
110+
}
111+
112+
/// Calculates the block number before which all state should be pruned.
113+
///
114+
/// Returns `None` if no pruning should occur (e.g., in `Full` mode).
115+
/// Returns `Some(block_number)` indicating that all state before this block can be pruned.
116+
pub fn prune_before(&self) -> Option<BlockNumber> {
117+
match self.mode {
118+
PruningMode::Full => None,
119+
PruningMode::LatestOnly => Some(self.tip.saturating_sub(1)),
120+
PruningMode::HistoricalBlocks(n) => {
121+
if self.tip > n {
122+
Some(self.tip - n)
123+
} else {
124+
None
125+
}
126+
}
127+
}
128+
}
129+
}
130+
131+
/// Output from a stage pruning operation.
132+
#[derive(Debug, Default)]
133+
pub struct PruneOutput {
134+
/// The number of items (blocks, state entries, etc.) that were pruned.
135+
pub pruned_count: u64,
136+
}
137+
65138
#[derive(Debug, thiserror::Error)]
66139
pub enum Error {
67140
#[error(transparent)]
@@ -96,9 +169,10 @@ pub enum Error {
96169
///
97170
/// # Implementation Note
98171
///
99-
/// The [`execute`](Stage::execute) method returns a [`BoxFuture`] instead of `impl Future` to
100-
/// maintain dyn-compatibility. This allows the pipeline to store different stage implementations
101-
/// in a `Vec<Box<dyn Stage>>`, enabling dynamic composition of sync stages at runtime.
172+
/// The [`execute`](Stage::execute) and [`prune`](Stage::prune) methods return a [`BoxFuture`]
173+
/// instead of `impl Future` to maintain dyn-compatibility. This allows the pipeline to store
174+
/// different stage implementations in a `Vec<Box<dyn Stage>>`, enabling dynamic composition of
175+
/// sync stages at runtime.
102176
///
103177
/// While this introduces a small heap allocation for the future, it's negligible compared to
104178
/// the actual async work performed by stages (network I/O, database operations, etc.).
@@ -126,16 +200,72 @@ pub trait Stage: Send + Sync {
126200
/// Implementors are expected to perform any necessary processings on all blocks in the range
127201
/// `[input.from, input.to]`.
128202
fn execute<'a>(&'a mut self, input: &'a StageExecutionInput) -> BoxFuture<'a, StageResult>;
203+
204+
/// Prunes historical data for this stage according to the pruning configuration.
205+
///
206+
/// This method is called by the pipeline to remove old historical state that is no longer
207+
/// needed according to the pruning mode. The pruning operation is non-blocking and runs
208+
/// asynchronously.
209+
///
210+
/// # Arguments
211+
///
212+
/// * `input` - The pruning input containing the current chain tip and pruning mode
213+
///
214+
/// # Returns
215+
///
216+
/// A future that resolves to a [`PruneResult`] containing [`PruneOutput`] with the
217+
/// number of items that were pruned.
218+
///
219+
/// # Implementation Notes
220+
///
221+
/// - Stages that don't store historical state (e.g., Classes) can provide a no-op
222+
/// implementation that returns `Ok(PruneOutput::default())`.
223+
/// - Stages that store state (e.g., Blocks, StateTrie) should implement pruning logic
224+
/// appropriate to their data model.
225+
/// - The pruning operation must be non-blocking, just like [`execute`](Stage::execute).
226+
/// - Implementors should use [`PruneInput::prune_before`] to determine which blocks to prune.
227+
fn prune<'a>(&'a mut self, input: &'a PruneInput) -> BoxFuture<'a, PruneResult>;
129228
}
130229

131230
#[cfg(test)]
132231
mod tests {
133-
use crate::StageExecutionInput;
232+
use crate::{PruneInput, PruningMode, StageExecutionInput};
134233

135234
#[tokio::test]
136235
#[should_panic(expected = "Invalid block range")]
137236
async fn invalid_range_panics() {
138237
// When from > to, the range is invalid and should panic at construction time
139238
let _ = StageExecutionInput::new(100, 99);
140239
}
240+
241+
#[test]
242+
fn prune_before_full_mode() {
243+
let input = PruneInput::new(1000, PruningMode::Full);
244+
assert_eq!(input.prune_before(), None);
245+
}
246+
247+
#[test]
248+
fn prune_before_latest_only() {
249+
let input = PruneInput::new(1000, PruningMode::LatestOnly);
250+
assert_eq!(input.prune_before(), Some(999));
251+
252+
// Edge case: tip at block 0
253+
let input = PruneInput::new(0, PruningMode::LatestOnly);
254+
assert_eq!(input.prune_before(), Some(0));
255+
}
256+
257+
#[test]
258+
fn prune_before_historical_blocks() {
259+
// Keep last 100 blocks, tip at 1000
260+
let input = PruneInput::new(1000, PruningMode::HistoricalBlocks(100));
261+
assert_eq!(input.prune_before(), Some(900));
262+
263+
// Keep last 100 blocks, tip at 50 (not enough blocks yet)
264+
let input = PruneInput::new(50, PruningMode::HistoricalBlocks(100));
265+
assert_eq!(input.prune_before(), None);
266+
267+
// Keep last 100 blocks, tip at exactly 100
268+
let input = PruneInput::new(100, PruningMode::HistoricalBlocks(100));
269+
assert_eq!(input.prune_before(), Some(0));
270+
}
141271
}

0 commit comments

Comments
 (0)