@@ -644,7 +644,10 @@ cfg_sync! {
644
644
645
645
let mut bg_abort: Option <std:: sync:: Arc <crate :: sync:: DropAbort >> = None ;
646
646
647
+
647
648
if let Some ( sync_interval) = sync_interval {
649
+ let ( cancel_tx, mut cancel_rx) = tokio:: sync:: oneshot:: channel:: <( ) >( ) ;
650
+
648
651
let sync_span = tracing:: debug_span!( "sync_interval" ) ;
649
652
let _enter = sync_span. enter( ) ;
650
653
@@ -659,31 +662,36 @@ cfg_sync! {
659
662
// `bootstrap_db` (for synced dbs) before calling connect. Otherwise, the sync
660
663
// protocol skips calling `export` endpoint causing slowdown in initial bootstrap.
661
664
let conn = db. connect( ) ?;
662
- let jh = tokio:: spawn(
665
+
666
+ tokio:: spawn(
663
667
async move {
664
668
let mut interval = tokio:: time:: interval( sync_interval) ;
665
669
666
670
loop {
667
- tracing:: info!( "trying to sync" ) ;
668
-
669
- interval. tick( ) . await ;
670
-
671
- let mut ctx = sync_ctx. lock( ) . await ;
672
- if remote_writes {
673
- if let Err ( e) = crate :: sync:: try_pull( & mut ctx, & conn) . await {
674
- tracing:: error!( "sync error: {}" , e) ;
675
- }
676
- } else {
677
- if let Err ( e) = crate :: sync:: sync_offline( & mut ctx, & conn) . await {
678
- tracing:: error!( "sync error: {}" , e) ;
671
+ tokio:: select! {
672
+ _ = & mut cancel_rx => break ,
673
+ _ = interval. tick( ) => {
674
+ tracing:: debug!( "trying to sync" ) ;
675
+
676
+ let mut ctx = sync_ctx. lock( ) . await ;
677
+
678
+ let result = if remote_writes {
679
+ crate :: sync:: try_pull( & mut ctx, & conn) . await
680
+ } else {
681
+ crate :: sync:: sync_offline( & mut ctx, & conn) . await
682
+ } ;
683
+
684
+ if let Err ( e) = result {
685
+ tracing:: error!( "Error syncing database: {}" , e) ;
686
+ }
679
687
}
680
688
}
681
689
}
682
690
}
683
691
. instrument( tracing:: debug_span!( "sync interval thread" ) ) ,
684
692
) ;
685
693
686
- bg_abort. replace( std:: sync:: Arc :: new( crate :: sync:: DropAbort ( jh . abort_handle ( ) ) ) ) ;
694
+ bg_abort. replace( std:: sync:: Arc :: new( crate :: sync:: DropAbort ( Some ( cancel_tx ) ) ) ) ;
687
695
}
688
696
689
697
Ok ( Database {
0 commit comments