Skip to content

Commit 9a1b730

Browse files
committed
Morselize into Row groups
1 parent 8dee0b3 commit 9a1b730

File tree

4 files changed

+305
-180
lines changed

4 files changed

+305
-180
lines changed

datafusion/datasource-parquet/src/access_plan.rs

Lines changed: 76 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,10 @@
1616
// under the License.
1717

1818
use crate::sort::reverse_row_selection;
19-
use datafusion_common::{Result, assert_eq_or_internal_err};
19+
use datafusion_common::{Result, assert_eq_or_internal_err, internal_datafusion_err};
2020
use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
2121
use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
22+
use std::collections::VecDeque;
2223

2324
/// A selection of rows and row groups within a ParquetFile to decode.
2425
///
@@ -396,6 +397,80 @@ impl PreparedAccessPlan {
396397

397398
Ok(self)
398399
}
400+
401+
/// Split this access plan into one plan per selected row group.
402+
///
403+
/// The returned plans preserve the current row-group ordering. If
404+
/// `row_selection` is present, it is partitioned so each returned plan
405+
/// contains only the selection entries for its single row group.
406+
pub(crate) fn into_single_row_group_plans(
407+
self,
408+
file_metadata: &ParquetMetaData,
409+
) -> Result<Vec<Self>> {
410+
let Self {
411+
row_group_indexes,
412+
row_selection,
413+
} = self;
414+
415+
let Some(row_selection) = row_selection else {
416+
return Ok(row_group_indexes
417+
.into_iter()
418+
.map(|row_group_index| Self {
419+
row_group_indexes: vec![row_group_index],
420+
row_selection: None,
421+
})
422+
.collect());
423+
};
424+
425+
let mut selectors: VecDeque<RowSelector> =
426+
Vec::<RowSelector>::from(row_selection).into();
427+
let mut plans = Vec::with_capacity(row_group_indexes.len());
428+
429+
for row_group_index in row_group_indexes {
430+
let mut remaining_rows =
431+
file_metadata.row_groups()[row_group_index].num_rows() as usize;
432+
let mut row_group_selectors = Vec::new();
433+
434+
while remaining_rows > 0 {
435+
let selector = selectors.pop_front().ok_or_else(|| {
436+
internal_datafusion_err!(
437+
"PreparedAccessPlan row selection ended before row group {row_group_index} was fully described"
438+
)
439+
})?;
440+
441+
let rows_for_group = selector.row_count.min(remaining_rows);
442+
row_group_selectors.push(if selector.skip {
443+
RowSelector::skip(rows_for_group)
444+
} else {
445+
RowSelector::select(rows_for_group)
446+
});
447+
448+
if selector.row_count > rows_for_group {
449+
let remaining_selector_rows = selector.row_count - rows_for_group;
450+
selectors.push_front(if selector.skip {
451+
RowSelector::skip(remaining_selector_rows)
452+
} else {
453+
RowSelector::select(remaining_selector_rows)
454+
});
455+
}
456+
457+
remaining_rows -= rows_for_group;
458+
}
459+
460+
plans.push(Self {
461+
row_group_indexes: vec![row_group_index],
462+
row_selection: Some(row_group_selectors.into()),
463+
});
464+
}
465+
466+
if !selectors.is_empty() {
467+
return Err(internal_datafusion_err!(
468+
"PreparedAccessPlan row selection had leftover selectors after splitting by row group"
469+
));
470+
}
471+
472+
Ok(plans)
473+
}
399474
}
400475

401476
#[cfg(test)]

0 commit comments

Comments
 (0)