Skip to content

Commit 3db1702

Browse files
authored
chore(cubestore): Bulk in memory chunks drop (#6785)
1 parent c828f74 commit 3db1702

File tree

4 files changed

+59
-20
lines changed

4 files changed

+59
-20
lines changed

rust/cubestore/cubestore/src/cluster/message.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ pub enum NetworkMessage {
4343
},
4444
FreeMemoryChunkResult(Result<(), CubeError>),
4545

46-
FreeDeletedMemoryChunks,
46+
FreeDeletedMemoryChunks(Vec<u64>),
4747
FreeDeletedMemoryChunksResult(Result<(), CubeError>),
4848

4949
MetaStoreCall(MetaStoreRpcMethodCall),

rust/cubestore/cubestore/src/cluster/mod.rs

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,11 @@ pub trait Cluster: DIService + Send + Sync {
132132
) -> Result<(), CubeError>;
133133

134134
async fn free_memory_chunk(&self, node_name: &str, chunk_id: u64) -> Result<(), CubeError>;
135-
async fn free_deleted_memory_chunks(&self, node_name: &str) -> Result<(), CubeError>;
135+
async fn free_deleted_memory_chunks(
136+
&self,
137+
node_name: &str,
138+
chunk_ids: Vec<u64>,
139+
) -> Result<(), CubeError>;
136140

137141
fn job_result_listener(&self) -> JobResultListener;
138142

@@ -446,9 +450,16 @@ impl Cluster for ClusterImpl {
446450
}
447451
}
448452

