Skip to content

Commit 13b64fc

Browse files
authored
chore(cubestore): Small metastore refactoring (#5790)
1 parent 02a8e02 commit 13b64fc

File tree

2 files changed

+72
-62
lines changed

2 files changed

+72
-62
lines changed

rust/cubestore/cubestore/src/metastore/rocks_fs.rs

Lines changed: 68 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,25 @@ pub trait MetaStoreFs: Send + Sync {
2424
) -> Result<Arc<RocksStore>, CubeError>;
2525
async fn upload_log(
2626
&self,
27-
log_name: &str,
27+
dir: &str,
28+
seq_number: u64,
2829
serializer: &WriteBatchContainer,
2930
) -> Result<u64, CubeError>;
3031
async fn upload_checkpoint(
3132
&self,
3233
remote_path: String,
3334
checkpoint_path: PathBuf,
3435
) -> Result<(), CubeError>;
36+
async fn check_rocks_store(
37+
&self,
38+
rocks_store: Arc<RocksStore>,
39+
snapshot: Option<u128>,
40+
) -> Result<Arc<RocksStore>, CubeError>;
41+
async fn load_metastore_logs(
42+
&self,
43+
snapshot: u128,
44+
rocks_store: &Arc<RocksStore>,
45+
) -> Result<(), CubeError>;
3546
}
3647

3748
#[derive(Clone)]
@@ -201,48 +212,22 @@ impl BaseRocksStoreFs {
201212
};
202213
Ok(last_metastore_snapshot)
203214
}
204-
pub async fn load_metastore_logs(
205-
&self,
206-
snapshot: u128,
207-
rocks_store: &Arc<RocksStore>,
208-
) -> Result<(), CubeError> {
209-
let logs_to_batch = self
215+
216+
pub async fn files_to_load(&self, snapshot: u128) -> Result<Vec<(String, u64)>, CubeError> {
217+
let res = self
210218
.remote_fs
211-
.list(&format!("{}-{}-logs", self.name, snapshot))
212-
.await?;
213-
let mut logs_to_batch_to_seq = logs_to_batch
219+
.list_with_metadata(&format!("{}-{}", self.name, snapshot))
220+
.await?
214221
.into_iter()
215-
.map(|f| -> Result<_, CubeError> {
216-
let last = f
217-
.split("/")
218-
.last()
219-
.ok_or(CubeError::internal(format!("Can't split path: {}", f)))?;
220-
let result = last.replace(".flex", "").parse::<usize>().map_err(|e| {
221-
CubeError::internal(format!("Can't parse flex path {}: {}", f, e))
222-
})?;
223-
Ok((f, result))
224-
})
225-
.collect::<Result<Vec<_>, _>>()?;
226-
logs_to_batch_to_seq.sort_unstable_by_key(|(_, seq)| *seq);
227-
228-
for (log_file, _) in logs_to_batch_to_seq.iter() {
229-
let path_to_log = self.remote_fs.local_file(log_file).await?;
230-
let batch = WriteBatchContainer::read_from_file(&path_to_log).await;
231-
if let Ok(batch) = batch {
232-
let db = rocks_store.db.clone();
233-
db.write(batch.write_batch())?;
234-
} else if let Err(e) = batch {
235-
error!(
236-
"Corrupted {} WAL file. Discarding: {:?} {}",
237-
self.name, log_file, e
238-
);
239-
break;
240-
}
241-
}
242-
Ok(())
222+
.map(|f| (f.remote_path, f.file_size))
223+
.collect::<Vec<_>>();
224+
Ok(res)
243225
}
226+
}
244227

