Skip to content

Commit b5dba72

Browse files
committed
partial implmentation of LibsqlWalReplicationConfigurator
1 parent 066f152 commit b5dba72

File tree

1 file changed

+138
-0
lines changed

1 file changed

+138
-0
lines changed
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
use std::pin::Pin;
2+
use std::future::Future;
3+
use std::sync::Arc;
4+
5+
use chrono::prelude::NaiveDateTime;
6+
use hyper::Uri;
7+
use libsql_replication::rpc::replication::replication_log_client::ReplicationLogClient;
8+
use libsql_wal::io::StdIO;
9+
use libsql_wal::registry::WalRegistry;
10+
use libsql_wal::storage::NoStorage;
11+
use tokio::task::JoinSet;
12+
use tonic::transport::Channel;
13+
14+
use crate::connection::config::DatabaseConfig;
15+
use crate::connection::connection_manager::InnerWalManager;
16+
use crate::connection::write_proxy::MakeWriteProxyConn;
17+
use crate::connection::MakeConnection;
18+
use crate::database::{Database, ReplicaDatabase};
19+
use crate::namespace::broadcasters::BroadcasterHandle;
20+
use crate::namespace::configurator::helpers::make_stats;
21+
use crate::namespace::meta_store::MetaStoreHandle;
22+
use crate::namespace::{
23+
Namespace, NamespaceBottomlessDbIdInit, NamespaceName, NamespaceStore, ResetCb,
24+
ResolveNamespacePathFn, RestoreOption,
25+
};
26+
use crate::DEFAULT_AUTO_CHECKPOINT;
27+
28+
use super::{BaseNamespaceConfig, ConfigureNamespace};
29+
30+
pub struct LibsqlWalReplicaConfigurator {
31+
base: BaseNamespaceConfig,
32+
registry: Arc<WalRegistry<StdIO, NoStorage>>,
33+
uri: Uri,
34+
channel: Channel,
35+
make_wal_manager: Arc<dyn Fn() -> InnerWalManager + Sync + Send + 'static>,
36+
}
37+
38+
impl ConfigureNamespace for LibsqlWalReplicaConfigurator {
39+
fn setup<'a>(
40+
&'a self,
41+
db_config: MetaStoreHandle,
42+
restore_option: RestoreOption,
43+
name: &'a NamespaceName,
44+
reset: ResetCb,
45+
resolve_attach_path: ResolveNamespacePathFn,
46+
store: NamespaceStore,
47+
broadcaster: BroadcasterHandle,
48+
) -> Pin<Box<dyn Future<Output = crate::Result<Namespace>> + Send + 'a>> {
49+
Box::pin(async move {
50+
tracing::debug!("creating replica namespace");
51+
let db_path = self.base.base_path.join("dbs").join(name.as_str());
52+
let channel = self.channel.clone();
53+
let uri = self.uri.clone();
54+
55+
let rpc_client = ReplicationLogClient::with_origin(channel.clone(), uri.clone());
56+
// TODO! setup replication
57+
58+
let mut join_set = JoinSet::new();
59+
let namespace = name.clone();
60+
61+
let stats = make_stats(
62+
&db_path,
63+
&mut join_set,
64+
db_config.clone(),
65+
self.base.stats_sender.clone(),
66+
name.clone(),
67+
applied_frame_no_receiver.clone(),
68+
)
69+
.await?;
70+
71+
let connection_maker = MakeWriteProxyConn::new(
72+
db_path.clone(),
73+
self.base.extensions.clone(),
74+
channel.clone(),
75+
uri.clone(),
76+
stats.clone(),
77+
broadcaster,
78+
db_config.clone(),
79+
applied_frame_no_receiver,
80+
self.base.max_response_size,
81+
self.base.max_total_response_size,
82+
primary_current_replication_index,
83+
None,
84+
resolve_attach_path,
85+
self.make_wal_manager.clone(),
86+
)
87+
.await?
88+
.throttled(
89+
self.base.max_concurrent_connections.clone(),
90+
Some(DB_CREATE_TIMEOUT),
91+
self.base.max_total_response_size,
92+
self.base.max_concurrent_requests,
93+
);
94+
95+
Ok(Namespace {
96+
tasks: join_set,
97+
db: Database::Replica(ReplicaDatabase {
98+
connection_maker: Arc::new(connection_maker),
99+
}),
100+
name: name.clone(),
101+
stats,
102+
db_config_store: db_config,
103+
path: db_path.into(),
104+
})
105+
})
106+
}
107+
108+
fn cleanup<'a>(
109+
&'a self,
110+
namespace: &'a NamespaceName,
111+
_db_config: &DatabaseConfig,
112+
_prune_all: bool,
113+
_bottomless_db_id_init: NamespaceBottomlessDbIdInit,
114+
) -> Pin<Box<dyn Future<Output = crate::Result<()>> + Send + 'a>> {
115+
Box::pin(async move {
116+
let ns_path = self.base.base_path.join("dbs").join(namespace.as_str());
117+
if ns_path.try_exists()? {
118+
tracing::debug!("removing database directory: {}", ns_path.display());
119+
tokio::fs::remove_dir_all(ns_path).await?;
120+
}
121+
Ok(())
122+
})
123+
}
124+
125+
fn fork<'a>(
126+
&'a self,
127+
_from_ns: &'a Namespace,
128+
_from_config: MetaStoreHandle,
129+
_to_ns: NamespaceName,
130+
_to_config: MetaStoreHandle,
131+
_timestamp: Option<chrono::prelude::NaiveDateTime>,
132+
_store: NamespaceStore,
133+
) -> Pin<Box<dyn Future<Output = crate::Result<Namespace>> + Send + 'a>> {
134+
Box::pin(std::future::ready(Err(crate::Error::Fork(
135+
super::fork::ForkError::ForkReplica,
136+
))))
137+
}
138+
}

0 commit comments

Comments
 (0)