Skip to content

Commit df1e0b7

Browse files
authored
Disable optimized spatial join and GeoParquet data pruning for geography types (#57)
As mentioned in #39, our GeoParquet data pruning optimization and spatial join optimization does not handle geography types correctly, so we'd better disable them to avoid generating incorrect query results.
1 parent 79cea80 commit df1e0b7

File tree

4 files changed

+322
-10
lines changed

4 files changed

+322
-10
lines changed

python/sedonadb/tests/test_sjoin.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,3 +76,65 @@ def test_spatial_join(join_type, on):
7676
sedonadb_results = eng_sedonadb.execute_and_collect(sql).to_pandas()
7777
assert len(sedonadb_results) > 0
7878
eng_postgis.assert_query_result(sql, sedonadb_results)
79+
80+
81+
@pytest.mark.parametrize(
82+
"join_type", ["INNER JOIN", "LEFT OUTER JOIN", "RIGHT OUTER JOIN"]
83+
)
84+
@pytest.mark.parametrize(
85+
"on",
86+
[
87+
"ST_Intersects(sjoin_geog1.geog, sjoin_geog2.geog)",
88+
"ST_Distance(sjoin_geog1.geog, sjoin_geog2.geog) < 100000",
89+
],
90+
)
91+
def test_spatial_join_geography(join_type, on):
92+
eng_sedonadb = SedonaDB.create_or_skip()
93+
eng_postgis = PostGIS.create_or_skip()
94+
95+
# Select two sets of bounding boxes that cross the antimeridian,
96+
# which would be disjoint on a Euclidean plane. A geography join will produce non-empty results,
97+
# whereas a geometry join would not.
98+
west_most_bound = [-190, -10, -170, 10]
99+
east_most_bound = [170, -10, 190, 10]
100+
options = json.dumps(
101+
{
102+
"geom_type": "Point",
103+
"num_parts_range": [2, 10],
104+
"vertices_per_linestring_range": [2, 10],
105+
"bounds": west_most_bound,
106+
"size_range": [0.1, 5],
107+
"seed": 42,
108+
}
109+
)
110+
df_point = eng_sedonadb.execute_and_collect(
111+
f"SELECT id, ST_SetSRID(ST_GeogFromWKB(ST_AsBinary(geometry)), 4326) geog, dist FROM sd_random_geometry('{options}') LIMIT 100"
112+
)
113+
options = json.dumps(
114+
{
115+
"geom_type": "Polygon",
116+
"polygon_hole_rate": 0.5,
117+
"num_parts_range": [2, 10],
118+
"vertices_per_linestring_range": [2, 10],
119+
"bounds": east_most_bound,
120+
"size_range": [0.1, 5],
121+
"seed": 43,
122+
}
123+
)
124+
df_polygon = eng_sedonadb.execute_and_collect(
125+
f"SELECT id, ST_SetSRID(ST_GeogFromWKB(ST_AsBinary(geometry)), 4326) geog, dist FROM sd_random_geometry('{options}') LIMIT 100"
126+
)
127+
eng_sedonadb.create_table_arrow("sjoin_geog1", df_point)
128+
eng_sedonadb.create_table_arrow("sjoin_geog2", df_polygon)
129+
eng_postgis.create_table_arrow("sjoin_geog1", df_point)
130+
eng_postgis.create_table_arrow("sjoin_geog2", df_polygon)
131+
132+
sql = f"""
133+
SELECT sjoin_geog1.id id0, sjoin_geog2.id id1
134+
FROM sjoin_geog1 {join_type} sjoin_geog2
135+
ON {on}
136+
ORDER BY id0, id1
137+
"""
138+
139+
sedonadb_results = eng_sedonadb.execute_and_collect(sql).to_pandas()
140+
eng_postgis.assert_query_result(sql, sedonadb_results)

rust/sedona-expr/src/spatial_filter.rs

Lines changed: 138 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use datafusion_physical_expr::{
2626
use geo_traits::Dimensions;
2727
use sedona_common::sedona_internal_err;
2828
use sedona_geometry::{bounding_box::BoundingBox, bounds::wkb_bounds_xy, interval::IntervalTrait};
29-
use sedona_schema::datatypes::SedonaType;
29+
use sedona_schema::{datatypes::SedonaType, matchers::ArgMatcher};
3030

3131
use crate::{
3232
statistics::GeoStatistics,
@@ -185,6 +185,9 @@ impl SpatialFilter {
185185
match (&args[0], &args[1]) {
186186
(ArgRef::Col(column), ArgRef::Lit(literal))
187187
| (ArgRef::Lit(literal), ArgRef::Col(column)) => {
188+
if !is_prunable_geospatial_literal(literal) {
189+
return Ok(Some(Self::Unknown));
190+
}
188191
match literal_bounds(literal) {
189192
Ok(literal_bounds) => {
190193
Ok(Some(Self::Intersects(column.clone(), literal_bounds)))
@@ -204,6 +207,9 @@ impl SpatialFilter {
204207
match (&args[0], &args[1]) {
205208
(ArgRef::Col(column), ArgRef::Lit(literal)) => {
206209
// column within/covered_by literal -> Intersects filter
210+
if !is_prunable_geospatial_literal(literal) {
211+
return Ok(Some(Self::Unknown));
212+
}
207213
match literal_bounds(literal) {
208214
Ok(literal_bounds) => {
209215
Ok(Some(Self::Intersects(column.clone(), literal_bounds)))
@@ -213,6 +219,9 @@ impl SpatialFilter {
213219
}
214220
(ArgRef::Lit(literal), ArgRef::Col(column)) => {
215221
// literal within/covered_by column -> Covers filter
222+
if !is_prunable_geospatial_literal(literal) {
223+
return Ok(Some(Self::Unknown));
224+
}
216225
match literal_bounds(literal) {
217226
Ok(literal_bounds) => {
218227
Ok(Some(Self::Covers(column.clone(), literal_bounds)))
@@ -233,6 +242,9 @@ impl SpatialFilter {
233242
(ArgRef::Col(column), ArgRef::Lit(literal)) => {
234243
// column contains/covers literal -> Covers filter
235244
// (column's bbox must fully cover literal's bbox)
245+
if !is_prunable_geospatial_literal(literal) {
246+
return Ok(Some(Self::Unknown));
247+
}
236248
match literal_bounds(literal) {
237249
Ok(literal_bounds) => {
238250
Ok(Some(Self::Covers(column.clone(), literal_bounds)))
@@ -243,6 +255,9 @@ impl SpatialFilter {
243255
(ArgRef::Lit(literal), ArgRef::Col(column)) => {
244256
// literal contains/covers column -> Intersects filter
245257
// (if literal contains column, they must at least intersect)
258+
if !is_prunable_geospatial_literal(literal) {
259+
return Ok(Some(Self::Unknown));
260+
}
246261
match literal_bounds(literal) {
247262
Ok(literal_bounds) => {
248263
Ok(Some(Self::Intersects(column.clone(), literal_bounds)))
@@ -284,6 +299,9 @@ impl SpatialFilter {
284299
match (&args[0], &args[1], &args[2]) {
285300
(ArgRef::Col(column), ArgRef::Lit(literal), ArgRef::Lit(distance))
286301
| (ArgRef::Lit(literal), ArgRef::Col(column), ArgRef::Lit(distance)) => {
302+
if !is_prunable_geospatial_literal(literal) {
303+
return Ok(Some(Self::Unknown));
304+
}
287305
match (
288306
literal_bounds(literal),
289307
distance.value().cast_to(&DataType::Float64)?,
@@ -314,6 +332,19 @@ enum ArgRef<'a> {
314332
Other,
315333
}
316334

335+
/// Our current spatial data pruning implementation does not correctly handle geography data.
336+
/// We therefore only consider geometry data type for pruning.
337+
fn is_prunable_geospatial_literal(literal: &Literal) -> bool {
338+
let Ok(literal_field) = literal.return_field(&Schema::empty()) else {
339+
return false;
340+
};
341+
let Ok(sedona_type) = SedonaType::from_storage_field(&literal_field) else {
342+
return false;
343+
};
344+
let matcher = ArgMatcher::is_geometry();
345+
matcher.match_type(&sedona_type)
346+
}
347+
317348
fn literal_bounds(literal: &Literal) -> Result<BoundingBox> {
318349
let literal_field = literal.return_field(&Schema::empty())?;
319350
let sedona_type = SedonaType::from_storage_field(&literal_field)?;
@@ -348,12 +379,11 @@ fn parse_args(args: &[Arc<dyn PhysicalExpr>]) -> Vec<ArgRef<'_>> {
348379

349380
#[cfg(test)]
350381
mod test {
351-
352382
use arrow_schema::{DataType, Field};
353383
use datafusion_expr::{ScalarUDF, Signature, SimpleScalarUDF, Volatility};
354384
use rstest::rstest;
355385
use sedona_geometry::{bounding_box::BoundingBox, interval::Interval};
356-
use sedona_schema::datatypes::WKB_GEOMETRY;
386+
use sedona_schema::datatypes::{WKB_GEOGRAPHY, WKB_GEOMETRY};
357387
use sedona_testing::create::create_scalar;
358388

359389
use super::*;
@@ -806,6 +836,111 @@ mod test {
806836
));
807837
}
808838

839+
#[rstest]
840+
fn range_predicate_involving_geography_should_be_transformed_to_unknown(
841+
#[values(
842+
"st_intersects",
843+
"st_equals",
844+
"st_touches",
845+
"st_contains",
846+
"st_covers",
847+
"st_within",
848+
"st_covered_by",
849+
"st_coveredby"
850+
)]
851+
func_name: &str,
852+
) {
853+
let column: Arc<dyn PhysicalExpr> = Arc::new(Column::new("geometry", 0));
854+
let storage_field = WKB_GEOGRAPHY.to_storage_field("", true).unwrap();
855+
let literal: Arc<dyn PhysicalExpr> = Arc::new(Literal::new_with_metadata(
856+
create_scalar(Some("POLYGON ((0 0, 2 0, 2 2, 0 2, 0 0))"), &WKB_GEOGRAPHY),
857+
Some(storage_field.metadata().into()),
858+
));
859+
860+
let func = create_dummy_spatial_function(func_name, 2);
861+
let expr: Arc<dyn PhysicalExpr> = Arc::new(ScalarFunctionExpr::new(
862+
func_name,
863+
Arc::new(func.clone()),
864+
vec![column.clone(), literal.clone()],
865+
Arc::new(Field::new("", DataType::Boolean, true)),
866+
));
867+
let predicate = SpatialFilter::try_from_expr(&expr).unwrap();
868+
assert!(
869+
matches!(predicate, SpatialFilter::Unknown),
870+
"Function {func_name} involving geography should produce Unknown filter"
871+
);
872+
}
873+
874+
#[test]
875+
fn distance_predicate_involving_geography_should_be_transformed_to_unknown() {
876+
let column: Arc<dyn PhysicalExpr> = Arc::new(Column::new("geometry", 0));
877+
let storage_field = WKB_GEOGRAPHY.to_storage_field("", true).unwrap();
878+
let literal: Arc<dyn PhysicalExpr> = Arc::new(Literal::new_with_metadata(
879+
create_scalar(Some("POINT (1 2)"), &WKB_GEOGRAPHY),
880+
Some(storage_field.metadata().into()),
881+
));
882+
let distance_literal: Arc<dyn PhysicalExpr> =
883+
Arc::new(Literal::new(ScalarValue::Float64(Some(100.0))));
884+
885+
// Test ST_DWithin function
886+
let st_dwithin = create_dummy_spatial_function("st_dwithin", 3);
887+
let dwithin_expr: Arc<dyn PhysicalExpr> = Arc::new(ScalarFunctionExpr::new(
888+
"st_dwithin",
889+
Arc::new(st_dwithin.clone()),
890+
vec![column.clone(), literal.clone(), distance_literal.clone()],
891+
Arc::new(Field::new("", DataType::Boolean, true)),
892+
));
893+
let predicate = SpatialFilter::try_from_expr(&dwithin_expr).unwrap();
894+
assert!(
895+
matches!(predicate, SpatialFilter::Unknown),
896+
"ST_DWithin involving geography should produce Unknown filter"
897+
);
898+
899+
// Test ST_DWithin with reversed geometry arguments
900+
let dwithin_expr_reversed: Arc<dyn PhysicalExpr> = Arc::new(ScalarFunctionExpr::new(
901+
"st_dwithin",
902+
Arc::new(st_dwithin),
903+
vec![literal.clone(), column.clone(), distance_literal.clone()],
904+
Arc::new(Field::new("", DataType::Boolean, true)),
905+
));
906+
let predicate_reversed = SpatialFilter::try_from_expr(&dwithin_expr_reversed).unwrap();
907+
assert!(
908+
matches!(predicate_reversed, SpatialFilter::Unknown),
909+
"ST_DWithin involving geography should produce Unknown filter"
910+
);
911+
912+
// Test ST_Distance <= threshold
913+
let st_distance = create_dummy_spatial_function("st_distance", 2);
914+
let distance_expr: Arc<dyn PhysicalExpr> = Arc::new(ScalarFunctionExpr::new(
915+
"st_distance",
916+
Arc::new(st_distance.clone()),
917+
vec![column.clone(), literal.clone()],
918+
Arc::new(Field::new("", DataType::Boolean, true)),
919+
));
920+
let comparison_expr: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
921+
distance_expr.clone(),
922+
Operator::LtEq,
923+
distance_literal.clone(),
924+
));
925+
let predicate = SpatialFilter::try_from_expr(&comparison_expr).unwrap();
926+
assert!(
927+
matches!(predicate, SpatialFilter::Unknown),
928+
"ST_Distance <= threshold involving geography should produce Unknown filter"
929+
);
930+
931+
// Test threshold >= ST_Distance
932+
let comparison_expr_reversed: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
933+
distance_literal.clone(),
934+
Operator::GtEq,
935+
distance_expr.clone(),
936+
));
937+
let predicate_reversed = SpatialFilter::try_from_expr(&comparison_expr_reversed).unwrap();
938+
assert!(
939+
matches!(predicate_reversed, SpatialFilter::Unknown),
940+
"threshold >= ST_Distance involving geography should produce Unknown filter"
941+
);
942+
}
943+
809944
#[test]
810945
fn predicate_from_expr_has_z() {
811946
let column: Arc<dyn PhysicalExpr> = Arc::new(Column::new("geometry", 0));

rust/sedona-spatial-join/src/exec.rs

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -633,7 +633,7 @@ mod tests {
633633
use geo_types::{Coord, Rect};
634634
use rstest::rstest;
635635
use sedona_geometry::types::GeometryTypeId;
636-
use sedona_schema::datatypes::WKB_GEOMETRY;
636+
use sedona_schema::datatypes::{SedonaType, WKB_GEOGRAPHY, WKB_GEOMETRY};
637637
use sedona_testing::datagen::RandomPartitionedDataBuilder;
638638
use tokio::sync::OnceCell;
639639

@@ -649,12 +649,13 @@ mod tests {
649649

650650
/// Creates standard test data with left (Polygon) and right (Point) partitions
651651
fn create_default_test_data() -> Result<(TestPartitions, TestPartitions)> {
652-
create_test_data_with_size_range((1.0, 10.0))
652+
create_test_data_with_size_range((1.0, 10.0), WKB_GEOMETRY)
653653
}
654654

655655
/// Creates test data with custom size range
656656
fn create_test_data_with_size_range(
657657
size_range: (f64, f64),
658+
sedona_type: SedonaType,
658659
) -> Result<(TestPartitions, TestPartitions)> {
659660
let bounds = Rect::new(Coord { x: 0.0, y: 0.0 }, Coord { x: 100.0, y: 100.0 });
660661

@@ -664,7 +665,7 @@ mod tests {
664665
.batches_per_partition(2)
665666
.rows_per_batch(30)
666667
.geometry_type(GeometryTypeId::Polygon)
667-
.sedona_type(WKB_GEOMETRY)
668+
.sedona_type(sedona_type.clone())
668669
.bounds(bounds)
669670
.size_range(size_range)
670671
.null_rate(0.1)
@@ -676,7 +677,7 @@ mod tests {
676677
.batches_per_partition(4)
677678
.rows_per_batch(30)
678679
.geometry_type(GeometryTypeId::Point)
679-
.sedona_type(WKB_GEOMETRY)
680+
.sedona_type(sedona_type)
680681
.bounds(bounds)
681682
.size_range(size_range)
682683
.null_rate(0.1)
@@ -928,7 +929,7 @@ mod tests {
928929
#[tokio::test]
929930
async fn test_spatial_join_with_filter() -> Result<()> {
930931
let ((left_schema, left_partitions), (right_schema, right_partitions)) =
931-
create_test_data_with_size_range((0.1, 10.0))?;
932+
create_test_data_with_size_range((0.1, 10.0), WKB_GEOMETRY)?;
932933

933934
for max_batch_size in [10, 30, 100] {
934935
let options = SpatialJoinOptions {
@@ -996,6 +997,37 @@ mod tests {
996997
Ok(())
997998
}
998999

1000+
#[tokio::test]
1001+
async fn test_geography_join_is_not_optimized() -> Result<()> {
1002+
let options = SpatialJoinOptions::default();
1003+
let ctx = setup_context(Some(options), 10)?;
1004+
1005+
// Prepare geography tables
1006+
let ((left_schema, left_partitions), (right_schema, right_partitions)) =
1007+
create_test_data_with_size_range((0.1, 10.0), WKB_GEOGRAPHY)?;
1008+
let mem_table_left: Arc<dyn TableProvider> =
1009+
Arc::new(MemTable::try_new(left_schema, left_partitions)?);
1010+
let mem_table_right: Arc<dyn TableProvider> =
1011+
Arc::new(MemTable::try_new(right_schema, right_partitions)?);
1012+
ctx.register_table("L", mem_table_left)?;
1013+
ctx.register_table("R", mem_table_right)?;
1014+
1015+
// Execute geography join query
1016+
let df = ctx
1017+
.sql("SELECT * FROM L JOIN R ON ST_Intersects(L.geometry, R.geometry)")
1018+
.await?;
1019+
let plan = df.create_physical_plan().await?;
1020+
1021+
// Verify that no SpatialJoinExec is present (geography join should not be optimized)
1022+
let spatial_joins = collect_spatial_join_exec(&plan)?;
1023+
assert!(
1024+
spatial_joins.is_empty(),
1025+
"Geography joins should not be optimized to SpatialJoinExec"
1026+
);
1027+
1028+
Ok(())
1029+
}
1030+
9991031
async fn test_with_join_types(join_type: JoinType) -> Result<RecordBatch> {
10001032
let ((left_schema, left_partitions), (right_schema, right_partitions)) =
10011033
create_test_data_with_empty_partitions()?;

0 commit comments

Comments
 (0)