Skip to content

Commit 1df0acb

Browse files
committed
fmt
1 parent 5617376 commit 1df0acb

File tree

11 files changed

+89
-48
lines changed

11 files changed

+89
-48
lines changed

libsql-server/src/bottomless_migrate.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,10 +66,7 @@ pub async fn bottomless_migrate(
6666
}
6767
});
6868

69-
let tmp_registry = Arc::new(WalRegistry::new(
70-
NoStorage.into(),
71-
sender,
72-
)?);
69+
let tmp_registry = Arc::new(WalRegistry::new(NoStorage.into(), sender)?);
7370

7471
let mut configurators = NamespaceConfigurators::default();
7572

libsql-server/src/namespace/configurator/libsql_primary.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use std::pin::Pin;
33
use std::sync::atomic::{AtomicBool, Ordering};
44
use std::sync::Arc;
55

6+
use chrono::NaiveDateTime;
67
use futures::prelude::Future;
78
use libsql_sys::name::NamespaceResolver;
89
use libsql_wal::io::StdIO;
@@ -267,13 +268,19 @@ impl ConfigureNamespace for LibsqlPrimaryConfigurator {
267268
_from_config: MetaStoreHandle,
268269
to_ns: NamespaceName,
269270
to_config: MetaStoreHandle,
270-
timestamp: Option<DateTime<Utc>>,
271+
timestamp: Option<NaiveDateTime>,
271272
store: NamespaceStore,
272273
) -> Pin<Box<dyn Future<Output = crate::Result<Namespace>> + Send + 'a>> {
273274
let registry = self.registry.clone();
274275
let base_path = &self.base.base_path;
275276
Box::pin(super::libsql_fork::libsql_wal_fork(
276-
registry, base_path, from_ns, to_ns, to_config, timestamp, store,
277+
registry,
278+
base_path,
279+
from_ns,
280+
to_ns,
281+
to_config,
282+
timestamp.map(|ts| ts.and_utc()),
283+
store,
277284
))
278285
}
279286
}

libsql-server/src/namespace/configurator/libsql_schema.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use std::path::Path;
22
use std::sync::Arc;
33

4+
use chrono::NaiveDateTime;
45
use futures::prelude::Future;
56
use libsql_sys::name::NamespaceResolver;
67
use libsql_wal::io::StdIO;
@@ -163,13 +164,19 @@ impl ConfigureNamespace for LibsqlSchemaConfigurator {
163164
_from_config: MetaStoreHandle,
164165
to_ns: NamespaceName,
165166
to_config: MetaStoreHandle,
166-
timestamp: Option<DateTime<Utc>>,
167+
timestamp: Option<NaiveDateTime>,
167168
store: NamespaceStore,
168169
) -> std::pin::Pin<Box<dyn Future<Output = crate::Result<Namespace>> + Send + 'a>> {
169170
let registry = self.registry.clone();
170171
let base_path = &self.base.base_path;
171172
Box::pin(super::libsql_fork::libsql_wal_fork(
172-
registry, base_path, from_ns, to_ns, to_config, timestamp, store,
173+
registry,
174+
base_path,
175+
from_ns,
176+
to_ns,
177+
to_config,
178+
timestamp.map(|ts| ts.and_utc()),
179+
store,
173180
))
174181
}
175182
}

libsql-server/src/namespace/configurator/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ use super::{
2020
};
2121

2222
pub mod fork;
23-
mod libsql_fork;
2423
mod helpers;
24+
mod libsql_fork;
2525
mod libsql_primary;
2626
mod libsql_replica;
2727
mod libsql_schema;

