Skip to content

Commit 9e03577

Browse files
committed
minor: Move PreparedAccessPlan to same module as ParquetAccessPlan
1 parent d2278a9 commit 9e03577

File tree

3 files changed

+95
-92
lines changed

3 files changed

+95
-92
lines changed

datafusion/datasource-parquet/src/access_plan.rs

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,10 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use crate::sort::reverse_row_selection;
1819
use datafusion_common::{Result, assert_eq_or_internal_err};
1920
use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
20-
use parquet::file::metadata::RowGroupMetaData;
21+
use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
2122

2223
/// A selection of rows and row groups within a ParquetFile to decode.
2324
///
@@ -337,6 +338,58 @@ impl ParquetAccessPlan {
337338
pub fn into_inner(self) -> Vec<RowGroupAccess> {
338339
self.row_groups
339340
}
341+
342+
/// Prepare/finalize the access plan
343+
pub fn prepare(
344+
self,
345+
row_group_meta_data: &[RowGroupMetaData],
346+
) -> Result<PreparedAccessPlan> {
347+
let row_group_indexes = self.row_group_indexes();
348+
let row_selection = self.into_overall_row_selection(row_group_meta_data)?;
349+
350+
PreparedAccessPlan::new(row_group_indexes, row_selection)
351+
}
352+
}
353+
354+
/// Represents a prepared access plan with optional row selection
355+
pub struct PreparedAccessPlan {
356+
/// Row group indexes to read
357+
pub row_group_indexes: Vec<usize>,
358+
/// Optional row selection for filtering within row groups
359+
pub row_selection: Option<RowSelection>,
360+
}
361+
362+
impl PreparedAccessPlan {
363+
/// Create a new prepared access plan
364+
fn new(
365+
row_group_indexes: Vec<usize>,
366+
row_selection: Option<RowSelection>,
367+
) -> Result<Self> {
368+
Ok(Self {
369+
row_group_indexes,
370+
row_selection,
371+
})
372+
}
373+
374+
/// Reverse the access plan for reverse scanning
375+
pub(crate) fn reverse(mut self, file_metadata: &ParquetMetaData) -> Result<Self> {
376+
// Get the row group indexes before reversing
377+
let row_groups_to_scan = self.row_group_indexes.clone();
378+
379+
// Reverse the row group indexes
380+
self.row_group_indexes = self.row_group_indexes.into_iter().rev().collect();
381+
382+
// If we have a row selection, reverse it to match the new row group order
383+
if let Some(row_selection) = self.row_selection {
384+
self.row_selection = Some(reverse_row_selection(
385+
&row_selection,
386+
file_metadata,
387+
&row_groups_to_scan, // Pass the original (non-reversed) row group indexes
388+
)?);
389+
}
390+
391+
Ok(self)
392+
}
340393
}
341394

342395
#[cfg(test)]

datafusion/datasource-parquet/src/opener.rs

Lines changed: 2 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@ use datafusion_physical_plan::metrics::{
5252
};
5353
use datafusion_pruning::{FilePruner, PruningPredicate, build_pruning_predicate};
5454

