|
16 | 16 | // under the License. |
17 | 17 |
|
18 | 18 | use crate::sort::reverse_row_selection; |
19 | | -use datafusion_common::{Result, assert_eq_or_internal_err}; |
| 19 | +use datafusion_common::{Result, assert_eq_or_internal_err, internal_datafusion_err}; |
20 | 20 | use parquet::arrow::arrow_reader::{RowSelection, RowSelector}; |
21 | 21 | use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData}; |
| 22 | +use std::collections::VecDeque; |
22 | 23 |
|
23 | 24 | /// A selection of rows and row groups within a ParquetFile to decode. |
24 | 25 | /// |
@@ -396,6 +397,80 @@ impl PreparedAccessPlan { |
396 | 397 |
|
397 | 398 | Ok(self) |
398 | 399 | } |
| 400 | + |
| 401 | + /// Split this access plan into one plan per selected row group. |
| 402 | + /// |
| 403 | + /// The returned plans preserve the current row-group ordering. If |
| 404 | + /// `row_selection` is present, it is partitioned so each returned plan |
| 405 | + /// contains only the selection entries for its single row group. |
| 406 | + pub(crate) fn into_single_row_group_plans( |
| 407 | + self, |
| 408 | + file_metadata: &ParquetMetaData, |
| 409 | + ) -> Result<Vec<Self>> { |
| 410 | + let Self { |
| 411 | + row_group_indexes, |
| 412 | + row_selection, |
| 413 | + } = self; |
| 414 | + |
| 415 | + let Some(row_selection) = row_selection else { |
| 416 | + return Ok(row_group_indexes |
| 417 | + .into_iter() |
| 418 | + .map(|row_group_index| Self { |
| 419 | + row_group_indexes: vec![row_group_index], |
| 420 | + row_selection: None, |
| 421 | + }) |
| 422 | + .collect()); |
| 423 | + }; |
| 424 | + |
| 425 | + let mut selectors: VecDeque<RowSelector> = |
| 426 | + Vec::<RowSelector>::from(row_selection).into(); |
| 427 | + let mut plans = Vec::with_capacity(row_group_indexes.len()); |
| 428 | + |
| 429 | + for row_group_index in row_group_indexes { |
| 430 | + let mut remaining_rows = |
| 431 | + file_metadata.row_groups()[row_group_index].num_rows() as usize; |
| 432 | + let mut row_group_selectors = Vec::new(); |
| 433 | + |
| 434 | + while remaining_rows > 0 { |
| 435 | + let selector = selectors.pop_front().ok_or_else(|| { |
| 436 | + internal_datafusion_err!( |
| 437 | + "PreparedAccessPlan row selection ended before row group {row_group_index} was fully described" |
| 438 | + ) |
| 439 | + })?; |
| 440 | + |
| 441 | + let rows_for_group = selector.row_count.min(remaining_rows); |
| 442 | + row_group_selectors.push(if selector.skip { |
| 443 | + RowSelector::skip(rows_for_group) |
| 444 | + } else { |
| 445 | + RowSelector::select(rows_for_group) |
| 446 | + }); |
| 447 | + |
| 448 | + if selector.row_count > rows_for_group { |
| 449 | + let remaining_selector_rows = selector.row_count - rows_for_group; |
| 450 | + selectors.push_front(if selector.skip { |
| 451 | + RowSelector::skip(remaining_selector_rows) |
| 452 | + } else { |
| 453 | + RowSelector::select(remaining_selector_rows) |
| 454 | + }); |
| 455 | + } |
| 456 | + |
| 457 | + remaining_rows -= rows_for_group; |
| 458 | + } |
| 459 | + |
| 460 | + plans.push(Self { |
| 461 | + row_group_indexes: vec![row_group_index], |
| 462 | + row_selection: Some(row_group_selectors.into()), |
| 463 | + }); |
| 464 | + } |
| 465 | + |
| 466 | + if !selectors.is_empty() { |
| 467 | + return Err(internal_datafusion_err!( |
| 468 | + "PreparedAccessPlan row selection had leftover selectors after splitting by row group" |
| 469 | + )); |
| 470 | + } |
| 471 | + |
| 472 | + Ok(plans) |
| 473 | + } |
399 | 474 | } |
400 | 475 |
|
401 | 476 | #[cfg(test)] |
|
0 commit comments