@@ -75,6 +75,7 @@ pub struct Replicator {
75
75
upload_progress : Arc < Mutex < CompletionProgress > > ,
76
76
last_uploaded_frame_no : Receiver < u32 > ,
77
77
skip_snapshot : bool ,
78
+ skip_shutdown_upload : bool ,
78
79
}
79
80
80
81
#[ derive( Debug ) ]
@@ -122,6 +123,8 @@ pub struct Options {
122
123
pub s3_max_retries : u32 ,
123
124
/// Skip snapshot upload per checkpoint.
124
125
pub skip_snapshot : bool ,
126
+ /// Skip uploading snapshots on shutdown
127
+ pub skip_shutdown_upload : bool ,
125
128
}
126
129
127
130
impl Options {
@@ -238,6 +241,10 @@ impl Options {
238
241
Some ( key) => Some ( EncryptionConfig :: new ( cipher, key) ) ,
239
242
None => None ,
240
243
} ;
244
+
245
+ let skip_shutdown_upload =
246
+ env_var_or ( "LIBSQL_BOTTOMLESS_SKIP_SHUTDOWN_UPLOAD" , false ) . parse :: < bool > ( ) ?;
247
+
241
248
Ok ( Options {
242
249
db_id,
243
250
create_bucket_if_not_exists : true ,
@@ -255,6 +262,7 @@ impl Options {
255
262
bucket_name,
256
263
s3_max_retries,
257
264
skip_snapshot,
265
+ skip_shutdown_upload,
258
266
} )
259
267
}
260
268
}
@@ -343,6 +351,12 @@ impl Replicator {
343
351
} ;
344
352
tracing:: debug!( "Database path: '{}', name: '{}'" , db_path, db_name) ;
345
353
354
+ let skip_shutdown_upload = options. skip_shutdown_upload ;
355
+
356
+ if skip_shutdown_upload {
357
+ tracing:: warn!( "skipping upload on shutdown" ) ;
358
+ }
359
+
346
360
let ( flush_trigger, mut flush_trigger_rx) = channel ( ( ) ) ;
347
361
let ( last_committed_frame_no_sender, last_committed_frame_no) = channel ( Ok ( 0 ) ) ;
348
362
@@ -498,6 +512,7 @@ impl Replicator {
498
512
join_set,
499
513
upload_progress,
500
514
last_uploaded_frame_no,
515
+ skip_shutdown_upload,
501
516
} )
502
517
}
503
518
@@ -529,33 +544,38 @@ impl Replicator {
529
544
}
530
545
531
546
pub async fn shutdown_gracefully ( & mut self ) -> Result < ( ) > {
532
- tracing:: info!( "bottomless replicator: shutting down..." ) ;
533
- // 1. wait for all committed WAL frames to be committed locally
534
- let last_frame_no = self . last_known_frame ( ) ;
535
- // force flush in order to not wait for periodic wake up of local back up process
536
- if let Some ( tx) = & self . flush_trigger {
537
- let _ = tx. send ( ( ) ) ;
538
- }
539
- self . wait_until_committed ( last_frame_no) . await ?;
540
- tracing:: info!(
541
- "bottomless replicator: local backup replicated frames until {}" ,
542
- last_frame_no
543
- ) ;
544
- // 2. wait for snapshot upload to S3 to finish
545
- self . wait_until_snapshotted ( ) . await ?;
546
- tracing:: info!( "bottomless replicator: snapshot succesfully uploaded to S3" ) ;
547
- // 3. drop flush trigger, which will cause WAL upload loop to close. Since this action will
548
- // close the channel used by wait_until_committed, it must happen after wait_until_committed
549
- // has finished. If trigger won't be dropped, tasks from join_set will never finish.
550
- self . flush_trigger . take ( ) ;
551
- // 4. drop shutdown trigger which will notify S3 upload process to stop all retry attempts
552
- // and finish upload process
553
- self . shutdown_trigger . take ( ) ;
554
- while let Some ( t) = self . join_set . join_next ( ) . await {
555
- // one of the tasks we're waiting for is upload of local WAL segment from pt.1 to S3
556
- // this should ensure that all WAL frames are one S3
557
- t?;
547
+ if !self . skip_shutdown_upload {
548
+ tracing:: info!( "bottomless replicator: shutting down..." ) ;
549
+ // 1. wait for all committed WAL frames to be committed locally
550
+ let last_frame_no = self . last_known_frame ( ) ;
551
+ // force flush in order to not wait for periodic wake up of local back up process
552
+ if let Some ( tx) = & self . flush_trigger {
553
+ let _ = tx. send ( ( ) ) ;
554
+ }
555
+ self . wait_until_committed ( last_frame_no) . await ?;
556
+ tracing:: info!(
557
+ "bottomless replicator: local backup replicated frames until {}" ,
558
+ last_frame_no
559
+ ) ;
560
+ // 2. wait for snapshot upload to S3 to finish
561
+ self . wait_until_snapshotted ( ) . await ?;
562
+ tracing:: info!( "bottomless replicator: snapshot succesfully uploaded to S3" ) ;
563
+ // 3. drop flush trigger, which will cause WAL upload loop to close. Since this action will
564
+ // close the channel used by wait_until_committed, it must happen after wait_until_committed
565
+ // has finished. If trigger won't be dropped, tasks from join_set will never finish.
566
+ self . flush_trigger . take ( ) ;
567
+ // 4. drop shutdown trigger which will notify S3 upload process to stop all retry attempts
568
+ // and finish upload process
569
+ self . shutdown_trigger . take ( ) ;
570
+ while let Some ( t) = self . join_set . join_next ( ) . await {
571
+ // one of the tasks we're waiting for is upload of local WAL segment from pt.1 to S3
572
+ // this should ensure that all WAL frames are one S3
573
+ t?;
574
+ }
575
+ } else {
576
+ tracing:: warn!( "skipping snapshot upload during shutdown" ) ;
558
577
}
578
+
559
579
tracing:: info!( "bottomless replicator: shutdown complete" ) ;
560
580
Ok ( ( ) )
561
581
}
0 commit comments