@@ -67,6 +67,7 @@ impl Builder<()> {
67
67
http_request_callback: None ,
68
68
namespace: None ,
69
69
skip_safety_assert: false ,
70
+ sync_protocol: Default :: default ( ) ,
70
71
} ,
71
72
}
72
73
}
@@ -222,6 +223,7 @@ cfg_replication! {
222
223
http_request_callback: Option <crate :: util:: HttpRequestCallback >,
223
224
namespace: Option <String >,
224
225
skip_safety_assert: bool ,
226
+ sync_protocol: super :: SyncProtocol ,
225
227
}
226
228
227
229
/// Local replica configuration type in [`Builder`].
@@ -274,6 +276,14 @@ cfg_replication! {
274
276
self
275
277
}
276
278
279
+ /// Set the duration at which the replicator will automatically call `sync` in the
280
+ /// background. The sync will continue for the duration that the resulted `Database`
281
+ /// type is alive for, once it is dropped the background task will get dropped and stop.
282
+ pub fn sync_protocol( mut self , protocol: super :: SyncProtocol ) -> Builder <RemoteReplica > {
283
+ self . inner. sync_protocol = protocol;
284
+ self
285
+ }
286
+
277
287
pub fn http_request_callback<F >( mut self , f: F ) -> Builder <RemoteReplica >
278
288
where
279
289
F : Fn ( & mut http:: Request <( ) >) + Send + Sync + ' static
@@ -326,7 +336,8 @@ cfg_replication! {
326
336
sync_interval,
327
337
http_request_callback,
328
338
namespace,
329
- skip_safety_assert
339
+ skip_safety_assert,
340
+ sync_protocol,
330
341
} = self . inner;
331
342
332
343
let connector = if let Some ( connector) = connector {
@@ -342,26 +353,44 @@ cfg_replication! {
342
353
crate :: util:: ConnectorService :: new( svc)
343
354
} ;
344
355
345
- let client = hyper:: client:: Client :: builder( )
346
- . build:: <_, hyper:: Body >( connector. clone( ) ) ;
347
-
348
- let req = http:: Request :: get( format!( "{url}/sync/0/0/0" ) )
349
- . header( "Authorization" , format!( "Bearer {}" , auth_token) )
350
- . body( hyper:: Body :: empty( ) )
351
- . unwrap( ) ;
352
-
353
- let res = client
354
- . request( req)
355
- . await
356
- . map_err( |err| crate :: Error :: Sync ( err. into( ) ) ) ?;
357
-
358
- match res. status( ) {
359
- hyper:: StatusCode :: OK => return Builder :: new_synced_database( path, url, auth_token)
360
- . remote_writes( true )
361
- . read_your_writes( read_your_writes)
362
- . build( )
363
- . await ,
364
- _ => { }
356
+ use super :: SyncProtocol ;
357
+
358
+ match sync_protocol {
359
+ p @ ( SyncProtocol :: Auto | SyncProtocol :: V2 ) => {
360
+ let client = hyper:: client:: Client :: builder( )
361
+ . build:: <_, hyper:: Body >( connector. clone( ) ) ;
362
+
363
+ let req = http:: Request :: get( format!( "{url}/sync/0/0/0" ) )
364
+ . header( "Authorization" , format!( "Bearer {}" , auth_token) )
365
+ . body( hyper:: Body :: empty( ) )
366
+ . unwrap( ) ;
367
+
368
+ let res = client
369
+ . request( req)
370
+ . await
371
+ . map_err( |err| crate :: Error :: Sync ( err. into( ) ) ) ?;
372
+
373
+ if matches!( p, SyncProtocol :: V2 ) {
374
+ if !res. status( ) . is_success( ) {
375
+ let status = res. status( ) ;
376
+ let body_bytes = hyper:: body:: to_bytes( res. into_body( ) )
377
+ . await
378
+ . map_err( |err| crate :: Error :: Sync ( err. into( ) ) ) ?;
379
+ let error_message = String :: from_utf8_lossy( & body_bytes) ;
380
+ return Err ( crate :: Error :: Sync ( format!( "HTTP error {}: {}" , status, error_message) . into( ) ) ) ;
381
+ }
382
+ }
383
+
384
+ if res. status( ) . is_success( ) {
385
+ return Builder :: new_synced_database( path, url, auth_token)
386
+ . remote_writes( true )
387
+ . read_your_writes( read_your_writes)
388
+ . build( )
389
+ . await ;
390
+ }
391
+
392
+ }
393
+ SyncProtocol :: V1 => { }
365
394
}
366
395
367
396
let path = path. to_str( ) . ok_or( crate :: Error :: InvalidUTF8Path ) ?. to_owned( ) ;
0 commit comments