Skip to content

Commit f87f60e

Browse files
authored
create PageIndexPolicy to allow optional indexes (#8071)
# Which issue does this PR close? - Closes #8070. # Rationale for this change This change introduces a more flexible way to handle page indexes (column and offset indexes) in Parquet files. Previously, the reading of these indexes was controlled by boolean flags, which indicated read required or do not read. The new `PageIndexPolicy` enum (`Off`, `Optional`, `Required`) provides finer control, allowing users to specify whether an index is not read, read if present (without error if missing), or strictly required (error if missing). # What changes are included in this PR? - Introduced a new `PageIndexPolicy` enum with `Off`, `Optional`, and `Required` variants. - Replaced the boolean `column_index` and `offset_index` fields in `ParquetMetaDataReader` with the new `PageIndexPolicy` enum. - Updated the `ParquetMetaDataReader::new()` function to initialize page index policies to `Off`, preserving previous defaults. - Modified existing `with_page_indexes`, `with_column_indexes`, and `with_offset_indexes` methods to utilize the new `PageIndexPolicy`, defaulting to `Required` when enabling indexes. - Added new methods: `with_page_index_policy`, `with_column_index_policy`, and `with_offset_index_policy` to allow direct setting of the page index policy. - Adjusted the internal logic for parsing column and offset indexes to respect the specified `PageIndexPolicy`, including returning an error if a `Required` index is not found. # Are these changes tested? Yes, a new test file `parquet/tests/page_index.rs` has been added to cover the functionality of the new `PageIndexPolicy` and its integration with `ParquetMetaDataReader`. # Are there any user-facing changes? Yes, there are user-facing changes to the `ParquetMetaDataReader` API. The `with_column_indexes` and `with_offset_indexes` methods now implicitly use `PageIndexPolicy::Required` when enabling page indexes. New methods `with_page_index_policy`, `with_column_index_policy`, and `with_offset_index_policy` have been added.
1 parent ace8dad commit f87f60e

File tree

11 files changed

+209
-72
lines changed

11 files changed

+209
-72
lines changed

parquet/benches/arrow_reader_row_filter.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ use parquet::arrow::arrow_reader::{
7070
use parquet::arrow::async_reader::AsyncFileReader;
7171
use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder, ProjectionMask};
7272
use parquet::basic::Compression;
73-
use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
73+
use parquet::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader};
7474
use parquet::file::properties::WriterProperties;
7575
use rand::{rngs::StdRng, Rng, SeedableRng};
7676
use std::ops::Range;
@@ -550,7 +550,8 @@ struct InMemoryReader {
550550

551551
impl InMemoryReader {
552552
fn try_new(inner: &Bytes) -> parquet::errors::Result<Self> {
553-
let mut metadata_reader = ParquetMetaDataReader::new().with_page_indexes(true);
553+
let mut metadata_reader =
554+
ParquetMetaDataReader::new().with_page_index_policy(PageIndexPolicy::Required);
554555
metadata_reader.try_parse(inner)?;
555556
let metadata = metadata_reader.finish().map(Arc::new)?;
556557

parquet/examples/external_metadata.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@ use arrow_cast::pretty::pretty_format_batches;
2020
use futures::TryStreamExt;
2121
use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
2222
use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder};
23-
use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader, ParquetMetaDataWriter};
23+
use parquet::file::metadata::{
24+
PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader, ParquetMetaDataWriter,
25+
};
2426
use parquet::file::properties::{EnabledStatistics, WriterProperties};
2527
use std::fs::File;
2628
use std::path::{Path, PathBuf};
@@ -111,7 +113,7 @@ async fn get_metadata_from_remote_parquet_file(
111113

112114
// tell the reader to read the page index
113115
ParquetMetaDataReader::new()
114-
.with_page_indexes(true)
116+
.with_page_index_policy(PageIndexPolicy::Required)
115117
.load_and_finish(remote_file, file_size)
116118
.await
117119
.unwrap()
@@ -160,7 +162,7 @@ fn write_metadata_to_local_file(metadata: ParquetMetaData, file: impl AsRef<Path
160162
fn read_metadata_from_local_file(file: impl AsRef<Path>) -> ParquetMetaData {
161163
let file = File::open(file).unwrap();
162164
ParquetMetaDataReader::new()
163-
.with_page_indexes(true)
165+
.with_page_index_policy(PageIndexPolicy::Required)
164166
.parse_and_finish(&file)
165167
.unwrap()
166168
}

parquet/src/arrow/arrow_reader/mod.rs

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ use crate::column::page::{PageIterator, PageReader};
3737
#[cfg(feature = "encryption")]
3838
use crate::encryption::decrypt::FileDecryptionProperties;
3939
use crate::errors::{ParquetError, Result};
40-
use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
40+
use crate::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader};
4141
use crate::file::reader::{ChunkReader, SerializedPageReader};
4242
use crate::format::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};
4343
use crate::schema::types::SchemaDescriptor;
@@ -383,8 +383,8 @@ pub struct ArrowReaderOptions {
383383
///
384384
/// [ARROW_SCHEMA_META_KEY]: crate::arrow::ARROW_SCHEMA_META_KEY
385385
supplied_schema: Option<SchemaRef>,
386-
/// If true, attempt to read `OffsetIndex` and `ColumnIndex`
387-
pub(crate) page_index: bool,
386+
/// Policy for reading offset and column indexes.
387+
pub(crate) page_index_policy: PageIndexPolicy,
388388
/// If encryption is enabled, the file decryption properties can be provided
389389
#[cfg(feature = "encryption")]
390390
pub(crate) file_decryption_properties: Option<FileDecryptionProperties>,
@@ -486,7 +486,20 @@ impl ArrowReaderOptions {
486486
/// [`ParquetMetaData::column_index`]: crate::file::metadata::ParquetMetaData::column_index
487487
/// [`ParquetMetaData::offset_index`]: crate::file::metadata::ParquetMetaData::offset_index
488488
pub fn with_page_index(self, page_index: bool) -> Self {
489-
Self { page_index, ..self }
489+
let page_index_policy = PageIndexPolicy::from(page_index);
490+
491+
Self {
492+
page_index_policy,
493+
..self
494+
}
495+
}
496+
497+
/// Set the [`PageIndexPolicy`] to determine how page indexes should be read.
498+
pub fn with_page_index_policy(self, policy: PageIndexPolicy) -> Self {
499+
Self {
500+
page_index_policy: policy,
501+
..self
502+
}
490503
}
491504

492505
/// Provide the file decryption properties to use when reading encrypted parquet files.
@@ -507,7 +520,7 @@ impl ArrowReaderOptions {
507520
///
508521
/// This can be set via [`with_page_index`][Self::with_page_index].
509522
pub fn page_index(&self) -> bool {
510-
self.page_index
523+
self.page_index_policy != PageIndexPolicy::Skip
511524
}
512525

513526
/// Retrieve the currently set file decryption properties.
@@ -556,7 +569,8 @@ impl ArrowReaderMetadata {
556569
/// `Self::metadata` is missing the page index, this function will attempt
557570
/// to load the page index by making an object store request.
558571
pub fn load<T: ChunkReader>(reader: &T, options: ArrowReaderOptions) -> Result<Self> {
559-
let metadata = ParquetMetaDataReader::new().with_page_indexes(options.page_index);
572+
let metadata =
573+
ParquetMetaDataReader::new().with_page_index_policy(options.page_index_policy);
560574
#[cfg(feature = "encryption")]
561575
let metadata =
562576
metadata.with_decryption_properties(options.file_decryption_properties.as_ref());

parquet/src/arrow/async_reader/mod.rs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ use crate::bloom_filter::{
5252
};
5353
use crate::column::page::{PageIterator, PageReader};
5454
use crate::errors::{ParquetError, Result};
55-
use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
55+
use crate::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader};
5656
use crate::file::page_index::offset_index::OffsetIndexMetaData;
5757
use crate::file::reader::{ChunkReader, Length, SerializedPageReader};
5858
use crate::format::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};
@@ -175,8 +175,9 @@ impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {
175175
options: Option<&'a ArrowReaderOptions>,
176176
) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
177177
async move {
178-
let metadata_reader = ParquetMetaDataReader::new()
179-
.with_page_indexes(options.is_some_and(|o| o.page_index));
178+
let metadata_reader = ParquetMetaDataReader::new().with_page_index_policy(
179+
PageIndexPolicy::from(options.is_some_and(|o| o.page_index())),
180+
);
180181

181182
#[cfg(feature = "encryption")]
182183
let metadata_reader = metadata_reader.with_decryption_properties(
@@ -1262,8 +1263,9 @@ mod tests {
12621263
&'a mut self,
12631264
options: Option<&'a ArrowReaderOptions>,
12641265
) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
1265-
let metadata_reader = ParquetMetaDataReader::new()
1266-
.with_page_indexes(options.is_some_and(|o| o.page_index));
1266+
let metadata_reader = ParquetMetaDataReader::new().with_page_index_policy(
1267+
PageIndexPolicy::from(options.is_some_and(|o| o.page_index())),
1268+
);
12671269
self.metadata = Some(Arc::new(
12681270
metadata_reader.parse_and_finish(&self.data).unwrap(),
12691271
));
@@ -1931,6 +1933,7 @@ mod tests {
19311933
}
19321934

19331935
#[tokio::test]
1936+
#[allow(deprecated)]
19341937
async fn test_in_memory_row_group_sparse() {
19351938
let testdata = arrow::util::test_util::parquet_test_data();
19361939
let path = format!("{testdata}/alltypes_tiny_pages.parquet");
@@ -2458,6 +2461,7 @@ mod tests {
24582461
}
24592462

24602463
#[tokio::test]
2464+
#[allow(deprecated)]
24612465
async fn empty_offset_index_doesnt_panic_in_read_row_group() {
24622466
use tokio::fs::File;
24632467
let testdata = arrow::util::test_util::parquet_test_data();
@@ -2483,6 +2487,7 @@ mod tests {
24832487
}
24842488

24852489
#[tokio::test]
2490+
#[allow(deprecated)]
24862491
async fn non_empty_offset_index_doesnt_panic_in_read_row_group() {
24872492
use tokio::fs::File;
24882493
let testdata = arrow::util::test_util::parquet_test_data();
@@ -2507,6 +2512,7 @@ mod tests {
25072512
}
25082513

25092514
#[tokio::test]
2515+
#[allow(deprecated)]
25102516
async fn empty_offset_index_doesnt_panic_in_column_chunks() {
25112517
use tempfile::TempDir;
25122518
use tokio::fs::File;

parquet/src/arrow/async_reader/store.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use std::{ops::Range, sync::Arc};
2020
use crate::arrow::arrow_reader::ArrowReaderOptions;
2121
use crate::arrow::async_reader::{AsyncFileReader, MetadataSuffixFetch};
2222
use crate::errors::{ParquetError, Result};
23-
use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
23+
use crate::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader};
2424
use bytes::Bytes;
2525
use futures::{future::BoxFuture, FutureExt, TryFutureExt};
2626
use object_store::{path::Path, ObjectStore};
@@ -200,8 +200,8 @@ impl AsyncFileReader for ParquetObjectReader {
200200
) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
201201
Box::pin(async move {
202202
let mut metadata = ParquetMetaDataReader::new()
203-
.with_column_indexes(self.preload_column_index)
204-
.with_offset_indexes(self.preload_offset_index)
203+
.with_column_index_policy(PageIndexPolicy::from(self.preload_column_index))
204+
.with_offset_index_policy(PageIndexPolicy::from(self.preload_offset_index))
205205
.with_prefetch_hint(self.metadata_size_hint);
206206

207207
#[cfg(feature = "encryption")]

parquet/src/arrow/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -467,6 +467,7 @@ mod test {
467467
use super::ProjectionMask;
468468

469469
#[test]
470+
#[allow(deprecated)]
470471
// Reproducer for https://github.com/apache/arrow-rs/issues/6464
471472
fn test_metadata_read_write_partial_offset() {
472473
let parquet_bytes = create_parquet_file();
@@ -514,6 +515,7 @@ mod test {
514515
}
515516

516517
#[test]
518+
#[allow(deprecated)]
517519
fn test_metadata_read_write_roundtrip_page_index() {
518520
let parquet_bytes = create_parquet_file();
519521

parquet/src/file/metadata/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ use crate::schema::types::{
120120
};
121121
#[cfg(feature = "encryption")]
122122
use crate::thrift::{TCompactSliceInputProtocol, TSerializable};
123-
pub use reader::{FooterTail, ParquetMetaDataReader};
123+
pub use reader::{FooterTail, PageIndexPolicy, ParquetMetaDataReader};
124124
use std::ops::Range;
125125
use std::sync::Arc;
126126
pub use writer::ParquetMetaDataWriter;

0 commit comments

Comments
 (0)