Skip to content

Commit fb896ba

Browse files
authored
feat(cubestore): Streaming ingestion optimizations (#6198)
1 parent b4c922a commit fb896ba

File tree

9 files changed

+793
-216
lines changed

9 files changed

+793
-216
lines changed

rust/cubestore/cubestore/src/metastore/mod.rs

Lines changed: 107 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -843,6 +843,10 @@ pub trait MetaStore: DIService + Send + Sync {
843843
&self,
844844
seconds_ago: i64,
845845
) -> Result<Vec<IdRow<Partition>>, CubeError>;
846+
async fn get_chunks_without_partition_created_seconds_ago(
847+
&self,
848+
seconds_ago: i64,
849+
) -> Result<Vec<IdRow<Chunk>>, CubeError>;
846850

847851
fn index_table(&self) -> IndexMetaStoreTable;
848852
async fn create_index(
@@ -924,6 +928,11 @@ pub trait MetaStore: DIService + Send + Sync {
924928
in_memory: bool,
925929
) -> Result<IdRow<Chunk>, CubeError>;
926930
async fn get_chunk(&self, chunk_id: u64) -> Result<IdRow<Chunk>, CubeError>;
931+
async fn get_chunks_out_of_queue(&self, ids: Vec<u64>) -> Result<Vec<IdRow<Chunk>>, CubeError>;
932+
async fn get_partitions_out_of_queue(
933+
&self,
934+
ids: Vec<u64>,
935+
) -> Result<Vec<IdRow<Partition>>, CubeError>;
927936
async fn get_chunks_by_partition(
928937
&self,
929938
partition_id: u64,
@@ -972,6 +981,7 @@ pub trait MetaStore: DIService + Send + Sync {
972981
replay_handle_id: Option<u64>,
973982
) -> Result<(), CubeError>;
974983
async fn delete_chunk(&self, chunk_id: u64) -> Result<IdRow<Chunk>, CubeError>;
984+
async fn delete_chunks_without_checks(&self, chunk_ids: Vec<u64>) -> Result<(), CubeError>;
975985
async fn all_inactive_chunks(&self) -> Result<Vec<IdRow<Chunk>>, CubeError>;
976986
async fn all_inactive_not_uploaded_chunks(&self) -> Result<Vec<IdRow<Chunk>>, CubeError>;
977987

@@ -2751,16 +2761,61 @@ impl MetaStore for RocksMetaStore {
27512761
}
27522762

27532763
let partitions_table = PartitionRocksTable::new(db_ref.clone());
2754-
let partitions = partition_ids
2755-
.into_iter()
2756-
.map(|id| partitions_table.get_row_or_not_found(id))
2757-
.collect::<Result<Vec<_>, _>>()?;
2764+
let mut partitions = Vec::new();
2765+
for id in partition_ids {
2766+
if let Some(partition) = partitions_table.get_row(id)? {
2767+
partitions.push(partition);
2768+
}
2769+
}
27582770

27592771
Ok(partitions)
27602772
})
27612773
.await
27622774
}
27632775

2776+
#[tracing::instrument(level = "trace", skip(self))]
2777+
async fn get_chunks_without_partition_created_seconds_ago(
2778+
&self,
2779+
seconds_ago: i64,
2780+
) -> Result<Vec<IdRow<Chunk>>, CubeError> {
2781+
self.read_operation_out_of_queue(move |db_ref| {
2782+
let chunks_table = ChunkRocksTable::new(db_ref.clone());
2783+
2784+
let now = Utc::now();
2785+
let mut partitions = HashMap::new();
2786+
for c in chunks_table.scan_all_rows()? {
2787+
let c = c?;
2788+
if c.get_row().active()
2789+
&& c.get_row()
2790+
.created_at()
2791+
.as_ref()
2792+
.map(|created_at| {
2793+
now.signed_duration_since(created_at.clone()).num_seconds()
2794+
>= seconds_ago
2795+
})
2796+
.unwrap_or(false)
2797+
{
2798+
partitions
2799+
.entry(c.get_row().get_partition_id())
2800+
.or_insert(vec![])
2801+
.push(c);
2802+
}
2803+
}
2804+
2805+
let partitions_table = PartitionRocksTable::new(db_ref.clone());
2806+
2807+
let mut result = Vec::new();
2808+
for (id, mut chunks) in partitions {
2809+
if partitions_table.get_row(id)?.is_none() {
2810+
result.append(&mut chunks);
2811+
}
2812+
}
2813+
2814+
Ok(result)
2815+
})
2816+
.await
2817+
}
2818+
27642819
fn index_table(&self) -> IndexMetaStoreTable {
27652820
IndexMetaStoreTable {
27662821
rocks_meta_store: self.store.clone(),
@@ -3037,6 +3092,36 @@ impl MetaStore for RocksMetaStore {
30373092
})
30383093
.await
30393094
}
3095+
#[tracing::instrument(level = "trace", skip(self))]
3096+
async fn get_chunks_out_of_queue(&self, ids: Vec<u64>) -> Result<Vec<IdRow<Chunk>>, CubeError> {
3097+
self.read_operation_out_of_queue(move |db| {
3098+
let db = ChunkRocksTable::new(db.clone());
3099+
let mut res = Vec::with_capacity(ids.len());
3100+
for id in ids.into_iter() {
3101+
if let Some(chunk) = db.get_row(id)? {
3102+
res.push(chunk);
3103+
}
3104+
}
3105+
Ok(res)
3106+
})
3107+
.await
3108+
}
3109+
async fn get_partitions_out_of_queue(
3110+
&self,
3111+
ids: Vec<u64>,
3112+
) -> Result<Vec<IdRow<Partition>>, CubeError> {
3113+
self.read_operation_out_of_queue(move |db| {
3114+
let db = PartitionRocksTable::new(db.clone());
3115+
let mut res = Vec::with_capacity(ids.len());
3116+
for id in ids.into_iter() {
3117+
if let Some(partition) = db.get_row(id)? {
3118+
res.push(partition);
3119+
}
3120+
}
3121+
Ok(res)
3122+
})
3123+
.await
3124+
}
30403125

