Skip to content

Commit 21f0961

Browse files
committed
refactor(pipeline): only finish on stop command
1 parent 7120272 commit 21f0961

File tree

1 file changed

+18
-36
lines changed
  • crates/sync/pipeline/src

1 file changed

+18
-36
lines changed

crates/sync/pipeline/src/lib.rs

Lines changed: 18 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -75,17 +75,12 @@
7575
//! [Erigon]: https://github.com/erigontech/erigon
7676
7777
use core::future::IntoFuture;
78-
use std::future::Future;
79-
use std::pin::Pin;
80-
use std::task::Poll;
8178

8279
use futures::future::BoxFuture;
83-
use futures::FutureExt;
8480
use katana_primitives::block::BlockNumber;
8581
use katana_provider_api::stage::StageCheckpointProvider;
8682
use katana_provider_api::ProviderError;
8783
use katana_stage::{Stage, StageExecutionInput, StageExecutionOutput};
88-
use tokio::sync::watch::error::RecvError;
8984
use tokio::sync::watch::{self};
9085
use tokio::task::yield_now;
9186
use tracing::{debug, error, info, info_span, Instrument};
@@ -296,23 +291,13 @@ impl<P: StageCheckpointProvider> Pipeline<P> {
296291
tokio::select! {
297292
biased;
298293

299-
changed = command_rx.changed() => {
294+
changed = command_rx.wait_for(|c| matches!(c, &Some(PipelineCommand::Stop))) => {
300295
if changed.is_err() {
301296
break;
302297
}
303298

304-
// Check if the handle has sent a signal
305-
match *self.cmd_rx.borrow_and_update() {
306-
Some(PipelineCommand::Stop) => {
307-
debug!(target: "pipeline", "Received stop command.");
308-
break;
309-
}
310-
Some(PipelineCommand::SetTip(new_tip)) => {
311-
info!(target: "pipeline", tip = %new_tip, "A new tip has been set.");
312-
self.tip = Some(new_tip);
313-
}
314-
None => {}
315-
}
299+
debug!(target: "pipeline", "Received stop command.");
300+
break;
316301
}
317302

318303
result = self.run_loop() => {
@@ -430,27 +415,27 @@ impl<P: StageCheckpointProvider> Pipeline<P> {
430415
} else {
431416
current_chunk_tip = (last_block_processed + self.chunk_size).min(tip);
432417
}
433-
434-
continue;
418+
} else {
419+
// block until a new tip is set
420+
info!(target: "pipeline", "Waiting to receive new tip.");
421+
self.cmd_rx
422+
.wait_for(|c| matches!(c, &Some(PipelineCommand::SetTip(_))))
423+
.await
424+
.expect("qed; channel closed");
425+
}
426+
if self.cmd_rx.has_changed().unwrap_or(false) {
427+
if let Some(PipelineCommand::SetTip(new_tip)) = *self.cmd_rx.borrow_and_update() {
428+
info!(target: "pipeline", tip = %new_tip, "A new tip has been set.");
429+
self.tip = Some(new_tip);
430+
}
435431
}
436-
437-
info!(target: "pipeline", "Waiting to receive new tip.");
438-
439-
// block until a new tip is set
440-
self.cmd_rx
441-
.wait_for(|c| matches!(c, &Some(PipelineCommand::SetTip(_))))
442-
.await
443-
.expect("qed; channel closed");
444432

445433
yield_now().await;
446434
}
447435
}
448436
}
449437

450-
impl<P> IntoFuture for Pipeline<P>
451-
where
452-
P: StageCheckpointProvider + 'static,
453-
{
438+
impl<P: StageCheckpointProvider + 'static> IntoFuture for Pipeline<P> {
454439
type Output = PipelineResult<()>;
455440
type IntoFuture = PipelineFut;
456441

@@ -463,10 +448,7 @@ where
463448
}
464449
}
465450

466-
impl<P> core::fmt::Debug for Pipeline<P>
467-
where
468-
P: core::fmt::Debug,
469-
{
451+
impl<P: core::fmt::Debug> core::fmt::Debug for Pipeline<P> {
470452
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
471453
f.debug_struct("Pipeline")
472454
.field("command", &self.cmd_rx)

0 commit comments

Comments
 (0)