Skip to content

Commit 066f152

Browse files
committed
configure libsql_wal
1 parent b12431c commit 066f152

File tree

1 file changed

+165
-48
lines changed

1 file changed

+165
-48
lines changed

libsql-server/src/lib.rs

Lines changed: 165 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use auth::Auth;
2828
use config::{
2929
AdminApiConfig, DbConfig, HeartbeatConfig, RpcClientConfig, RpcServerConfig, UserApiConfig,
3030
};
31+
use futures::future::{pending, ready};
3132
use futures::Future;
3233
use http::user::UserApi;
3334
use hyper::client::HttpConnector;
@@ -40,6 +41,10 @@ use libsql_sys::wal::either::Either as EitherWAL;
4041
#[cfg(feature = "durable-wal")]
4142
use libsql_sys::wal::either::Either3 as EitherWAL;
4243
use libsql_sys::wal::Sqlite3WalManager;
44+
use libsql_wal::checkpointer::LibsqlCheckpointer;
45+
use libsql_wal::registry::WalRegistry;
46+
use libsql_wal::storage::NoStorage;
47+
use libsql_wal::wal::LibsqlWalManager;
4348
use namespace::meta_store::MetaStoreHandle;
4449
use namespace::NamespaceName;
4550
use net::Connector;
@@ -458,9 +463,10 @@ where
458463
let configurators = self
459464
.make_configurators(
460465
base_config,
461-
scripted_backup,
462-
scheduler_sender.into(),
463466
client_config.clone(),
467+
&mut join_set,
468+
scheduler_sender.into(),
469+
scripted_backup,
464470
)
465471
.await?;
466472

@@ -596,7 +602,6 @@ where
596602
self.disable_namespaces,
597603
);
598604

