Skip to content

Commit bbce8c4

Browse files
authored
fix(pipeline): pipeline waits on stale tip (#380)
1 parent c106d85 commit bbce8c4

File tree

1 file changed

+14
-11
lines changed
  • crates/sync/pipeline/src

1 file changed

+14
-11
lines changed

crates/sync/pipeline/src/lib.rs

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -354,7 +354,6 @@ impl Pipeline {
354354
result = self.run_loop() => {
355355
if let Err(error) = result {
356356
error!(target: "pipeline", %error, "Pipeline finished due to error.");
357-
break;
358357
}
359358
}
360359
}
@@ -544,21 +543,25 @@ impl Pipeline {
544543
}
545544
} else {
546545
info!(target: "pipeline", "Waiting to receive new tip.");
547-
}
546+
self.cmd_rx.changed().await.map_err(|_| Error::CommandChannelClosed)?;
548547

549-
if let Some(PipelineCommand::SetTip(new_tip)) = *self
550-
.cmd_rx
551-
.wait_for(|c| matches!(c, &Some(PipelineCommand::SetTip(_))))
552-
.await
553-
.map_err(|_| Error::CommandChannelClosed)?
554-
{
555-
info!(target: "pipeline", tip = %new_tip, "A new tip has been set.");
556-
self.tip = Some(new_tip);
557-
self.metrics.set_sync_target(new_tip);
548+
match *self.cmd_rx.borrow_and_update() {
549+
Some(PipelineCommand::SetTip(new_tip)) => {
550+
info!(target: "pipeline", tip = %new_tip, "A new tip has been set.");
551+
self.tip = Some(new_tip);
552+
self.metrics.set_sync_target(new_tip);
553+
}
554+
555+
Some(PipelineCommand::Stop) => break,
556+
557+
_ => {}
558+
}
558559
}
559560

560561
yield_now().await;
561562
}
563+
564+
Ok(())
562565
}
563566
}
564567

0 commit comments

Comments
 (0)