@@ -60,6 +60,10 @@ pub enum SyncError {
60
60
RedirectHeader ( http:: header:: ToStrError ) ,
61
61
#[ error( "redirect response with no location header" ) ]
62
62
NoRedirectLocationHeader ,
63
+ #[ error( "failed to pull db export: status={0}, error={1}" ) ]
64
+ PullDb ( StatusCode , String ) ,
65
+ #[ error( "server returned a lower generation than local: local={0}, remote={1}" ) ]
66
+ InvalidLocalGeneration ( u32 , u32 ) ,
63
67
}
64
68
65
69
impl SyncError {
@@ -86,6 +90,11 @@ pub enum PullResult {
86
90
EndOfGeneration { max_generation : u32 } ,
87
91
}
88
92
93
+ #[ derive( serde:: Deserialize ) ]
94
+ struct InfoResult {
95
+ current_generation : u32 ,
96
+ }
97
+
89
98
pub struct SyncContext {
90
99
db_path : String ,
91
100
client : hyper:: Client < ConnectorService , Body > ,
@@ -97,6 +106,9 @@ pub struct SyncContext {
97
106
durable_generation : u32 ,
98
107
/// Represents the max_frame_no from the server.
99
108
durable_frame_num : u32 ,
109
+ /// whenever sync is called very first time, we will call the remote server
110
+ /// to get the generation information and sync the db file if needed
111
+ initial_server_sync : bool ,
100
112
}
101
113
102
114
impl SyncContext {
@@ -123,8 +135,9 @@ impl SyncContext {
123
135
max_retries : DEFAULT_MAX_RETRIES ,
124
136
push_batch_size : DEFAULT_PUSH_BATCH_SIZE ,
125
137
client,
126
- durable_generation : 1 ,
138
+ durable_generation : 0 ,
127
139
durable_frame_num : 0 ,
140
+ initial_server_sync : false ,
128
141
} ;
129
142
130
143
if let Err ( e) = me. read_metadata ( ) . await {
@@ -173,7 +186,7 @@ impl SyncContext {
173
186
frame_no,
174
187
frame_no + frames_count
175
188
) ;
176
- tracing:: debug!( "pushing frame" ) ;
189
+ tracing:: debug!( "pushing frame(frame_no={}, count={}, generation={})" , frame_no , frames_count , generation ) ;
177
190
178
191
let result = self . push_with_retry ( uri, frames, self . max_retries ) . await ?;
179
192
@@ -458,6 +471,105 @@ impl SyncContext {
458
471
459
472
Ok ( ( ) )
460
473
}
474
+
475
+ /// get_remote_info calls the remote server to get the current generation information.
476
+ async fn get_remote_info ( & self ) -> Result < InfoResult > {
477
+ let uri = format ! ( "{}/info" , self . sync_url) ;
478
+ let mut req = http:: Request :: builder ( ) . method ( "GET" ) . uri ( & uri) ;
479
+
480
+ if let Some ( auth_token) = & self . auth_token {
481
+ req = req. header ( "Authorization" , auth_token) ;
482
+ }
483
+
484
+ let req = req. body ( Body :: empty ( ) ) . expect ( "valid request" ) ;
485
+
486
+ let res = self
487
+ . client
488
+ . request ( req)
489
+ . await
490
+ . map_err ( SyncError :: HttpDispatch ) ?;
491
+
492
+ if !res. status ( ) . is_success ( ) {
493
+ let status = res. status ( ) ;
494
+ let body = hyper:: body:: to_bytes ( res. into_body ( ) )
495
+ . await
496
+ . map_err ( SyncError :: HttpBody ) ?;
497
+ return Err (
498
+ SyncError :: PullDb ( status, String :: from_utf8_lossy ( & body) . to_string ( ) ) . into ( ) ,
499
+ ) ;
500
+ }
501
+
502
+ let body = hyper:: body:: to_bytes ( res. into_body ( ) )
503
+ . await
504
+ . map_err ( SyncError :: HttpBody ) ?;
505
+
506
+ let info = serde_json:: from_slice ( & body) . map_err ( SyncError :: JsonDecode ) ?;
507
+
508
+ Ok ( info)
509
+ }
510
+
511
+ async fn sync_db_if_needed ( & mut self , generation : u32 ) -> Result < ( ) > {
512
+ // we will get the export file only if the remote generation is different from the one we have
513
+ if generation == self . durable_generation {
514
+ return Ok ( ( ) ) ;
515
+ }
516
+ // somehow we are ahead of the remote in generations. following should not happen because
517
+ // we checkpoint only if the remote server tells us to do so.
518
+ if self . durable_generation > generation {
519
+ tracing:: error!(
520
+ "server returned a lower generation than what we have: sent={}, got={}" ,
521
+ self . durable_generation,
522
+ generation
523
+ ) ;
524
+ return Err (
525
+ SyncError :: InvalidLocalGeneration ( self . durable_generation , generation) . into ( ) ,
526
+ ) ;
527
+ }
528
+ tracing:: debug!(
529
+ "syncing db file from remote server, generation={}" ,
530
+ generation
531
+ ) ;
532
+ self . sync_db ( generation) . await
533
+ }
534
+
535
+ /// sync_db will download the db file from the remote server and replace the local file.
536
+ async fn sync_db ( & mut self , generation : u32 ) -> Result < ( ) > {
537
+ let uri = format ! ( "{}/export/{}" , self . sync_url, generation) ;
538
+ let mut req = http:: Request :: builder ( ) . method ( "GET" ) . uri ( & uri) ;
539
+
540
+ if let Some ( auth_token) = & self . auth_token {
541
+ req = req. header ( "Authorization" , auth_token) ;
542
+ }
543
+
544
+ let req = req. body ( Body :: empty ( ) ) . expect ( "valid request" ) ;
545
+
546
+ let res = self
547
+ . client
548
+ . request ( req)
549
+ . await
550
+ . map_err ( SyncError :: HttpDispatch ) ?;
551
+
552
+ if !res. status ( ) . is_success ( ) {
553
+ let status = res. status ( ) ;
554
+ let body = hyper:: body:: to_bytes ( res. into_body ( ) )
555
+ . await
556
+ . map_err ( SyncError :: HttpBody ) ?;
557
+ return Err (
558
+ SyncError :: PullFrame ( status, String :: from_utf8_lossy ( & body) . to_string ( ) ) . into ( ) ,
559
+ ) ;
560
+ }
561
+
562
+ // todo: do streaming write to the disk
563
+ let bytes = hyper:: body:: to_bytes ( res. into_body ( ) )
564
+ . await
565
+ . map_err ( SyncError :: HttpBody ) ?;
566
+
567
+ atomic_write ( & self . db_path , & bytes) . await ?;
568
+ self . durable_generation = generation;
569
+ self . durable_frame_num = 0 ;
570
+ self . write_metadata ( ) . await ?;
571
+ Ok ( ( ) )
572
+ }
461
573
}
462
574
463
575
#[ derive( serde:: Serialize , serde:: Deserialize , Debug ) ]
@@ -555,6 +667,22 @@ pub async fn sync_offline(
555
667
Err ( e) => Err ( e) ,
556
668
}
557
669
} else {
670
+ // todo: we are checking with the remote server only during initialisation. ideally,
671
+ // we should check everytime we try to sync with the remote server. However, we need to close
672
+ // all the ongoing connections since we replace `.db` file and remove the `.db-wal` file
673
+ if !sync_ctx. initial_server_sync {
674
+ // sync is being called first time. so we will call remote, get the generation information
675
+ // if we are lagging behind, then we will call the export API and get to the latest
676
+ // generation directly.
677
+ let info = sync_ctx. get_remote_info ( ) . await ?;
678
+ sync_ctx
679
+ . sync_db_if_needed ( info. current_generation )
680
+ . await ?;
681
+ // when sync_ctx is initialised, we set durable_generation to 0. however, once
682
+ // sync_db is called, it should be > 0.
683
+ assert ! ( sync_ctx. durable_generation > 0 , "generation should be > 0" ) ;
684
+ sync_ctx. initial_server_sync = true ;
685
+ }
558
686
try_pull ( sync_ctx, conn) . await
559
687
}
560
688
. or_else ( |err| {
0 commit comments