599-
dbg!();
600605
self.make_services(
601606
namespace_store.clone(),
602607
idle_shutdown_kicker,
@@ -647,42 +652,125 @@ where
647652
async fn make_configurators(
648653
&self,
649654
base_config: BaseNamespaceConfig,
650-
scripted_backup: Option<ScriptBackupManager>,
651-
migration_scheduler_handle: SchedulerHandle,
652655
client_config: Option<(Channel, Uri)>,
656+
join_set: &mut JoinSet<anyhow::Result<()>>,
657+
migration_scheduler_handle: SchedulerHandle,
658+
scripted_backup: Option<ScriptBackupManager>,
653659
) -> anyhow::Result<NamespaceConfigurators> {
660+
let wal_path = base_config.base_path.join("wals");
661+
let enable_libsql_wal_test = {
662+
let is_primary = self.rpc_server_config.is_some();
663+
let is_libsql_wal_test = std::env::var("LIBSQL_WAL_TEST").is_ok();
664+
is_primary && is_libsql_wal_test
665+
};
666+
let use_libsql_wal =
667+
self.use_custom_wal == Some(CustomWAL::LibsqlWal) || enable_libsql_wal_test;
668+
if !use_libsql_wal {
669+
if wal_path.try_exists()? {
670+
anyhow::bail!("database was previously setup to use libsql-wal");
671+
}
672+
}
673+
674+
if self.use_custom_wal.is_some() {
675+
if self.db_config.bottomless_replication.is_some() {
676+
anyhow::bail!("bottomless not supported with custom WAL");
677+
}
678+
if self.rpc_client_config.is_some() {
679+
anyhow::bail!("custom WAL not supported in replica mode");
680+
}
681+
}
682+
654683
match self.use_custom_wal {
655-
Some(CustomWAL::LibsqlWal) => self.libsql_wal_configurators(),
684+
Some(CustomWAL::LibsqlWal) => self.libsql_wal_configurators(
685+
base_config,
686+
client_config,
687+
join_set,
688+
migration_scheduler_handle,
689+
scripted_backup,
690+
wal_path,
691+
),
656692
#[cfg(feature = "durable-wal")]
657693
Some(CustomWAL::DurableWal) => self.durable_wal_configurators(
658694
base_config,
659-
scripted_backup,
660-
migration_scheduler_handle,
661695
client_config,
696+
migration_scheduler_handle,
697+
scripted_backup,
662698
),
663699
None => {
664700
self.legacy_configurators(
665701
base_config,
666-
scripted_backup,
667-
migration_scheduler_handle,
668702
client_config,
703+
migration_scheduler_handle,
704+
scripted_backup,
669705
)
670706
.await
671707
}
672708
}
673709
}
674710

675-
fn libsql_wal_configurators(&self) -> anyhow::Result<NamespaceConfigurators> {
676-
todo!()
711+
fn libsql_wal_configurators(
712+
&self,
713+
base_config: BaseNamespaceConfig,
714+
client_config: Option<(Channel, Uri)>,
715+
join_set: &mut JoinSet<anyhow::Result<()>>,
716+
migration_scheduler_handle: SchedulerHandle,
717+
scripted_backup: Option<ScriptBackupManager>,
718+
wal_path: PathBuf,
719+
) -> anyhow::Result<NamespaceConfigurators> {
720+
tracing::info!("using libsql wal");
721+
let (sender, receiver) = tokio::sync::mpsc::channel(64);
722+
let registry = Arc::new(WalRegistry::new(wal_path, NoStorage, sender)?);
723+
let checkpointer = LibsqlCheckpointer::new(registry.clone(), receiver, 8);
724+
self.spawn_until_shutdown_on(join_set, async move {
725+
checkpointer.run().await;
726+
Ok(())
727+
});
728+
729+
let namespace_resolver = |path: &Path| {
730+
NamespaceName::from_string(
731+
path.parent()
732+
.unwrap()
733+
.file_name()
734+
.unwrap()
735+
.to_str()
736+
.unwrap()
737+
.to_string(),
738+
)
739+
.unwrap()
740+
.into()
741+
};
742+
let wal = LibsqlWalManager::new(registry.clone(), Arc::new(namespace_resolver));
743+
744+
self.spawn_until_shutdown_with_teardown(join_set, pending(), async move {
745+
registry.shutdown().await?;
746+
Ok(())
747+
});
748+
749+
let make_wal_manager = Arc::new(move || EitherWAL::B(wal.clone()));
750+
let mut configurators = NamespaceConfigurators::empty();
751+
752+
match client_config {
753+
Some(_) => todo!("configure replica"),
754+
// configure primary
755+
None => self.configure_primary_common(
756+
base_config,
757+
&mut configurators,
758+
make_wal_manager,
759+
migration_scheduler_handle,
760+
scripted_backup,
761+
),
762+
}
763+
764+
Ok(configurators)
677765
}
678766

679767
#[cfg(feature = "durable-wal")]
680768
fn durable_wal_configurators(
681769
&self,
682770
base_config: BaseNamespaceConfig,
683-
scripted_backup: Option<ScriptBackupManager>,
684-
migration_scheduler_handle: SchedulerHandle,
685771
client_config: Option<(Channel, Uri)>,
772+
migration_scheduler_handle: SchedulerHandle,
773+
scripted_backup: Option<ScriptBackupManager>,
686774
) -> anyhow::Result<NamespaceConfigurators> {
687775
tracing::info!("using durable wal");
688776
let lock_manager = Arc::new(std::sync::Mutex::new(LockManager::new()));
@@ -706,22 +794,37 @@ where
706794
);
707795
let make_wal_manager = Arc::new(move || EitherWAL::C(wal.clone()));
708796
self.configurators_common(
709-
client_config,
710797
base_config,
798+
client_config,
711799
make_wal_manager,
712-
scripted_backup,
713800
migration_scheduler_handle,
801+
scripted_backup,
714802
)
715803
}
716804

717805
fn spawn_until_shutdown_on<F>(&self, join_set: &mut JoinSet<anyhow::Result<()>>, fut: F)
718806
where
719807
F: Future<Output = anyhow::Result<()>> + Send + 'static,
808+
{
809+
self.spawn_until_shutdown_with_teardown(join_set, fut, ready(Ok(())))
810+
}
811+
812+
/// run the passed future until shutdown is called, then call the passed teardown future
813+
fn spawn_until_shutdown_with_teardown<F, T>(
814+
&self,
815+
join_set: &mut JoinSet<anyhow::Result<()>>,
816+
fut: F,
817+
teardown: T,
818+
) where
819+
F: Future<Output = anyhow::Result<()>> + Send + 'static,
820+
T: Future<Output = anyhow::Result<()>> + Send + 'static,
720821
{
721822
let shutdown = self.shutdown.clone();
722823
join_set.spawn(async move {
723824
tokio::select! {
724-
_ = shutdown.notified() => Ok(()),
825+
_ = shutdown.notified() => {
826+
teardown.await
827+
},
725828
ret = fut => ret
726829
}
727830
});
@@ -730,30 +833,29 @@ where
730833
async fn legacy_configurators(
731834
&self,
732835
base_config: BaseNamespaceConfig,
733-
scripted_backup: Option<ScriptBackupManager>,
734-
migration_scheduler_handle: SchedulerHandle,
735836
client_config: Option<(Channel, Uri)>,
837+
migration_scheduler_handle: SchedulerHandle,
838+
scripted_backup: Option<ScriptBackupManager>,
736839
) -> anyhow::Result<NamespaceConfigurators> {
737840
let make_wal_manager = Arc::new(|| EitherWAL::A(Sqlite3WalManager::default()));
738841
self.configurators_common(
739-
client_config,
740842
base_config,
843+
client_config,
741844
make_wal_manager,
742-
scripted_backup,
743845
migration_scheduler_handle,
846+
scripted_backup,
744847
)
745848
}
746849

747850
fn configurators_common(
748851
&self,
749-
client_config: Option<(Channel, Uri)>,
750852
base_config: BaseNamespaceConfig,
853+
client_config: Option<(Channel, Uri)>,
751854
make_wal_manager: Arc<dyn Fn() -> InnerWalManager + Sync + Send + 'static>,
752-
scripted_backup: Option<ScriptBackupManager>,
753855
migration_scheduler_handle: SchedulerHandle,
856+
scripted_backup: Option<ScriptBackupManager>,
754857
) -> anyhow::Result<NamespaceConfigurators> {
755858
let mut configurators = NamespaceConfigurators::empty();
756-
757859
match client_config {
758860
// replica mode
759861
Some((channel, uri)) => {
@@ -762,34 +864,49 @@ where
762864
configurators.with_replica(replica_configurator);
763865
}
764866
// primary mode
765-
None => {
766-
let primary_config = PrimaryExtraConfig {
767-
max_log_size: self.db_config.max_log_size,
768-
max_log_duration: self.db_config.max_log_duration.map(Duration::from_secs_f32),
769-
bottomless_replication: self.db_config.bottomless_replication.clone(),
770-
scripted_backup,
771-
checkpoint_interval: self.db_config.checkpoint_interval,
772-
};
867+
None => self.configure_primary_common(
868+
base_config,
869+
&mut configurators,
870+
make_wal_manager,
871+
migration_scheduler_handle,
872+
scripted_backup,
873+
),
874+
}
773875

774-
let primary_configurator = PrimaryConfigurator::new(
775-
base_config.clone(),
776-
primary_config.clone(),
777-
make_wal_manager.clone(),
778-
);
876+
Ok(configurators)
877+
}
779878

780-
let schema_configurator = SchemaConfigurator::new(
781-
base_config.clone(),
782-
primary_config,
783-
make_wal_manager.clone(),
784-
migration_scheduler_handle,
785-
);
879+
fn configure_primary_common(
880+
&self,
881+
base_config: BaseNamespaceConfig,
882+
configurators: &mut NamespaceConfigurators,
883+
make_wal_manager: Arc<dyn Fn() -> InnerWalManager + Sync + Send + 'static>,
884+
migration_scheduler_handle: SchedulerHandle,
885+
scripted_backup: Option<ScriptBackupManager>,
886+
) {
887+
let primary_config = PrimaryExtraConfig {
888+
max_log_size: self.db_config.max_log_size,
889+
max_log_duration: self.db_config.max_log_duration.map(Duration::from_secs_f32),
890+
bottomless_replication: self.db_config.bottomless_replication.clone(),
891+
scripted_backup,
892+
checkpoint_interval: self.db_config.checkpoint_interval,
893+
};
786894

787-
configurators.with_schema(schema_configurator);
788-
configurators.with_primary(primary_configurator);
789-
}
790-
}
895+
let primary_configurator = PrimaryConfigurator::new(
896+
base_config.clone(),
897+
primary_config.clone(),
898+
make_wal_manager.clone(),
899+
);
791900

792-
Ok(configurators)
901+
let schema_configurator = SchemaConfigurator::new(
902+
base_config.clone(),
903+
primary_config,
904+
make_wal_manager.clone(),
905+
migration_scheduler_handle,
906+
);
907+
908+
configurators.with_schema(schema_configurator);
909+
configurators.with_primary(primary_configurator);
793910
}
794911

795912
fn setup_shutdown(&self) -> Option<IdleShutdownKicker> {

0 commit comments

Comments
 (0)