Skip to content

Commit 077e992

Browse files
committed
chore(cubestore): Upgrade DF: Set compaction row group size back to 16384
This seriously affects performance.
1 parent d3c1725 commit 077e992

File tree

1 file changed

+4
-17
lines changed

1 file changed

+4
-17
lines changed

rust/cubestore/cubestore/src/store/compaction.rs

Lines changed: 4 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -448,10 +448,6 @@ impl CompactionServiceImpl {
448448
}
449449
}
450450

451-
/// The batch size used by CompactionServiceImpl::compact. Based on MAX_BATCH_ROWS=4096 of the
452-
/// pre-DF-upgrade's MergeSortExec, but even smaller to be farther from potential i32 overflow.
453-
const COMPACT_BATCH_SIZE: usize = 2048;
454-
455451
#[async_trait]
456452
impl CompactionService for CompactionServiceImpl {
457453
async fn compact(
@@ -605,12 +601,9 @@ impl CompactionService for CompactionServiceImpl {
605601
}
606602
}
607603

608-
// We use COMPACT_BATCH_SIZE instead of ROW_GROUP_SIZE for this write, to avoid i32 Utf8 arrow
609-
// array offset overflow in some (unusual) cases.
610-
// TODO: Simply lowering the size is not great.
611604
let store = ParquetTableStore::new(
612605
index.get_row().clone(),
613-
COMPACT_BATCH_SIZE,
606+
ROW_GROUP_SIZE,
614607
self.metadata_cache_factory.clone(),
615608
);
616609
let old_partition_remote = match &new_chunk {
@@ -686,12 +679,6 @@ impl CompactionService for CompactionServiceImpl {
686679
.metadata_cache_factory
687680
.cache_factory()
688681
.make_session_config();
689-
// Set batch size to 2048 to avoid overflow in case where, perhaps, we might get repeated
690-
// large string values, such that the default value, 8192, could produce an array too big
691-
// for i32 string array offsets in a SortPreservingMergeExecStream that is constructed in
692-
// `merge_chunks`. In pre-DF-upgrade Cubestore, MergeSortExec used a local variable,
693-
// MAX_BATCH_ROWS = 4096, which might be small enough.
694-
let session_config = session_config.with_batch_size(COMPACT_BATCH_SIZE);
695682

696683
// Merge and write rows.
697684
let schema = Arc::new(arrow_schema(index.get_row()));
@@ -1358,9 +1345,9 @@ async fn write_to_files_impl(
13581345
}
13591346
};
13601347
let err = redistribute(records, store.row_group_size(), move |b| {
1361-
// See if we get an array using more than 512 MB and log it. With COMPACT_BATCH_SIZE=2048,
1362-
// this means a default batch size of 8192 might, or our row group size of 16384 really might,
1363-
// get i32 offset overflow when used in an Arrow array.
1348+
// See if we get an array using more than 512 MB and log it. This means a default batch
1349+
// size of 8192 might, or our row group size of 16384 really might, get i32 offset overflow
1350+
// when used in an Arrow array with a Utf8 column.
13641351

13651352
// First figure out what to log. (Normally we don't allocate or log anything.)
13661353
let mut loggable_overlongs = Vec::new();

0 commit comments

Comments
 (0)