Skip to content

Commit 71d8c86

Browse files
committed
fix Backend impl
1 parent 2cd6e72 commit 71d8c86

File tree

2 files changed

+152
-21
lines changed

2 files changed

+152
-21
lines changed

libsql-wal/src/storage/backend/mod.rs

Lines changed: 73 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,11 @@
11
#![allow(dead_code)]
2-
use std::future::Future;
3-
use std::path::Path;
2+
use std::{future::Future, path::Path};
43
use std::sync::Arc;
54

65
use chrono::{DateTime, Utc};
76
use fst::Map;
87
use uuid::Uuid;
98

10-
use super::{RestoreOptions, Result};
119
use super::{RestoreOptions, Result, SegmentKey};
1210
use crate::io::file::FileExt;
1311
use libsql_sys::name::NamespaceName;
@@ -44,14 +42,46 @@ pub trait Backend: Send + Sync + 'static {
4442
segment_index: Vec<u8>,
4543
) -> impl Future<Output = Result<()>> + Send;
4644

45+
async fn find_segment(
46+
&self,
47+
config: &Self::Config,
48+
namespace: &NamespaceName,
49+
frame_no: u64,
50+
) -> Result<SegmentKey>;
51+
52+
async fn fetch_segment_index(
53+
&self,
54+
config: &Self::Config,
55+
namespace: &NamespaceName,
56+
key: &SegmentKey,
57+
) -> Result<Map<Arc<[u8]>>>;
58+
4759
/// Fetch a segment for `namespace` containing `frame_no`, and writes it to `dest`.
60+
async fn fetch_segment_data_to_file(
61+
&self,
62+
config: &Self::Config,
63+
namespace: &NamespaceName,
64+
key: &SegmentKey,
65+
file: &impl FileExt
66+
) -> Result<()>;
67+
68+
// this method taking self: Arc<Self> is an infortunate consequence of rust type system making
69+
// impl FileExt variant with all the arguments, with no escape hatch...
70+
async fn fetch_segment_data(
71+
self: Arc<Self>,
72+
config: Arc<Self::Config>,
73+
namespace: NamespaceName,
74+
key: SegmentKey,
75+
) -> Result<impl FileExt>;
76+
77+
// /// Fetch a segment for `namespace` containing `frame_no`, and writes it to `dest`.
4878
async fn fetch_segment(
4979
&self,
5080
config: &Self::Config,
5181
namespace: &NamespaceName,
5282
frame_no: u64,
5383
dest_path: &Path,
54-
) -> Result<Map<Vec<u8>>>;
84+
) -> Result<Map<Arc<[u8]>>>;
5585

