Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 91 additions & 0 deletions python/sedonadb/tests/test_sjoin.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# under the License.

import json
import re
import warnings

import geopandas as gpd
Expand Down Expand Up @@ -68,6 +69,96 @@ def test_spatial_join(join_type, on):
eng_postgis.assert_query_result(sql, sedonadb_results)


def _plan_text(df):
query_plan = df.to_pandas()
return "\n".join(query_plan.iloc[:, 1].astype(str).tolist())


def _spatial_join_side_file_names(plan_text):
Comment on lines +72 to +77
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I opened #734 so that we can make hopefully make this easier some day.

"""Extract the left/right parquet file names used by `SpatialJoinExec`.

Example input:
SpatialJoinExec: join_type=Inner, on=ST_intersects(geo_right@0, geo_left@0)
ProjectionExec: expr=[geometry@0 as geo_right]
DataSourceExec: file_groups={1 group: [[.../natural-earth_countries_geo.parquet]]}, projection=[geometry], file_type=parquet
ProbeShuffleExec: partitioning=RoundRobinBatch(1)
ProjectionExec: expr=[geometry@0 as geo_left]
DataSourceExec: file_groups={1 group: [[.../natural-earth_cities_geo.parquet]]}, projection=[geometry], file_type=parquet

Example output:
["natural-earth_countries_geo", "natural-earth_cities_geo"]
"""
spatial_join_idx = plan_text.find("SpatialJoinExec:")
assert spatial_join_idx != -1, plan_text

file_names = re.findall(
r"DataSourceExec:.*?/([^/\]]+)\.parquet", plan_text[spatial_join_idx:]
)
assert len(file_names) >= 2, plan_text
return file_names[:2]


def test_spatial_join_reordering_can_be_disabled_e2e(geoarrow_data):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One consideration is the test’s running time: the two Parquet files are around 10 KB, and this test runs in about 0.1 s on my machine.

I couldn’t find another way to perform the e2e test. The sd_random_geometry() table function does not seem to produce statistics, so spatial join reordering cannot occur.

Hope this is okay.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely ok! (Parquet tests use some of the larger files to ensure everything works with realistic input)

path_left = (
geoarrow_data / "natural-earth" / "files" / "natural-earth_cities_geo.parquet"
)
path_right = (
geoarrow_data
/ "natural-earth"
/ "files"
/ "natural-earth_countries_geo.parquet"
)
assert path_left.exists(), f"Missing test asset: {path_left}"
assert path_right.exists(), f"Missing test asset: {path_right}"
Comment on lines +111 to +112
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we have sedonadb.testing.skip_if_not_exists() for this (so the tests can run without the submodule)


with SedonaDB.create_or_skip() as eng_sedonadb:
sql = f"""
SELECT t1.name
FROM '{path_left}' AS t1
JOIN '{path_right}' AS t2
ON ST_Intersects(t1.geometry, t2.geometry)
"""

# Test 1: regular run swaps the join order
plan_text = _plan_text(eng_sedonadb.con.sql(f"EXPLAIN {sql}"))
print(f"Plan with reordering enabled:\n{plan_text}")
assert _spatial_join_side_file_names(plan_text) == [
"natural-earth_countries_geo",
"natural-earth_cities_geo",
], plan_text

result_with_reordering = (
eng_sedonadb.execute_and_collect(sql)
.to_pandas()
.sort_values("name")
.reset_index(drop=True)
)
assert len(result_with_reordering) > 0

# Test 2: with config disabled, join won't reorder
eng_sedonadb.con.sql(
"SET sedona.spatial_join.spatial_join_reordering TO false"
).execute()

plan_text = _plan_text(eng_sedonadb.con.sql(f"EXPLAIN {sql}"))
print(f"Plan with reordering disabled:\n{plan_text}")
assert _spatial_join_side_file_names(plan_text) == [
"natural-earth_cities_geo",
"natural-earth_countries_geo",
], plan_text

result_without_reordering = (
eng_sedonadb.execute_and_collect(sql)
.to_pandas()
.sort_values("name")
.reset_index(drop=True)
)
pd.testing.assert_frame_equal(
result_without_reordering,
result_with_reordering,
)


