Skip to content

Commit 6569b2e

Browse files
committed
wip
1 parent fc94c45 commit 6569b2e

File tree

4 files changed

+25
-9
lines changed

4 files changed

+25
-9
lines changed

.tool-versions

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
11
scarb 2.8.2
2+
katana 1.7.0

crates/sync/pipeline/src/lib.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,7 @@ pub struct Pipeline<P> {
256256
block_tx: watch::Sender<Option<BlockNumber>>,
257257
tip: Option<BlockNumber>,
258258
pruning_config: PruningConfig,
259+
/// The block at which the pipeline was last pruned.
259260
last_pruned_block: Option<BlockNumber>,
260261
}
261262

@@ -456,15 +457,15 @@ impl<P: StageCheckpointProvider> Pipeline<P> {
456457
}
457458

458459
/// Runs pruning on all stages.
459-
pub async fn prune(&mut self, tip: BlockNumber) -> PipelineResult<()> {
460+
pub async fn prune(&mut self) -> PipelineResult<()> {
460461
if self.stages.is_empty() {
461462
return Ok(());
462463
}
463464

464465
for stage in self.stages.iter_mut() {
465466
let id = stage.id();
466467

467-
let span = info_span!(target: "pipeline", "stage.prune", stage = %id, %tip);
468+
let span = info_span!(target: "pipeline", "stage.prune", stage = %id);
468469
let enter = span.entered();
469470

470471
if let Some(checkpoint) = self.provider.checkpoint(id)? {
@@ -504,7 +505,7 @@ impl<P: StageCheckpointProvider> Pipeline<P> {
504505
// Check if we should run pruning
505506
if self.should_prune(last_block_processed) {
506507
info!(target: "pipeline", block = %last_block_processed, "Starting pruning.");
507-
self.prune(last_block_processed).await?;
508+
self.prune().await?;
508509
self.last_pruned_block = Some(last_block_processed);
509510
}
510511

crates/sync/pipeline/tests/pipeline.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,11 @@ impl Stage for TrackingStage {
7575
Ok(StageExecutionOutput { last_block_processed: input.to() })
7676
})
7777
}
78+
79+
fn prune<'a>(&'a mut self, input: &'a PruneInput) -> BoxFuture<'a, PruneResult> {
80+
let _ = input;
81+
Box::pin(async move { Ok(PruneOutput::default()) })
82+
}
7883
}
7984

8085
/// Mock stage that fails on execution
@@ -97,6 +102,11 @@ impl Stage for FailingStage {
97102
fn execute<'a>(&'a mut self, _: &'a StageExecutionInput) -> BoxFuture<'a, StageResult> {
98103
Box::pin(async { Err(katana_stage::Error::Other(anyhow!("Stage execution failed"))) })
99104
}
105+
106+
fn prune<'a>(&'a mut self, input: &'a PruneInput) -> BoxFuture<'a, PruneResult> {
107+
let _ = input;
108+
Box::pin(async move { Ok(PruneOutput::default()) })
109+
}
100110
}
101111

102112
/// Mock stage that always reports a fixed `last_block_processed`.
@@ -138,6 +148,11 @@ impl Stage for FixedOutputStage {
138148
Ok(StageExecutionOutput { last_block_processed })
139149
})
140150
}
151+
152+
fn prune<'a>(&'a mut self, input: &'a PruneInput) -> BoxFuture<'a, PruneResult> {
153+
let _ = input;
154+
Box::pin(async move { Ok(PruneOutput::default()) })
155+
}
141156
}
142157

143158
// ============================================================================
@@ -497,6 +512,11 @@ async fn run_should_be_cancelled_if_stop_requested() {
497512
Ok(StageExecutionOutput { last_block_processed: 100 })
498513
})
499514
}
515+
516+
fn prune<'a>(&'a mut self, input: &'a PruneInput) -> BoxFuture<'a, PruneResult> {
517+
let _ = input;
518+
Box::pin(async move { Ok(PruneOutput::default()) })
519+
}
500520
}
501521

502522
let provider = Arc::new(test_provider());

crates/sync/stage/tests/trie.rs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -53,12 +53,6 @@ impl MockProvider {
5353
self
5454
}
5555

56-
/// Configure the mock to fail on trie operations.
57-
fn with_trie_error(self) -> Self {
58-
*self.should_fail.lock().unwrap() = true;
59-
self
60-
}
61-
6256
/// Get all block numbers that had trie inserts called.
6357
fn trie_insert_blocks(&self) -> Vec<BlockNumber> {
6458
self.trie_insert_calls.lock().unwrap().clone()

0 commit comments

Comments
 (0)