5686
/// Fetch meta for `namespace`
5787
fn meta(
@@ -92,7 +122,7 @@ impl<T: Backend> Backend for Arc<T> {
92122
namespace: &NamespaceName,
93123
frame_no: u64,
94124
dest_path: &Path,
95-
) -> Result<fst::Map<Vec<u8>>> {
125+
) -> Result<fst::Map<Arc<[u8]>>> {
96126
self.as_ref()
97127
.fetch_segment(config, namespace, frame_no, dest_path)
98128
.await
@@ -117,4 +147,42 @@ impl<T: Backend> Backend for Arc<T> {
117147
.restore(config, namespace, restore_options, dest)
118148
.await
119149
}
150+
151+
async fn find_segment(
152+
&self,
153+
config: &Self::Config,
154+
namespace: &NamespaceName,
155+
frame_no: u64,
156+
) -> Result<SegmentKey> {
157+
self.as_ref().find_segment(config, namespace, frame_no).await
158+
}
159+
160+
async fn fetch_segment_index(
161+
&self,
162+
config: &Self::Config,
163+
namespace: &NamespaceName,
164+
key: &SegmentKey,
165+
) -> Result<Map<Arc<[u8]>>> {
166+
self.as_ref().fetch_segment_index(config, namespace, key).await
167+
}
168+
169+
async fn fetch_segment_data_to_file(
170+
&self,
171+
config: &Self::Config,
172+
namespace: &NamespaceName,
173+
key: &SegmentKey,
174+
file: &impl FileExt,
175+
) -> Result<()> {
176+
self.as_ref().fetch_segment_data_to_file(config, namespace, key, file).await
177+
}
178+
179+
async fn fetch_segment_data(
180+
self: Arc<Self>,
181+
config: Arc<Self::Config>,
182+
namespace: NamespaceName,
183+
key: SegmentKey,
184+
) -> Result<impl FileExt> {
185+
// this implementation makes no sense (Arc<Arc<T>>)
186+
self.as_ref().clone().fetch_segment_data(config, namespace, key).await
187+
}
120188
}

libsql-wal/src/storage/backend/s3.rs

Lines changed: 79 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ impl<IO: Io> S3Backend<IO> {
120120
Ok(stream.into_async_read())
121121
}
122122

123-
async fn fetch_segment_data(
123+
async fn fetch_segment_data_inner(
124124
&self,
125125
config: &S3Config,
126126
folder_key: &FolderKey<'_>,
@@ -158,12 +158,12 @@ impl<IO: Io> S3Backend<IO> {
158158
Ok(())
159159
}
160160

161-
async fn fetch_segment_index(
161+
async fn fetch_segment_index_inner(
162162
&self,
163163
config: &S3Config,
164164
folder_key: &FolderKey<'_>,
165165
segment_key: &SegmentKey,
166-
) -> Result<fst::Map<Vec<u8>>> {
166+
) -> Result<fst::Map<Arc<[u8]>>> {
167167
let s3_index_key = s3_segment_index_key(folder_key, segment_key);
168168
let mut stream = self.s3_get(config, s3_index_key).await?.into_async_read();
169169
let mut header: SegmentIndexHeader = SegmentIndexHeader::new_zeroed();
@@ -177,12 +177,13 @@ impl<IO: Io> S3Backend<IO> {
177177
if checksum != header.checksum.get() {
178178
return Err(Error::InvalidIndex("invalid index data checksum"));
179179
}
180-
let index = fst::Map::new(data).map_err(|_| Error::InvalidIndex("invalid index bytes"))?;
180+
let index =
181+
fst::Map::new(data.into()).map_err(|_| Error::InvalidIndex("invalid index bytes"))?;
181182
Ok(index)
182183
}
183184

184185
/// Find the most recent, and biggest segment that may contain `frame_no`
185-
async fn find_segment(
186+
async fn find_segment_inner(
186187
&self,
187188
config: &S3Config,
188189
folder_key: &FolderKey<'_>,
@@ -227,7 +228,10 @@ impl<IO: Io> S3Backend<IO> {
227228
cluster_id: &config.cluster_id,
228229
namespace,
229230
};
230-
let Some(latest_key) = self.find_segment(config, &folder_key, u64::MAX).await? else {
231+
let Some(latest_key) = self
232+
.find_segment_inner(config, &folder_key, u64::MAX)
233+
.await?
234+
else {
231235
tracing::info!("nothing to restore for {namespace}");
232236
return Ok(());
233237
};
@@ -262,7 +266,7 @@ impl<IO: Io> S3Backend<IO> {
262266

263267
let next_frame_no = header.start_frame_no.get() - 1;
264268
let Some(key) = self
265-
.find_segment(config, &folder_key, next_frame_no)
269+
.find_segment_inner(config, &folder_key, next_frame_no)
266270
.await?
267271
else {
268272
todo!("there should be a segment!");
@@ -283,10 +287,10 @@ impl<IO: Io> S3Backend<IO> {
283287
folder_key: &FolderKey<'_>,
284288
segment_key: &SegmentKey,
285289
dest_file: &impl FileExt,
286-
) -> Result<fst::Map<Vec<u8>>> {
290+
) -> Result<fst::Map<Arc<[u8]>>> {
287291
let (_, index) = tokio::try_join!(
288-
self.fetch_segment_data(config, &folder_key, &segment_key, dest_file),
289-
self.fetch_segment_index(config, &folder_key, &segment_key),
292+
self.fetch_segment_data_inner(config, &folder_key, &segment_key, dest_file),
293+
self.fetch_segment_index_inner(config, &folder_key, &segment_key),
290294
)?;
291295

292296
Ok(index)
@@ -378,16 +382,19 @@ where
378382
namespace: &NamespaceName,
379383
frame_no: u64,
380384
dest_path: &Path,
381-
) -> Result<fst::Map<Vec<u8>>> {
385+
) -> Result<fst::Map<Arc<[u8]>>> {
382386
let folder_key = FolderKey {
383387
cluster_id: &config.cluster_id,
384388
namespace: &namespace,
385389
};
386-
387-
let Some(segment_key) = self.find_segment(config, &folder_key, frame_no).await? else {
390+
391+
let Some(segment_key) = self
392+
.find_segment_inner(config, &folder_key, frame_no)
393+
.await?
394+
else {
388395
return Err(Error::FrameNotFound(frame_no));
389396
};
390-
397+
391398
if segment_key.includes(frame_no) {
392399
// TODO: make open async
393400
let file = self.io.open(false, false, true, dest_path)?;
@@ -403,13 +410,15 @@ where
403410
config: &Self::Config,
404411
namespace: &NamespaceName,
405412
) -> Result<super::DbMeta> {
406-
// request a key bigger than any other to get the last segment
407413
let folder_key = FolderKey {
408414
cluster_id: &config.cluster_id,
409415
namespace: &namespace,
410416
};
411417

412-
let max_segment_key = self.find_segment(config, &folder_key, u64::MAX).await?;
418+
// request a key bigger than any other to get the last segment
419+
let max_segment_key = self
420+
.find_segment_inner(config, &folder_key, u64::MAX)
421+
.await?;
413422

414423
Ok(super::DbMeta {
415424
max_frame_no: max_segment_key.map(|s| s.end_frame_no).unwrap_or(0),
@@ -432,6 +441,60 @@ where
432441
RestoreOptions::Timestamp(_) => todo!(),
433442
}
434443
}
444+
445+
async fn find_segment(
446+
&self,
447+
config: &Self::Config,
448+
namespace: &NamespaceName,
449+
frame_no: u64,
450+
) -> Result<SegmentKey> {
451+
let folder_key = FolderKey {
452+
cluster_id: &config.cluster_id,
453+
namespace: &namespace,
454+
};
455+
self.find_segment_inner(config, &folder_key, frame_no)
456+
.await?
457+
.ok_or_else(|| Error::FrameNotFound(frame_no))
458+
}
459+
460+
async fn fetch_segment_index(
461+
&self,
462+
config: &Self::Config,
463+
namespace: &NamespaceName,
464+
key: &SegmentKey,
465+
) -> Result<fst::Map<Arc<[u8]>>> {
466+
let folder_key = FolderKey {
467+
cluster_id: &config.cluster_id,
468+
namespace: &namespace,
469+
};
470+
self.fetch_segment_index_inner(config, &folder_key, key).await
471+
}
472+
473+
async fn fetch_segment_data_to_file(
474+
&self,
475+
config: &Self::Config,
476+
namespace: &NamespaceName,
477+
key: &SegmentKey,
478+
file: &impl FileExt,
479+
) -> Result<()> {
480+
let folder_key = FolderKey {
481+
cluster_id: &config.cluster_id,
482+
namespace: &namespace,
483+
};
484+
self.fetch_segment_data_inner(config, &folder_key, key, file).await?;
485+
Ok(())
486+
}
487+
488+
async fn fetch_segment_data(
489+
self: Arc<Self>,
490+
config: Arc<Self::Config>,
491+
namespace: NamespaceName,
492+
key: SegmentKey,
493+
) -> Result<impl FileExt> {
494+
let file = self.io.tempfile()?;
495+
self.fetch_segment_data_to_file(&config, &namespace, &key, &file).await?;
496+
Ok(file)
497+
}
435498
}
436499

437500
#[derive(Clone, Copy)]

0 commit comments

Comments
 (0)