Skip to content

Commit b320337

Browse files
authored
feat: Add metrics in DataSourceExec related to spatial predicate pruning (#173)
1 parent 11e2993 commit b320337

File tree

4 files changed

+221
-4
lines changed

4 files changed

+221
-4
lines changed

rust/sedona-geoparquet/src/file_opener.rs

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use datafusion::datasource::{
2424
};
2525
use datafusion_common::Result;
2626
use datafusion_physical_expr::PhysicalExpr;
27+
use datafusion_physical_plan::metrics::{Count, ExecutionPlanMetricsSet, MetricBuilder};
2728
use object_store::ObjectStore;
2829
use parquet::file::{
2930
metadata::{ParquetMetaData, RowGroupMetaData},
@@ -35,6 +36,40 @@ use sedona_schema::{datatypes::SedonaType, matchers::ArgMatcher};
3536

3637
use crate::metadata::GeoParquetMetadata;
3738

39+
#[derive(Clone)]
40+
struct GeoParquetFileOpenerMetrics {
41+
/// How many file ranges are pruned by [`SpatialFilter`]
42+
///
43+
/// Note on "file range": an opener may read only part of a file rather than the
44+
/// entire file; that portion is referred to as the "file range". See [`PartitionedFile`]
45+
/// for details.
46+
files_ranges_spatial_pruned: Count,
47+
/// How many file ranges are matched by [`SpatialFilter`]
48+
files_ranges_spatial_matched: Count,
49+
/// How many row groups are pruned by [`SpatialFilter`]
50+
///
51+
/// Note: row groups skipped during the file-level pruning step are not counted
52+
/// again here.
53+
row_groups_spatial_pruned: Count,
54+
/// How many row groups are matched by [`SpatialFilter`]
55+
row_groups_spatial_matched: Count,
56+
}
57+
58+
impl GeoParquetFileOpenerMetrics {
59+
fn new(execution_plan_global_metrics: &ExecutionPlanMetricsSet) -> Self {
60+
Self {
61+
files_ranges_spatial_pruned: MetricBuilder::new(execution_plan_global_metrics)
62+
.global_counter("files_ranges_spatial_pruned"),
63+
files_ranges_spatial_matched: MetricBuilder::new(execution_plan_global_metrics)
64+
.global_counter("files_ranges_spatial_matched"),
65+
row_groups_spatial_pruned: MetricBuilder::new(execution_plan_global_metrics)
66+
.global_counter("row_groups_spatial_pruned"),
67+
row_groups_spatial_matched: MetricBuilder::new(execution_plan_global_metrics)
68+
.global_counter("row_groups_spatial_matched"),
69+
}
70+
}
71+
}
72+
3873
/// Geo-aware [FileOpener] implementing file and row group pruning
3974
///
4075
/// Pruning happens (for Parquet) in the [FileOpener], so we implement
@@ -47,6 +82,7 @@ pub struct GeoParquetFileOpener {
4782
predicate: Arc<dyn PhysicalExpr>,
4883
file_schema: SchemaRef,
4984
enable_pruning: bool,
85+
metrics: GeoParquetFileOpenerMetrics,
5086
}
5187

5288
impl GeoParquetFileOpener {
@@ -58,6 +94,7 @@ impl GeoParquetFileOpener {
5894
predicate: Arc<dyn PhysicalExpr>,
5995
file_schema: SchemaRef,
6096
enable_pruning: bool,
97+
execution_plan_global_metrics: &ExecutionPlanMetricsSet,
6198
) -> Self {
6299
Self {
63100
inner,
@@ -66,6 +103,7 @@ impl GeoParquetFileOpener {
66103
predicate,
67104
file_schema,
68105
enable_pruning,
106+
metrics: GeoParquetFileOpenerMetrics::new(execution_plan_global_metrics),
69107
}
70108
}
71109
}
@@ -96,6 +134,7 @@ impl FileOpener for GeoParquetFileOpener {
96134
&mut access_plan,
97135
&spatial_filter,
98136
&geoparquet_metadata,
137+
&self_clone.metrics,
99138
)?;
100139

101140
filter_access_plan_using_geoparquet_covering(
@@ -104,6 +143,7 @@ impl FileOpener for GeoParquetFileOpener {
104143
&spatial_filter,
105144
&geoparquet_metadata,
106145
&parquet_metadata,
146+
&self_clone.metrics,
107147
)?;
108148
}
109149
}
@@ -135,12 +175,16 @@ fn filter_access_plan_using_geoparquet_file_metadata(
135175
access_plan: &mut ParquetAccessPlan,
136176
spatial_filter: &SpatialFilter,
137177
metadata: &GeoParquetMetadata,
178+
metrics: &GeoParquetFileOpenerMetrics,
138179
) -> Result<()> {
139180
let table_geo_stats = geoparquet_file_geo_stats(file_schema, metadata)?;
140181
if !spatial_filter.evaluate(&table_geo_stats) {
182+
metrics.files_ranges_spatial_pruned.add(1);
141183
for i in access_plan.row_group_indexes() {
142184
access_plan.skip(i);
143185
}
186+
} else {
187+
metrics.files_ranges_spatial_matched.add(1);
144188
}
145189

146190
Ok(())
@@ -156,6 +200,7 @@ fn filter_access_plan_using_geoparquet_covering(
156200
spatial_filter: &SpatialFilter,
157201
metadata: &GeoParquetMetadata,
158202
parquet_metadata: &ParquetMetaData,
203+
metrics: &GeoParquetFileOpenerMetrics,
159204
) -> Result<()> {
160205
let row_group_indices_to_scan = access_plan.row_group_indexes();
161206

@@ -176,7 +221,10 @@ fn filter_access_plan_using_geoparquet_covering(
176221

177222
// Evaluate predicate!
178223
if !spatial_filter.evaluate(&row_group_geo_stats) {
224+
metrics.row_groups_spatial_pruned.add(1);
179225
access_plan.skip(i);
226+
} else {
227+
metrics.row_groups_spatial_matched.add(1);
180228
}
181229
}
182230

rust/sedona-geoparquet/src/format.rs

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -364,15 +364,21 @@ impl GeoParquetFileSource {
364364
predicate: Option<Arc<dyn PhysicalExpr>>,
365365
) -> Result<Self> {
366366
if let Some(parquet_source) = inner.as_any().downcast_ref::<ParquetSource>() {
367-
let mut parquet_source = parquet_source.clone();
367+
let parquet_source = parquet_source.clone();
368368
// Extract the predicate from the existing source if it exists so we can keep a copy of it
369369
let new_predicate = match (parquet_source.predicate().cloned(), predicate) {
370370
(None, None) => None,
371371
(None, Some(specified_predicate)) => Some(specified_predicate),
372372
(Some(inner_predicate), None) => Some(inner_predicate),
373-
(Some(_), Some(specified_predicate)) => {
374-
parquet_source = parquet_source.with_predicate(specified_predicate.clone());
375-
Some(specified_predicate)
373+
(Some(inner_predicate), Some(specified_predicate)) => {
374+
// Sanity check: predicate in `GeoParquetFileSource` is init
375+
// from its inner ParquetSource's predicate, they should be
376+
// equivalent.
377+
if Arc::ptr_eq(&inner_predicate, &specified_predicate) {
378+
Some(inner_predicate)
379+
} else {
380+
return sedona_internal_err!("Inner predicate should be equivalent to the predicate in `GeoParquetFileSource`");
381+
}
376382
}
377383
};
378384

@@ -452,6 +458,9 @@ impl FileSource for GeoParquetFileSource {
452458
self.predicate.clone().unwrap(),
453459
base_config.file_schema.clone(),
454460
self.inner.table_parquet_options().global.pruning,
461+
// HACK: Since there is no public API to set inner's metrics, so we use
462+
// inner's metrics as the ExecutionPlan-global metrics
463+
self.inner.metrics(),
455464
))
456465
}
457466

rust/sedona-testing/src/data.rs

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,47 @@ pub fn geoarrow_data_dir() -> Result<String> {
8484
)
8585
}
8686

87+
/// Find the most likely path to the sedona-testing directory if it exists
88+
///
89+
/// This mirrors [`geoarrow_data_dir`] but for the sedona-testing submodule.
90+
/// It checks the `SEDONA_TESTING_DIR` environment variable first, then
91+
/// falls back to the typical repository-relative locations.
92+
pub fn sedona_testing_dir() -> Result<String> {
93+
if let Ok(from_env) = env::var("SEDONA_TESTING_DIR") {
94+
if fs::exists(&from_env)? {
95+
return Ok(from_env);
96+
} else {
97+
return sedona_internal_err!(
98+
"{}\n{}{}{}",
99+
"Can't resolve sedona-testing directory because",
100+
"the value of the SEDONA_TESTING_DIR (",
101+
from_env,
102+
") does not exist"
103+
);
104+
}
105+
}
106+
107+
let likely_possibilities = [
108+
"../../submodules/sedona-testing".to_string(),
109+
"submodules/sedona-testing".to_string(),
110+
];
111+
112+
for possibility in likely_possibilities.into_iter().rev() {
113+
if let Ok(exists) = fs::exists(&possibility) {
114+
if exists {
115+
return Ok(possibility);
116+
}
117+
}
118+
}
119+
120+
sedona_internal_err!(
121+
"{}\n{}\n{}",
122+
"Can't resolve sedona-testing directory from the current working directory",
123+
"You may need to run `git submodule init && git submodule update --recursive` or",
124+
"set the SEDONA_TESTING_DIR environment variable"
125+
)
126+
}
127+
87128
#[cfg(test)]
88129
mod test {
89130
use super::*;
@@ -113,4 +154,22 @@ mod test {
113154
env::remove_var("SEDONA_GEOARROW_DATA_DIR");
114155
assert!(maybe_file.is_ok());
115156
}
157+
158+
#[test]
159+
fn sedona_testing_dir_resolves() {
160+
assert!(sedona_testing_dir().is_ok());
161+
162+
env::set_var("SEDONA_TESTING_DIR", "this_directory_does_not_exist");
163+
let err = sedona_testing_dir();
164+
env::remove_var("SEDONA_TESTING_DIR");
165+
assert!(err
166+
.unwrap_err()
167+
.message()
168+
.contains("the value of the SEDONA_TESTING_DIR"));
169+
170+
env::set_var("SEDONA_TESTING_DIR", sedona_testing_dir().unwrap());
171+
let maybe_dir = sedona_testing_dir();
172+
env::remove_var("SEDONA_TESTING_DIR");
173+
assert!(maybe_dir.is_ok());
174+
}
116175
}

rust/sedona/tests/metrics.rs

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
use datafusion::arrow::util::pretty::pretty_format_batches;
18+
use sedona::context::SedonaContext;
19+
use sedona_testing::data::sedona_testing_dir;
20+
21+
#[tokio::test]
22+
async fn geo_parquet_metrics() {
23+
// Setup and register test table
24+
// -----------------------------
25+
let ctx = SedonaContext::new_local_interactive()
26+
.await
27+
.expect("interactive context should initialize");
28+
29+
let geo_parquet_path = format!(
30+
"{}/data/parquet/geoparquet-1.1.0.parquet",
31+
sedona_testing_dir().expect("sedona-testing directory should resolve")
32+
);
33+
let create_table_sql =
34+
format!("CREATE EXTERNAL TABLE test STORED AS PARQUET LOCATION '{geo_parquet_path}'");
35+
36+
ctx.sql(&create_table_sql)
37+
.await
38+
.expect("create table should succeed")
39+
.collect()
40+
.await
41+
.expect("collecting create table result should succeed");
42+
43+
// Test 1: query with spatial predicate that pruned the entire file
44+
// ----------------------------------------------------------------
45+
let prune_query = r#"
46+
EXPLAIN ANALYZE
47+
SELECT *
48+
FROM test
49+
WHERE ST_Intersects(
50+
geometry,
51+
ST_SetSRID(
52+
ST_GeomFromText('POLYGON((-10 84, -10 88, 10 88, 10 84, -10 84))'),
53+
4326
54+
)
55+
)
56+
"#;
57+
58+
let prune_plan = run_and_format(&ctx, prune_query).await;
59+
assert!(prune_plan.contains("files_ranges_spatial_pruned=1"));
60+
assert!(prune_plan.contains("files_ranges_spatial_matched=0"));
61+
assert!(prune_plan.contains("row_groups_spatial_pruned=0"));
62+
assert!(prune_plan.contains("row_groups_spatial_matched=0"));
63+
64+
// Test 2: query with spatial filter that can't skip any file or row group
65+
// -----------------------------------------------------------------------
66+
let match_query = r#"
67+
EXPLAIN ANALYZE
68+
SELECT *
69+
FROM test
70+
WHERE ST_Intersects(
71+
geometry,
72+
ST_SetSRID(
73+
ST_GeomFromText(
74+
'POLYGON((-180 -18.28799, -180 83.23324, 180 83.23324, 180 -18.28799, -180 -18.28799))'
75+
),
76+
4326
77+
)
78+
)
79+
"#;
80+
81+
let match_plan = run_and_format(&ctx, match_query).await;
82+
assert!(match_plan.contains("files_ranges_spatial_pruned=0"));
83+
assert!(match_plan.contains("files_ranges_spatial_matched=1"));
84+
assert!(match_plan.contains("row_groups_spatial_pruned=0"));
85+
assert!(match_plan.contains("row_groups_spatial_matched=1"));
86+
}
87+
88+
async fn run_and_format(ctx: &SedonaContext, sql: &str) -> String {
89+
let df = ctx
90+
.sql(sql.trim())
91+
.await
92+
.expect("explain analyze query should succeed");
93+
let batches = df
94+
.collect()
95+
.await
96+
.expect("collecting explain analyze result should succeed");
97+
format!(
98+
"{}",
99+
pretty_format_batches(&batches).expect("formatting plan should succeed")
100+
)
101+
}

0 commit comments

Comments
 (0)