Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
55e32d2
identify a few spots
paleolimbot Oct 1, 2025
9a88f7e
add a dyn accumulator
paleolimbot Oct 1, 2025
6a34eaf
document and set
paleolimbot Oct 1, 2025
d59c644
maybe pipe into the encoder
paleolimbot Oct 1, 2025
2f268f3
try
paleolimbot Oct 1, 2025
5c9547e
remove some previous work
paleolimbot Oct 1, 2025
6516544
plausibly working
paleolimbot Oct 1, 2025
629afc7
test
paleolimbot Oct 1, 2025
7ae4f17
failing test but a bit better
paleolimbot Oct 2, 2025
523575d
passing test
paleolimbot Oct 2, 2025
203ea9c
test!
paleolimbot Oct 2, 2025
5305e4e
ensure size stats are written for geometry/geography from generic enc…
paleolimbot Oct 6, 2025
3089b69
remove tests that will start failing when new thift footer merges
paleolimbot Oct 6, 2025
9e12b57
more flexible testers
paleolimbot Oct 6, 2025
a3b729b
more tests
paleolimbot Oct 7, 2025
f8b58c6
add roundtrip tests, fix accumulator for the all empty case
paleolimbot Oct 7, 2025
7472ba6
more test files
paleolimbot Oct 7, 2025
be7b522
use code-compatible stats accessor
paleolimbot Oct 7, 2025
e60cd98
fix test for new accessor
paleolimbot Oct 7, 2025
a34e5c4
add documentation for new trait member
paleolimbot Oct 7, 2025
15bbe3c
explicit schema test
paleolimbot Oct 7, 2025
0fca11f
document the accumulator
paleolimbot Oct 7, 2025
cd0f609
tests
paleolimbot Oct 7, 2025
92d0d73
rename
paleolimbot Oct 7, 2025
182776e
once lock thinger
paleolimbot Oct 7, 2025
4c7c52a
remove uneeded todo
paleolimbot Oct 7, 2025
0e600b4
remove copied comment
paleolimbot Oct 7, 2025
59a00ed
add better docstring
paleolimbot Oct 7, 2025
a024793
Apply suggestions from code review
paleolimbot Oct 8, 2025
3798609
more monospace
paleolimbot Oct 8, 2025
85ebb72
more monospace
paleolimbot Oct 8, 2025
2bc7bbe
more compact updater
paleolimbot Oct 8, 2025
431da25
try_new_geo_stats_accumulator()
paleolimbot Oct 8, 2025
6156112
fix link
paleolimbot Oct 8, 2025
ed85f90
Merge branch 'main' into spatial-stats-write
paleolimbot Oct 8, 2025
9d6d6c3
fix build
paleolimbot Oct 8, 2025
392d949
remove duplicate test
paleolimbot Oct 8, 2025
f9112f7
maybe merge tests better
paleolimbot Oct 8, 2025
ec31096
Apply suggestions from code review
paleolimbot Oct 9, 2025
eac356e
document feature flag
paleolimbot Oct 9, 2025
1bb2cd8
verify stats/null count
paleolimbot Oct 9, 2025
d5ba2f2
test column index
paleolimbot Oct 9, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ arrow-data = { workspace = true, optional = true }
arrow-schema = { workspace = true, optional = true }
arrow-select = { workspace = true, optional = true }
arrow-ipc = { workspace = true, optional = true }
parquet-geospatial = { workspace = true, optional = true }
parquet-variant = { workspace = true, optional = true }
parquet-variant-json = { workspace = true, optional = true }
parquet-variant-compute = { workspace = true, optional = true }
Expand Down Expand Up @@ -131,6 +132,8 @@ flate2-rust_backened = ["flate2/rust_backend"]
flate2-zlib-rs = ["flate2/zlib-rs"]
# Enable parquet variant support
variant_experimental = ["arrow", "parquet-variant", "parquet-variant-json", "parquet-variant-compute"]
# Enable geospatial support
geospatial = ["parquet-geospatial"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please also add the new feature flag to the main crate readme as well?

https://github.com/apache/arrow-rs/blob/main/parquet/README.md#feature-flags



[[example]]
Expand Down
51 changes: 49 additions & 2 deletions parquet/src/arrow/arrow_writer/byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,18 @@
// specific language governing permissions and limitations
// under the License.

use crate::basic::Encoding;
use crate::basic::{Encoding, LogicalType};
use crate::bloom_filter::Sbbf;
use crate::column::writer::encoder::{ColumnValueEncoder, DataPageValues, DictionaryPage};
use crate::data_type::{AsBytes, ByteArray, Int32Type};
use crate::encodings::encoding::{DeltaBitPackEncoder, Encoder};
use crate::encodings::rle::RleEncoder;
use crate::errors::{ParquetError, Result};
use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion};
use crate::geospatial::accumulator::{
DefaultGeoStatsAccumulatorFactory, GeoStatsAccumulator, GeoStatsAccumulatorFactory,
};
use crate::geospatial::statistics::GeospatialStatistics;
use crate::schema::types::ColumnDescPtr;
use crate::util::bit_util::num_required_bits;
use crate::util::interner::{Interner, Storage};
Expand Down Expand Up @@ -421,6 +425,7 @@ pub struct ByteArrayEncoder {
min_value: Option<ByteArray>,
max_value: Option<ByteArray>,
bloom_filter: Option<Sbbf>,
geo_stats_accumulator: Option<Box<dyn GeoStatsAccumulator>>,
}

