Skip to content

Commit 146798a

Browse files
authored
feat(cubestore): Optimization of inmemory chunk operations for streaming (#6354)
1 parent a01d2f9 commit 146798a

File tree

11 files changed

+535
-120
lines changed

11 files changed

+535
-120
lines changed

rust/cubestore/cubestore/src/cluster/message.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@ pub enum NetworkMessage {
4343
},
4444
FreeMemoryChunkResult(Result<(), CubeError>),
4545

46+
FreeDeletedMemoryChunks,
47+
FreeDeletedMemoryChunksResult(Result<(), CubeError>),
48+
4649
MetaStoreCall(MetaStoreRpcMethodCall),
4750
MetaStoreCallResult(MetaStoreRpcMethodResult),
4851

rust/cubestore/cubestore/src/cluster/mod.rs

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ pub trait Cluster: DIService + Send + Sync {
128128
) -> Result<(), CubeError>;
129129

130130
async fn free_memory_chunk(&self, node_name: &str, chunk_id: u64) -> Result<(), CubeError>;
131+
async fn free_deleted_memory_chunks(&self, node_name: &str) -> Result<(), CubeError>;
131132

132133
fn job_result_listener(&self) -> JobResultListener;
133134

@@ -435,6 +436,16 @@ impl Cluster for ClusterImpl {
435436
}
436437
}
437438

