Skip to content

Commit 7ceb6b1

Browse files
authored
perf: Cache geometries in KNN queries to eliminate redundant WKB conversions (#53)
1 parent 3e9b7a9 commit 7ceb6b1

File tree

2 files changed

+282
-18
lines changed

2 files changed

+282
-18
lines changed

benchmarks/test_knn.py

Lines changed: 226 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,226 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
import json
18+
import pytest
19+
from test_bench_base import TestBenchBase
20+
from sedonadb.testing import SedonaDB
21+
22+
23+
class TestBenchKNN(TestBenchBase):
24+
def setup_class(self):
25+
"""Setup test data for KNN benchmarks"""
26+
self.sedonadb = SedonaDB.create_or_skip()
27+
28+
# Create building-like polygons (index side - fewer, larger geometries)
29+
# Note: Dataset sizes are limited to avoid performance issues observed when processing
30+
# very large synthetic datasets. Large synthetic datasets have been observed to cause
31+
# memory pressure or performance degradation in DataFusion operations.
32+
building_options = {
33+
"geom_type": "Polygon",
34+
"target_rows": 2_000, # Reasonable size for benchmarking
35+
"vertices_per_linestring_range": [4, 8],
36+
"size_range": [0.001, 0.01],
37+
"seed": 42,
38+
}
39+
40+
building_query = f"""
41+
SELECT
42+
geometry as geom,
43+
round(random() * 1000) as building_id,
44+
'Building_' || cast(round(random() * 1000) as varchar) as name
45+
FROM sd_random_geometry('{json.dumps(building_options)}')
46+
"""
47+
building_tab = self.sedonadb.execute_and_collect(building_query)
48+
self.sedonadb.create_table_arrow("knn_buildings", building_tab)
49+
50+
# Create trip pickup points (probe side - many small geometries)
51+
trip_options = {
52+
"geom_type": "Point",
53+
"target_rows": 10_000,
54+
"seed": 43,
55+
}
56+
57+
trip_query = f"""
58+
SELECT
59+
geometry as geom,
60+
round(random() * 100000) as trip_id
61+
FROM sd_random_geometry('{json.dumps(trip_options)}')
62+
"""
63+
trip_tab = self.sedonadb.execute_and_collect(trip_query)
64+
self.sedonadb.create_table_arrow("knn_trips", trip_tab)
65+
66+
# Create a smaller test dataset for quick tests
67+
small_building_query = """
68+
SELECT * FROM knn_buildings LIMIT 1000
69+
"""
70+
small_building_tab = self.sedonadb.execute_and_collect(small_building_query)
71+
self.sedonadb.create_table_arrow("knn_buildings_small", small_building_tab)
72+
73+
small_trip_query = """
74+
SELECT * FROM knn_trips LIMIT 5000
75+
"""
76+
small_trip_tab = self.sedonadb.execute_and_collect(small_trip_query)
77+
self.sedonadb.create_table_arrow("knn_trips_small", small_trip_tab)
78+
79+
@pytest.mark.parametrize("k", [1, 5, 10])
80+
@pytest.mark.parametrize("use_spheroid", [False, True])
81+
@pytest.mark.parametrize("dataset_size", ["small", "large"])
82+
def test_knn_performance(self, benchmark, k, use_spheroid, dataset_size):
83+
"""Benchmark KNN query performance with different parameters"""
84+
85+
if dataset_size == "small":
86+
trip_table = "knn_trips_small"
87+
building_table = "knn_buildings_small"
88+
trip_limit = 100 # Test with 100 trips
89+
else:
90+
trip_table = "knn_trips_small"
91+
building_table = "knn_buildings"
92+
trip_limit = 500
93+
94+
spheroid_str = "TRUE" if use_spheroid else "FALSE"
95+
96+
def run_knn_query():
97+
query = f"""
98+
WITH trip_sample AS (
99+
SELECT trip_id, geom as trip_geom
100+
FROM {trip_table}
101+
LIMIT {trip_limit}
102+
),
103+
building_with_geom AS (
104+
SELECT building_id, name, geom as building_geom
105+
FROM {building_table}
106+
)
107+
SELECT
108+
t.trip_id,
109+
b.building_id,
110+
b.name,
111+
ST_Distance(t.trip_geom, b.building_geom) as distance
112+
FROM trip_sample t
113+
JOIN building_with_geom b ON ST_KNN(t.trip_geom, b.building_geom, {k}, {spheroid_str})
114+
ORDER BY t.trip_id, distance
115+
"""
116+
result = self.sedonadb.execute_and_collect(query)
117+
return len(result) # Return result count for verification
118+
119+
# Run the benchmark
120+
result_count = benchmark(run_knn_query)
121+
122+
# Verify we got the expected number of results (trips * k)
123+
expected_count = trip_limit * k
124+
assert result_count == expected_count, (
125+
f"Expected {expected_count} results, got {result_count}"
126+
)
127+
128+
@pytest.mark.parametrize("k", [1, 5, 10, 20])
129+
def test_knn_scalability_by_k(self, benchmark, k):
130+
"""Test how KNN performance scales with increasing k values"""
131+
132+
def run_knn_query():
133+
query = f"""
134+
WITH trip_sample AS (
135+
SELECT trip_id, geom as trip_geom
136+
FROM knn_trips_small
137+
LIMIT 50 -- Small sample for k scaling test
138+
)
139+
SELECT
140+
COUNT(*) as result_count
141+
FROM trip_sample t
142+
JOIN knn_buildings_small b ON ST_KNN(t.trip_geom, b.geom, {k}, FALSE)
143+
"""
144+
result = self.sedonadb.execute_and_collect(query)
145+
return result.to_pandas().iloc[0]["result_count"]
146+
147+
result_count = benchmark(run_knn_query)
148+
expected_count = 50 * k # 50 trips * k neighbors each
149+
assert result_count == expected_count, (
150+
f"Expected {expected_count} results, got {result_count}"
151+
)
152+
153+
def test_knn_correctness(self):
154+
"""Verify KNN returns results in correct distance order"""
155+
156+
# Test with a known point and verify ordering
157+
query = """
158+
WITH test_point AS (
159+
SELECT ST_Point(0.0, 0.0) as query_geom
160+
)
161+
SELECT
162+
ST_Distance(test_point.query_geom, b.geom) as distance,
163+
b.building_id
164+
FROM test_point
165+
JOIN knn_buildings_small b ON ST_KNN(test_point.query_geom, b.geom, 5, FALSE)
166+
ORDER BY distance
167+
"""
168+
169+
result = self.sedonadb.execute_and_collect(query).to_pandas()
170+
171+
# Verify we got 5 results
172+
assert len(result) == 5, f"Expected 5 results, got {len(result)}"
173+
174+
# Verify distances are in ascending order
175+
distances = result["distance"].tolist()
176+
assert distances == sorted(distances), (
177+
f"Results not ordered by distance: {distances}"
178+
)
179+
180+
# Verify all distances are non-negative
181+
assert all(d >= 0 for d in distances), f"Found negative distances: {distances}"
182+
183+
def test_knn_tie_breaking(self):
184+
"""Test KNN behavior with tie-breaking when geometries have equal distances"""
185+
186+
# Create test data with known equal distances
187+
setup_query = """
188+
WITH test_points AS (
189+
SELECT 1 as id, ST_Point(1.0, 0.0) as geom
190+
UNION ALL
191+
SELECT 2 as id, ST_Point(-1.0, 0.0) as geom
192+
UNION ALL
193+
SELECT 3 as id, ST_Point(0.0, 1.0) as geom
194+
UNION ALL
195+
SELECT 4 as id, ST_Point(0.0, -1.0) as geom
196+
UNION ALL
197+
SELECT 5 as id, ST_Point(2.0, 0.0) as geom
198+
)
199+
SELECT * FROM test_points
200+
"""
201+
tie_test_tab = self.sedonadb.execute_and_collect(setup_query)
202+
self.sedonadb.create_table_arrow("knn_tie_test", tie_test_tab)
203+
204+
# Query for 2 nearest neighbors from origin - should get 2 of the 4 equidistant points
205+
query = """
206+
WITH query_point AS (
207+
SELECT ST_Point(0.0, 0.0) as geom
208+
)
209+
SELECT
210+
t.id,
211+
ST_Distance(query_point.geom, t.geom) as distance
212+
FROM query_point
213+
JOIN knn_tie_test t ON ST_KNN(query_point.geom, t.geom, 2, FALSE)
214+
ORDER BY distance, t.id
215+
"""
216+
217+
result = self.sedonadb.execute_and_collect(query).to_pandas()
218+
219+
# Should get exactly 2 results
220+
assert len(result) == 2, f"Expected 2 results, got {len(result)}"
221+
222+
# Both should be at distance 1.0 (the 4 equidistant points)
223+
distances = result["distance"].tolist()
224+
assert all(abs(d - 1.0) < 1e-6 for d in distances), (
225+
f"Expected distances ~1.0, got {distances}"
226+
)

rust/sedona-spatial-join/src/index.rs

Lines changed: 56 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,32 @@ impl SpatialIndexBuilder {
235235
geom_idx_vec
236236
}
237237

238+
/// Build cached geometries for KNN queries to avoid repeated WKB conversions
239+
/// Returns both geometries and total WKB size for memory estimation
240+
fn build_cached_geometries(indexed_batches: &[IndexedBatch]) -> (Vec<Geometry<f64>>, usize) {
241+
let mut geometries = Vec::new();
242+
let mut total_wkb_size = 0;
243+
244+
for indexed_batch in indexed_batches.iter() {
245+
for wkb_opt in indexed_batch.geom_array.wkbs().iter() {
246+
if let Some(wkb) = wkb_opt.as_ref() {
247+
if let Ok(geom) = item_to_geometry(wkb) {
248+
geometries.push(geom);
249+
total_wkb_size += wkb.buf().len();
250+
}
251+
}
252+
}
253+
}
254+
255+
(geometries, total_wkb_size)
256+
}
257+
258+
/// Estimate the memory usage of cached geometries based on WKB size with overhead
259+
fn estimate_geometry_memory(wkb_size: usize) -> usize {
260+
// Use WKB size as base + overhead for geo::Geometry objects
261+
wkb_size * 2
262+
}
263+
238264
/// Finish building and return the completed SpatialIndex.
239265
pub fn finish(mut self, schema: SchemaRef) -> Result<SpatialIndex> {
240266
if self.indexed_batches.is_empty() {
@@ -271,6 +297,16 @@ impl SpatialIndexBuilder {
271297
ConcurrentReservation::try_new(REFINER_RESERVATION_PREALLOC_SIZE, refiner_reservation)
272298
.unwrap();
273299

300+
// Pre-compute geometries for KNN queries to avoid repeated WKB-to-geometry conversions
301+
let (cached_geometries, total_wkb_size) =
302+
Self::build_cached_geometries(&self.indexed_batches);
303+
304+
// Reserve memory for cached geometries using WKB size with overhead
305+
let geometry_memory_estimate = Self::estimate_geometry_memory(total_wkb_size);
306+
let geometry_consumer = MemoryConsumer::new("SpatialJoinGeometryCache");
307+
let mut geometry_reservation = geometry_consumer.register(&self.memory_pool);
308+
geometry_reservation.try_grow(geometry_memory_estimate)?;
309+
274310
Ok(SpatialIndex {
275311
schema,
276312
evaluator,
@@ -283,6 +319,8 @@ impl SpatialIndexBuilder {
283319
visited_left_side,
284320
probe_threads_counter: AtomicUsize::new(self.probe_threads_count),
285321
reservation: self.reservation,
322+
cached_geometries,
323+
cached_geometry_reservation: geometry_reservation,
286324
})
287325
}
288326
}
@@ -331,6 +369,15 @@ pub(crate) struct SpatialIndex {
331369
/// Cleared on `SpatialIndex` drop
332370
#[expect(dead_code)]
333371
reservation: MemoryReservation,
372+
373+
/// Cached vector of geometries for KNN queries to avoid repeated WKB-to-geometry conversions
374+
/// This is computed once during index building for performance optimization
375+
cached_geometries: Vec<Geometry<f64>>,
376+
377+
/// Memory reservation for tracking the memory usage of cached geometries
378+
/// Cleared on `SpatialIndex` drop
379+
#[expect(dead_code)]
380+
cached_geometry_reservation: MemoryReservation,
334381
}
335382

336383
/// Indexed batch containing the original record batch and the evaluated geometry array.
@@ -385,6 +432,7 @@ impl SpatialIndex {
385432
);
386433
let refiner_reservation = reservation.split(0);
387434
let refiner_reservation = ConcurrentReservation::try_new(0, refiner_reservation).unwrap();
435+
let cached_geometry_reservation = reservation.split(0);
388436
let rtree = RTreeBuilder::<f32>::new(0).finish::<HilbertSort>();
389437
Self {
390438
schema,
@@ -398,6 +446,8 @@ impl SpatialIndex {
398446
visited_left_side: None,
399447
probe_threads_counter,
400448
reservation,
449+
cached_geometries: Vec::new(),
450+
cached_geometry_reservation,
401451
}
402452
}
403453

@@ -507,20 +557,8 @@ impl SpatialIndex {
507557
}
508558
};
509559

510-
// Create a vector of geometries for all indexed items
511-
let mut geometries: Vec<Geometry<f64>> = Vec::new();
512-
let mut geometry_to_position: Vec<(i32, i32)> = Vec::new();
513-
514-
for (batch_idx, indexed_batch) in self.indexed_batches.iter().enumerate() {
515-
for (row_idx, wkb_opt) in indexed_batch.geom_array.wkbs().iter().enumerate() {
516-
if let Some(wkb) = wkb_opt.as_ref() {
517-
if let Ok(geom) = item_to_geometry(wkb) {
518-
geometries.push(geom);
519-
geometry_to_position.push((batch_idx as i32, row_idx as i32));
520-
}
521-
}
522-
}
523-
}
560+
// Use pre-computed cached geometries for performance
561+
let geometries = &self.cached_geometries;
524562

525563
if geometries.is_empty() {
526564
return Ok(JoinResultMetrics {
@@ -544,7 +582,7 @@ impl SpatialIndex {
544582
Some(k as usize),
545583
None, // no max_distance filter
546584
distance_metric.as_ref(),
547-
&geometries,
585+
geometries,
548586
);
549587

550588
if initial_results.is_empty() {
@@ -657,10 +695,10 @@ impl SpatialIndex {
657695
}
658696
}
659697

660-
// Convert results to build_batch_positions
698+
// Convert results to build_batch_positions using existing data_id_to_batch_pos mapping
661699
for &result_idx in &final_results {
662-
if (result_idx as usize) < geometry_to_position.len() {
663-
build_batch_positions.push(geometry_to_position[result_idx as usize]);
700+
if (result_idx as usize) < self.data_id_to_batch_pos.len() {
701+
build_batch_positions.push(self.data_id_to_batch_pos[result_idx as usize]);
664702
}
665703
}
666704

0 commit comments

Comments
 (0)