Skip to content

Commit 237ef33

Browse files
committed
error
1 parent 21f0961 commit 237ef33

File tree

1 file changed

+12
-10
lines changed
  • crates/sync/pipeline/src

1 file changed

+12
-10
lines changed

crates/sync/pipeline/src/lib.rs

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,9 @@ pub enum Error {
101101

102102
#[error(transparent)]
103103
Provider(#[from] ProviderError),
104+
105+
#[error("command channel closed")]
106+
CommandChannelClosed,
104107
}
105108

106109
/// Commands that can be sent to control the pipeline.
@@ -416,18 +419,17 @@ impl<P: StageCheckpointProvider> Pipeline<P> {
416419
current_chunk_tip = (last_block_processed + self.chunk_size).min(tip);
417420
}
418421
} else {
419-
// block until a new tip is set
420422
info!(target: "pipeline", "Waiting to receive new tip.");
421-
self.cmd_rx
422-
.wait_for(|c| matches!(c, &Some(PipelineCommand::SetTip(_))))
423-
.await
424-
.expect("qed; channel closed");
425423
}
426-
if self.cmd_rx.has_changed().unwrap_or(false) {
427-
if let Some(PipelineCommand::SetTip(new_tip)) = *self.cmd_rx.borrow_and_update() {
428-
info!(target: "pipeline", tip = %new_tip, "A new tip has been set.");
429-
self.tip = Some(new_tip);
430-
}
424+
425+
if let Some(PipelineCommand::SetTip(new_tip)) = *self
426+
.cmd_rx
427+
.wait_for(|c| matches!(c, &Some(PipelineCommand::SetTip(_))))
428+
.await
429+
.map_err(|_| Error::CommandChannelClosed)?
430+
{
431+
info!(target: "pipeline", tip = %new_tip, "A new tip has been set.");
432+
self.tip = Some(new_tip);
431433
}
432434

433435
yield_now().await;

0 commit comments

Comments
 (0)