Skip to content

Commit a535d3b

Browse files
authored
feat: add method for sync Parquet reader read bloom filter (#8024)
# Which issue does this PR close? - Closes #8023 # Rationale for this change Add sync parquet read bloom filter. # What changes are included in this PR? Add a sync `get_row_group_column_bloom_filter` # Are these changes tested? By unittests # Are there any user-facing changes? Api added
1 parent bfc7679 commit a535d3b

File tree

1 file changed

+114
-0
lines changed
  • parquet/src/arrow/arrow_reader

1 file changed

+114
-0
lines changed

parquet/src/arrow/arrow_reader/mod.rs

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,16 @@ pub use crate::arrow::array_reader::RowGroups;
3030
use crate::arrow::array_reader::{ArrayReader, ArrayReaderBuilder};
3131
use crate::arrow::schema::{parquet_to_arrow_schema_and_fields, ParquetField};
3232
use crate::arrow::{parquet_to_arrow_field_levels, FieldLevels, ProjectionMask};
33+
use crate::bloom_filter::{
34+
chunk_read_bloom_filter_header_and_offset, Sbbf, SBBF_HEADER_SIZE_ESTIMATE,
35+
};
3336
use crate::column::page::{PageIterator, PageReader};
3437
#[cfg(feature = "encryption")]
3538
use crate::encryption::decrypt::FileDecryptionProperties;
3639
use crate::errors::{ParquetError, Result};
3740
use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
3841
use crate::file::reader::{ChunkReader, SerializedPageReader};
42+
use crate::format::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};
3943
use crate::schema::types::SchemaDescriptor;
4044

4145
pub(crate) use read_plan::{ReadPlan, ReadPlanBuilder};
@@ -703,6 +707,66 @@ impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
703707
Self::new_builder(SyncReader(input), metadata)
704708
}
705709

710+
/// Read bloom filter for a column in a row group
711+
///
712+
/// Returns `None` if the column does not have a bloom filter
713+
///
714+
/// We should call this function after other forms pruning, such as projection and predicate pushdown.
715+
pub fn get_row_group_column_bloom_filter(
716+
&mut self,
717+
row_group_idx: usize,
718+
column_idx: usize,
719+
) -> Result<Option<Sbbf>> {
720+
let metadata = self.metadata.row_group(row_group_idx);
721+
let column_metadata = metadata.column(column_idx);
722+
723+
let offset: u64 = if let Some(offset) = column_metadata.bloom_filter_offset() {
724+
offset
725+
.try_into()
726+
.map_err(|_| ParquetError::General("Bloom filter offset is invalid".to_string()))?
727+
} else {
728+
return Ok(None);
729+
};
730+
731+
let buffer = match column_metadata.bloom_filter_length() {
732+
Some(length) => self.input.0.get_bytes(offset, length as usize),
733+
None => self.input.0.get_bytes(offset, SBBF_HEADER_SIZE_ESTIMATE),
734+
}?;
735+
736+
let (header, bitset_offset) =
737+
chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?;
738+
739+
match header.algorithm {
740+
BloomFilterAlgorithm::BLOCK(_) => {
741+
// this match exists to future proof the singleton algorithm enum
742+
}
743+
}
744+
match header.compression {
745+
BloomFilterCompression::UNCOMPRESSED(_) => {
746+
// this match exists to future proof the singleton compression enum
747+
}
748+
}
749+
match header.hash {
750+
BloomFilterHash::XXHASH(_) => {
751+
// this match exists to future proof the singleton hash enum
752+
}
753+
}
754+
755+
let bitset = match column_metadata.bloom_filter_length() {
756+
Some(_) => buffer.slice(
757+
(TryInto::<usize>::try_into(bitset_offset).unwrap()
758+
- TryInto::<usize>::try_into(offset).unwrap())..,
759+
),
760+
None => {
761+
let bitset_length: usize = header.num_bytes.try_into().map_err(|_| {
762+
ParquetError::General("Bloom filter length is invalid".to_string())
763+
})?;
764+
self.input.0.get_bytes(bitset_offset, bitset_length)?
765+
}
766+
};
767+
Ok(Some(Sbbf::new(&bitset)))
768+
}
769+
706770
/// Build a [`ParquetRecordBatchReader`]
707771
///
708772
/// Note: this will eagerly evaluate any `RowFilter` before returning
@@ -4720,4 +4784,54 @@ mod tests {
47204784
assert_eq!(c0.len(), c1.len());
47214785
c0.iter().zip(c1.iter()).for_each(|(l, r)| assert_eq!(l, r));
47224786
}
4787+
4788+
#[test]
4789+
fn test_get_row_group_column_bloom_filter_with_length() {
4790+
// convert to new parquet file with bloom_filter_length
4791+
let testdata = arrow::util::test_util::parquet_test_data();
4792+
let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
4793+
let file = File::open(path).unwrap();
4794+
let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
4795+
let schema = builder.schema().clone();
4796+
let reader = builder.build().unwrap();
4797+
4798+
let mut parquet_data = Vec::new();
4799+
let props = WriterProperties::builder()
4800+
.set_bloom_filter_enabled(true)
4801+
.build();
4802+
let mut writer = ArrowWriter::try_new(&mut parquet_data, schema, Some(props)).unwrap();
4803+
for batch in reader {
4804+
let batch = batch.unwrap();
4805+
writer.write(&batch).unwrap();
4806+
}
4807+
writer.close().unwrap();
4808+
4809+
// test the new parquet file
4810+
test_get_row_group_column_bloom_filter(parquet_data.into(), true);
4811+
}
4812+
4813+
#[test]
4814+
fn test_get_row_group_column_bloom_filter_without_length() {
4815+
let testdata = arrow::util::test_util::parquet_test_data();
4816+
let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
4817+
let data = Bytes::from(std::fs::read(path).unwrap());
4818+
test_get_row_group_column_bloom_filter(data, false);
4819+
}
4820+
4821+
fn test_get_row_group_column_bloom_filter(data: Bytes, with_length: bool) {
4822+
let mut builder = ParquetRecordBatchReaderBuilder::try_new(data.clone()).unwrap();
4823+
4824+
let metadata = builder.metadata();
4825+
assert_eq!(metadata.num_row_groups(), 1);
4826+
let row_group = metadata.row_group(0);
4827+
let column = row_group.column(0);
4828+
assert_eq!(column.bloom_filter_length().is_some(), with_length);
4829+
4830+
let sbbf = builder
4831+
.get_row_group_column_bloom_filter(0, 0)
4832+
.unwrap()
4833+
.unwrap();
4834+
assert!(sbbf.check(&"Hello"));
4835+
assert!(!sbbf.check(&"Hello_Not_Exists"));
4836+
}
47234837
}

0 commit comments

Comments
 (0)