Skip to content

Commit d7c184d

Browse files
committed
bottomless: add upload on shutdown skip option
This commit adds a env var that can be set via `LIBSQL_BOTTOMLESS_SKIP_SHUTDOWN_UPLOAD` that will force bottomless to ignore uploading a snapshot during a graceful shutdown.
1 parent 21ae561 commit d7c184d

File tree

2 files changed

+47
-26
lines changed

2 files changed

+47
-26
lines changed

bottomless/src/replicator.rs

Lines changed: 46 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ pub struct Replicator {
7575
upload_progress: Arc<Mutex<CompletionProgress>>,
7676
last_uploaded_frame_no: Receiver<u32>,
7777
skip_snapshot: bool,
78+
skip_shutdown_upload: bool,
7879
}
7980

8081
#[derive(Debug)]
@@ -122,6 +123,8 @@ pub struct Options {
122123
pub s3_max_retries: u32,
123124
/// Skip snapshot upload per checkpoint.
124125
pub skip_snapshot: bool,
126+
/// Skip uploading snapshots on shutdown
127+
pub skip_shutdown_upload: bool,
125128
}
126129

127130
impl Options {
@@ -238,6 +241,10 @@ impl Options {
238241
Some(key) => Some(EncryptionConfig::new(cipher, key)),
239242
None => None,
240243
};
244+
245+
let skip_shutdown_upload =
246+
env_var_or("LIBSQL_BOTTOMLESS_SKIP_SHUTDOWN_UPLOAD", false).parse::<bool>()?;
247+
241248
Ok(Options {
242249
db_id,
243250
create_bucket_if_not_exists: true,
@@ -255,6 +262,7 @@ impl Options {
255262
bucket_name,
256263
s3_max_retries,
257264
skip_snapshot,
265+
skip_shutdown_upload,
258266
})
259267
}
260268
}
@@ -343,6 +351,12 @@ impl Replicator {
343351
};
344352
tracing::debug!("Database path: '{}', name: '{}'", db_path, db_name);
345353

354+
let skip_shutdown_upload = options.skip_shutdown_upload;
355+
356+
if skip_shutdown_upload {
357+
tracing::warn!("skipping upload on shutdown");
358+
}
359+
346360
let (flush_trigger, mut flush_trigger_rx) = channel(());
347361
let (last_committed_frame_no_sender, last_committed_frame_no) = channel(Ok(0));
348362

@@ -498,6 +512,7 @@ impl Replicator {
498512
join_set,
499513
upload_progress,
500514
last_uploaded_frame_no,
515+
skip_shutdown_upload,
501516
})
502517
}
503518

@@ -529,33 +544,38 @@ impl Replicator {
529544
}
530545

531546
pub async fn shutdown_gracefully(&mut self) -> Result<()> {
532-
tracing::info!("bottomless replicator: shutting down...");
533-
// 1. wait for all committed WAL frames to be committed locally
534-
let last_frame_no = self.last_known_frame();
535-
// force flush in order to not wait for periodic wake up of local back up process
536-
if let Some(tx) = &self.flush_trigger {
537-
let _ = tx.send(());
538-
}
539-
self.wait_until_committed(last_frame_no).await?;
540-
tracing::info!(
541-
"bottomless replicator: local backup replicated frames until {}",
542-
last_frame_no
543-
);
544-
// 2. wait for snapshot upload to S3 to finish
545-
self.wait_until_snapshotted().await?;
546-
tracing::info!("bottomless replicator: snapshot succesfully uploaded to S3");
547-
// 3. drop flush trigger, which will cause WAL upload loop to close. Since this action will
548-
// close the channel used by wait_until_committed, it must happen after wait_until_committed
549-
// has finished. If trigger won't be dropped, tasks from join_set will never finish.
550-
self.flush_trigger.take();
551-
// 4. drop shutdown trigger which will notify S3 upload process to stop all retry attempts
552-
// and finish upload process
553-
self.shutdown_trigger.take();
554-
while let Some(t) = self.join_set.join_next().await {
555-
// one of the tasks we're waiting for is upload of local WAL segment from pt.1 to S3
556-
// this should ensure that all WAL frames are one S3
557-
t?;
547+
if !self.skip_shutdown_upload {
548+
tracing::info!("bottomless replicator: shutting down...");
549+
// 1. wait for all committed WAL frames to be committed locally
550+
let last_frame_no = self.last_known_frame();
551+
// force flush in order to not wait for periodic wake up of local back up process
552+
if let Some(tx) = &self.flush_trigger {
553+
let _ = tx.send(());
554+
}
555+
self.wait_until_committed(last_frame_no).await?;
556+
tracing::info!(
557+
"bottomless replicator: local backup replicated frames until {}",
558+
last_frame_no
559+
);
560+
// 2. wait for snapshot upload to S3 to finish
561+
self.wait_until_snapshotted().await?;
562+
tracing::info!("bottomless replicator: snapshot succesfully uploaded to S3");
563+
// 3. drop flush trigger, which will cause WAL upload loop to close. Since this action will
564+
// close the channel used by wait_until_committed, it must happen after wait_until_committed
565+
// has finished. If trigger won't be dropped, tasks from join_set will never finish.
566+
self.flush_trigger.take();
567+
// 4. drop shutdown trigger which will notify S3 upload process to stop all retry attempts
568+
// and finish upload process
569+
self.shutdown_trigger.take();
570+
while let Some(t) = self.join_set.join_next().await {
571+
// one of the tasks we're waiting for is upload of local WAL segment from pt.1 to S3
572+
// this should ensure that all WAL frames are one S3
573+
t?;
574+
}
575+
} else {
576+
tracing::warn!("skipping snapshot upload during shutdown");
558577
}
578+
559579
tracing::info!("bottomless replicator: shutdown complete");
560580
Ok(())
561581
}

libsql-server/src/namespace/meta_store.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ pub async fn metastore_connection_maker(
131131
s3_max_parallelism: 32,
132132
s3_max_retries: 10,
133133
skip_snapshot: false,
134+
skip_shutdown_upload: false,
134135
};
135136
let mut replicator = bottomless::replicator::Replicator::with_options(
136137
db_path.join("data").to_str().unwrap(),

0 commit comments

Comments
 (0)