Skip to content

Commit 3e9b7a9

Browse files
authored
Merge pull request #54 from Kontinuation/geoparquet-respecting-pruning-opt
bug: GeoParquet should respect parquet pruning option
2 parents 8aa365f + 75fa29e commit 3e9b7a9

File tree

2 files changed

+72
-30
lines changed

2 files changed

+72
-30
lines changed

rust/sedona-geoparquet/src/file_opener.rs

Lines changed: 27 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ pub struct GeoParquetFileOpener {
4646
metadata_size_hint: Option<usize>,
4747
predicate: Arc<dyn PhysicalExpr>,
4848
file_schema: SchemaRef,
49+
enable_pruning: bool,
4950
}
5051

5152
impl GeoParquetFileOpener {
@@ -56,19 +57,21 @@ impl GeoParquetFileOpener {
5657
metadata_size_hint: Option<usize>,
5758
predicate: Arc<dyn PhysicalExpr>,
5859
file_schema: SchemaRef,
60+
enable_pruning: bool,
5961
) -> Self {
6062
Self {
6163
inner,
6264
object_store,
6365
metadata_size_hint,
6466
predicate,
6567
file_schema,
68+
enable_pruning,
6669
}
6770
}
6871
}
6972

7073
impl FileOpener for GeoParquetFileOpener {
71-
fn open(&self, file_meta: FileMeta, _file: PartitionedFile) -> Result<FileOpenFuture> {
74+
fn open(&self, file_meta: FileMeta, file: PartitionedFile) -> Result<FileOpenFuture> {
7275
let self_clone = self.clone();
7376

7477
Ok(Box::pin(async move {
@@ -81,25 +84,28 @@ impl FileOpener for GeoParquetFileOpener {
8184
.await?;
8285

8386
let mut access_plan = ParquetAccessPlan::new_all(parquet_metadata.num_row_groups());
84-
let spatial_filter = SpatialFilter::try_from_expr(&self_clone.predicate)?;
85-
86-
if let Some(geoparquet_metadata) =
87-
GeoParquetMetadata::try_from_parquet_metadata(&parquet_metadata)?
88-
{
89-
filter_access_plan_using_geoparquet_file_metadata(
90-
&self_clone.file_schema,
91-
&mut access_plan,
92-
&spatial_filter,
93-
&geoparquet_metadata,
94-
)?;
95-
96-
filter_access_plan_using_geoparquet_covering(
97-
&self_clone.file_schema,
98-
&mut access_plan,
99-
&spatial_filter,
100-
&geoparquet_metadata,
101-
&parquet_metadata,
102-
)?;
87+
88+
if self_clone.enable_pruning {
89+
let spatial_filter = SpatialFilter::try_from_expr(&self_clone.predicate)?;
90+
91+
if let Some(geoparquet_metadata) =
92+
GeoParquetMetadata::try_from_parquet_metadata(&parquet_metadata)?
93+
{
94+
filter_access_plan_using_geoparquet_file_metadata(
95+
&self_clone.file_schema,
96+
&mut access_plan,
97+
&spatial_filter,
98+
&geoparquet_metadata,
99+
)?;
100+
101+
filter_access_plan_using_geoparquet_covering(
102+
&self_clone.file_schema,
103+
&mut access_plan,
104+
&spatial_filter,
105+
&geoparquet_metadata,
106+
&parquet_metadata,
107+
)?;
108+
}
103109
}
104110

105111
// When we have built-in GEOMETRY/GEOGRAPHY types, we can filter the access plan
@@ -115,7 +121,7 @@ impl FileOpener for GeoParquetFileOpener {
115121
metadata_size_hint: self_clone.metadata_size_hint,
116122
};
117123

118-
self_clone.inner.open(file_meta, _file)?.await
124+
self_clone.inner.open(file_meta, file)?.await
119125
}))
120126
}
121127
}

rust/sedona-geoparquet/src/format.rs

Lines changed: 45 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -438,6 +438,7 @@ impl FileSource for GeoParquetFileSource {
438438
self.metadata_size_hint,
439439
self.predicate.clone().unwrap(),
440440
base_config.file_schema.clone(),
441+
self.inner.table_parquet_options().global.pruning,
441442
))
442443
}
443444

@@ -678,7 +679,7 @@ mod test {
678679

679680
#[rstest]
680681
#[tokio::test]
681-
async fn pruning_geoparquet_metadata(#[values("st_intersects", "st_within")] udf_name: &str) {
682+
async fn pruning_geoparquet_metadata(#[values("st_intersects", "st_contains")] udf_name: &str) {
682683
let data_dir = geoarrow_data_dir().unwrap();
683684
let ctx = setup_context();
684685

@@ -690,10 +691,8 @@ mod test {
690691
)
691692
.into();
692693

693-
let definitely_non_intersecting_scalar = create_scalar(
694-
Some("POLYGON ((100 200), (100 300), (200 300), (100 200))"),
695-
&WKB_GEOMETRY,
696-
);
694+
let definitely_non_intersecting_scalar =
695+
create_scalar(Some("POINT (100 200)"), &WKB_GEOMETRY);
697696
let storage_field = WKB_GEOMETRY.to_storage_field("", true).unwrap();
698697

699698
let df = ctx
@@ -712,10 +711,7 @@ mod test {
712711
let batches_out = df.collect().await.unwrap();
713712
assert!(batches_out.is_empty());
714713

715-
let definitely_intersecting_scalar = create_scalar(
716-
Some("POLYGON ((30 10), (30 20), (40 20), (40 10), (30 10))"),
717-
&WKB_GEOMETRY,
718-
);
714+
let definitely_intersecting_scalar = create_scalar(Some("POINT (30 10)"), &WKB_GEOMETRY);
719715
let df = ctx
720716
.table(format!("{data_dir}/example/files/*_geo.parquet"))
721717
.await
@@ -733,6 +729,46 @@ mod test {
733729
assert!(!batches_out.is_empty());
734730
}
735731

732+
#[tokio::test]
733+
async fn should_not_prune_geoparquet_metadata_after_disabling_pruning() {
734+
let data_dir = geoarrow_data_dir().unwrap();
735+
let ctx = setup_context();
736+
ctx.sql("SET datafusion.execution.parquet.pruning TO false")
737+
.await
738+
.expect("Disabling parquet pruning failed");
739+
740+
let udf: ScalarUDF = SimpleScalarUDF::new_with_signature(
741+
"st_intersects",
742+
Signature::any(2, Volatility::Immutable),
743+
DataType::Boolean,
744+
Arc::new(|_args| Ok(ScalarValue::Boolean(Some(true)).into())),
745+
)
746+
.into();
747+
748+
let definitely_non_intersecting_scalar =
749+
create_scalar(Some("POINT (100 200)"), &WKB_GEOMETRY);
750+
let storage_field = WKB_GEOMETRY.to_storage_field("", true).unwrap();
751+
752+
let df = ctx
753+
.table(format!("{data_dir}/example/files/*_geo.parquet"))
754+
.await
755+
.unwrap()
756+
.filter(udf.call(vec![
757+
col("geometry"),
758+
Expr::Literal(
759+
definitely_non_intersecting_scalar,
760+
Some(storage_field.metadata().into()),
761+
),
762+
]))
763+
.unwrap();
764+
765+
// Even if the query window does not intersect with the data, we should not prune
766+
// any files because pruning has been disabled. We can retrieve the data here
767+
// because the dummy UDF always returns true.
768+
let batches_out = df.collect().await.unwrap();
769+
assert!(!batches_out.is_empty());
770+
}
771+
736772
#[tokio::test]
737773
async fn geoparquet_format_factory() {
738774
let ctx = SessionContext::new();

0 commit comments

Comments
 (0)