File tree Expand file tree Collapse file tree 1 file changed +12
-7
lines changed
Expand file tree Collapse file tree 1 file changed +12
-7
lines changed Original file line number Diff line number Diff line change @@ -353,7 +353,8 @@ impl Pipeline {
353353
354354 result = self . run_loop( ) => {
355355 if let Err ( error) = result {
356- error!( target: "pipeline" , %error, "Pipeline returned an error." ) ;
356+ error!( target: "pipeline" , %error, "Pipeline finished due to error." ) ;
357+ break ;
357358 }
358359 }
359360 }
@@ -543,13 +544,17 @@ impl Pipeline {
543544 }
544545 } else {
545546 info ! ( target: "pipeline" , "Waiting to receive new tip." ) ;
546- self . cmd_rx . changed ( ) . await . map_err ( |_| Error :: CommandChannelClosed ) ? ;
547+ }
547548
548- if let Some ( PipelineCommand :: SetTip ( new_tip) ) = * self . cmd_rx . borrow_and_update ( ) {
549- info ! ( target: "pipeline" , tip = %new_tip, "A new tip has been set." ) ;
550- self . tip = Some ( new_tip) ;
551- self . metrics . set_sync_target ( new_tip) ;
552- }
549+ if let Some ( PipelineCommand :: SetTip ( new_tip) ) = * self
550+ . cmd_rx
551+ . wait_for ( |c| matches ! ( c, & Some ( PipelineCommand :: SetTip ( _) ) ) )
552+ . await
553+ . map_err ( |_| Error :: CommandChannelClosed ) ?
554+ {
555+ info ! ( target: "pipeline" , tip = %new_tip, "A new tip has been set." ) ;
556+ self . tip = Some ( new_tip) ;
557+ self . metrics . set_sync_target ( new_tip) ;
553558 }
554559
555560 yield_now ( ) . await ;
You can’t perform that action at this time.
0 commit comments