Skip to content

Commit e6faacb

Browse files
authored
Fix panic during spill to disk in clickbench query (#19421)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #18474 ## Rationale for this change WIthout this fix running clickbench with limited ram panics: ```sql SELECT "UserID", extract(minute FROM to_timestamp_seconds("EventTime")) AS m, "SearchPhrase", COUNT(*) FROM 'benchmarks/data/hits_partitioned' GROUP BY "UserID", m, "SearchPhrase" ORDER BY COUNT(*) DESC LIMIT 10; ``` ```shell andrewlamb@Andrews-MacBook-Pro-3:~/Software/datafusion2$ cargo run --bin datafusion-cli -- -m 1G -c "SELECT \"UserID\", extract(minute FROM to_timestamp_seconds(\"EventTime\")) AS m, \"SearchPhrase\", COUNT(*) FROM '/Users/andrewlamb/Software/datafusion/benchmarks/data/hits_partitioned' GROUP BY \"UserID\", m, \"SearchPhrase\" ORDER BY COUNT(*) DESC LIMIT 10;" Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.40s Running `target/debug/datafusion-cli -m 1G -c 'SELECT "UserID", extract(minute FROM to_timestamp_seconds("EventTime")) AS m, "SearchPhrase", COUNT(*) FROM '\''/Users/andrewlamb/Software/datafusion/benchmarks/data/hits_partitioned'\'' GROUP BY "UserID", m, "SearchPhrase" ORDER BY COUNT(*) DESC LIMIT 10;'` DataFusion CLI v51.0.0 thread 'tokio-runtime-worker' (4994761) panicked at datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes_view.rs:466:53: range end index 2094219 out of range for slice of length 1066 note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace ``` ## What changes are included in this PR? Fix the bug. This was almost entirely written by codex (prompt below) <details><summary>Prompt</summary> <p> ``` This command causes a panic cargo run --bin datafusion-cli -- -m 1G -c "SELECT \"UserID\", extract(minute FROM to_timestamp_seconds(\"EventTime\")) AS m, \"SearchPhrase\", COUNT(*) FROM '/Users/andrewlamb/Software/datafusion/benchmarks/data/hits_partitioned' GROUP BY \"UserID\", m, \"SearchPhrase\" ORDER BY COUNT(*) DESC LIMIT 10;" It panics in ByteViewGroupValueBuilder::take_buffers_with_partial_last I think the problem happens due to a bug in the take_n implementation thread 'tokio-runtime-worker' (4978703) panicked at datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes_view.rs:466:53: range end index 2095248 out of range for slice of length 1370 stack backtrace: 0: __rustc::rust_begin_unwind at /rustc/ded5c06cf21d2b93bffd5d884aa6e96934ee4234/library/std/src/panicking.rs:698:5 1: core::panicking::panic_fmt at /rustc/ded5c06cf21d2b93bffd5d884aa6e96934ee4234/library/core/src/panicking.rs:80:14 2: core::slice::index::slice_index_fail::do_panic::runtime at /rustc/ded5c06cf21d2b93bffd5d884aa6e96934ee4234/library/core/src/panic.rs:173:21 3: core::slice::index::slice_index_fail at /rustc/ded5c06cf21d2b93bffd5d884aa6e96934ee4234/library/core/src/panic.rs:178:9 4: <core::ops::range::Range<usize> as core::slice::index::SliceIndex<[T]>>::index at /Users/andrewlamb/.rustup/toolchains/1.92.0-aarch64-apple-darwin/lib/rustlib/src/rust/library/core/src/slice/index.rs:438:13 5: core::slice::index::<impl core::ops::index::Index<I> for [T]>::index at /Users/andrewlamb/.rustup/toolchains/1.92.0-aarch64-apple-darwin/lib/rustlib/src/rust/library/core/src/slice/index.rs:18:15 6: <alloc::vec::Vec<T,A> as core::ops::index::Index<I>>::index at /Users/andrewlamb/.rustup/toolchains/1.92.0-aarch64-apple-darwin/lib/rustlib/src/rust/library/alloc/src/vec/mod.rs:3628:9 7: datafusion_physical_plan::aggregates::group_values::multi_group_by::bytes_view::ByteViewGroupValueBuilder<B>::take_buffers_with_partial_last at ./datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes_view.rs:466:53 8: datafusion_physical_plan::aggregates::group_values::multi_group_by::bytes_view::ByteViewGroupValueBuilder<B>::take_n_inner at ./datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes_view.rs:399:18 9: <datafusion_physical_plan::aggregates::group_values::multi_group_by::bytes_view::ByteViewGroupValueBuilder<B> as datafusion_physical_plan::aggregates::group_values::multi_group_by::GroupColumn>::take_n at ./datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes_view.rs:541:14 10: <datafusion_physical_plan::aggregates::group_values::multi_group_by::GroupValuesColumn<_> as datafusion_physical_plan::aggregates::group_values::GroupValues>::emit::{{closure}} at ./datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs:1097:32 11: core::iter::adapters::map::map_fold::{{closure}} Please find and fix the bug ``` </p> </details> ## Are these changes tested? Yes, there is a test included ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. -->
1 parent 72f1746 commit e6faacb

File tree

1 file changed

+32
-6
lines changed
  • datafusion/physical-plan/src/aggregates/group_values/multi_group_by

1 file changed

+32
-6
lines changed

datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes_view.rs

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -457,21 +457,23 @@ impl<B: ByteViewType> ByteViewGroupValueBuilder<B> {
457457
last_take_len: usize,
458458
) -> Vec<Buffer> {
459459
let mut take_buffers = Vec::with_capacity(last_remaining_buffer_index + 1);
460+
debug_assert!(last_remaining_buffer_index <= self.completed.len());
460461

461-
// Take `0 ~ last_remaining_buffer_index - 1` buffers
462-
if !self.completed.is_empty() || last_remaining_buffer_index == 0 {
463-
take_buffers.extend(self.completed.drain(0..last_remaining_buffer_index));
464-
}
465-
466-
// Process the `last_remaining_buffer_index` buffers
462+
// Process the `last_remaining_buffer_index` buffer before draining so the index is valid.
467463
let last_buffer = if last_remaining_buffer_index < self.completed.len() {
468464
// If it is in `completed`, simply clone
469465
self.completed[last_remaining_buffer_index].clone()
470466
} else {
471467
// If it is `in_progress`, copied `0 ~ offset` part
468+
debug_assert!(last_take_len <= self.in_progress.len());
472469
let taken_last_buffer = self.in_progress[0..last_take_len].to_vec();
473470
Buffer::from_vec(taken_last_buffer)
474471
};
472+
473+
// Take `0 ~ last_remaining_buffer_index - 1` buffers
474+
if last_remaining_buffer_index > 0 {
475+
take_buffers.extend(self.completed.drain(0..last_remaining_buffer_index));
476+
}
475477
take_buffers.push(last_buffer);
476478

477479
take_buffers
@@ -948,4 +950,28 @@ mod tests {
948950
let taken_array = builder.take_n(final_ones_to_append);
949951
assert_eq!(&taken_array, &input_array);
950952
}
953+
954+
#[test]
955+
fn test_byte_view_take_n_partial_completed_nonzero_index() {
956+
let mut builder =
957+
ByteViewGroupValueBuilder::<StringViewType>::new().with_max_block_size(30);
958+
let input_array = StringViewArray::from(vec![
959+
Some("aaaaaaaaaaaaaa"),
960+
Some("bbbbbbbbbbbbbb"),
961+
Some("cccccccccccccc"),
962+
Some("dddddddddddddd"),
963+
Some("eeeeeeeeeeeeee"),
964+
]);
965+
let input_array: ArrayRef = Arc::new(input_array);
966+
967+
for row in 0..input_array.len() {
968+
builder.append_val(&input_array, row).unwrap();
969+
}
970+
971+
assert_eq!(builder.completed.len(), 2);
972+
assert_eq!(builder.in_progress.len(), 14);
973+
974+
let taken_array = builder.take_n(3);
975+
assert_eq!(&taken_array, &input_array.slice(0, 3));
976+
}
951977
}

0 commit comments

Comments
 (0)