Skip to content

Commit 102caeb

Browse files
authored
minor: More comments to ParquetOpener::open() (#19677)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #. ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> `ParquetOpener::open()` is a critical function for parquet planning, it's the entry point for many major steps like row-group/file pruning. It has almost 400 lines of code now, this PR adds some markers to the code blocks/important steps, to make this function easier to navigate. (though I may have overlooked some critical steps) Ideally, we should break these blocks into utilities. I tried extracting some of them with AI, but the resulting utilities still have unclear semantics, with many input arguments and output items. Overall, the complexity doesn’t seem reduced after the change. I think it’s possible to factor them into helper functions with clear semantics, but that likely requires someone who understands the implementation details very well. ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. -->
1 parent 646213e commit 102caeb

File tree

1 file changed

+39
-4
lines changed

1 file changed

+39
-4
lines changed

datafusion/datasource-parquet/src/opener.rs

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,9 @@ impl PreparedAccessPlan {
180180

181181
impl FileOpener for ParquetOpener {
182182
fn open(&self, partitioned_file: PartitionedFile) -> Result<FileOpenFuture> {
183+
// -----------------------------------
184+
// Step: prepare configurations, etc.
185+
// -----------------------------------
183186
let file_range = partitioned_file.range.clone();
184187
let extensions = partitioned_file.extensions.clone();
185188
let file_location = partitioned_file.object_meta.location.clone();
@@ -280,6 +283,10 @@ impl FileOpener for ParquetOpener {
280283
.get_file_decryption_properties(&file_location)
281284
.await?;
282285

286+
// ---------------------------------------------
287+
// Step: try to prune the current file partition
288+
// ---------------------------------------------
289+
283290
// Prune this file using the file level statistics and partition values.
284291
// Since dynamic filters may have been updated since planning it is possible that we are able
285292
// to prune files now that we couldn't prune at planning time.
@@ -328,6 +335,10 @@ impl FileOpener for ParquetOpener {
328335

329336
file_metrics.files_ranges_pruned_statistics.add_matched(1);
330337

338+
// --------------------------------------------------------
339+
// Step: fetch Parquet metadata (and optionally page index)
340+
// --------------------------------------------------------
341+
331342
// Don't load the page index yet. Since it is not stored inline in
332343
// the footer, loading the page index if it is not needed will do
333344
// unnecessary I/O. We decide later if it is needed to evaluate the
@@ -428,14 +439,21 @@ impl FileOpener for ParquetOpener {
428439

429440
metadata_timer.stop();
430441

442+
// ---------------------------------------------------------
443+
// Step: construct builder for the final RecordBatch stream
444+
// ---------------------------------------------------------
445+
431446
let mut builder = ParquetRecordBatchStreamBuilder::new_with_metadata(
432447
async_file_reader,
433448
reader_metadata,
434449
);
435450

436-
let indices = projection.column_indices();
437-
438-
let mask = ProjectionMask::roots(builder.parquet_schema(), indices);
451+
// ---------------------------------------------------------------------
452+
// Step: optionally add row filter to the builder
453+
//
454+
// Row filter is used for late materialization in parquet decoding, see
455+
// `row_filter` for details.
456+
// ---------------------------------------------------------------------
439457

440458
// Filter pushdown: evaluate predicates during scan
441459
if let Some(predicate) = pushdown_filters.then_some(predicate).flatten() {
@@ -464,6 +482,10 @@ impl FileOpener for ParquetOpener {
464482
builder.with_row_selection_policy(RowSelectionPolicy::Selectors);
465483
}
466484

485+
// ------------------------------------------------------------
486+
// Step: prune row groups by range, predicate and bloom filter
487+
// ------------------------------------------------------------
488+
467489
// Determine which row groups to actually read. The idea is to skip
468490
// as many row groups as possible based on the metadata and query
469491
let file_metadata = Arc::clone(builder.metadata());
@@ -525,9 +547,13 @@ impl FileOpener for ParquetOpener {
525547

526548
let mut access_plan = row_groups.build();
527549

550+
// --------------------------------------------------------
551+
// Step: prune pages from the kept row groups
552+
//
528553
// page index pruning: if all data on individual pages can
529554
// be ruled using page metadata, rows from other columns
530555
// with that range can be skipped as well
556+
// --------------------------------------------------------
531557
if enable_page_index
532558
&& !access_plan.is_empty()
533559
&& let Some(p) = page_pruning_predicate
@@ -545,7 +571,10 @@ impl FileOpener for ParquetOpener {
545571
let mut prepared_plan =
546572
PreparedAccessPlan::from_access_plan(access_plan, rg_metadata)?;
547573

548-
// If reverse scanning is enabled, reverse the prepared plan
574+
// ----------------------------------------------------------
575+
// Step: potentially reverse the access plan for performance.
576+
// See `ParquetSource::try_reverse_output` for the rationale.
577+
// ----------------------------------------------------------
549578
if reverse_row_groups {
550579
prepared_plan = prepared_plan.reverse(file_metadata.as_ref())?;
551580
}
@@ -564,6 +593,9 @@ impl FileOpener for ParquetOpener {
564593
// metrics from the arrow reader itself
565594
let arrow_reader_metrics = ArrowReaderMetrics::enabled();
566595

596+
let indices = projection.column_indices();
597+
let mask = ProjectionMask::roots(builder.parquet_schema(), indices);
598+
567599
let stream = builder
568600
.with_projection(mask)
569601
.with_batch_size(batch_size)
@@ -621,6 +653,9 @@ impl FileOpener for ParquetOpener {
621653
})
622654
});
623655

656+
// ----------------------------------------------------------------------
657+
// Step: wrap the stream so a dynamic filter can stop the file scan early
658+
// ----------------------------------------------------------------------
624659
if let Some(file_pruner) = file_pruner {
625660
Ok(EarlyStoppingStream::new(
626661
stream,

0 commit comments

Comments
 (0)