Skip to content

Commit 124007c

Browse files
committed
Implemented fast wkb_to_geos
1 parent 5b7fc54 commit 124007c

File tree

7 files changed

+289
-5
lines changed

7 files changed

+289
-5
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ num-traits = { version = "0.2", default-features = false, features = ["libm"] }
8989
mimalloc = { version = "0.1", default-features = false }
9090
libmimalloc-sys = { version = "0.1", default-features = false }
9191

92-
geos = { version = "10.0.0", features = ["geo"] }
92+
geos = { version = "10.0.0", features = ["geo", "v3_10_0"] }
9393

9494
geo-types = "0.7.17"
9595
geo-traits = "0.3.0"

c/sedona-geos/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,9 @@ sedona-expr = { path = "../../rust/sedona-expr" }
4242
sedona-functions = { path = "../../rust/sedona-functions" }
4343
sedona-geometry = { path = "../../rust/sedona-geometry" }
4444
sedona-schema = { path = "../../rust/sedona-schema" }
45+
geo-traits = { workspace = true }
4546
wkb = { workspace = true }
47+
byteorder = { workspace = true }
4648

4749
[[bench]]
4850
harness = false

c/sedona-geos/src/executor.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,15 @@
1717
use datafusion_common::{DataFusionError, Result};
1818
use sedona_functions::executor::{GenericExecutor, GeometryFactory};
1919

20+
use crate::wkb_to_geos::GEOSWkbFactory;
21+
2022
/// A [GenericExecutor] that iterates over [geos::Geometry]
2123
pub type GeosExecutor<'a, 'b> = GenericExecutor<'a, 'b, GeosGeometryFactory, GeosGeometryFactory>;
2224

2325
/// [GeometryFactory] implementation for iterating over [geos::Geometry]
2426
#[derive(Default)]
2527
pub struct GeosGeometryFactory {
26-
inner: wkb::reader::to_geos::GEOSWkbFactory,
28+
inner: GEOSWkbFactory,
2729
}
2830

