Skip to content

Commit cd92cd7

Browse files
authored
Merge branch 'tursodatabase:main' into main
2 parents e608ff7 + e853d54 commit cd92cd7

40 files changed

+1203
-526
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

bottomless/src/replicator.rs

Lines changed: 46 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ pub struct Replicator {
7575
upload_progress: Arc<Mutex<CompletionProgress>>,
7676
last_uploaded_frame_no: Receiver<u32>,
7777
skip_snapshot: bool,
78+
skip_shutdown_upload: bool,
7879
}
7980

8081
#[derive(Debug)]
@@ -122,6 +123,8 @@ pub struct Options {
122123
pub s3_max_retries: u32,
123124
/// Skip snapshot upload per checkpoint.
124125
pub skip_snapshot: bool,
126+
/// Skip uploading snapshots on shutdown
127+
pub skip_shutdown_upload: bool,
125128
}
126129

127130
impl Options {
@@ -238,6 +241,10 @@ impl Options {
238241
Some(key) => Some(EncryptionConfig::new(cipher, key)),
239242
None => None,
240243
};
244+
245+
let skip_shutdown_upload =
246+
env_var_or("LIBSQL_BOTTOMLESS_SKIP_SHUTDOWN_UPLOAD", false).parse::<bool>()?;
247+
241248
Ok(Options {
242249
db_id,
243250
create_bucket_if_not_exists: true,
@@ -255,6 +262,7 @@ impl Options {
255262
bucket_name,
256263
s3_max_retries,
257264
skip_snapshot,
265+
skip_shutdown_upload,
258266
})
259267
}
260268
}
@@ -343,6 +351,12 @@ impl Replicator {
343351
};
344352
tracing::debug!("Database path: '{}', name: '{}'", db_path, db_name);
345353

354+
let skip_shutdown_upload = options.skip_shutdown_upload;
355+
356+
if skip_shutdown_upload {
357+
tracing::warn!("skipping upload on shutdown");
358+
}
359+
346360
let (flush_trigger, mut flush_trigger_rx) = channel(());
347361
let (last_committed_frame_no_sender, last_committed_frame_no) = channel(Ok(0));
348362

@@ -498,6 +512,7 @@ impl Replicator {
498512
join_set,
499513
upload_progress,
500514
last_uploaded_frame_no,
515+
skip_shutdown_upload,
501516
})
502517
}
503518

@@ -529,33 +544,38 @@ impl Replicator {
529544
}
530545

