Skip to content

Commit 0161adb

Browse files
committed
feat: implement extension mechanism and point encoding
1 parent 49e45da commit 0161adb

File tree

6 files changed

+78
-18
lines changed

6 files changed

+78
-18
lines changed

.cargo/config.toml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +0,0 @@
1-
[target.x86_64-unknown-linux-gnu]
2-
linker = "clang"
3-
rustflags = ["-C", "link-arg=-fuse-ld=lld"]

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

arrow-pg/Cargo.toml

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,20 +16,21 @@ rust-version.workspace = true
1616
default = ["arrow", "geo"]
1717
arrow = ["dep:arrow"]
1818
datafusion = ["dep:datafusion"]
19-
geo = ["postgres-types/with-geo-types-0_7", "dep:geoarrow", "dep:geoarrow-schema"]
19+
geo = ["postgres-types/with-geo-types-0_7", "dep:geoarrow", "dep:geoarrow-schema", "dep:wkb"]
2020

2121
[dependencies]
2222
arrow = { workspace = true, optional = true }
2323
arrow-schema = { workspace = true}
24-
geoarrow = { version = "0.7", optional = true }
25-
geoarrow-schema = { version = "0.7", optional = true }
2624
bytes.workspace = true
2725
chrono.workspace = true
2826
datafusion = { workspace = true, optional = true }
2927
futures.workspace = true
28+
geoarrow = { version = "0.7", optional = true }
29+
geoarrow-schema = { version = "0.7", optional = true }
3030
pgwire = { workspace = true, default-features = false, features = ["server-api", "pg-ext-types"] }
3131
postgres-types.workspace = true
3232
rust_decimal.workspace = true
33+
wkb = { version = "0.9", optional = true }
3334

3435
[dev-dependencies]
3536
async-trait = "0.1"

arrow-pg/src/encoder.rs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@ use std::sync::Arc;
55

66
#[cfg(not(feature = "datafusion"))]
77
use arrow::{array::*, datatypes::*};
8-
#[cfg(feature = "geo")]
9-
use arrow_schema::extension::ExtensionType;
108
use bytes::BufMut;
119
use bytes::BytesMut;
1210
use chrono::NaiveTime;
@@ -22,6 +20,8 @@ use rust_decimal::Decimal;
2220
use timezone::Tz;
2321

2422
use crate::error::ToSqlError;
23+
#[cfg(feature = "geo")]
24+
use crate::geo_encoder::encode_geo;
2525
use crate::list_encoder::encode_list;
2626
use crate::struct_encoder::encode_struct;
2727

@@ -293,21 +293,21 @@ pub fn encode_value<T: Encoder>(
293293
let arrow_type = arrow_field.data_type();
294294

295295
#[cfg(feature = "geo")]
296-
if let Some(geoarrow_type) = geoarrow_schema::GeoArrowType::from_extension_field(&arrow_field)
296+
if let Some(geoarrow_type) = geoarrow_schema::GeoArrowType::from_extension_field(arrow_field)
297297
.map_err(|e| PgWireError::ApiError(Box::new(e)))?
298298
{
299-
use geoarrow::array::AsGeoArrowArray;
300-
301299
let geoarrow_array: Arc<dyn geoarrow::array::GeoArrowArray> =
302300
geoarrow::array::from_arrow_array(arr, arrow_field)
303301
.map_err(|e| PgWireError::ApiError(Box::new(e)))?;
304-
match geoarrow_type {
305-
geoarrow_schema::GeoArrowType::Point(_) => {
306-
let array: &geoarrow::array::PointArray = geoarrow_array.as_point();
307-
// encode pointarray
308-
}
309-
_ => todo!("handle other geometry types"),
310-
}
302+
303+
return encode_geo(
304+
encoder,
305+
geoarrow_type,
306+
&geoarrow_array,
307+
idx,
308+
arrow_field,
309+
pg_field,
310+
);
311311
}
312312

313313
match arrow_type {

arrow-pg/src/geo_encoder.rs

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
use bytes::BytesMut;
2+
use std::sync::Arc;
3+
4+
#[cfg(not(feature = "datafusion"))]
5+
use arrow::datatypes::*;
6+
#[cfg(feature = "datafusion")]
7+
use datafusion::arrow::datatypes::*;
8+
use geoarrow::array::{AsGeoArrowArray, GeoArrowArray, GeoArrowArrayAccessor};
9+
use geoarrow_schema::GeoArrowType;
10+
use pgwire::api::results::FieldInfo;
11+
use pgwire::error::{PgWireError, PgWireResult};
12+
use wkb::{writer::WriteOptions, Endianness};
13+
14+
use crate::encoder::{EncodedValue, Encoder};
15+
16+
pub fn encode_geo<T: Encoder>(
17+
encoder: &mut T,
18+
geoarrow_type: GeoArrowType,
19+
arr: &Arc<dyn geoarrow::array::GeoArrowArray>,
20+
idx: usize,
21+
_arrow_field: &Field,
22+
pg_field: &FieldInfo,
23+
) -> PgWireResult<()> {
24+
match geoarrow_type {
25+
geoarrow_schema::GeoArrowType::Point(_) => {
26+
let array: &geoarrow::array::PointArray = arr.as_point();
27+
encode_point(encoder, array, idx, pg_field)?;
28+
}
29+
_ => todo!("handle other geometry types"),
30+
}
31+
Ok(())
32+
}
33+
34+
fn encode_point<T: Encoder>(
35+
encoder: &mut T,
36+
array: &geoarrow::array::PointArray,
37+
idx: usize,
38+
pg_field: &FieldInfo,
39+
) -> PgWireResult<()> {
40+
if array.is_null(idx) {
41+
encoder.encode_field(&None::<EncodedValue>, pg_field)?;
42+
return Ok(());
43+
}
44+
45+
let point = array
46+
.value(idx)
47+
.map_err(|e| PgWireError::ApiError(Box::new(e)))?;
48+
let mut bytes = Vec::new();
49+
let options = WriteOptions {
50+
endianness: Endianness::LittleEndian,
51+
};
52+
wkb::writer::write_point(&mut bytes, &point, &options)
53+
.map_err(|e| PgWireError::ApiError(Box::new(e)))?;
54+
55+
let encoded_value = EncodedValue {
56+
bytes: BytesMut::from(&bytes[..]),
57+
};
58+
encoder.encode_field(&encoded_value, pg_field)
59+
}

arrow-pg/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
pub mod datatypes;
77
pub mod encoder;
88
mod error;
9+
#[cfg(feature = "geo")]
10+
pub mod geo_encoder;
911
pub mod list_encoder;
1012
pub mod row_encoder;
1113
pub mod struct_encoder;

0 commit comments

Comments
 (0)