245-
pub async fn check_rocks_store(
228+
#[async_trait]
229+
impl MetaStoreFs for BaseRocksStoreFs {
230+
async fn check_rocks_store(
246231
&self,
247232
rocks_store: Arc<RocksStore>,
248233
snapshot: Option<u128>,
@@ -255,21 +240,6 @@ impl BaseRocksStoreFs {
255240

256241
Ok(rocks_store)
257242
}
258-
259-
pub async fn files_to_load(&self, snapshot: u128) -> Result<Vec<(String, u64)>, CubeError> {
260-
let res = self
261-
.remote_fs
262-
.list_with_metadata(&format!("{}-{}", self.name, snapshot))
263-
.await?
264-
.into_iter()
265-
.map(|f| (f.remote_path, f.file_size))
266-
.collect::<Vec<_>>();
267-
Ok(res)
268-
}
269-
}
270-
271-
#[async_trait]
272-
impl MetaStoreFs for BaseRocksStoreFs {
273243
async fn load_from_remote(
274244
self: Arc<Self>,
275245
path: &str,
@@ -321,10 +291,12 @@ impl MetaStoreFs for BaseRocksStoreFs {
321291

322292
async fn upload_log(
323293
&self,
324-
log_name: &str,
294+
dir: &str,
295+
seq_number: u64,
325296
serializer: &WriteBatchContainer,
326297
) -> Result<u64, CubeError> {
327-
let file_name = self.remote_fs.local_file(log_name).await?;
298+
let log_name = format!("{}/{}.flex", dir, seq_number);
299+
let file_name = self.remote_fs.local_file(&log_name).await?;
328300
serializer.write_to_file(&file_name).await?;
329301
// TODO persist file size
330302
self.remote_fs.upload_file(&file_name, &log_name).await
@@ -344,6 +316,46 @@ impl MetaStoreFs for BaseRocksStoreFs {
344316

345317
Ok(())
346318
}
319+
async fn load_metastore_logs(
320+
&self,
321+
snapshot: u128,
322+
rocks_store: &Arc<RocksStore>,
323+
) -> Result<(), CubeError> {
324+
let logs_to_batch = self
325+
.remote_fs
326+
.list(&format!("{}-{}-logs", self.name, snapshot))
327+
.await?;
328+
let mut logs_to_batch_to_seq = logs_to_batch
329+
.into_iter()
330+
.map(|f| -> Result<_, CubeError> {
331+
let last = f
332+
.split("/")
333+
.last()
334+
.ok_or(CubeError::internal(format!("Can't split path: {}", f)))?;
335+
let result = last.replace(".flex", "").parse::<usize>().map_err(|e| {
336+
CubeError::internal(format!("Can't parse flex path {}: {}", f, e))
337+
})?;
338+
Ok((f, result))
339+
})
340+
.collect::<Result<Vec<_>, _>>()?;
341+
logs_to_batch_to_seq.sort_unstable_by_key(|(_, seq)| *seq);
342+
343+
for (log_file, _) in logs_to_batch_to_seq.iter() {
344+
let path_to_log = self.remote_fs.local_file(log_file).await?;
345+
let batch = WriteBatchContainer::read_from_file(&path_to_log).await;
346+
if let Ok(batch) = batch {
347+
let db = rocks_store.db.clone();
348+
db.write(batch.write_batch())?;
349+
} else if let Err(e) = batch {
350+
error!(
351+
"Corrupted {} WAL file. Discarding: {:?} {}",
352+
self.name, log_file, e
353+
);
354+
break;
355+
}
356+
}
357+
Ok(())
358+
}
347359
}
348360

349361
crate::di_service!(BaseRocksStoreFs, [MetaStoreFs]);

rust/cubestore/cubestore/src/metastore/rocks_store.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -625,12 +625,10 @@ impl RocksStore {
625625

626626
if max.is_some() {
627627
let checkpoint_time = self.last_checkpoint_time.read().await;
628-
let log_name = format!(
629-
"{}-logs/{}.flex",
630-
self.get_store_path(&checkpoint_time),
631-
min.unwrap()
632-
);
633-
self.metastore_fs.upload_log(&log_name, &serializer).await?;
628+
let dir_name = format!("{}-logs", self.get_store_path(&checkpoint_time));
629+
self.metastore_fs
630+
.upload_log(&dir_name, min.unwrap(), &serializer)
631+
.await?;
634632
let mut seq = self.last_upload_seq.write().await;
635633
*seq = max.unwrap();
636634
self.write_completed_notify.notify_waiters();

0 commit comments

Comments
 (0)