Skip to content

Commit cfa4b47

Browse files
authored
fix(cubestore): Metastore logs are uploaded without the corresponding snapshot (#6222)
1 parent 23b5f84 commit cfa4b47

File tree

2 files changed

+96
-10
lines changed

2 files changed

+96
-10
lines changed

rust/cubestore/cubestore/src/metastore/mod.rs

Lines changed: 82 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4412,7 +4412,7 @@ mod tests {
44124412
use super::table::AggregateColumn;
44134413
use super::*;
44144414
use crate::config::Config;
4415-
use crate::remotefs::LocalDirRemoteFs;
4415+
use crate::remotefs::{LocalDirRemoteFs, RemoteFs};
44164416
use futures_timer::Delay;
44174417
use rocksdb::IteratorMode;
44184418
use std::thread::sleep;
@@ -5452,6 +5452,87 @@ mod tests {
54525452
}
54535453
}
54545454

5455+
#[tokio::test]
5456+
async fn upload_logs_without_snapshots() {
5457+
let config = Config::test("upload_logs_without_snapshots");
5458+
5459+
let _ = fs::remove_dir_all(config.local_dir());
5460+
let _ = fs::remove_dir_all(config.remote_dir());
5461+
5462+
let services = config.configure().await;
5463+
5464+
services.start_processing_loops().await.unwrap();
5465+
let rocks_meta_store = services.rocks_meta_store.as_ref().unwrap();
5466+
let remote_fs = services
5467+
.injector
5468+
.get_service::<dyn RemoteFs>("original_remote_fs")
5469+
.await;
5470+
services
5471+
.meta_store
5472+
.create_schema("foo1".to_string(), false)
5473+
.await
5474+
.unwrap();
5475+
rocks_meta_store.run_upload().await.unwrap();
5476+
services
5477+
.meta_store
5478+
.create_schema("foo".to_string(), false)
5479+
.await
5480+
.unwrap();
5481+
rocks_meta_store.run_upload().await.unwrap();
5482+
let uploaded = remote_fs.list("metastore-").await.unwrap();
5483+
assert!(uploaded.is_empty());
5484+
5485+
rocks_meta_store.upload_check_point().await.unwrap();
5486+
5487+
services
5488+
.meta_store
5489+
.create_schema("bar".to_string(), false)
5490+
.await
5491+
.unwrap();
5492+
5493+
rocks_meta_store.run_upload().await.unwrap();
5494+
5495+
let uploaded = remote_fs.list("metastore-").await.unwrap();
5496+
5497+
let logs_uploaded = uploaded
5498+
.into_iter()
5499+
.filter(|n| n.contains("-logs"))
5500+
.collect::<Vec<_>>();
5501+
5502+
assert_eq!(logs_uploaded.len(), 1);
5503+
5504+
rocks_meta_store.run_upload().await.unwrap();
5505+
5506+
let uploaded = remote_fs.list("metastore-").await.unwrap();
5507+
5508+
let logs_uploaded = uploaded
5509+
.into_iter()
5510+
.filter(|n| n.contains("-logs"))
5511+
.collect::<Vec<_>>();
5512+
5513+
assert_eq!(logs_uploaded.len(), 1);
5514+
5515+
services
5516+
.meta_store
5517+
.create_schema("bar2".to_string(), false)
5518+
.await
5519+
.unwrap();
5520+
5521+
rocks_meta_store.run_upload().await.unwrap();
5522+
5523+
let uploaded = remote_fs.list("metastore-").await.unwrap();
5524+
5525+
let logs_uploaded = uploaded
5526+
.into_iter()
5527+
.filter(|n| n.contains("-logs"))
5528+
.collect::<Vec<_>>();
5529+
5530+
assert_eq!(logs_uploaded.len(), 2);
5531+
5532+
let _ = fs::remove_dir_all(config.local_dir());
5533+
let _ = fs::remove_dir_all(config.remote_dir());
5534+
}
5535+
54555536
#[tokio::test]
54565537
async fn log_replay_ordering() {
54575538
{

rust/cubestore/cubestore/src/metastore/rocks_store.rs

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -511,6 +511,7 @@ pub struct RocksStore {
511511
pub(crate) write_completed_notify: Arc<Notify>,
512512
last_upload_seq: Arc<RwLock<u64>>,
513513
last_check_seq: Arc<RwLock<u64>>,
514+
snapshot_uploaded: Arc<RwLock<bool>>,
514515
snapshots_upload_stopped: Arc<AsyncMutex<bool>>,
515516
pub(crate) cached_tables: Arc<Mutex<Option<Arc<Vec<TablePath>>>>>,
516517
rw_loop_tx: std::sync::mpsc::SyncSender<
@@ -579,6 +580,7 @@ impl RocksStore {
579580
listeners: Arc::new(RwLock::new(listeners)),
580581
metastore_fs,
581582
last_checkpoint_time: Arc::new(RwLock::new(SystemTime::now())),
583+
snapshot_uploaded: Arc::new(RwLock::new(false)),
582584
write_notify: Arc::new(Notify::new()),
583585
write_completed_notify: Arc::new(Notify::new()),
584586
last_upload_seq: Arc::new(RwLock::new(db_arc.latest_sequence_number())),
@@ -766,13 +768,15 @@ impl RocksStore {
766768
seq_numbers.iter().max().map(|v| *v),
767769
)
768770
};
769-
770771
if max.is_some() {
771-
let checkpoint_time = self.last_checkpoint_time.read().await;
772-
let dir_name = format!("{}-logs", self.get_store_path(&checkpoint_time));
773-
self.metastore_fs
774-
.upload_log(&dir_name, min.unwrap(), &serializer)
775-
.await?;
772+
let snapshot_uploaded = self.snapshot_uploaded.read().await;
773+
if *snapshot_uploaded {
774+
let checkpoint_time = self.last_checkpoint_time.read().await;
775+
let dir_name = format!("{}-logs", self.get_store_path(&checkpoint_time));
776+
self.metastore_fs
777+
.upload_log(&dir_name, min.unwrap(), &serializer)
778+
.await?;
779+
}
776780
let mut seq = self.last_upload_seq.write().await;
777781
*seq = max.unwrap();
778782
self.write_completed_notify.notify_waiters();
@@ -785,11 +789,10 @@ impl RocksStore {
785789
{
786790
info!("Uploading {} check point", self.details.get_name());
787791
self.upload_check_point().await?;
792+
let mut check_seq = self.last_check_seq.write().await;
793+
*check_seq = last_db_seq;
788794
}
789795

790-
let mut check_seq = self.last_check_seq.write().await;
791-
*check_seq = last_db_seq;
792-
793796
info!(
794797
"Persisting {} snapshot: done ({:?})",
795798
self.details.get_name(),
@@ -813,6 +816,8 @@ impl RocksStore {
813816
self.metastore_fs
814817
.upload_checkpoint(remote_path, checkpoint_path)
815818
.await?;
819+
let mut snapshot_uploaded = self.snapshot_uploaded.write().await;
820+
*snapshot_uploaded = true;
816821
self.write_completed_notify.notify_waiters();
817822
}
818823
Ok(())

0 commit comments

Comments
 (0)