30413126
#[tracing::instrument(level = "trace", skip(self))]
30423127
async fn get_chunks_by_partition(
@@ -3252,6 +3337,19 @@ impl MetaStore for RocksMetaStore {
32523337
.await
32533338
}
32543339

3340+
#[tracing::instrument(level = "trace", skip(self))]
3341+
async fn delete_chunks_without_checks(&self, chunk_ids: Vec<u64>) -> Result<(), CubeError> {
3342+
self.write_operation(move |db_ref, batch_pipe| {
3343+
let chunks = ChunkRocksTable::new(db_ref.clone());
3344+
for id in chunk_ids {
3345+
chunks.delete(id, batch_pipe)?;
3346+
}
3347+
3348+
Ok(())
3349+
})
3350+
.await
3351+
}
3352+
32553353
#[tracing::instrument(level = "trace", skip(self))]
32563354
async fn all_inactive_chunks(&self) -> Result<Vec<IdRow<Chunk>>, CubeError> {
32573355
self.read_operation_out_of_queue(move |db_ref| {
@@ -3613,8 +3711,11 @@ impl MetaStore for RocksMetaStore {
36133711
let table = ReplayHandleRocksTable::new(db_ref);
36143712
let rows = ids
36153713
.iter()
3616-
.map(|id| table.get_row_or_not_found(*id))
3617-
.collect::<Result<Vec<_>, _>>()?;
3714+
.map(|id| table.get_row(*id))
3715+
.collect::<Result<Vec<_>, _>>()?
3716+
.into_iter()
3717+
.filter_map(|v| v)
3718+
.collect::<Vec<_>>();
36183719
Ok(rows)
36193720
})
36203721
.await

rust/cubestore/cubestore/src/metastore/rocks_store.rs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -676,8 +676,6 @@ impl RocksStore {
676676
cube_ext::spawn_blocking(move || {
677677
let res = rw_loop_sender.send(Box::new(move || {
678678
let db_span = warn_long("store write operation", Duration::from_millis(100));
679-
let span = tracing::trace_span!("metastore write operation");
680-
let span_holder = span.enter();
681679

682680
let mut batch = BatchPipe::new(db_to_send.as_ref());
683681
let snapshot = db_to_send.snapshot();
@@ -713,7 +711,6 @@ impl RocksStore {
713711
}
714712
}
715713

716-
mem::drop(span_holder);
717714
mem::drop(db_span);
718715

719716
Ok(())
@@ -875,8 +872,6 @@ impl RocksStore {
875872
cube_ext::spawn_blocking(move || {
876873
let res = rw_loop_sender.send(Box::new(move || {
877874
let db_span = warn_long("metastore read operation", Duration::from_millis(100));
878-
let span = tracing::trace_span!("metastore read operation");
879-
let span_holder = span.enter();
880875

881876
let snapshot = db_to_send.snapshot();
882877
let res = f(DbTableRef {
@@ -893,7 +888,6 @@ impl RocksStore {
893888
))
894889
})?;
895890

896-
mem::drop(span_holder);
897891
mem::drop(db_span);
898892

899893
Ok(())

rust/cubestore/cubestore/src/queryplanner/test_utils.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -658,6 +658,27 @@ impl MetaStore for MetaStoreMock {
658658
async fn set_current_snapshot(&self, _snapshot_id: u128) -> Result<(), CubeError> {
659659
panic!("MetaStore mock!")
660660
}
661+
async fn get_chunks_out_of_queue(
662+
&self,
663+
_ids: Vec<u64>,
664+
) -> Result<Vec<IdRow<Chunk>>, CubeError> {
665+
panic!("MetaStore mock!")
666+
}
667+
async fn get_partitions_out_of_queue(
668+
&self,
669+
_ids: Vec<u64>,
670+
) -> Result<Vec<IdRow<Partition>>, CubeError> {
671+
panic!("MetaStore mock!")
672+
}
673+
async fn delete_chunks_without_checks(&self, _chunk_ids: Vec<u64>) -> Result<(), CubeError> {
674+
panic!("MetaStore mock!")
675+
}
676+
async fn get_chunks_without_partition_created_seconds_ago(
677+
&self,
678+
_seconds_ago: i64,
679+
) -> Result<Vec<IdRow<Chunk>>, CubeError> {
680+
panic!("MetaStore mock!")
681+
}
661682
}
662683

663684
crate::di_service!(MetaStoreMock, [MetaStore]);

0 commit comments

Comments
 (0)