449-
async fn free_deleted_memory_chunks(&self, node_name: &str) -> Result<(), CubeError> {
453+
async fn free_deleted_memory_chunks(
454+
&self,
455+
node_name: &str,
456+
chunk_ids: Vec<u64>,
457+
) -> Result<(), CubeError> {
450458
let response = self
451-
.send_or_process_locally(node_name, NetworkMessage::FreeDeletedMemoryChunks)
459+
.send_or_process_locally(
460+
node_name,
461+
NetworkMessage::FreeDeletedMemoryChunks(chunk_ids),
462+
)
452463
.await?;
453464
match response {
454465
NetworkMessage::FreeDeletedMemoryChunksResult(r) => r,
@@ -594,14 +605,14 @@ impl Cluster for ClusterImpl {
594605
let res = chunk_store.free_memory_chunk(chunk_id).await;
595606
NetworkMessage::FreeMemoryChunkResult(res)
596607
}
597-
NetworkMessage::FreeDeletedMemoryChunks => {
608+
NetworkMessage::FreeDeletedMemoryChunks(ids) => {
598609
let chunk_store = self
599610
.injector
600611
.upgrade()
601612
.unwrap()
602613
.get_service_typed::<dyn ChunkDataStore>()
603614
.await;
604-
let res = chunk_store.free_deleted_memory_chunks().await;
615+
let res = chunk_store.free_deleted_memory_chunks(ids).await;
605616
NetworkMessage::FreeDeletedMemoryChunksResult(res)
606617
}
607618
NetworkMessage::FreeMemoryChunkResult(_) => {

rust/cubestore/cubestore/src/scheduler/mod.rs

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ pub struct SchedulerImpl {
4444
reconcile_loop: WorkerLoop,
4545
chunk_processing_loop: WorkerLoop,
4646
chunk_events_queue: Mutex<Vec<(SystemTime, u64)>>,
47+
in_memory_chunks_to_delete: Mutex<Vec<(String, u64)>>, //(node, chunk_is)
4748
node_last_actions: Mutex<HashMap<String, LastNodeActionTimes>>,
4849
}
4950

@@ -97,6 +98,7 @@ impl SchedulerImpl {
9798
gc_queue,
9899
reconcile_loop: WorkerLoop::new("Reconcile"),
99100
chunk_events_queue: Mutex::new(Vec::with_capacity(1000)),
101+
in_memory_chunks_to_delete: Mutex::new(Vec::with_capacity(1000)),
100102
node_last_actions: Mutex::new(workers),
101103
chunk_processing_loop: WorkerLoop::new("ChunkProcessing"),
102104
}
@@ -756,9 +758,8 @@ impl SchedulerImpl {
756758
.get_partition(chunk.get_row().get_partition_id())
757759
.await?;
758760
let node_name = self.cluster.node_name_by_partition(&partition);
759-
self.cluster
760-
.free_memory_chunk(&node_name, chunk.get_id())
761-
.await?;
761+
let mut to_delete = self.in_memory_chunks_to_delete.lock().await;
762+
to_delete.push((node_name, chunk.get_id()))
762763
} else if chunk.get_row().uploaded() {
763764
let file_name =
764765
ChunkStore::chunk_remote_path(chunk.get_id(), chunk.get_row().suffix());
@@ -899,6 +900,39 @@ impl SchedulerImpl {
899900
self.process_active_chunks(active_chunks).await?;
900901
self.process_inactive_chunks(inactive_chunks).await?;
901902
}
903+
{
904+
let chunks_to_delete = {
905+
let mut chunks = self.in_memory_chunks_to_delete.lock().await;
906+
if chunks.is_empty() {
907+
Vec::new()
908+
} else {
909+
let mut result = Vec::new();
910+
std::mem::swap(&mut *chunks, &mut result);
911+
result
912+
}
913+
};
914+
if !chunks_to_delete.is_empty() {
915+
let chunks_to_delete = chunks_to_delete.into_iter().into_group_map();
916+
for (node, ids) in chunks_to_delete {
917+
if !ids.is_empty() {
918+
if let Err(e) = self
919+
.cluster
920+
.free_deleted_memory_chunks(&node, ids.clone())
921+
.await
922+
{
923+
log::error!(
924+
"Error while trying release in memory chunks in node {}: {}",
925+
node,
926+
e
927+
);
928+
929+
let mut chunks = self.in_memory_chunks_to_delete.lock().await;
930+
chunks.extend(ids.iter().map(|v| (node.clone(), *v)));
931+
}
932+
}
933+
}
934+
}
935+
}
902936

903937
Ok(())
904938
}

rust/cubestore/cubestore/src/store/mod.rs

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,7 @@ pub trait ChunkDataStore: DIService + Send + Sync {
252252
) -> Result<(Vec<ArrayRef>, Vec<u64>), CubeError>;
253253
async fn add_memory_chunk(&self, chunk_id: u64, batch: RecordBatch) -> Result<(), CubeError>;
254254
async fn free_memory_chunk(&self, chunk_id: u64) -> Result<(), CubeError>;
255-
async fn free_deleted_memory_chunks(&self) -> Result<(), CubeError>;
255+
async fn free_deleted_memory_chunks(&self, chunk_ids: Vec<u64>) -> Result<(), CubeError>;
256256
async fn add_persistent_chunk(
257257
&self,
258258
index: IdRow<Index>,
@@ -705,19 +705,13 @@ impl ChunkDataStore for ChunkStore {
705705
memory_chunks.remove(&chunk_id);
706706
Ok(())
707707
}
708-
#[tracing::instrument(level = "trace", skip(self))]
709-
async fn free_deleted_memory_chunks(&self) -> Result<(), CubeError> {
710-
let existing_chunk_ids = self
711-
.meta_store
712-
.get_all_node_in_memory_chunks(self.cluster.server_name().to_string())
713-
.await?
714-
.into_iter()
715-
.map(|c| c.get_id())
716-
.collect::<HashSet<_>>();
717708

709+
#[tracing::instrument(level = "trace", skip(self))]
710+
async fn free_deleted_memory_chunks(&self, chunk_ids: Vec<u64>) -> Result<(), CubeError> {
711+
let ids_set = chunk_ids.into_iter().collect::<HashSet<_>>();
718712
{
719713
let mut memory_chunks = self.memory_chunks.write().await;
720-
memory_chunks.retain(|id, _| existing_chunk_ids.contains(id));
714+
memory_chunks.retain(|id, _| !ids_set.contains(id));
721715
}
722716

723717
self.report_in_memory_metrics().await?;

0 commit comments

Comments
 (0)