Skip to content

Commit 114a847

Browse files
authored
feat(cubestore): Streaming optimizations (#6228)
1 parent 2310531 commit 114a847

File tree

13 files changed

+414
-114
lines changed

13 files changed

+414
-114
lines changed

rust/cubestore/cubestore/src/app_metrics.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,3 +21,32 @@ pub static CACHE_QUERY_TIME_MS: Histogram = metrics::histogram("cs.sql.query.cac
2121
/// Incoming queue queries.
2222
pub static QUEUE_QUERIES: Counter = metrics::counter("cs.sql.query.queue");
2323
pub static QUEUE_QUERY_TIME_MS: Histogram = metrics::histogram("cs.sql.query.queue.ms");
24+
25+
pub static STREAMING_ROWS_READ: Counter = metrics::counter("cs.streaming.rows");
26+
pub static STREAMING_CHUNKS_READ: Counter = metrics::counter("cs.streaming.chunks");
27+
pub static STREAMING_LASTOFFSET: Gauge = metrics::gauge("cs.streaming.lastoffset");
28+
pub static IN_MEMORY_CHUNKS_COUNT: Gauge = metrics::gauge("cs.workers.in_memory_chunks)");
29+
pub static IN_MEMORY_CHUNKS_ROWS: Gauge = metrics::gauge("cs.workers.in_memory_chunks.rows)");
30+
pub static IN_MEMORY_CHUNKS_MEMORY: Gauge = metrics::gauge("cs.workers.in_memory_chunks.memory)");
31+
pub static STREAMING_IMPORT_TIME: Histogram = metrics::histogram("cs.streaming.import_time.ms");
32+
pub static STREAMING_PARTITION_TIME: Histogram =
33+
metrics::histogram("cs.streaming.partition_time.ms");
34+
pub static STREAMING_UPLOAD_TIME: Histogram = metrics::histogram("cs.streaming.upload_time.ms");
35+
pub static STREAMING_ROUNDTRIP_TIME: Histogram =
36+
metrics::histogram("cs.streaming.roundtrip_time.ms");
37+
pub static STREAMING_ROUNDTRIP_ROWS: Histogram = metrics::histogram("cs.streaming.roundtrip_rows");
38+
pub static STREAMING_ROUNDTRIP_CHUNKS: Histogram =
39+
metrics::histogram("cs.streaming.roundtrip_chunks");
40+
pub static STREAMING_LAG: Gauge = metrics::gauge("cs.streaming.lag");
41+
42+
pub static METASTORE_QUEUE: Gauge = metrics::gauge("cs.metastore.queue_size");
43+
pub static METASTORE_READ_OPERATION: Histogram =
44+
metrics::histogram("cs.metastore.read_operation.ms");
45+
pub static METASTORE_INNER_READ_OPERATION: Histogram =
46+
metrics::histogram("cs.metastore.inner_read_operation.ms");
47+
pub static METASTORE_WRITE_OPERATION: Histogram =
48+
metrics::histogram("cs.metastore.write_operation.ms");
49+
pub static METASTORE_INNER_WRITE_OPERATION: Histogram =
50+
metrics::histogram("cs.metastore.inner_write_operation.ms");
51+
pub static METASTORE_READ_OUT_QUEUE_OPERATION: Histogram =
52+
metrics::histogram("cs.metastore.read_out_queue_operation.ms");

rust/cubestore/cubestore/src/config/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -906,7 +906,7 @@ impl Config {
906906
),
907907
stream_replay_check_interval_secs: env_parse(
908908
"CUBESTORE_STREAM_REPLAY_CHECK_INTERVAL",
909-
0,
909+
60,
910910
),
911911
check_ws_orphaned_messages_interval_secs: env_parse(
912912
"CUBESTORE_CHECK_WS_ORPHANED_MESSAGES_INTERVAL",

rust/cubestore/cubestore/src/metastore/mod.rs

Lines changed: 61 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -863,6 +863,11 @@ pub trait MetaStore: DIService + Send + Sync {
863863
) -> Result<Vec<IdRow<Partition>>, CubeError>;
864864
async fn get_index(&self, index_id: u64) -> Result<IdRow<Index>, CubeError>;
865865

866+
async fn get_index_with_active_partitions_out_of_queue(
867+
&self,
868+
index_id: u64,
869+
) -> Result<(IdRow<Index>, Vec<IdRow<Partition>>), CubeError>;
870+
866871
async fn create_partitioned_index(
867872
&self,
868873
schema: String,
@@ -927,6 +932,7 @@ pub trait MetaStore: DIService + Send + Sync {
927932
max: Option<Row>,
928933
in_memory: bool,
929934
) -> Result<IdRow<Chunk>, CubeError>;
935+
async fn insert_chunks(&self, chunks: Vec<Chunk>) -> Result<Vec<IdRow<Chunk>>, CubeError>;
930936
async fn get_chunk(&self, chunk_id: u64) -> Result<IdRow<Chunk>, CubeError>;
931937
async fn get_chunks_out_of_queue(&self, ids: Vec<u64>) -> Result<Vec<IdRow<Chunk>>, CubeError>;
932938
async fn get_partitions_out_of_queue(
@@ -1046,11 +1052,11 @@ pub trait MetaStore: DIService + Send + Sync {
10461052
async fn all_replay_handles_to_merge(
10471053
&self,
10481054
) -> Result<Vec<(IdRow<ReplayHandle>, bool)>, CubeError>;
1049-
async fn update_replay_handle_failed(
1055+
async fn update_replay_handle_failed_if_exists(
10501056
&self,
10511057
id: u64,
10521058
failed: bool,
1053-
) -> Result<IdRow<ReplayHandle>, CubeError>;
1059+
) -> Result<(), CubeError>;
10541060
async fn replace_replay_handles(
10551061
&self,
10561062
old_ids: Vec<u64>,
@@ -2909,6 +2915,28 @@ impl MetaStore for RocksMetaStore {
29092915
.await
29102916
}
29112917

2918+
async fn get_index_with_active_partitions_out_of_queue(
2919+
&self,
2920+
index_id: u64,
2921+
) -> Result<(IdRow<Index>, Vec<IdRow<Partition>>), CubeError> {
2922+
self.read_operation_out_of_queue(move |db_ref| {
2923+
let index = IndexRocksTable::new(db_ref.clone()).get_row_or_not_found(index_id)?;
2924+
let rocks_partition = PartitionRocksTable::new(db_ref);
2925+
2926+
let partitions = rocks_partition
2927+
.get_rows_by_index(
2928+
&PartitionIndexKey::ByIndexId(index.get_id()),
2929+
&PartitionRocksIndex::IndexId,
2930+
)?
2931+
.into_iter()
2932+
.filter(|r| r.get_row().active)
2933+
.collect::<Vec<_>>();
2934+
2935+
Ok((index, partitions))
2936+
})
2937+
.await
2938+
}
2939+
29122940
#[tracing::instrument(level = "trace", skip(self, key_columns))]
29132941
async fn create_partitioned_index(
29142942
&self,
@@ -3086,6 +3114,22 @@ impl MetaStore for RocksMetaStore {
30863114
.await
30873115
}
30883116

3117+
#[tracing::instrument(level = "trace", skip(self, chunks))]
3118+
async fn insert_chunks(&self, chunks: Vec<Chunk>) -> Result<Vec<IdRow<Chunk>>, CubeError> {
3119+
self.write_operation(move |db_ref, batch_pipe| {
3120+
let rocks_chunk = ChunkRocksTable::new(db_ref.clone());
3121+
let mut result = Vec::with_capacity(chunks.len());
3122+
3123+
for chunk in chunks.into_iter() {
3124+
let id_row = rocks_chunk.insert(chunk, batch_pipe)?;
3125+
result.push(id_row);
3126+
}
3127+
3128+
Ok(result)
3129+
})
3130+
.await
3131+
}
3132+
30893133
#[tracing::instrument(level = "trace", skip(self))]
30903134
async fn get_chunk(&self, chunk_id: u64) -> Result<IdRow<Chunk>, CubeError> {
30913135
self.read_operation(move |db_ref| {
@@ -3723,17 +3767,18 @@ impl MetaStore for RocksMetaStore {
37233767
}
37243768

37253769
#[tracing::instrument(level = "trace", skip(self))]
3726-
async fn update_replay_handle_failed(
3770+
async fn update_replay_handle_failed_if_exists(
37273771
&self,
37283772
id: u64,
37293773
failed: bool,
3730-
) -> Result<IdRow<ReplayHandle>, CubeError> {
3774+
) -> Result<(), CubeError> {
37313775
self.write_operation(move |db_ref, batch_pipe| {
3732-
Ok(ReplayHandleRocksTable::new(db_ref.clone()).update_with_fn(
3733-
id,
3734-
|h| h.set_failed_to_persist_chunks(failed),
3735-
batch_pipe,
3736-
)?)
3776+
let table = ReplayHandleRocksTable::new(db_ref.clone());
3777+
if table.get_row(id)?.is_some() {
3778+
table.update_with_fn(id, |h| h.set_failed_to_persist_chunks(failed), batch_pipe)?;
3779+
}
3780+
3781+
Ok(())
37373782
})
37383783
.await
37393784
}
@@ -3759,10 +3804,11 @@ impl MetaStore for RocksMetaStore {
37593804
&ChunkRocksIndex::ReplayHandleId,
37603805
)?;
37613806

3762-
if !chunks.is_empty() {
3807+
let active_chunks = chunks.iter().filter(|c| c.get_row().active() || !c.get_row().uploaded()).collect::<Vec<_>>();
3808+
if !active_chunks.is_empty() {
37633809
return Err(CubeError::internal(format!(
3764-
"Can't merge replay handle with chunks: {:?}",
3765-
replay_handle
3810+
"Can't merge replay handle with chunks: {:?}, {}",
3811+
replay_handle, active_chunks[0].get_id()
37663812
)))
37673813
}
37683814

@@ -3811,7 +3857,9 @@ impl MetaStore for RocksMetaStore {
38113857
)?;
38123858
result.push((
38133859
replay_handle,
3814-
chunks.iter().filter(|c| c.get_row().active()).count() == 0,
3860+
chunks
3861+
.iter()
3862+
.all(|c| !c.get_row().active() && c.get_row().uploaded()),
38153863
));
38163864
}
38173865
Ok(result)

rust/cubestore/cubestore/src/metastore/rocks_store.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -733,7 +733,6 @@ impl RocksStore {
733733
}
734734
}
735735
}
736-
737736
Ok(spawn_res)
738737
}
739738

rust/cubestore/cubestore/src/metastore/rocks_table.rs

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,7 @@ where
263263

264264
pub struct IndexScanIter<'a, RT: RocksTable + ?Sized> {
265265
table: &'a RT,
266+
index_id: u32,
266267
secondary_key_val: Vec<u8>,
267268
secondary_key_hash: Vec<u8>,
268269
iter: DBIterator<'a>,
@@ -289,7 +290,30 @@ where
289290
continue;
290291
}
291292

292-
return Some(self.table.get_row_or_not_found(row_id));
293+
let result = match self.table.get_row(row_id) {
294+
Ok(Some(row)) => Ok(row),
295+
Ok(None) => {
296+
let index = self.table.get_index_by_id(self.index_id);
297+
match self.table.rebuild_index(&index) {
298+
Ok(_) => {
299+
Err(CubeError::internal(format!(
300+
"Row exists in secondary index however missing in {:?} table: {}. Repairing index.",
301+
self.table, row_id
302+
)))
303+
}
304+
Err(e) => {
305+
Err(CubeError::internal(format!(
306+
"Error while rebuilding secondary index for {:?} table: {:?}",
307+
self.table, e
308+
)))
309+
}
310+
311+
}
312+
}
313+
Err(e) => Err(e),
314+
};
315+
316+
return Some(result);
293317
};
294318
} else {
295319
return None;
@@ -1017,12 +1041,10 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync {
10171041
.to_vec();
10181042
let secondary_key_val = secondary_index.key_to_bytes(&row_key);
10191043

1044+
let index_id = RocksSecondaryIndex::get_id(secondary_index);
10201045
let key_len = secondary_key_hash.len();
1021-
let key_min = RowKey::SecondaryIndex(
1022-
Self::index_id(RocksSecondaryIndex::get_id(secondary_index)),
1023-
secondary_key_hash.clone(),
1024-
0,
1025-
);
1046+
let key_min =
1047+
RowKey::SecondaryIndex(Self::index_id(index_id), secondary_key_hash.clone(), 0);
10261048

10271049
let mut opts = ReadOptions::default();
10281050
opts.set_prefix_same_as_start(true);
@@ -1033,6 +1055,7 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync {
10331055

10341056
Ok(IndexScanIter {
10351057
table: self,
1058+
index_id,
10361059
secondary_key_val,
10371060
secondary_key_hash,
10381061
iter,

rust/cubestore/cubestore/src/queryplanner/test_utils.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -622,11 +622,11 @@ impl MetaStore for MetaStoreMock {
622622
panic!("MetaStore mock!")
623623
}
624624

625-
async fn update_replay_handle_failed(
625+
async fn update_replay_handle_failed_if_exists(
626626
&self,
627627
_id: u64,
628628
_failed: bool,
629-
) -> Result<IdRow<ReplayHandle>, CubeError> {
629+
) -> Result<(), CubeError> {
630630
panic!("MetaStore mock!")
631631
}
632632

@@ -685,6 +685,15 @@ impl MetaStore for MetaStoreMock {
685685
) -> Result<Vec<IdRow<Chunk>>, CubeError> {
686686
panic!("MetaStore mock!")
687687
}
688+
async fn get_index_with_active_partitions_out_of_queue(
689+
&self,
690+
_index_id: u64,
691+
) -> Result<(IdRow<Index>, Vec<IdRow<Partition>>), CubeError> {
692+
panic!("MetaStore mock!")
693+
}
694+
async fn insert_chunks(&self, _chunks: Vec<Chunk>) -> Result<Vec<IdRow<Chunk>>, CubeError> {
695+
panic!("MetaStore mock!")
696+
}
688697
}
689698

690699
crate::di_service!(MetaStoreMock, [MetaStore]);

rust/cubestore/cubestore/src/scheduler/mod.rs

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use crate::cluster::{pick_worker_by_ids, Cluster};
22
use crate::config::ConfigObj;
33
use crate::metastore::job::{Job, JobStatus, JobType};
44
use crate::metastore::partition::partition_file_name;
5+
use crate::metastore::replay_handle::ReplayHandle;
56
use crate::metastore::replay_handle::{
67
subtract_from_right_seq_pointer_by_location, subtract_if_covers_seq_pointer_by_location,
78
union_seq_pointer_by_location, SeqPointerForLocation,
@@ -206,7 +207,10 @@ impl SchedulerImpl {
206207
)
207208
.await
208209
{
209-
error!("Error scheduling partitions compaction: {}", e);
210+
error!(
211+
"Error scheduling deactivation chunks without partitions: {}",
212+
e
213+
);
210214
}
211215

212216
if let Err(e) = warn_long_fut(
@@ -314,6 +318,12 @@ impl SchedulerImpl {
314318
/// Otherwise we just subtract it from resulting `SeqPointer` so freshly created `ReplayHandle`
315319
/// can't remove failed one.
316320
pub async fn merge_replay_handles(&self) -> Result<(), CubeError> {
321+
fn is_newest_handle(handle: &IdRow<ReplayHandle>) -> bool {
322+
Utc::now()
323+
.signed_duration_since(handle.get_row().created_at().clone())
324+
.num_seconds()
325+
< 60
326+
}
317327
let (failed, mut without_failed) = self
318328
.meta_store
319329
.all_replay_handles_to_merge()
@@ -339,7 +349,7 @@ impl SchedulerImpl {
339349
let handles = handles.collect::<Vec<_>>();
340350
for (handle, _) in handles
341351
.iter()
342-
.filter(|(_, no_active_chunks)| *no_active_chunks)
352+
.filter(|(handle, no_active_chunks)| !is_newest_handle(handle) && *no_active_chunks)
343353
{
344354
union_seq_pointer_by_location(
345355
&mut seq_pointer_by_location,
@@ -424,7 +434,7 @@ impl SchedulerImpl {
424434
for chunk in chunks_without_partitions {
425435
if let Some(handle_id) = chunk.get_row().replay_handle_id() {
426436
self.meta_store
427-
.update_replay_handle_failed(*handle_id, true)
437+
.update_replay_handle_failed_if_exists(*handle_id, true)
428438
.await?;
429439
}
430440
ids.push(chunk.get_id());
@@ -503,7 +513,7 @@ impl SchedulerImpl {
503513
// Using get_tables_with_path due to it's cached
504514
let tables = self.meta_store.get_tables_with_path(true).await?;
505515
for table in tables.iter() {
506-
if table.table.get_row().is_ready() {
516+
if table.table.get_row().is_ready() && !table.table.get_row().sealed() {
507517
if let Some(locations) = table.table.get_row().locations() {
508518
for location in locations.iter() {
509519
if Table::is_stream_location(location) {

rust/cubestore/cubestore/src/store/compaction.rs

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -310,10 +310,10 @@ impl CompactionServiceImpl {
310310
for (failed_chunk, _) in failed {
311311
if let Some(handle_id) = failed_chunk.get_row().replay_handle_id() {
312312
self.meta_store
313-
.update_replay_handle_failed(*handle_id, true)
313+
.update_replay_handle_failed_if_exists(*handle_id, true)
314314
.await?;
315-
deactivate_failed_chunk_ids.push(failed_chunk.get_id());
316315
}
316+
deactivate_failed_chunk_ids.push(failed_chunk.get_id());
317317
}
318318
self.meta_store
319319
.deactivate_chunks_without_check(deactivate_failed_chunk_ids)
@@ -729,15 +729,21 @@ impl CompactionService for CompactionServiceImpl {
729729
.compaction_in_memory_chunks_max_lifetime_threshold()
730730
as i64
731731
})
732-
.unwrap_or(true)
732+
.unwrap_or(false)
733733
});
734734

735-
self.compact_chunks_to_memory(mem_chunks, &partition, &index, &table)
736-
.await?;
737-
self.compact_chunks_to_persistent(persistent_chunks, &partition, &index, &table)
738-
.await?;
739-
self.deactivate_and_mark_failed_chunks_for_replay(failed)
740-
.await?;
735+
let deactivate_res = self
736+
.deactivate_and_mark_failed_chunks_for_replay(failed)
737+
.await;
738+
let in_memory_res = self
739+
.compact_chunks_to_memory(mem_chunks, &partition, &index, &table)
740+
.await;
741+
let persistent_res = self
742+
.compact_chunks_to_persistent(persistent_chunks, &partition, &index, &table)
743+
.await;
744+
deactivate_res?;
745+
in_memory_res?;
746+
persistent_res?;
741747

742748
Ok(())
743749
}

0 commit comments

Comments
 (0)