Skip to content

Commit fc94c45

Browse files
committed
update
1 parent 432a505 commit fc94c45

File tree

3 files changed

+62
-49
lines changed

3 files changed

+62
-49
lines changed

crates/sync/pipeline/src/lib.rs

Lines changed: 35 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -393,7 +393,7 @@ impl<P: StageCheckpointProvider> Pipeline<P> {
393393
///
394394
/// Returns an error if any stage execution fails or if the pipeline fails to read the
395395
/// checkpoint.
396-
pub async fn run_once(&mut self, to: BlockNumber) -> PipelineResult<BlockNumber> {
396+
pub async fn execute(&mut self, to: BlockNumber) -> PipelineResult<BlockNumber> {
397397
if self.stages.is_empty() {
398398
return Ok(to);
399399
}
@@ -455,6 +455,39 @@ impl<P: StageCheckpointProvider> Pipeline<P> {
455455
Ok(last_block_processed_list.into_iter().min().unwrap_or(to))
456456
}
457457

458+
/// Runs pruning on all stages.
459+
pub async fn prune(&mut self, tip: BlockNumber) -> PipelineResult<()> {
460+
if self.stages.is_empty() {
461+
return Ok(());
462+
}
463+
464+
for stage in self.stages.iter_mut() {
465+
let id = stage.id();
466+
467+
let span = info_span!(target: "pipeline", "stage.prune", stage = %id, %tip);
468+
let enter = span.entered();
469+
470+
if let Some(checkpoint) = self.provider.checkpoint(id)? {
471+
let prune_input = PruneInput::new(checkpoint, self.pruning_config.mode);
472+
info!(target: "pipeline", mode = ?self.pruning_config.mode, "Pruning stage.");
473+
474+
let span_inner = enter.exit();
475+
let PruneOutput { pruned_count } = stage
476+
.prune(&prune_input)
477+
.instrument(span_inner.clone())
478+
.await
479+
.map_err(|error| Error::StagePruning { id, error })?;
480+
481+
let _enter = span_inner.enter();
482+
info!(target: "pipeline", %pruned_count, "Stage pruning completed.");
483+
} else {
484+
info!(target: "pipeline", "Skipping stage - no data to prune (no checkpoint).");
485+
}
486+
}
487+
488+
Ok(())
489+
}
490+
458491
/// Run the pipeline loop.
459492
async fn run_loop(&mut self) -> PipelineResult<()> {
460493
let mut current_chunk_tip = self.chunk_size;
@@ -463,7 +496,7 @@ impl<P: StageCheckpointProvider> Pipeline<P> {
463496
// Process blocks if we have a tip
464497
if let Some(tip) = self.tip {
465498
let to = current_chunk_tip.min(tip);
466-
let last_block_processed = self.run_once(to).await?;
499+
let last_block_processed = self.execute(to).await?;
467500

468501
// Notify subscribers about the newly processed block
469502
let _ = self.block_tx.send(Some(last_block_processed));
@@ -514,35 +547,6 @@ impl<P: StageCheckpointProvider> Pipeline<P> {
514547
None => current_block >= interval,
515548
}
516549
}
517-
518-
/// Runs pruning on all stages.
519-
async fn prune(&mut self, tip: BlockNumber) -> PipelineResult<()> {
520-
if self.stages.is_empty() {
521-
return Ok(());
522-
}
523-
524-
let prune_input = PruneInput::new(tip, self.pruning_config.mode);
525-
526-
for stage in self.stages.iter_mut() {
527-
let id = stage.id();
528-
let span = info_span!(target: "pipeline", "stage.prune", stage = %id, %tip);
529-
let enter = span.entered();
530-
531-
info!(target: "pipeline", mode = ?self.pruning_config.mode, "Pruning stage.");
532-
533-
let span_inner = enter.exit();
534-
let PruneOutput { pruned_count } = stage
535-
.prune(&prune_input)
536-
.instrument(span_inner.clone())
537-
.await
538-
.map_err(|error| Error::StagePruning { id, error })?;
539-
540-
let _enter = span_inner.enter();
541-
info!(target: "pipeline", %pruned_count, "Stage pruning completed.");
542-
}
543-
544-
Ok(())
545-
}
546550
}
547551

548552
impl<P> IntoFuture for Pipeline<P>

crates/sync/pipeline/tests/pipeline.rs

Lines changed: 25 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,10 @@ use katana_pipeline::Pipeline;
88
use katana_primitives::block::BlockNumber;
99
use katana_provider::api::stage::StageCheckpointProvider;
1010
use katana_provider::test_utils::test_provider;
11-
use katana_stage::{Stage, StageExecutionInput, StageExecutionOutput, StageResult};
11+
use katana_stage::{
12+
PruneInput, PruneOutput, PruneResult, Stage, StageExecutionInput, StageExecutionOutput,
13+
StageResult,
14+
};
1215

1316
/// Simple mock stage that does nothing
1417
struct MockStage;
@@ -21,6 +24,11 @@ impl Stage for MockStage {
2124
fn execute<'a>(&'a mut self, input: &'a StageExecutionInput) -> BoxFuture<'a, StageResult> {
2225
Box::pin(async move { Ok(StageExecutionOutput { last_block_processed: input.to() }) })
2326
}
27+
28+
fn prune<'a>(&'a mut self, input: &'a PruneInput) -> BoxFuture<'a, PruneResult> {
29+
let _ = input;
30+
Box::pin(async move { Ok(PruneOutput::default()) })
31+
}
2432
}
2533

2634
/// Tracks execution calls with their inputs
@@ -146,7 +154,7 @@ async fn run_to_executes_stage_to_target() {
146154

147155
pipeline.add_stage(stage);
148156
handle.set_tip(5);
149-
let result = pipeline.run_once(5).await.unwrap();
157+
let result = pipeline.execute(5).await.unwrap();
150158

151159
assert_eq!(result, 5);
152160
assert_eq!(provider.checkpoint(stage_clone.id()).unwrap(), Some(5));
@@ -170,7 +178,7 @@ async fn run_to_skips_stage_when_checkpoint_equals_target() {
170178
pipeline.add_stage(stage);
171179

172180
handle.set_tip(5);
173-
let result = pipeline.run_once(5).await.unwrap();
181+
let result = pipeline.execute(5).await.unwrap();
174182

175183
assert_eq!(result, 5);
176184
assert_eq!(stage_clone.executions().len(), 0); // Not executed
@@ -189,7 +197,7 @@ async fn run_to_skips_stage_when_checkpoint_exceeds_target() {
189197
pipeline.add_stage(stage);
190198

191199
handle.set_tip(10);
192-
let result = pipeline.run_once(5).await.unwrap();
200+
let result = pipeline.execute(5).await.unwrap();
193201

194202
assert_eq!(result, 10); // Returns the checkpoint
195203
assert_eq!(stage_clone.executions().len(), 0); // Not executed
@@ -207,7 +215,7 @@ async fn run_to_uses_checkpoint_plus_one_as_from() {
207215
provider.set_checkpoint(stage.id(), 3).unwrap();
208216
pipeline.add_stage(stage);
209217
handle.set_tip(10);
210-
pipeline.run_once(10).await.unwrap();
218+
pipeline.execute(10).await.unwrap();
211219

212220
let execs = stage_clone.executions();
213221
assert_eq!(execs.len(), 1);
@@ -241,7 +249,7 @@ async fn run_to_executes_all_stages_in_order() {
241249
]);
242250

243251
handle.set_tip(5);
244-
pipeline.run_once(5).await.unwrap();
252+
pipeline.execute(5).await.unwrap();
245253

246254
// All stages should be executed once because the tip is 5 and the chunk size is 10
247255
assert_eq!(stage1_clone.execution_count(), 1);
@@ -279,7 +287,7 @@ async fn run_to_with_mixed_checkpoints() {
279287
provider.set_checkpoint(stage2_clone.id(), 3).unwrap();
280288

281289
handle.set_tip(10);
282-
pipeline.run_once(10).await.unwrap();
290+
pipeline.execute(10).await.unwrap();
283291

284292
// Stage1 should be skipped because its checkpoint (10) >= than the tip (10)
285293
assert_eq!(stage1_clone.execution_count(), 0);
@@ -317,7 +325,7 @@ async fn run_to_returns_minimum_last_block_processed() {
317325
]);
318326

319327
handle.set_tip(20);
320-
let result = pipeline.run_once(20).await.unwrap();
328+
let result = pipeline.execute(20).await.unwrap();
321329

322330
// make sure that all the stages were executed once
323331
assert_eq!(stage1_clone.execution_count(), 1);
@@ -353,7 +361,7 @@ async fn run_to_middle_stage_skip_continues() {
353361
provider.set_checkpoint(stage2_clone.id(), 10).unwrap();
354362

355363
handle.set_tip(10);
356-
pipeline.run_once(10).await.unwrap();
364+
pipeline.execute(10).await.unwrap();
357365

358366
// Stage1 and Stage3 should execute
359367
assert_eq!(stage1_clone.execution_count(), 1);
@@ -526,7 +534,7 @@ async fn stage_execution_error_stops_pipeline() {
526534
pipeline.add_stage(stage);
527535

528536
handle.set_tip(10);
529-
let result = pipeline.run_once(10).await;
537+
let result = pipeline.execute(10).await;
530538
assert!(result.is_err());
531539

532540
// Checkpoint should not be set after failure
@@ -549,7 +557,7 @@ async fn stage_error_doesnt_affect_subsequent_runs() {
549557
pipeline.add_stage(stage2);
550558

551559
handle.set_tip(10);
552-
let error = pipeline.run_once(10).await.unwrap_err();
560+
let error = pipeline.execute(10).await.unwrap_err();
553561

554562
let katana_pipeline::Error::StageExecution { id, error } = error else {
555563
panic!("Unexpected error type");
@@ -573,7 +581,7 @@ async fn empty_pipeline_returns_target() {
573581

574582
// No stages added
575583
handle.set_tip(10);
576-
let result = pipeline.run_once(10).await.unwrap();
584+
let result = pipeline.execute(10).await.unwrap();
577585

578586
assert_eq!(result, 10);
579587
}
@@ -591,7 +599,7 @@ async fn tip_equals_checkpoint_no_execution() {
591599
pipeline.add_stage(stage);
592600

593601
handle.set_tip(10);
594-
pipeline.run_once(10).await.unwrap();
602+
pipeline.execute(10).await.unwrap();
595603

596604
assert_eq!(executions.lock().unwrap().len(), 0, "Stage1 should not be executed");
597605
}
@@ -612,7 +620,7 @@ async fn tip_less_than_checkpoint_skip_all() {
612620
pipeline.add_stage(stage);
613621

614622
handle.set_tip(20);
615-
let result = pipeline.run_once(10).await.unwrap();
623+
let result = pipeline.execute(10).await.unwrap();
616624

617625
assert_eq!(result, checkpoint);
618626
assert_eq!(executions.lock().unwrap().len(), 0, "Stage1 should not be executed");
@@ -660,20 +668,20 @@ async fn stage_checkpoint() {
660668
assert_eq!(initial_checkpoint, None);
661669

662670
handle.set_tip(5);
663-
pipeline.run_once(5).await.expect("failed to run the pipeline once");
671+
pipeline.execute(5).await.expect("failed to run the pipeline once");
664672

665673
// check that the checkpoint was set
666674
let actual_checkpoint = provider.checkpoint("Mock").unwrap();
667675
assert_eq!(actual_checkpoint, Some(5));
668676

669677
handle.set_tip(10);
670-
pipeline.run_once(10).await.expect("failed to run the pipeline once");
678+
pipeline.execute(10).await.expect("failed to run the pipeline once");
671679

672680
// check that the checkpoint was set
673681
let actual_checkpoint = provider.checkpoint("Mock").unwrap();
674682
assert_eq!(actual_checkpoint, Some(10));
675683

676-
pipeline.run_once(10).await.expect("failed to run the pipeline once");
684+
pipeline.execute(10).await.expect("failed to run the pipeline once");
677685

678686
// check that the checkpoint doesn't change
679687
let actual_checkpoint = provider.checkpoint("Mock").unwrap();

crates/sync/stage/src/blocks/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,8 @@ where
137137
})
138138
}
139139

140-
fn prune<'a>(&'a mut self, _input: &'a PruneInput) -> BoxFuture<'a, PruneResult> {
140+
fn prune<'a>(&'a mut self, input: &'a PruneInput) -> BoxFuture<'a, PruneResult> {
141+
let _ = input;
141142
Box::pin(async move {
142143
// TODO: Implement block pruning once the BlockWriter provider API supports it.
143144
// For now, this is a no-op. Block pruning would involve:

0 commit comments

Comments
 (0)