Skip to content

Commit 29080d7

Browse files
authored
fix(cubestore): Reduce memory usage while truncating (#7031)
1 parent 39eebf6 commit 29080d7

File tree

2 files changed

+16
-12
lines changed

2 files changed

+16
-12
lines changed

rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -581,10 +581,7 @@ impl CacheStore for RocksCacheStore {
581581
self.store
582582
.write_operation(move |db_ref, batch_pipe| {
583583
let cache_schema = CacheItemRocksTable::new(db_ref);
584-
let rows = cache_schema.all_rows()?;
585-
for row in rows.iter() {
586-
cache_schema.delete(row.get_id(), batch_pipe)?;
587-
}
584+
cache_schema.truncate(batch_pipe)?;
588585

589586
Ok(())
590587
})
@@ -728,16 +725,10 @@ impl CacheStore for RocksCacheStore {
728725
self.store
729726
.write_operation(move |db_ref, batch_pipe| {
730727
let queue_item_schema = QueueItemRocksTable::new(db_ref.clone());
731-
let rows = queue_item_schema.all_rows()?;
732-
for row in rows.iter() {
733-
queue_item_schema.delete(row.get_id(), batch_pipe)?;
734-
}
728+
queue_item_schema.truncate(batch_pipe)?;
735729

736730
let queue_result_schema = QueueResultRocksTable::new(db_ref);
737-
let rows = queue_result_schema.all_rows()?;
738-
for row in rows.iter() {
739-
queue_result_schema.delete(row.get_id(), batch_pipe)?;
740-
}
731+
queue_result_schema.truncate(batch_pipe)?;
741732

742733
Ok(())
743734
})

rust/cubestore/cubestore/src/metastore/rocks_table.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,7 @@ where
284284
if table_id != self.table_id {
285285
return None;
286286
}
287+
287288
Some(self.table.deserialize_id_row(row_id, &value))
288289
} else {
289290
None
@@ -879,6 +880,18 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync {
879880
Ok(IdRow::new(row_id, new_row))
880881
}
881882

883+
fn truncate(&self, batch_pipe: &mut BatchPipe) -> Result<(), CubeError> {
884+
let iter = self.table_scan(self.snapshot())?;
885+
886+
for item in iter {
887+
let item = item?;
888+
889+
self.delete_impl(item, batch_pipe)?;
890+
}
891+
892+
Ok(())
893+
}
894+
882895
fn delete(&self, row_id: u64, batch_pipe: &mut BatchPipe) -> Result<IdRow<Self::T>, CubeError> {
883896
let row = self.get_row_or_not_found(row_id)?;
884897
self.delete_impl(row, batch_pipe)

0 commit comments

Comments
 (0)