libsql-wal/benches/benchmarks.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,7 @@ fn with_libsql_conn(f: impl FnOnce(&mut Connection<LibsqlWal<StdIO>>)) {
6060
let resolver = |_: &Path| NamespaceName::from_string("test".into());
6161

6262
let (sender, _) = tokio::sync::mpsc::channel(12);
63-
let registry =
64-
Arc::new(WalRegistry::new(NoStorage.into(), sender).unwrap());
63+
let registry = Arc::new(WalRegistry::new(NoStorage.into(), sender).unwrap());
6564
let wal_manager = LibsqlWalManager::new(registry.clone(), Arc::new(resolver));
6665

6766
let mut conn = libsql_sys::Connection::open(

libsql-wal/src/lib.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -110,12 +110,8 @@ pub mod test {
110110

111111
let (sender, receiver) = mpsc::channel(128);
112112
let registry = Arc::new(
113-
WalRegistry::new_with_io(
114-
io.clone(),
115-
TestStorage::new_io(store, io).into(),
116-
sender,
117-
)
118-
.unwrap(),
113+
WalRegistry::new_with_io(io.clone(), TestStorage::new_io(store, io).into(), sender)
114+
.unwrap(),
119115
);
120116

121117
if store {

libsql-wal/src/registry.rs

Lines changed: 34 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -133,8 +133,26 @@ fn maybe_store_segment<S: Storage>(
133133
storage.store(namespace, seg, None, cb);
134134
} else {
135135
// segment can be checkpointed right away.
136-
let _ = notifier.blocking_send(CheckpointMessage::Namespace(namespace.clone()));
137-
tracing::debug!(segment_end = seg.last_committed(), durable_frame_no = *durable_frame_no.lock(), "segment doesn't contain any new data");
136+
// FIXME: this is only necessary because some tests call this method in an async context.
137+
#[cfg(debug_assertions)]
138+
{
139+
let namespace = namespace.clone();
140+
let notifier = notifier.clone();
141+
tokio::spawn(async move {
142+
let _ = notifier.send(CheckpointMessage::Namespace(namespace)).await;
143+
});
144+
}
145+
146+
#[cfg(not(debug_assertions))]
147+
{
148+
let _ = notifier.blocking_send(CheckpointMessage::Namespace(namespace.clone()));
149+
}
150+
151+
tracing::debug!(
152+
segment_end = seg.last_committed(),
153+
durable_frame_no = *durable_frame_no.lock(),
154+
"segment doesn't contain any new data"
155+
);
138156
}
139157
}
140158

@@ -245,10 +263,15 @@ where
245263
// will think that this is a wal file, but it's in fact a directory and it will not like
246264
// it.
247265
let mut wals_path = db_path.to_owned();
248-
wals_path.set_file_name(format!("{}-wal", db_path.file_name().unwrap().to_str().unwrap()));
266+
wals_path.set_file_name(format!(
267+
"{}-wal",
268+
db_path.file_name().unwrap().to_str().unwrap()
269+
));
249270
self.io.create_dir_all(&wals_path)?;
250271
// TODO: handle that with abstract io
251-
let dir = walkdir::WalkDir::new(&wals_path).sort_by_file_name().into_iter();
272+
let dir = walkdir::WalkDir::new(&wals_path)
273+
.sort_by_file_name()
274+
.into_iter();
252275

253276
// we only checkpoint durable frame_no so this is a good first estimate without an actual
254277
// network call.
@@ -268,9 +291,12 @@ where
268291

269292
let file = self.io.open(false, true, true, entry.path())?;
270293

271-
if let Some(sealed) =
272-
SealedSegment::open(file.into(), entry.path().to_path_buf(), Default::default(), self.io.now())?
273-
{
294+
if let Some(sealed) = SealedSegment::open(
295+
file.into(),
296+
entry.path().to_path_buf(),
297+
Default::default(),
298+
self.io.now(),
299+
)? {
274300
list.push(sealed.clone());
275301
maybe_store_segment(
276302
self.storage.as_ref(),
@@ -526,9 +552,7 @@ where
526552
return Ok(());
527553
}
528554
let start_frame_no = current.next_frame_no();
529-
let path = shared
530-
.wals_path
531-
.join(format!("{start_frame_no:020}.seg"));
555+
let path = shared.wals_path.join(format!("{start_frame_no:020}.seg"));
532556

533557
let segment_file = self.io.open(true, true, true, &path)?;
534558
let salt = self.io.with_rng(|rng| rng.gen());

libsql-wal/src/replication/storage.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use zerocopy::FromZeroes;
1010
use crate::io::buf::ZeroCopyBoxIoBuf;
1111
use crate::segment::Frame;
1212
use crate::storage::backend::FindSegmentReq;
13-
use crate::storage::{Storage};
13+
use crate::storage::Storage;
1414

1515
use super::Result;
1616

libsql-wal/src/segment/sealed.rs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ use std::io::{BufWriter, ErrorKind, Write};
44
use std::mem::size_of;
55
use std::ops::Deref;
66
use std::path::{Path, PathBuf};
7-
use std::sync::Arc;
87
use std::sync::atomic::{AtomicU64, Ordering};
8+
use std::sync::Arc;
99

1010
use chrono::prelude::{DateTime, Utc};
1111
use fst::{Map, MapBuilder, Streamer};
@@ -210,7 +210,12 @@ where
210210
}
211211

212212
impl<F: FileExt> SealedSegment<F> {
213-
pub fn open(file: Arc<F>, path: PathBuf, read_locks: Arc<AtomicU64>, now: DateTime<Utc>) -> Result<Option<Self>> {
213+
pub fn open(
214+
file: Arc<F>,
215+
path: PathBuf,
216+
read_locks: Arc<AtomicU64>,
217+
now: DateTime<Utc>,
218+
) -> Result<Option<Self>> {
214219
let mut header: SegmentHeader = SegmentHeader::new_zeroed();
215220
file.read_exact_at(header.as_bytes_mut(), 0)?;
216221

@@ -246,7 +251,12 @@ impl<F: FileExt> SealedSegment<F> {
246251
}))
247252
}
248253

249-
fn recover(file: Arc<F>, path: PathBuf, mut header: SegmentHeader, now: DateTime<Utc>) -> Result<Self> {
254+
fn recover(
255+
file: Arc<F>,
256+
path: PathBuf,
257+
mut header: SegmentHeader,
258+
now: DateTime<Utc>,
259+
) -> Result<Self> {
250260
assert!(!header.is_empty());
251261
assert_eq!(header.index_size.get(), 0);
252262
assert_eq!(header.index_offset.get(), 0);

libsql-wal/tests/flaky_fs.rs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -211,15 +211,13 @@ async fn flaky_fs() {
211211
};
212212
let (sender, _receiver) = tokio::sync::mpsc::channel(64);
213213
let registry = Arc::new(
214-
WalRegistry::new_with_io(
215-
io.clone(),
216-
TestStorage::new_io(false, io).into(),
217-
sender,
218-
)
219-
.unwrap(),
214+
WalRegistry::new_with_io(io.clone(), TestStorage::new_io(false, io).into(), sender)
215+
.unwrap(),
220216
);
221217
let wal_manager = LibsqlWalManager::new(registry.clone(), Arc::new(resolver));
222-
tokio::fs::create_dir_all(tmp.path().join("test")).await.unwrap();
218+
tokio::fs::create_dir_all(tmp.path().join("test"))
219+
.await
220+
.unwrap();
223221
let conn = libsql_sys::Connection::open(
224222
tmp.path().join("test/data").clone(),
225223
OpenFlags::SQLITE_OPEN_CREATE | OpenFlags::SQLITE_OPEN_READ_WRITE,

0 commit comments

Comments
 (0)