@pytest.mark.parametrize(
"join_type",
[
Expand Down
5 changes: 5 additions & 0 deletions rust/sedona-common/src/option.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ config_namespace! {
/// locality might cause imbalanced partitions when running out-of-core spatial join.
pub repartition_probe_side: bool, default = true

/// Reorder spatial join inputs to put the smaller input on the build side
/// when statistics are available. If set to `false`, spatial joins
/// preserve the original query order.
pub spatial_join_reordering: bool, default = true

/// Maximum number of sample bounding boxes collected from the index side for partitioning the
/// data when running out-of-core spatial join
pub max_index_side_bbox_samples: usize, default = 10000
Expand Down
29 changes: 23 additions & 6 deletions rust/sedona-spatial-join/src/planner/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use crate::planner::logical_plan_node::SpatialJoinPlanNode;
use crate::planner::probe_shuffle_exec::ProbeShuffleExec;
use crate::planner::spatial_expr_utils::{is_spatial_predicate_supported, transform_join_filter};
use crate::spatial_predicate::SpatialPredicate;
use sedona_common::option::SedonaOptions;
use sedona_common::option::{SedonaOptions, SpatialJoinOptions};

/// Registers a query planner that can produce [`SpatialJoinExec`] from a logical extension node.
pub(crate) fn register_spatial_join_planner(builder: SessionStateBuilder) -> SessionStateBuilder {
Expand Down Expand Up @@ -102,6 +102,7 @@ impl ExtensionPlanner for SpatialJoinExtensionPlanner {
else {
return sedona_internal_err!("SedonaOptions not found in session state extensions");
};
let spatial_join_options = &ext.spatial_join;

if !ext.spatial_join.enable {
return sedona_internal_err!("Spatial join is disabled in SedonaOptions");
Expand Down Expand Up @@ -151,14 +152,18 @@ impl ExtensionPlanner for SpatialJoinExtensionPlanner {

let should_swap = !matches!(spatial_predicate, SpatialPredicate::KNearestNeighbors(_))
&& join_type.supports_swap()
&& should_swap_join_order(physical_left.as_ref(), physical_right.as_ref())?;
&& should_swap_join_order(
spatial_join_options,
physical_left.as_ref(),
physical_right.as_ref(),
)?;

// Repartition the probe side when enabled. This breaks spatial locality in sorted/skewed
// datasets, leading to more balanced workloads during out-of-core spatial join.
// We determine which pre-swap input will be the probe AFTER any potential swap, and
// repartition it here. swap_inputs() will then carry the RepartitionExec to the correct
// child position.
let (physical_left, physical_right) = if ext.spatial_join.repartition_probe_side {
let (physical_left, physical_right) = if spatial_join_options.repartition_probe_side {
repartition_probe_side(
physical_left,
physical_right,
Expand All @@ -176,7 +181,7 @@ impl ExtensionPlanner for SpatialJoinExtensionPlanner {
remainder,
join_type,
None,
&ext.spatial_join,
spatial_join_options,
)?;

if should_swap {
Expand All @@ -192,8 +197,20 @@ impl ExtensionPlanner for SpatialJoinExtensionPlanner {
/// produce a smaller and more efficient spatial index (R-tree).
/// 2. If row-count statistics are unavailable (for example, for CSV sources),
/// fall back to total input size as an estimate.
/// 3. Do not swap the join order if no relevant statistics are available.
fn should_swap_join_order(left: &dyn ExecutionPlan, right: &dyn ExecutionPlan) -> Result<bool> {
/// 3. Do not swap the join order if join reordering is disabled or no relevant
/// statistics are available.
fn should_swap_join_order(
spatial_join_options: &SpatialJoinOptions,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use SpatialJoinOptions arg instead of a boolean flag, since this function will likely require additional options in the future and this approach is more extensible.

left: &dyn ExecutionPlan,
right: &dyn ExecutionPlan,
) -> Result<bool> {
if !spatial_join_options.spatial_join_reordering {
log::info!(
"spatial join swap heuristic disabled via sedona.spatial_join.spatial_join_reordering"
);
return Ok(false);
}

let left_stats = left.partition_statistics(None)?;
let right_stats = right.partition_statistics(None)?;

Expand Down
33 changes: 31 additions & 2 deletions rust/sedona-spatial-join/tests/spatial_join_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ fn single_row_table(schema: SchemaRef, id: i32, marker: &str) -> Result<Arc<dyn
// Keep the data fixed and vary only the advertised stats so the planner swap
// decision is explained entirely by the heuristic under test.
async fn assert_build_side_from_stats(
options: SpatialJoinOptions,
left_num_rows: Option<usize>,
right_num_rows: Option<usize>,
left_total_byte_size: Option<usize>,
Expand All @@ -309,7 +310,7 @@ async fn assert_build_side_from_stats(
stats_with(right_schema.as_ref(), right_num_rows, right_total_byte_size),
));

let ctx = setup_context(Some(SpatialJoinOptions::default()), 10)?;
let ctx = setup_context(Some(options), 10)?;
ctx.register_table("L", left_provider)?;
ctx.register_table("R", right_provider)?;

Expand Down Expand Up @@ -692,6 +693,7 @@ async fn test_spatial_join_swap_inputs_produces_same_plan(
// smaller-row input on the build side even if it is larger by byte size.
async fn test_spatial_join_reordering_uses_row_count() -> Result<()> {
assert_build_side_from_stats(
SpatialJoinOptions::default(),
Some(100),
Some(10),
Some(100),
Expand All @@ -706,6 +708,7 @@ async fn test_spatial_join_reordering_uses_row_count() -> Result<()> {
// smaller-bytes input on the build side.
async fn test_spatial_join_reordering_uses_size_fallback() -> Result<()> {
assert_build_side_from_stats(
SpatialJoinOptions::default(),
None,
None,
Some(10_000),
Expand All @@ -719,7 +722,33 @@ async fn test_spatial_join_reordering_uses_size_fallback() -> Result<()> {
// When both row count and size are absent, the planner preserves the original
// join order.
async fn test_spatial_join_reordering_preserves_order_without_stats() -> Result<()> {
assert_build_side_from_stats(None, None, None, None, OriginalInputSide::Left).await
assert_build_side_from_stats(
SpatialJoinOptions::default(),
None,
None,
None,
None,
OriginalInputSide::Left,
)
.await
}

#[tokio::test]
// When join reordering is disabled, the planner preserves the original join
// order even if statistics would normally trigger a swap.
async fn test_spatial_join_reordering_can_be_disabled() -> Result<()> {
assert_build_side_from_stats(
SpatialJoinOptions {
spatial_join_reordering: false,
..Default::default()
},
Some(100),
Some(10),
Some(100),
Some(10_000),
OriginalInputSide::Left,
)
.await
}

#[tokio::test]
Expand Down
Loading