Skip to content

Commit 2d7bae9

Browse files
ErigaraRoman Shanin
andauthored
fix(parquet_writer): correctly update upper bound (#1520)
## Which issue does this PR close? It fixes issue with not correct upper bound in case of multiple row groups in parquet file. Closes #1519 ## What changes are included in this PR? - change `>` to `<` for updating upper bound - adjust test to write multiple row groups per file ## Are these changes tested? I've changed a test so that this code path is activated. Co-authored-by: Roman Shanin <[email protected]>
1 parent b8bd132 commit 2d7bae9

File tree

1 file changed

+40
-2
lines changed

1 file changed

+40
-2
lines changed

crates/iceberg/src/writer/file_writer/parquet_writer.rs

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,7 @@ impl MinMaxColAggregator {
268268
self.upper_bounds
269269
.entry(field_id)
270270
.and_modify(|e| {
271-
if *e > datum {
271+
if *e < datum {
272272
*e = datum.clone()
273273
}
274274
})
@@ -673,6 +673,7 @@ mod tests {
673673
use arrow_schema::{DataType, Field, Fields, SchemaRef as ArrowSchemaRef};
674674
use arrow_select::concat::concat_batches;
675675
use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
676+
use parquet::file::statistics::ValueStatistics;
676677
use rust_decimal::Decimal;
677678
use tempfile::TempDir;
678679
use uuid::Uuid;
@@ -862,7 +863,9 @@ mod tests {
862863

863864
// write data
864865
let mut pw = ParquetWriterBuilder::new(
865-
WriterProperties::builder().build(),
866+
WriterProperties::builder()
867+
.set_max_row_group_size(128)
868+
.build(),
866869
Arc::new(to_write.schema().as_ref().try_into().unwrap()),
867870
file_io.clone(),
868871
location_gen,
@@ -2293,4 +2296,39 @@ mod tests {
22932296
// Check that file should have been deleted.
22942297
assert_eq!(std::fs::read_dir(temp_dir.path()).unwrap().count(), 0);
22952298
}
2299+
2300+
#[test]
2301+
fn test_min_max_aggregator() {
2302+
let schema = Arc::new(
2303+
Schema::builder()
2304+
.with_schema_id(1)
2305+
.with_fields(vec![
2306+
NestedField::required(0, "col", Type::Primitive(PrimitiveType::Int))
2307+
.with_id(0)
2308+
.into(),
2309+
])
2310+
.build()
2311+
.expect("Failed to create schema"),
2312+
);
2313+
let mut min_max_agg = MinMaxColAggregator::new(schema);
2314+
let create_statistics =
2315+
|min, max| Statistics::Int32(ValueStatistics::new(min, max, None, None, false));
2316+
min_max_agg
2317+
.update(0, create_statistics(None, Some(42)))
2318+
.unwrap();
2319+
min_max_agg
2320+
.update(0, create_statistics(Some(0), Some(i32::MAX)))
2321+
.unwrap();
2322+
min_max_agg
2323+
.update(0, create_statistics(Some(i32::MIN), None))
2324+
.unwrap();
2325+
min_max_agg
2326+
.update(0, create_statistics(None, None))
2327+
.unwrap();
2328+
2329+
let (lower_bounds, upper_bounds) = min_max_agg.produce();
2330+
2331+
assert_eq!(lower_bounds, HashMap::from([(0, Datum::int(i32::MIN))]));
2332+
assert_eq!(upper_bounds, HashMap::from([(0, Datum::int(i32::MAX))]));
2333+
}
22962334
}

0 commit comments

Comments
 (0)