Skip to content

Commit 28565d7

Browse files
authored
fix(pipeline): don't restart run_loop future on tip change (#368)
Currently the `select!` is configured in `Pipeline::run` to completes on the command waiting regardless of what command is received. This means the `self.run_loop()` future will be cancelled even if it's a command for changing the pipeline's tip - the pipeline has to process the same batch again.
1 parent 692587a commit 28565d7

File tree

1 file changed

+25
-36
lines changed
  • crates/sync/pipeline/src

1 file changed

+25
-36
lines changed

crates/sync/pipeline/src/lib.rs

Lines changed: 25 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ use katana_primitives::block::BlockNumber;
8181
use katana_provider_api::stage::StageCheckpointProvider;
8282
use katana_provider_api::ProviderError;
8383
use katana_stage::{Stage, StageExecutionInput, StageExecutionOutput};
84-
use tokio::sync::watch;
84+
use tokio::sync::watch::{self};
8585
use tokio::task::yield_now;
8686
use tracing::{debug, error, info, info_span, Instrument};
8787

@@ -101,6 +101,9 @@ pub enum Error {
101101

102102
#[error(transparent)]
103103
Provider(#[from] ProviderError),
104+
105+
#[error("command channel closed")]
106+
CommandChannelClosed,
104107
}
105108

106109
/// Commands that can be sent to control the pipeline.
@@ -217,8 +220,8 @@ pub struct Pipeline<P> {
217220
chunk_size: u64,
218221
provider: P,
219222
stages: Vec<Box<dyn Stage>>,
220-
command_rx: watch::Receiver<Option<PipelineCommand>>,
221-
command_tx: watch::Sender<Option<PipelineCommand>>,
223+
cmd_rx: watch::Receiver<Option<PipelineCommand>>,
224+
cmd_tx: watch::Sender<Option<PipelineCommand>>,
222225
block_tx: watch::Sender<Option<BlockNumber>>,
223226
tip: Option<BlockNumber>,
224227
}
@@ -240,8 +243,8 @@ impl<P> Pipeline<P> {
240243
let handle = PipelineHandle { tx: tx.clone(), block_tx: block_tx.clone() };
241244
let pipeline = Self {
242245
stages: Vec::new(),
243-
command_rx: rx,
244-
command_tx: tx,
246+
cmd_rx: rx,
247+
cmd_tx: tx,
245248
block_tx,
246249
provider,
247250
chunk_size,
@@ -269,7 +272,7 @@ impl<P> Pipeline<P> {
269272
/// The handle can be used to set the target tip block for the pipeline to sync to or to
270273
/// stop the pipeline.
271274
pub fn handle(&self) -> PipelineHandle {
272-
PipelineHandle { tx: self.command_tx.clone(), block_tx: self.block_tx.clone() }
275+
PipelineHandle { tx: self.cmd_tx.clone(), block_tx: self.block_tx.clone() }
273276
}
274277
}
275278

@@ -285,29 +288,19 @@ impl<P: StageCheckpointProvider> Pipeline<P> {
285288
/// Returns an error if any stage execution fails or it an error occurs while reading the
286289
/// checkpoint.
287290
pub async fn run(&mut self) -> PipelineResult<()> {
288-
let mut command_rx = self.command_rx.clone();
291+
let mut command_rx = self.cmd_rx.clone();
289292

290293
loop {
291294
tokio::select! {
292295
biased;
293296

294-
changed = command_rx.changed() => {
297+
changed = command_rx.wait_for(|c| matches!(c, &Some(PipelineCommand::Stop))) => {
295298
if changed.is_err() {
296299
break;
297300
}
298301

299-
// Check if the handle has sent a signal
300-
match *self.command_rx.borrow_and_update() {
301-
Some(PipelineCommand::Stop) => {
302-
debug!(target: "pipeline", "Received stop command.");
303-
break;
304-
}
305-
Some(PipelineCommand::SetTip(new_tip)) => {
306-
info!(target: "pipeline", tip = %new_tip, "A new tip has been set.");
307-
self.tip = Some(new_tip);
308-
}
309-
None => {}
310-
}
302+
debug!(target: "pipeline", "Received stop command.");
303+
break;
311304
}
312305

313306
result = self.run_loop() => {
@@ -425,27 +418,26 @@ impl<P: StageCheckpointProvider> Pipeline<P> {
425418
} else {
426419
current_chunk_tip = (last_block_processed + self.chunk_size).min(tip);
427420
}
428-
429-
continue;
421+
} else {
422+
info!(target: "pipeline", "Waiting to receive new tip.");
430423
}
431424

432-
info!(target: "pipeline", "Waiting to receive new tip.");
433-
434-
// block until a new tip is set
435-
self.command_rx
425+
if let Some(PipelineCommand::SetTip(new_tip)) = *self
426+
.cmd_rx
436427
.wait_for(|c| matches!(c, &Some(PipelineCommand::SetTip(_))))
437428
.await
438-
.expect("qed; channel closed");
429+
.map_err(|_| Error::CommandChannelClosed)?
430+
{
431+
info!(target: "pipeline", tip = %new_tip, "A new tip has been set.");
432+
self.tip = Some(new_tip);
433+
}
439434

440435
yield_now().await;
441436
}
442437
}
443438
}
444439

445-
impl<P> IntoFuture for Pipeline<P>
446-
where
447-
P: StageCheckpointProvider + 'static,
448-
{
440+
impl<P: StageCheckpointProvider + 'static> IntoFuture for Pipeline<P> {
449441
type Output = PipelineResult<()>;
450442
type IntoFuture = PipelineFut;
451443

@@ -458,13 +450,10 @@ where
458450
}
459451
}
460452

461-
impl<P> core::fmt::Debug for Pipeline<P>
462-
where
463-
P: core::fmt::Debug,
464-
{
453+
impl<P: core::fmt::Debug> core::fmt::Debug for Pipeline<P> {
465454
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
466455
f.debug_struct("Pipeline")
467-
.field("command", &self.command_rx)
456+
.field("command", &self.cmd_rx)
468457
.field("provider", &self.provider)
469458
.field("chunk_size", &self.chunk_size)
470459
.field("stages", &self.stages.iter().map(|s| s.id()).collect::<Vec<_>>())

0 commit comments

Comments
 (0)