531546
pub async fn shutdown_gracefully(&mut self) -> Result<()> {
532-
tracing::info!("bottomless replicator: shutting down...");
533-
// 1. wait for all committed WAL frames to be committed locally
534-
let last_frame_no = self.last_known_frame();
535-
// force flush in order to not wait for periodic wake up of local back up process
536-
if let Some(tx) = &self.flush_trigger {
537-
let _ = tx.send(());
538-
}
539-
self.wait_until_committed(last_frame_no).await?;
540-
tracing::info!(
541-
"bottomless replicator: local backup replicated frames until {}",
542-
last_frame_no
543-
);
544-
// 2. wait for snapshot upload to S3 to finish
545-
self.wait_until_snapshotted().await?;
546-
tracing::info!("bottomless replicator: snapshot succesfully uploaded to S3");
547-
// 3. drop flush trigger, which will cause WAL upload loop to close. Since this action will
548-
// close the channel used by wait_until_committed, it must happen after wait_until_committed
549-
// has finished. If trigger won't be dropped, tasks from join_set will never finish.
550-
self.flush_trigger.take();
551-
// 4. drop shutdown trigger which will notify S3 upload process to stop all retry attempts
552-
// and finish upload process
553-
self.shutdown_trigger.take();
554-
while let Some(t) = self.join_set.join_next().await {
555-
// one of the tasks we're waiting for is upload of local WAL segment from pt.1 to S3
556-
// this should ensure that all WAL frames are one S3
557-
t?;
547+
if !self.skip_shutdown_upload {
548+
tracing::info!("bottomless replicator: shutting down...");
549+
// 1. wait for all committed WAL frames to be committed locally
550+
let last_frame_no = self.last_known_frame();
551+
// force flush in order to not wait for periodic wake up of local back up process
552+
if let Some(tx) = &self.flush_trigger {
553+
let _ = tx.send(());
554+
}
555+
self.wait_until_committed(last_frame_no).await?;
556+
tracing::info!(
557+
"bottomless replicator: local backup replicated frames until {}",
558+
last_frame_no
559+
);
560+
// 2. wait for snapshot upload to S3 to finish
561+
self.wait_until_snapshotted().await?;
562+
tracing::info!("bottomless replicator: snapshot succesfully uploaded to S3");
563+
// 3. drop flush trigger, which will cause WAL upload loop to close. Since this action will
564+
// close the channel used by wait_until_committed, it must happen after wait_until_committed
565+
// has finished. If trigger won't be dropped, tasks from join_set will never finish.
566+
self.flush_trigger.take();
567+
// 4. drop shutdown trigger which will notify S3 upload process to stop all retry attempts
568+
// and finish upload process
569+
self.shutdown_trigger.take();
570+
while let Some(t) = self.join_set.join_next().await {
571+
// one of the tasks we're waiting for is upload of local WAL segment from pt.1 to S3
572+
// this should ensure that all WAL frames are one S3
573+
t?;
574+
}
575+
} else {
576+
tracing::warn!("skipping snapshot upload during shutdown");
558577
}
578+
559579
tracing::info!("bottomless replicator: shutdown complete");
560580
Ok(())
561581
}

libsql-replication/src/injector/libsql_injector.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,27 @@
11
use std::mem::size_of;
22

3-
use libsql_wal::io::StdIO;
43
use libsql_wal::replication::injector::Injector;
4+
use libsql_wal::segment::sealed::SealedSegment;
55
use libsql_wal::segment::Frame as WalFrame;
6+
use libsql_wal::{io::StdIO, storage::Storage};
67
use zerocopy::{AsBytes, FromZeroes};
78

89
use crate::frame::FrameNo;
910
use crate::rpc::replication::Frame as RpcFrame;
1011

1112
use super::error::{Error, Result};
1213

13-
pub struct LibsqlInjector {
14-
injector: Injector<StdIO>,
14+
pub struct LibsqlInjector<S> {
15+
injector: Injector<StdIO, S>,
1516
}
1617

17-
impl LibsqlInjector {
18-
pub fn new(injector: Injector<StdIO>) -> Self {
18+
impl<S> LibsqlInjector<S> {
19+
pub fn new(injector: Injector<StdIO, S>) -> Self {
1920
Self { injector }
2021
}
2122
}
2223

23-
impl super::Injector for LibsqlInjector {
24+
impl<S: Storage<Segment = SealedSegment<std::fs::File>>> super::Injector for LibsqlInjector<S> {
2425
async fn inject_frame(&mut self, frame: RpcFrame) -> Result<Option<FrameNo>> {
2526
// this is a bit annoying be we want to read the frame, and it has to be aligned, so we
2627
// must copy it...

libsql-server/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "libsql-server"
3-
version = "0.24.26"
3+
version = "0.24.27"
44
edition = "2021"
55
default-run = "sqld"
66
repository = "https://github.com/tursodatabase/libsql"

libsql-server/src/connection/connection_manager.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,13 @@ pub type ConnId = u64;
3131
pub type InnerWalManager =
3232
Either3<Sqlite3WalManager, LibsqlWalManager<StdIO, SqldStorage>, DurableWalManager>;
3333
#[cfg(feature = "durable-wal")]
34-
pub type InnerWal = Either3<Sqlite3Wal, LibsqlWal<StdIO>, DurableWal>;
34+
pub type InnerWal = Either3<Sqlite3Wal, LibsqlWal<StdIO, SqldStorage>, DurableWal>;
3535

3636
#[cfg(not(feature = "durable-wal"))]
3737
pub type InnerWalManager = Either<Sqlite3WalManager, LibsqlWalManager<StdIO, SqldStorage>>;
3838

3939
#[cfg(not(feature = "durable-wal"))]
40-
pub type InnerWal = Either<Sqlite3Wal, LibsqlWal<StdIO>>;
40+
pub type InnerWal = Either<Sqlite3Wal, LibsqlWal<StdIO, SqldStorage>>;
4141
pub type ManagedConnectionWal = WrappedWal<ManagedConnectionWalWrapper, InnerWal>;
4242

4343
#[derive(Copy, Clone, Debug)]

libsql-server/src/connection/libsql.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ impl MakeConnection for MakeLibsqlConnection {
8282

8383
#[derive(Clone)]
8484
pub struct LibsqlConnection {
85-
inner: Arc<Mutex<CoreConnection<LibsqlWal<StdIO>>>>,
85+
inner: Arc<Mutex<CoreConnection<LibsqlWal<StdIO, SqldStorage>>>>,
8686
}
8787

8888
impl LibsqlConnection {

libsql-server/src/namespace/configurator/libsql_fork.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ pub(crate) async fn libsql_wal_fork(
9797
}
9898

9999
async fn try_inject(
100-
to_shared: Arc<SharedWal<StdIO>>,
100+
to_shared: Arc<SharedWal<StdIO, SqldStorage>>,
101101
stream: &mut Pin<
102102
Box<dyn Stream<Item = Result<Box<Frame>, libsql_wal::replication::Error>> + Send + '_>,
103103
>,

libsql-server/src/namespace/meta_store.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ pub async fn metastore_connection_maker(
131131
s3_max_parallelism: 32,
132132
s3_max_retries: 10,
133133
skip_snapshot: false,
134+
skip_shutdown_upload: false,
134135
};
135136
let mut replicator = bottomless::replicator::Replicator::with_options(
136137
db_path.join("data").to_str().unwrap(),

libsql-server/src/replication/replicator_client.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,11 @@ use crate::metrics::{
2929
use crate::namespace::meta_store::MetaStoreHandle;
3030
use crate::namespace::{NamespaceName, NamespaceStore};
3131
use crate::replication::FrameNo;
32+
use crate::SqldStorage;
3233

3334
pub enum WalImpl {
3435
LibsqlWal {
35-
shared: Arc<SharedWal<StdIO>>,
36+
shared: Arc<SharedWal<StdIO, SqldStorage>>,
3637
},
3738
SqliteWal {
3839
meta: WalIndexMeta,
@@ -52,7 +53,7 @@ impl WalImpl {
5253
})
5354
}
5455

55-
pub fn new_libsql(shared: Arc<SharedWal<StdIO>>) -> Self {
56+
pub fn new_libsql(shared: Arc<SharedWal<StdIO, SqldStorage>>) -> Self {
5657
Self::LibsqlWal { shared }
5758
}
5859

libsql-server/src/rpc/replication/libsql_replicator.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,12 +82,12 @@ pin_project_lite::pin_project! {
8282
#[pin]
8383
inner: S,
8484
flavor: WalFlavor,
85-
shared: Arc<SharedWal<StdIO>>,
85+
shared: Arc<SharedWal<StdIO, SqldStorage>>,
8686
}
8787
}
8888

8989
impl<S> FrameStreamAdapter<S> {
90-
fn new(inner: S, flavor: WalFlavor, shared: Arc<SharedWal<StdIO>>) -> Self {
90+
fn new(inner: S, flavor: WalFlavor, shared: Arc<SharedWal<StdIO, SqldStorage>>) -> Self {
9191
Self {
9292
inner,
9393
flavor,

0 commit comments

Comments
 (0)