Skip to content

Commit 0f533a5

Browse files
committed
chore(cubestore): Upgrade DF: Regroup batches in compaction to avoid i32 offset overflow
1 parent 4bebd28 commit 0f533a5

File tree

2 files changed

+29
-17
lines changed

2 files changed

+29
-17
lines changed

rust/cubestore/cubestore/src/queryplanner/query_executor.rs

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2073,24 +2073,33 @@ fn combine_filters(filters: &[Expr]) -> Option<Expr> {
20732073
Some(combined_filter)
20742074
}
20752075

2076+
pub fn regroup_batch_onto(
2077+
b: RecordBatch,
2078+
max_rows: usize,
2079+
onto: &mut Vec<RecordBatch>,
2080+
) -> Result<(), CubeError> {
2081+
let mut row = 0;
2082+
while row != b.num_rows() {
2083+
let slice_len = min(b.num_rows() - row, max_rows);
2084+
onto.push(RecordBatch::try_new(
2085+
b.schema(),
2086+
b.columns()
2087+
.iter()
2088+
.map(|c| slice_copy(c.as_ref(), row, slice_len))
2089+
.collect(),
2090+
)?);
2091+
row += slice_len;
2092+
}
2093+
Ok(())
2094+
}
2095+
20762096
fn regroup_batches(
20772097
batches: Vec<RecordBatch>,
20782098
max_rows: usize,
20792099
) -> Result<Vec<RecordBatch>, CubeError> {
20802100
let mut r = Vec::with_capacity(batches.len());
20812101
for b in batches {
2082-
let mut row = 0;
2083-
while row != b.num_rows() {
2084-
let slice_len = min(b.num_rows() - row, max_rows);
2085-
r.push(RecordBatch::try_new(
2086-
b.schema(),
2087-
b.columns()
2088-
.iter()
2089-
.map(|c| slice_copy(c.as_ref(), row, slice_len))
2090-
.collect(),
2091-
)?);
2092-
row += slice_len
2093-
}
2102+
regroup_batch_onto(b, max_rows, &mut r)?;
20942103
}
20952104
Ok(r)
20962105
}

rust/cubestore/cubestore/src/store/compaction.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use crate::metastore::{
1111
};
1212
use crate::queryplanner::merge_sort::LastRowByUniqueKeyExec;
1313
use crate::queryplanner::metadata_cache::MetadataCacheFactory;
14+
use crate::queryplanner::query_executor::regroup_batch_onto;
1415
use crate::queryplanner::trace_data_loaded::{DataLoadedSize, TraceDataLoadedExec};
1516
use crate::queryplanner::{try_make_memory_data_source, QueryPlannerImpl};
1617
use crate::remotefs::{ensure_temp_file_is_dropped, RemoteFs};
@@ -668,6 +669,7 @@ impl CompactionService for CompactionServiceImpl {
668669
None,
669670
)?)
670671
}
672+
671673
Ok((store, new))
672674
})
673675
.await??;
@@ -1425,7 +1427,11 @@ pub async fn merge_chunks(
14251427
task_context: Arc<TaskContext>,
14261428
) -> Result<SendableRecordBatchStream, CubeError> {
14271429
let schema = l.schema();
1428-
let r = RecordBatch::try_new(schema.clone(), r)?;
1430+
let r_batch = RecordBatch::try_new(schema.clone(), r)?;
1431+
let mut r = Vec::<RecordBatch>::new();
1432+
// Regroup batches -- which had been concatenated and sorted -- so that SortPreservingMergeExec
1433+
// doesn't overflow i32 in interleaving or building a Utf8Array.
1434+
regroup_batch_onto(r_batch, 8192, &mut r)?;
14291435

14301436
let mut key = Vec::with_capacity(key_size);
14311437
for i in 0..key_size {
@@ -1436,10 +1442,7 @@ pub async fn merge_chunks(
14361442
));
14371443
}
14381444

1439-
let inputs = UnionExec::new(vec![
1440-
l,
1441-
try_make_memory_data_source(&[vec![r]], schema, None)?,
1442-
]);
1445+
let inputs = UnionExec::new(vec![l, try_make_memory_data_source(&[r], schema, None)?]);
14431446
let mut res: Arc<dyn ExecutionPlan> = Arc::new(SortPreservingMergeExec::new(
14441447
LexOrdering::new(key),
14451448
Arc::new(inputs),

0 commit comments

Comments
 (0)