2931
impl GeometryFactory for GeosGeometryFactory {

c/sedona-geos/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,3 +27,4 @@ mod st_convexhull;
2727
mod st_dwithin;
2828
mod st_length;
2929
mod st_perimeter;
30+
pub mod wkb_to_geos;

c/sedona-geos/src/wkb_to_geos.rs

Lines changed: 270 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,270 @@
1+
use std::cell::RefCell;
2+
3+
use byteorder::{BigEndian, ByteOrder, LittleEndian};
4+
use geo_traits::*;
5+
use geos::GResult;
6+
use wkb::{reader::*, Endianness};
7+
8+
/// A factory for converting WKB to GEOS geometries.
9+
///
10+
/// This factory uses a scratch buffer to store intermediate coordinate data.
11+
/// The scratch buffer is reused for each conversion, which reduces memory allocation
12+
/// overhead.
13+
pub struct GEOSWkbFactory {
14+
scratch: RefCell<Vec<f64>>,
15+
}
16+
17+
impl Default for GEOSWkbFactory {
18+
fn default() -> Self {
19+
Self::new()
20+
}
21+
}
22+
23+
impl GEOSWkbFactory {
24+
/// Create a new GEOSWkbFactory.
25+
pub fn new() -> Self {
26+
Self {
27+
scratch: RefCell::new(Vec::new()),
28+
}
29+
}
30+
31+
/// Create a GEOS geometry from a WKB.
32+
pub fn create(&self, wkb: &Wkb) -> GResult<geos::Geometry> {
33+
let scratch = &mut self.scratch.borrow_mut();
34+
geometry_to_geos(scratch, wkb)
35+
}
36+
}
37+
38+
fn geometry_to_geos(scratch: &mut Vec<f64>, wkb: &Wkb) -> GResult<geos::Geometry> {
39+
let geom = wkb.as_type();
40+
match geom {
41+
geo_traits::GeometryType::Point(p) => point_to_geos(scratch, p),
42+
geo_traits::GeometryType::LineString(ls) => line_string_to_geos(scratch, ls),
43+
geo_traits::GeometryType::Polygon(poly) => polygon_to_geos(scratch, poly),
44+
geo_traits::GeometryType::MultiPoint(mp) => multi_point_to_geos(scratch, mp),
45+
geo_traits::GeometryType::MultiLineString(mls) => multi_line_string_to_geos(scratch, mls),
46+
geo_traits::GeometryType::MultiPolygon(mpoly) => multi_polygon_to_geos(scratch, mpoly),
47+
geo_traits::GeometryType::GeometryCollection(gc) => {
48+
geometry_collection_to_geos(scratch, gc)
49+
}
50+
_ => Err(geos::Error::ConversionError(
51+
"Unsupported geometry type".to_string(),
52+
)),
53+
}
54+
}
55+
56+
fn point_to_geos(scratch: &mut Vec<f64>, p: &Point) -> GResult<geos::Geometry> {
57+
if p.is_empty() {
58+
geos::Geometry::create_empty_point()
59+
} else {
60+
let coord_seq = create_coord_sequence_from_raw_parts(
61+
p.coord_slice(),
62+
p.dimension(),
63+
p.byte_order(),
64+
1,
65+
scratch,
66+
)?;
67+
let point = geos::Geometry::create_point(coord_seq)?;
68+
Ok(point)
69+
}
70+
}
71+
72+
fn line_string_to_geos(scratch: &mut Vec<f64>, ls: &LineString) -> GResult<geos::Geometry> {
73+
let num_points = ls.num_coords();
74+
if num_points == 0 {
75+
geos::Geometry::create_empty_line_string()
76+
} else {
77+
let coord_seq = create_coord_sequence_from_raw_parts(
78+
ls.coords_slice(),
79+
ls.dimension(),
80+
ls.byte_order(),
81+
num_points,
82+
scratch,
83+
)?;
84+
geos::Geometry::create_line_string(coord_seq)
85+
}
86+
}
87+
88+
fn polygon_to_geos(scratch: &mut Vec<f64>, poly: &Polygon) -> GResult<geos::Geometry> {
89+
// Create exterior ring
90+
let exterior = if let Some(ring) = poly.exterior() {
91+
let coord_seq = create_coord_sequence_from_raw_parts(
92+
ring.coords_slice(),
93+
ring.dimension(),
94+
ring.byte_order(),
95+
ring.num_coords(),
96+
scratch,
97+
)?;
98+
geos::Geometry::create_linear_ring(coord_seq)?
99+
} else {
100+
return geos::Geometry::create_empty_polygon();
101+
};
102+
103+
// Create interior rings
104+
let num_interiors = poly.num_interiors();
105+
let mut interior_rings = Vec::with_capacity(num_interiors);
106+
for i in 0..num_interiors {
107+
let ring = poly.interior(i).unwrap();
108+
let coord_seq = create_coord_sequence_from_raw_parts(
109+
ring.coords_slice(),
110+
ring.dimension(),
111+
ring.byte_order(),
112+
ring.num_coords(),
113+
scratch,
114+
)?;
115+
let interior_ring = geos::Geometry::create_linear_ring(coord_seq)?;
116+
interior_rings.push(interior_ring);
117+
}
118+
119+
geos::Geometry::create_polygon(exterior, interior_rings)
120+
}
121+
122+
fn multi_point_to_geos(scratch: &mut Vec<f64>, mp: &MultiPoint) -> GResult<geos::Geometry> {
123+
let num_points = mp.num_points();
124+
if num_points == 0 {
125+
// Create an empty multi-point by creating a geometry collection with no geometries
126+
geos::Geometry::create_empty_collection(geos::GeometryTypes::MultiPoint)
127+
} else {
128+
let mut points = Vec::with_capacity(num_points);
129+
for i in 0..num_points {
130+
let point = unsafe { mp.point_unchecked(i) };
131+
let geos_point = point_to_geos(scratch, &point)?;
132+
points.push(geos_point);
133+
}
134+
geos::Geometry::create_multipoint(points)
135+
}
136+
}
137+
138+
fn multi_line_string_to_geos(
139+
scratch: &mut Vec<f64>,
140+
mls: &MultiLineString,
141+
) -> GResult<geos::Geometry> {
142+
let num_line_strings = mls.num_line_strings();
143+
if num_line_strings == 0 {
144+
geos::Geometry::create_empty_collection(geos::GeometryTypes::MultiLineString)
145+
} else {
146+
let mut line_strings = Vec::with_capacity(num_line_strings);
147+
for i in 0..num_line_strings {
148+
let ls = unsafe { mls.line_string_unchecked(i) };
149+
let geos_line_string = line_string_to_geos(scratch, ls)?;
150+
line_strings.push(geos_line_string);
151+
}
152+
geos::Geometry::create_multiline_string(line_strings)
153+
}
154+
}
155+
156+
fn multi_polygon_to_geos(scratch: &mut Vec<f64>, mpoly: &MultiPolygon) -> GResult<geos::Geometry> {
157+
let num_polygons = mpoly.num_polygons();
158+
if num_polygons == 0 {
159+
geos::Geometry::create_empty_collection(geos::GeometryTypes::MultiPolygon)
160+
} else {
161+
let mut polygons = Vec::with_capacity(num_polygons);
162+
for i in 0..num_polygons {
163+
let poly = unsafe { mpoly.polygon_unchecked(i) };
164+
let geos_polygon = polygon_to_geos(scratch, poly)?;
165+
polygons.push(geos_polygon);
166+
}
167+
geos::Geometry::create_multipolygon(polygons)
168+
}
169+
}
170+
171+
fn geometry_collection_to_geos(
172+
scratch: &mut Vec<f64>,
173+
gc: &GeometryCollection,
174+
) -> GResult<geos::Geometry> {
175+
if gc.num_geometries() == 0 {
176+
geos::Geometry::create_empty_collection(geos::GeometryTypes::GeometryCollection)
177+
} else {
178+
let num_geometries = gc.num_geometries();
179+
let mut geometries = Vec::with_capacity(num_geometries);
180+
for i in 0..num_geometries {
181+
let geom = gc.geometry(i).unwrap();
182+
let geos_geom = geometry_to_geos(scratch, geom)?;
183+
geometries.push(geos_geom);
184+
}
185+
geos::Geometry::create_geometry_collection(geometries)
186+
}
187+
}
188+
189+
const NATIVE_ENDIANNESS: Endianness = if cfg!(target_endian = "big") {
190+
Endianness::BigEndian
191+
} else {
192+
Endianness::LittleEndian
193+
};
194+
195+
fn create_coord_sequence_from_raw_parts(
196+
buf: &[u8],
197+
dim: Dimension,
198+
byte_order: Endianness,
199+
num_coords: usize,
200+
scratch: &mut Vec<f64>,
201+
) -> GResult<geos::CoordSeq> {
202+
let (has_z, has_m, dim_size) = match dim {
203+
Dimension::Xy => (false, false, 2),
204+
Dimension::Xyz => (true, false, 3),
205+
Dimension::Xym => (false, true, 3),
206+
Dimension::Xyzm => (true, true, 4),
207+
};
208+
let num_ordinates = dim_size * num_coords;
209+
210+
// If the byte order matches native endianness, we can potentially use zero-copy
211+
if byte_order == NATIVE_ENDIANNESS {
212+
let ptr = buf.as_ptr();
213+
214+
// On platforms with unaligned memory access support, we can construct the coord seq
215+
// directly from the raw parts without copying to the scratch buffer.
216+
#[cfg(any(target_arch = "aarch64", target_arch = "x86_64"))]
217+
{
218+
let coords_f64 =
219+
unsafe { &*core::ptr::slice_from_raw_parts(ptr as *const f64, num_ordinates) };
220+
geos::CoordSeq::new_from_buffer(coords_f64, num_coords, has_z, has_m)
221+
}
222+
223+
// On platforms without unaligned memory access support, we need to copy the data to the
224+
// scratch buffer to make sure the data is aligned.
225+
#[cfg(not(any(target_arch = "aarch64", target_arch = "x86_64")))]
226+
{
227+
unsafe {
228+
scratch.clear();
229+
scratch.reserve(num_ordinates);
230+
scratch.set_len(num_ordinates);
231+
std::ptr::copy_nonoverlapping(
232+
ptr,
233+
scratch.as_mut_ptr() as *mut u8,
234+
num_ordinates * std::mem::size_of::<f64>(),
235+
);
236+
geos::CoordSeq::new_from_buffer(scratch.as_slice(), num_coords, has_z, has_m)
237+
}
238+
}
239+
} else {
240+
// Need to convert byte order
241+
match byte_order {
242+
Endianness::BigEndian => {
243+
save_f64_to_scratch::<BigEndian>(scratch, buf, num_ordinates);
244+
}
245+
Endianness::LittleEndian => {
246+
save_f64_to_scratch::<LittleEndian>(scratch, buf, num_ordinates);
247+
}
248+
}
249+
geos::CoordSeq::new_from_buffer(scratch.as_slice(), num_coords, has_z, has_m)
250+
}
251+
}
252+
253+
fn save_f64_to_scratch<B: ByteOrder>(scratch: &mut Vec<f64>, buf: &[u8], num_ordinates: usize) {
254+
scratch.clear();
255+
scratch.reserve(num_ordinates);
256+
// Safety: we have already reserved the capacity, so we can set the length safely.
257+
// Justification: rewriting the loop to not use Vec::push makes it many times faster,
258+
// since it eliminates several memory loads and stores for vector's length and capacity,
259+
// and it enables the compiler to generate vectorized code.
260+
#[allow(clippy::uninit_vec)]
261+
unsafe {
262+
scratch.set_len(num_ordinates);
263+
}
264+
assert!(num_ordinates * 8 <= buf.len());
265+
for (i, tgt) in scratch.iter_mut().enumerate().take(num_ordinates) {
266+
let offset = i * 8;
267+
let value = B::read_f64(&buf[offset..]);
268+
*tgt = value;
269+
}
270+
}

rust/sedona-functions/src/st_geomfromwkt.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ use sedona_schema::{
2929
datatypes::{SedonaType, WKB_GEOGRAPHY, WKB_GEOMETRY},
3030
matchers::ArgMatcher,
3131
};
32-
use wkb::writer::write_geometry;
32+
use wkb::writer::{write_geometry, WriteOptions};
33+
use wkb::Endianness;
3334
use wkt::Wkt;
3435

3536
use crate::executor::WkbExecutor;
@@ -128,8 +129,14 @@ fn invoke_scalar(wkt_bytes: &str, builder: &mut BinaryBuilder) -> Result<()> {
128129
let geometry: Wkt<f64> = Wkt::from_str(wkt_bytes)
129130
.map_err(|err| DataFusionError::Internal(format!("WKT parse error: {err}")))?;
130131

131-
write_geometry(builder, &geometry, wkb::Endianness::LittleEndian)
132-
.map_err(|err| DataFusionError::Internal(format!("WKB write error: {err}")))
132+
write_geometry(
133+
builder,
134+
&geometry,
135+
&WriteOptions {
136+
endianness: Endianness::LittleEndian,
137+
},
138+
)
139+
.map_err(|err| DataFusionError::Internal(format!("WKB write error: {err}")))
133140
}
134141

135142
#[cfg(test)]

0 commit comments

Comments
 (0)