439+
async fn free_deleted_memory_chunks(&self, node_name: &str) -> Result<(), CubeError> {
440+
let response = self
441+
.send_or_process_locally(node_name, NetworkMessage::FreeDeletedMemoryChunks)
442+
.await?;
443+
match response {
444+
NetworkMessage::FreeDeletedMemoryChunksResult(r) => r,
445+
x => panic!("Unexpected result for add chunk: {:?}", x),
446+
}
447+
}
448+
438449
fn job_result_listener(&self) -> JobResultListener {
439450
JobResultListener {
440451
receiver: self.meta_store_sender.subscribe(),
@@ -573,8 +584,21 @@ impl Cluster for ClusterImpl {
573584
let res = chunk_store.free_memory_chunk(chunk_id).await;
574585
NetworkMessage::FreeMemoryChunkResult(res)
575586
}
587+
NetworkMessage::FreeDeletedMemoryChunks => {
588+
let chunk_store = self
589+
.injector
590+
.upgrade()
591+
.unwrap()
592+
.get_service_typed::<dyn ChunkDataStore>()
593+
.await;
594+
let res = chunk_store.free_deleted_memory_chunks().await;
595+
NetworkMessage::FreeDeletedMemoryChunksResult(res)
596+
}
576597
NetworkMessage::FreeMemoryChunkResult(_) => {
577-
panic!("AddChunkResult sent to worker");
598+
panic!("FreeMemoryChunkResult sent to worker");
599+
}
600+
NetworkMessage::FreeDeletedMemoryChunksResult(_) => {
601+
panic!("FreeDeletedMemoryChunksResult sent to worker");
578602
}
579603
NetworkMessage::MetaStoreCall(_) | NetworkMessage::MetaStoreCallResult(_) => {
580604
panic!("MetaStoreCall sent to worker");
@@ -890,6 +914,9 @@ impl JobRunner {
890914
if let RowKey::Table(TableId::Partitions, partition_id) = job.row_reference() {
891915
let compaction_service = self.compaction_service.clone();
892916
let partition_id = *partition_id;
917+
log::warn!(
918+
"JobType::InMemoryChunksCompaction is deprecated and should not be used"
919+
);
893920
Ok(cube_ext::spawn(async move {
894921
compaction_service
895922
.compact_in_memory_chunks(partition_id)
@@ -899,6 +926,19 @@ impl JobRunner {
899926
Self::fail_job_row_key(job)
900927
}
901928
}
929+
JobType::NodeInMemoryChunksCompaction(_) => {
930+
if let RowKey::Table(TableId::Tables, _) = job.row_reference() {
931+
let compaction_service = self.compaction_service.clone();
932+
let node_name = self.server_name.clone();
933+
Ok(cube_ext::spawn(async move {
934+
compaction_service
935+
.compact_node_in_memory_chunks(node_name)
936+
.await
937+
}))
938+
} else {
939+
Self::fail_job_row_key(job)
940+
}
941+
}
902942
JobType::MultiPartitionSplit => {
903943
if let RowKey::Table(TableId::MultiPartitions, id) = job.row_reference() {
904944
let compaction_service = self.compaction_service.clone();

rust/cubestore/cubestore/src/config/mod.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -361,6 +361,8 @@ pub trait ConfigObj: DIService {
361361

362362
fn compaction_in_memory_chunks_ratio_check_threshold(&self) -> u64;
363363

364+
fn compaction_in_memory_chunks_schedule_period_secs(&self) -> u64;
365+
364366
fn wal_split_threshold(&self) -> u64;
365367

366368
fn select_worker_pool_size(&self) -> usize;
@@ -482,6 +484,7 @@ pub struct ConfigObjImpl {
482484
pub compaction_in_memory_chunks_count_threshold: usize,
483485
pub compaction_in_memory_chunks_ratio_threshold: u64,
484486
pub compaction_in_memory_chunks_ratio_check_threshold: u64,
487+
pub compaction_in_memory_chunks_schedule_period_secs: u64,
485488
pub wal_split_threshold: u64,
486489
pub data_dir: PathBuf,
487490
pub dump_dir: Option<PathBuf>,
@@ -592,6 +595,10 @@ impl ConfigObj for ConfigObjImpl {
592595
self.compaction_in_memory_chunks_ratio_check_threshold
593596
}
594597

598+
fn compaction_in_memory_chunks_schedule_period_secs(&self) -> u64 {
599+
self.compaction_in_memory_chunks_schedule_period_secs
600+
}
601+
595602
fn wal_split_threshold(&self) -> u64 {
596603
self.wal_split_threshold
597604
}
@@ -1002,6 +1009,10 @@ impl Config {
10021009
"CUBESTORE_IN_MEMORY_CHUNKS_RATIO_CHECK_THRESHOLD",
10031010
1000,
10041011
),
1012+
compaction_in_memory_chunks_schedule_period_secs: env_parse(
1013+
"CUBESTORE_IN_MEMORY_CHUNKS_SCHEDULE_PERIOD_SECS",
1014+
5,
1015+
),
10051016
store_provider: {
10061017
if let Ok(bucket_name) = env::var("CUBESTORE_S3_BUCKET") {
10071018
FileStoreProvider::S3 {
@@ -1195,6 +1206,7 @@ impl Config {
11951206
compaction_in_memory_chunks_count_threshold: 10,
11961207
compaction_in_memory_chunks_ratio_threshold: 3,
11971208
compaction_in_memory_chunks_ratio_check_threshold: 1000,
1209+
compaction_in_memory_chunks_schedule_period_secs: 5,
11981210
store_provider: FileStoreProvider::Filesystem {
11991211
remote_dir: Some(
12001212
env::current_dir()

rust/cubestore/cubestore/src/metastore/job.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ pub enum JobType {
2020
FinishMultiSplit,
2121
RepartitionChunk,
2222
InMemoryChunksCompaction,
23+
NodeInMemoryChunksCompaction(/*node*/ String),
2324
}
2425

2526
fn get_job_type_index(j: &JobType) -> u32 {
@@ -33,6 +34,7 @@ fn get_job_type_index(j: &JobType) -> u32 {
3334
JobType::FinishMultiSplit => 7,
3435
JobType::RepartitionChunk => 8,
3536
JobType::InMemoryChunksCompaction => 9,
37+
JobType::NodeInMemoryChunksCompaction(_) => 10,
3638
}
3739
}
3840

@@ -154,7 +156,7 @@ impl RocksSecondaryIndex<Job, JobIndexKey> for JobRocksIndex {
154156
buf.write_u32::<BigEndian>(get_job_type_index(job_type))
155157
.unwrap();
156158
match job_type {
157-
JobType::TableImportCSV(l) => {
159+
JobType::TableImportCSV(l) | JobType::NodeInMemoryChunksCompaction(l) => {
158160
buf.write_u64::<BigEndian>(l.len() as u64).unwrap();
159161
buf.write(l.as_bytes()).unwrap();
160162
}

0 commit comments

Comments
 (0)