impl ColumnValueEncoder for ByteArrayEncoder {
Expand All @@ -447,13 +452,23 @@ impl ColumnValueEncoder for ByteArrayEncoder {

let statistics_enabled = props.statistics_enabled(descr.path());

let geo_stats_accumulator = if matches!(
descr.logical_type(),
Some(LogicalType::Geometry) | Some(LogicalType::Geography)
) {
Some(DefaultGeoStatsAccumulatorFactory::default().new_accumulator(descr))
} else {
None
};

Ok(Self {
fallback,
statistics_enabled,
bloom_filter,
dict_encoder: dictionary,
min_value: None,
max_value: None,
geo_stats_accumulator,
})
}

Expand Down Expand Up @@ -536,6 +551,14 @@ impl ColumnValueEncoder for ByteArrayEncoder {
_ => self.fallback.flush_data_page(min_value, max_value),
}
}

fn flush_geospatial_statistics(&mut self) -> Option<Box<GeospatialStatistics>> {
if let Some(accumulator) = self.geo_stats_accumulator.as_mut() {
accumulator.finish()
} else {
None
}
}
}

/// Encodes the provided `values` and `indices` to `encoder`
Expand All @@ -547,7 +570,10 @@ where
T::Item: Copy + Ord + AsRef<[u8]>,
{
if encoder.statistics_enabled != EnabledStatistics::None {
if let Some((min, max)) = compute_min_max(values, indices.iter().cloned()) {
// TODO ensure Converted interval types have no stats written for them?
if let Some(accumulator) = encoder.geo_stats_accumulator.as_mut() {
update_geo_stats_accumulator(accumulator.as_mut(), values, indices.iter().cloned());
} else if let Some((min, max)) = compute_min_max(values, indices.iter().cloned()) {
if encoder.min_value.as_ref().is_none_or(|m| m > &min) {
encoder.min_value = Some(min);
}
Expand Down Expand Up @@ -595,3 +621,24 @@ where
}
Some((min.as_ref().to_vec().into(), max.as_ref().to_vec().into()))
}

/// Updates geospatial statistics for the provided array and indices
///
/// This is a free function so it can be used with `downcast_op!`
fn update_geo_stats_accumulator<T>(
bounder: &mut dyn GeoStatsAccumulator,
array: T,
valid: impl Iterator<Item = usize>,
) where
T: ArrayAccessor,
T::Item: Copy + Ord + AsRef<[u8]>,
{
if !bounder.is_valid() {
return;
}

for idx in valid {
let val = array.value(idx);
bounder.update_wkb(val.as_ref());
}
}
26 changes: 21 additions & 5 deletions parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,11 +229,18 @@ impl<W: Write + Send> ArrowWriter<W> {
options: ArrowWriterOptions,
) -> Result<Self> {
let mut props = options.properties;
let mut converter = ArrowSchemaConverter::new().with_coerce_types(props.coerce_types());
if let Some(schema_root) = &options.schema_root {
converter = converter.schema_root(schema_root);
}
let schema = converter.convert(&arrow_schema)?;

let schema = if let Some(parquet_schema) = options.schema_descr {
parquet_schema.clone()
} else {
let mut converter = ArrowSchemaConverter::new().with_coerce_types(props.coerce_types());
if let Some(schema_root) = &options.schema_root {
converter = converter.schema_root(schema_root);
}

converter.convert(&arrow_schema)?
};

if !options.skip_arrow_metadata {
// add serialized arrow schema
add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut props);
Expand Down Expand Up @@ -458,6 +465,7 @@ pub struct ArrowWriterOptions {
properties: WriterProperties,
skip_arrow_metadata: bool,
schema_root: Option<String>,
schema_descr: Option<SchemaDescriptor>,
}

impl ArrowWriterOptions {
Expand Down Expand Up @@ -491,6 +499,14 @@ impl ArrowWriterOptions {
..self
}
}

/// Explicitly specify the Parquet schema to be used
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a nice API addition I think

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this API actually ends up being a bit problematic, the reason being the type inference and coercion machinery are supposed to mirror each other.

With this change:

  • You can write files that won't roundtrip correctly, as the reader doesn't understand the types in the arrow schema (and will just ignore them)
  • You can end up with incorrect type coercion for types, e.g. unsigned types not being handled correctly

Further this interferes with removing arrow_cast as a dependency - #9077

I'm not sure what the intention of this API is, why can't the arrays just be cast before being written, why does this logic need to live within the parquet writer itself?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think one rationale was to put the appropriate metadata on the field (so the parquet writer knew what logical type to add), but I make be mistaken

I don't fully understand the concerns about type coercion, but at least part of this API I think is designed to allow interoperability between other arrow implementations (aka not just reading back arrays that were written in Rust, but writing arrays that other writers will accept)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I put a note on the other PR as well, but the intention was really just to be able to add the test that I needed to add at the time.

I don't have opinions about how this kind of thing should work here in particular, but a schema request across a type boundary (e.g. pyarrow.table(xxx, schema=xxx) is quite common and nicely separates the destination type inference (usually lossy with some choices to be made) from the conversion (either write the source type or error if this is not possible). The API here was basically an escape hatch in the event that the built-in Parquet schema inference did the wrong thing (which it did for spatial types at the time that I added it).

pub fn with_parquet_schema(self, schema_descr: SchemaDescriptor) -> Self {
Self {
schema_descr: Some(schema_descr),
..self
}
}
}

/// A single column chunk produced by [`ArrowColumnWriter`]
Expand Down
57 changes: 50 additions & 7 deletions parquet/src/column/writer/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ use crate::data_type::DataType;
use crate::encodings::encoding::{get_encoder, DictEncoder, Encoder};
use crate::errors::{ParquetError, Result};
use crate::file::properties::{EnabledStatistics, WriterProperties};
use crate::geospatial::accumulator::{
DefaultGeoStatsAccumulatorFactory, GeoStatsAccumulator, GeoStatsAccumulatorFactory,
};
use crate::geospatial::statistics::GeospatialStatistics;
use crate::schema::types::{ColumnDescPtr, ColumnDescriptor};

/// A collection of [`ParquetValueType`] encoded by a [`ColumnValueEncoder`]
Expand Down Expand Up @@ -121,6 +125,8 @@ pub trait ColumnValueEncoder {
/// will *not* be tracked by the bloom filter as it is empty since. This should be called once
/// near the end of encoding.
fn flush_bloom_filter(&mut self) -> Option<Sbbf>;

fn flush_geospatial_statistics(&mut self) -> Option<Box<GeospatialStatistics>>;
}

pub struct ColumnValueEncoderImpl<T: DataType> {
Expand All @@ -133,6 +139,7 @@ pub struct ColumnValueEncoderImpl<T: DataType> {
max_value: Option<T::T>,
bloom_filter: Option<Sbbf>,
variable_length_bytes: Option<i64>,
geo_stats_accumulator: Option<Box<dyn GeoStatsAccumulator>>,
}

impl<T: DataType> ColumnValueEncoderImpl<T> {
Expand All @@ -145,16 +152,20 @@ impl<T: DataType> ColumnValueEncoderImpl<T> {

fn write_slice(&mut self, slice: &[T::T]) -> Result<()> {
if self.statistics_enabled != EnabledStatistics::None
// INTERVAL has undefined sort order, so don't write min/max stats for it
// INTERVAL, Geometry, and Geography have undefined sort order,so don't write min/max stats for them
&& self.descr.converted_type() != ConvertedType::INTERVAL
{
if let Some((min, max)) = self.min_max(slice, None) {
update_min(&self.descr, &min, &mut self.min_value);
update_max(&self.descr, &max, &mut self.max_value);
}
if let Some(accumulator) = self.geo_stats_accumulator.as_mut() {
update_geo_stats_accumulator(accumulator.as_mut(), slice.iter());
} else {
if let Some((min, max)) = self.min_max(slice, None) {
update_min(&self.descr, &min, &mut self.min_value);
update_max(&self.descr, &max, &mut self.max_value);
}

if let Some(var_bytes) = T::T::variable_length_bytes(slice) {
*self.variable_length_bytes.get_or_insert(0) += var_bytes;
if let Some(var_bytes) = T::T::variable_length_bytes(slice) {
*self.variable_length_bytes.get_or_insert(0) += var_bytes;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should execute regardless of whether geo stats are enabled. The variable_length_bytes are ultimately written to the SizeStatistics which are useful even without min/max statistics.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

}
}

Expand Down Expand Up @@ -201,6 +212,15 @@ impl<T: DataType> ColumnValueEncoder for ColumnValueEncoderImpl<T> {
.map(|props| Sbbf::new_with_ndv_fpp(props.ndv, props.fpp))
.transpose()?;

let geo_stats_accumulator = if matches!(
descr.logical_type(),
Some(LogicalType::Geometry) | Some(LogicalType::Geography)
) {
Some(DefaultGeoStatsAccumulatorFactory::default().new_accumulator(descr))
} else {
None
};

Ok(Self {
encoder,
dict_encoder,
Expand All @@ -211,6 +231,7 @@ impl<T: DataType> ColumnValueEncoder for ColumnValueEncoderImpl<T> {
min_value: None,
max_value: None,
variable_length_bytes: None,
geo_stats_accumulator,
})
}

Expand Down Expand Up @@ -307,6 +328,14 @@ impl<T: DataType> ColumnValueEncoder for ColumnValueEncoderImpl<T> {
variable_length_bytes: self.variable_length_bytes.take(),
})
}

fn flush_geospatial_statistics(&mut self) -> Option<Box<GeospatialStatistics>> {
if let Some(accumulator) = self.geo_stats_accumulator.as_mut() {
accumulator.finish()
} else {
None
}
}
}

fn get_min_max<'a, T, I>(descr: &ColumnDescriptor, mut iter: I) -> Option<(T, T)>
Expand Down Expand Up @@ -367,3 +396,17 @@ fn replace_zero<T: ParquetValueType>(val: &T, descr: &ColumnDescriptor, replace:
_ => val.clone(),
}
}

fn update_geo_stats_accumulator<'a, T, I>(bounder: &mut dyn GeoStatsAccumulator, iter: I)
where
T: ParquetValueType + 'a,
I: Iterator<Item = &'a T>,
{
if !bounder.is_valid() {
return;
}

for val in iter {
bounder.update_wkb(val.as_bytes());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could go inside if bounder.is_valid() instead. 🤷

}
4 changes: 4 additions & 0 deletions parquet/src/column/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1219,6 +1219,10 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
.set_definition_level_histogram(
self.column_metrics.definition_level_histogram.take(),
);

if let Some(geo_stats) = self.encoder.flush_geospatial_statistics() {
builder = builder.set_geo_statistics(geo_stats);
}
}

builder = self.set_column_chunk_encryption_properties(builder);
Expand Down
3 changes: 3 additions & 0 deletions parquet/src/file/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,9 @@ impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> {
if let Some(statistics) = metadata.statistics() {
builder = builder.set_statistics(statistics.clone())
}
if let Some(geo_statistics) = metadata.geo_statistics() {
builder = builder.set_geo_statistics(Box::new(geo_statistics.clone()))
}
if let Some(page_encoding_stats) = metadata.page_encoding_stats() {
builder = builder.set_page_encoding_stats(page_encoding_stats.clone())
}
Expand Down
Loading
Loading