-
Notifications
You must be signed in to change notification settings - Fork 44
feat: Add metrics in DataSourceExec related to spatial predicate pruning
#173
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
93cdcfd
cc94fb1
b143a95
5bb489d
5ef0cc1
3df3a0e
cac92f8
be45fd3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -364,15 +364,21 @@ impl GeoParquetFileSource { | |
| predicate: Option<Arc<dyn PhysicalExpr>>, | ||
| ) -> Result<Self> { | ||
| if let Some(parquet_source) = inner.as_any().downcast_ref::<ParquetSource>() { | ||
| let mut parquet_source = parquet_source.clone(); | ||
| let parquet_source = parquet_source.clone(); | ||
| // Extract the predicate from the existing source if it exists so we can keep a copy of it | ||
| let new_predicate = match (parquet_source.predicate().cloned(), predicate) { | ||
| (None, None) => None, | ||
| (None, Some(specified_predicate)) => Some(specified_predicate), | ||
| (Some(inner_predicate), None) => Some(inner_predicate), | ||
| (Some(_), Some(specified_predicate)) => { | ||
| parquet_source = parquet_source.with_predicate(specified_predicate.clone()); | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. DataFusion's |
||
| Some(specified_predicate) | ||
| (Some(inner_predicate), Some(specified_predicate)) => { | ||
| // Sanity check: predicate in `GeoParquetFileSource` is init | ||
| // from its inner ParquetSource's predicate, they should be | ||
| // equivalent. | ||
| if Arc::ptr_eq(&inner_predicate, &specified_predicate) { | ||
zhangfengcdt marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| Some(inner_predicate) | ||
| } else { | ||
| return sedona_internal_err!("Inner predicate should be equivalent to the predicate in `GeoParquetFileSource`"); | ||
| } | ||
| } | ||
| }; | ||
|
|
||
|
|
@@ -452,6 +458,9 @@ impl FileSource for GeoParquetFileSource { | |
| self.predicate.clone().unwrap(), | ||
| base_config.file_schema.clone(), | ||
| self.inner.table_parquet_options().global.pruning, | ||
| // HACK: Since there is no public API to set inner's metrics, so we use | ||
| // inner's metrics as the ExecutionPlan-global metrics | ||
| self.inner.metrics(), | ||
| )) | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||
|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,101 @@ | ||||||||
| // Licensed to the Apache Software Foundation (ASF) under one | ||||||||
| // or more contributor license agreements. See the NOTICE file | ||||||||
| // distributed with this work for additional information | ||||||||
| // regarding copyright ownership. The ASF licenses this file | ||||||||
| // to you under the Apache License, Version 2.0 (the | ||||||||
| // "License"); you may not use this file except in compliance | ||||||||
| // with the License. You may obtain a copy of the License at | ||||||||
| // | ||||||||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||||||||
| // | ||||||||
| // Unless required by applicable law or agreed to in writing, | ||||||||
| // software distributed under the License is distributed on an | ||||||||
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||||||||
| // KIND, either express or implied. See the License for the | ||||||||
| // specific language governing permissions and limitations | ||||||||
| // under the License. | ||||||||
| use datafusion::arrow::util::pretty::pretty_format_batches; | ||||||||
| use sedona::context::SedonaContext; | ||||||||
| use sedona_testing::data::sedona_testing_dir; | ||||||||
|
|
||||||||
| #[tokio::test] | ||||||||
| async fn geo_parquet_metrics() { | ||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should the parquet specific test be in rust/sedona-geoparquet/tests/ instead of rust/sedona/tests/ for better organization?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's true that all of our other pruning tests are in sedona-geoparquet (for better or worse! The top level sedona crate didn't exist when I wrote them..). We don't have access to a real sedona-db/rust/sedona-geoparquet/src/format.rs Lines 684 to 686 in a15844b
I'd prefer to keep the pruning tests together in sedona-geoparquet but also happy to have some integration-y tests here if there's some technical reason they can't live there 🙂
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For some technical reasons, those tests are easier to be implemented as e2e/integration tests. |
||||||||
| // Setup and register test table | ||||||||
| // ----------------------------- | ||||||||
| let ctx = SedonaContext::new_local_interactive() | ||||||||
| .await | ||||||||
| .expect("interactive context should initialize"); | ||||||||
|
|
||||||||
| let geo_parquet_path = format!( | ||||||||
| "{}/data/parquet/geoparquet-1.1.0.parquet", | ||||||||
| sedona_testing_dir().expect("sedona-testing directory should resolve") | ||||||||
| ); | ||||||||
| let create_table_sql = | ||||||||
| format!("CREATE EXTERNAL TABLE test STORED AS PARQUET LOCATION '{geo_parquet_path}'"); | ||||||||
|
|
||||||||
| ctx.sql(&create_table_sql) | ||||||||
| .await | ||||||||
| .expect("create table should succeed") | ||||||||
| .collect() | ||||||||
| .await | ||||||||
| .expect("collecting create table result should succeed"); | ||||||||
|
|
||||||||
| // Test 1: query with spatial predicate that pruned the entire file | ||||||||
| // ---------------------------------------------------------------- | ||||||||
| let prune_query = r#" | ||||||||
| EXPLAIN ANALYZE | ||||||||
| SELECT * | ||||||||
| FROM test | ||||||||
| WHERE ST_Intersects( | ||||||||
| geometry, | ||||||||
| ST_SetSRID( | ||||||||
| ST_GeomFromText('POLYGON((-10 84, -10 88, 10 88, 10 84, -10 84))'), | ||||||||
| 4326 | ||||||||
| ) | ||||||||
| ) | ||||||||
| "#; | ||||||||
|
|
||||||||
| let prune_plan = run_and_format(&ctx, prune_query).await; | ||||||||
| assert!(prune_plan.contains("files_ranges_spatial_pruned=1")); | ||||||||
| assert!(prune_plan.contains("files_ranges_spatial_matched=0")); | ||||||||
| assert!(prune_plan.contains("row_groups_spatial_pruned=0")); | ||||||||
| assert!(prune_plan.contains("row_groups_spatial_matched=0")); | ||||||||
|
|
||||||||
| // Test 2: query with spatial filter that can't skip any file or row group | ||||||||
| // ----------------------------------------------------------------------- | ||||||||
| let match_query = r#" | ||||||||
| EXPLAIN ANALYZE | ||||||||
| SELECT * | ||||||||
| FROM test | ||||||||
| WHERE ST_Intersects( | ||||||||
| geometry, | ||||||||
| ST_SetSRID( | ||||||||
| ST_GeomFromText( | ||||||||
| 'POLYGON((-180 -18.28799, -180 83.23324, 180 83.23324, 180 -18.28799, -180 -18.28799))' | ||||||||
| ), | ||||||||
| 4326 | ||||||||
| ) | ||||||||
| ) | ||||||||
| "#; | ||||||||
|
|
||||||||
| let match_plan = run_and_format(&ctx, match_query).await; | ||||||||
| assert!(match_plan.contains("files_ranges_spatial_pruned=0")); | ||||||||
| assert!(match_plan.contains("files_ranges_spatial_matched=1")); | ||||||||
| assert!(match_plan.contains("row_groups_spatial_pruned=0")); | ||||||||
| assert!(match_plan.contains("row_groups_spatial_matched=1")); | ||||||||
| } | ||||||||
|
|
||||||||
| async fn run_and_format(ctx: &SedonaContext, sql: &str) -> String { | ||||||||
| let df = ctx | ||||||||
| .sql(sql.trim()) | ||||||||
| .await | ||||||||
| .expect("explain analyze query should succeed"); | ||||||||
| let batches = df | ||||||||
| .collect() | ||||||||
| .await | ||||||||
| .expect("collecting explain analyze result should succeed"); | ||||||||
| format!( | ||||||||
| "{}", | ||||||||
| pretty_format_batches(&batches).expect("formatting plan should succeed") | ||||||||
| ) | ||||||||
| } | ||||||||
Uh oh!
There was an error while loading. Please reload this page.