55-
use crate::sort::reverse_row_selection;
5655
#[cfg(feature = "parquet_encryption")]
5756
use datafusion_common::config::EncryptionFactoryOptions;
5857
#[cfg(feature = "parquet_encryption")]
@@ -67,7 +66,7 @@ use parquet::arrow::arrow_reader::{
6766
use parquet::arrow::async_reader::AsyncFileReader;
6867
use parquet::arrow::push_decoder::{ParquetPushDecoder, ParquetPushDecoderBuilder};
6968
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
70-
use parquet::file::metadata::{PageIndexPolicy, ParquetMetaDataReader, RowGroupMetaData};
69+
use parquet::file::metadata::{PageIndexPolicy, ParquetMetaDataReader};
7170

7271
/// Implements [`FileOpener`] for a parquet file
7372
pub(super) struct ParquetOpener {
@@ -125,53 +124,6 @@ pub(super) struct ParquetOpener {
125124
pub reverse_row_groups: bool,
126125
}
127126

128-
/// Represents a prepared access plan with optional row selection
129-
pub(crate) struct PreparedAccessPlan {
130-
/// Row group indexes to read
131-
pub(crate) row_group_indexes: Vec<usize>,
132-
/// Optional row selection for filtering within row groups
133-
pub(crate) row_selection: Option<parquet::arrow::arrow_reader::RowSelection>,
134-
}
135-
136-
impl PreparedAccessPlan {
137-
/// Create a new prepared access plan from a ParquetAccessPlan
138-
pub(crate) fn from_access_plan(
139-
access_plan: ParquetAccessPlan,
140-
rg_metadata: &[RowGroupMetaData],
141-
) -> Result<Self> {
142-
let row_group_indexes = access_plan.row_group_indexes();
143-
let row_selection = access_plan.into_overall_row_selection(rg_metadata)?;
144-
145-
Ok(Self {
146-
row_group_indexes,
147-
row_selection,
148-
})
149-
}
150-
151-
/// Reverse the access plan for reverse scanning
152-
pub(crate) fn reverse(
153-
mut self,
154-
file_metadata: &parquet::file::metadata::ParquetMetaData,
155-
) -> Result<Self> {
156-
// Get the row group indexes before reversing
157-
let row_groups_to_scan = self.row_group_indexes.clone();
158-
159-
// Reverse the row group indexes
160-
self.row_group_indexes = self.row_group_indexes.into_iter().rev().collect();
161-
162-
// If we have a row selection, reverse it to match the new row group order
163-
if let Some(row_selection) = self.row_selection {
164-
self.row_selection = Some(reverse_row_selection(
165-
&row_selection,
166-
file_metadata,
167-
&row_groups_to_scan, // Pass the original (non-reversed) row group indexes
168-
)?);
169-
}
170-
171-
Ok(self)
172-
}
173-
}
174-
175127
impl FileOpener for ParquetOpener {
176128
fn open(&self, partitioned_file: PartitionedFile) -> Result<FileOpenFuture> {
177129
// -----------------------------------
@@ -545,8 +497,7 @@ impl FileOpener for ParquetOpener {
545497
}
546498

547499
// Prepare the access plan (extract row groups and row selection)
548-
let mut prepared_plan =
549-
PreparedAccessPlan::from_access_plan(access_plan, rg_metadata)?;
500+
let mut prepared_plan = access_plan.prepare(rg_metadata)?;
550501

551502
// ----------------------------------------------------------
552503
// Step: potentially reverse the access plan for performance.

datafusion/datasource-parquet/src/sort.rs

Lines changed: 39 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,6 @@ pub fn reverse_row_selection(
122122
mod tests {
123123
use crate::ParquetAccessPlan;
124124
use crate::RowGroupAccess;
125-
use crate::opener::PreparedAccessPlan;
126125
use arrow::datatypes::{DataType, Field, Schema};
127126
use bytes::Bytes;
128127
use parquet::arrow::ArrowWriter;
@@ -169,9 +168,9 @@ mod tests {
169168
let access_plan = ParquetAccessPlan::new_all(3);
170169
let rg_metadata = metadata.row_groups();
171170

172-
let prepared_plan =
173-
PreparedAccessPlan::from_access_plan(access_plan, rg_metadata)
174-
.expect("Failed to create PreparedAccessPlan");
171+
let prepared_plan = access_plan
172+
.prepare(rg_metadata)
173+
.expect("Failed to create PreparedAccessPlan");
175174

176175
// Verify original plan
177176
assert_eq!(prepared_plan.row_group_indexes, vec![0, 1, 2]);
@@ -205,9 +204,9 @@ mod tests {
205204
);
206205

207206
let rg_metadata = metadata.row_groups();
208-
let prepared_plan =
209-
PreparedAccessPlan::from_access_plan(access_plan, rg_metadata)
210-
.expect("Failed to create PreparedAccessPlan");
207+
let prepared_plan = access_plan
208+
.prepare(rg_metadata)
209+
.expect("Failed to create PreparedAccessPlan");
211210

212211
let original_selected: usize = prepared_plan
213212
.row_selection
@@ -255,9 +254,9 @@ mod tests {
255254
);
256255

257256
let rg_metadata = metadata.row_groups();
258-
let prepared_plan =
259-
PreparedAccessPlan::from_access_plan(access_plan, rg_metadata)
260-
.expect("Failed to create PreparedAccessPlan");
257+
let prepared_plan = access_plan
258+
.prepare(rg_metadata)
259+
.expect("Failed to create PreparedAccessPlan");
261260

262261
let original_selected: usize = prepared_plan
263262
.row_selection
@@ -298,9 +297,9 @@ mod tests {
298297
}
299298

300299
let rg_metadata = metadata.row_groups();
301-
let prepared_plan =
302-
PreparedAccessPlan::from_access_plan(access_plan, rg_metadata)
303-
.expect("Failed to create PreparedAccessPlan");
300+
let prepared_plan = access_plan
301+
.prepare(rg_metadata)
302+
.expect("Failed to create PreparedAccessPlan");
304303

305304
let reversed_plan = prepared_plan
306305
.reverse(&metadata)
@@ -338,9 +337,9 @@ mod tests {
338337
);
339338

340339
let rg_metadata = metadata.row_groups();
341-
let prepared_plan =
342-
PreparedAccessPlan::from_access_plan(access_plan, rg_metadata)
343-
.expect("Failed to create PreparedAccessPlan");
340+
let prepared_plan = access_plan
341+
.prepare(rg_metadata)
342+
.expect("Failed to create PreparedAccessPlan");
344343

345344
let original_selected: usize = prepared_plan
346345
.row_selection
@@ -379,9 +378,9 @@ mod tests {
379378
);
380379

381380
let rg_metadata = metadata.row_groups();
382-
let prepared_plan =
383-
PreparedAccessPlan::from_access_plan(access_plan, rg_metadata)
384-
.expect("Failed to create PreparedAccessPlan");
381+
let prepared_plan = access_plan
382+
.prepare(rg_metadata)
383+
.expect("Failed to create PreparedAccessPlan");
385384

386385
let original_selected: usize = prepared_plan
387386
.row_selection
@@ -435,9 +434,9 @@ mod tests {
435434
access_plan.scan_selection(2, RowSelection::from(vec![RowSelector::select(100)]));
436435

437436
let rg_metadata = metadata.row_groups();
438-
let prepared_plan =
439-
PreparedAccessPlan::from_access_plan(access_plan, rg_metadata)
440-
.expect("Failed to create PreparedAccessPlan");
437+
let prepared_plan = access_plan
438+
.prepare(rg_metadata)
439+
.expect("Failed to create PreparedAccessPlan");
441440

442441
let original_selected: usize = prepared_plan
443442
.row_selection
@@ -502,9 +501,9 @@ mod tests {
502501
let rg_metadata = metadata.row_groups();
503502

504503
// Step 1: Create PreparedAccessPlan
505-
let prepared_plan =
506-
PreparedAccessPlan::from_access_plan(access_plan, rg_metadata)
507-
.expect("Failed to create PreparedAccessPlan");
504+
let prepared_plan = access_plan
505+
.prepare(rg_metadata)
506+
.expect("Failed to create PreparedAccessPlan");
508507

509508
// Verify original plan
510509
assert_eq!(prepared_plan.row_group_indexes, vec![0, 2, 3]);
@@ -594,9 +593,9 @@ mod tests {
594593
access_plan.scan_selection(2, RowSelection::from(vec![RowSelector::select(100)]));
595594

596595
let rg_metadata = metadata.row_groups();
597-
let prepared_plan =
598-
PreparedAccessPlan::from_access_plan(access_plan, rg_metadata)
599-
.expect("Failed to create PreparedAccessPlan");
596+
let prepared_plan = access_plan
597+
.prepare(rg_metadata)
598+
.expect("Failed to create PreparedAccessPlan");
600599

601600
let original_selected: usize = prepared_plan
602601
.row_selection
@@ -647,9 +646,9 @@ mod tests {
647646
);
648647

649648
let rg_metadata = metadata.row_groups();
650-
let prepared_plan =
651-
PreparedAccessPlan::from_access_plan(access_plan, rg_metadata)
652-
.expect("Failed to create PreparedAccessPlan");
649+
let prepared_plan = access_plan
650+
.prepare(rg_metadata)
651+
.expect("Failed to create PreparedAccessPlan");
653652

654653
let original_selected: usize = prepared_plan
655654
.row_selection
@@ -720,9 +719,9 @@ mod tests {
720719
let rg_metadata = metadata.row_groups();
721720

722721
// Step 1: Create PreparedAccessPlan
723-
let prepared_plan =
724-
PreparedAccessPlan::from_access_plan(access_plan, rg_metadata)
725-
.expect("Failed to create PreparedAccessPlan");
722+
let prepared_plan = access_plan
723+
.prepare(rg_metadata)
724+
.expect("Failed to create PreparedAccessPlan");
726725

727726
// Verify original plan in detail
728727
assert_eq!(prepared_plan.row_group_indexes, vec![0, 2, 3]);
@@ -862,9 +861,9 @@ mod tests {
862861
access_plan.scan_selection(2, RowSelection::from(vec![RowSelector::select(100)]));
863862

864863
let rg_metadata = metadata.row_groups();
865-
let prepared_plan =
866-
PreparedAccessPlan::from_access_plan(access_plan, rg_metadata)
867-
.expect("Failed to create PreparedAccessPlan");
864+
let prepared_plan = access_plan
865+
.prepare(rg_metadata)
866+
.expect("Failed to create PreparedAccessPlan");
868867

869868
// Verify original selection structure in detail
870869
let orig_selectors: Vec<_> = prepared_plan
@@ -944,9 +943,9 @@ mod tests {
944943
);
945944

946945
let rg_metadata = metadata.row_groups();
947-
let prepared_plan =
948-
PreparedAccessPlan::from_access_plan(access_plan, rg_metadata)
949-
.expect("Failed to create PreparedAccessPlan");
946+
let prepared_plan = access_plan
947+
.prepare(rg_metadata)
948+
.expect("Failed to create PreparedAccessPlan");
950949

951950
// Original: [0, 2]
952951
assert_eq!(prepared_plan.row_group_indexes, vec![0, 2]);

0 commit comments

Comments
 (0)