Skip to content

Commit 11fefbd

Browse files
paleolimbotCopilot
andauthored
feat(rust/sedona-geoparquet): GeoParquet 1.1 write support (#175)
Co-authored-by: Copilot <[email protected]>
1 parent 7f821cc commit 11fefbd

File tree

9 files changed

+732
-45
lines changed

9 files changed

+732
-45
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

python/sedonadb/python/sedonadb/dataframe.py

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
# under the License.
1717

1818
from pathlib import Path
19-
from typing import TYPE_CHECKING, Union, Optional, Any, Iterable
19+
from typing import TYPE_CHECKING, Union, Optional, Any, Iterable, Literal
2020

2121
from sedonadb.utility import sedona # noqa: F401
2222

@@ -295,13 +295,15 @@ def to_parquet(
295295
partition_by: Optional[Union[str, Iterable[str]]] = None,
296296
sort_by: Optional[Union[str, Iterable[str]]] = None,
297297
single_file_output: Optional[bool] = None,
298+
geoparquet_version: Literal["1.0", "1.1"] = "1.0",
299+
overwrite_bbox_columns: bool = False,
298300
):
299301
"""Write this DataFrame to one or more (Geo)Parquet files
300302
301303
For input that contains geometry columns, GeoParquet metadata is written
302304
such that suitable readers can recreate Geometry/Geography types when
303-
reading the output.
304-
305+
reading the output and potentially read fewer row groups when only a
306+
subset of the file is needed for a given query.
305307
306308
Args:
307309
path: A filename or directory to which parquet file(s) should be written.
@@ -313,6 +315,21 @@ def to_parquet(
313315
file vs. writing one file per partition to a directory. By default,
314316
a single file is written if `partition_by` is unspecified and
315317
`path` ends with `.parquet`.
318+
geoparquet_version: GeoParquet metadata version to write if output contains
319+
one or more geometry columns. The default (1.0) is the most widely
320+
supported and will result in geometry columns being recognized in many
321+
readers; however, only includes statistics at the file level.
322+
323+
Use GeoParquet 1.1 to compute an additional bounding box column
324+
for every geometry column in the output: some readers can use these columns
325+
to prune row groups when files contain an effective spatial ordering.
326+
The extra columns will appear just before their geometry column and
327+
will be named "[geom_col_name]_bbox" for all geometry columns except
328+
"geometry", whose bounding box column name is just "bbox".
329+
overwrite_bbox_columns: Use `True` to overwrite any bounding box columns
330+
that already exist in the input. This is useful in a read -> modify
331+
-> write scenario to ensure these columns are up-to-date. If `False`
332+
(the default), an error will be raised if a bbox column already exists.
316333
317334
Examples:
318335
@@ -344,7 +361,13 @@ def to_parquet(
344361
sort_by = []
345362

346363
self._impl.to_parquet(
347-
self._ctx, str(path), partition_by, sort_by, single_file_output
364+
self._ctx,
365+
str(path),
366+
partition_by,
367+
sort_by,
368+
single_file_output,
369+
geoparquet_version,
370+
overwrite_bbox_columns,
348371
)
349372

350373
def show(

python/sedonadb/src/dataframe.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use pyo3::prelude::*;
3232
use pyo3::types::PyCapsule;
3333
use sedona::context::{SedonaDataFrame, SedonaWriteOptions};
3434
use sedona::show::{DisplayMode, DisplayTableOptions};
35-
use sedona_geoparquet::options::TableGeoParquetOptions;
35+
use sedona_geoparquet::options::{GeoParquetVersion, TableGeoParquetOptions};
3636
use sedona_schema::schema::SedonaSchema;
3737
use tokio::runtime::Runtime;
3838

@@ -139,6 +139,7 @@ impl InternalDataFrame {
139139
))
140140
}
141141

142+
#[allow(clippy::too_many_arguments)]
142143
fn to_parquet<'py>(
143144
&self,
144145
py: Python<'py>,
@@ -147,6 +148,8 @@ impl InternalDataFrame {
147148
partition_by: Vec<String>,
148149
sort_by: Vec<String>,
149150
single_file_output: bool,
151+
geoparquet_version: Option<String>,
152+
overwrite_bbox_columns: bool,
150153
) -> Result<(), PySedonaError> {
151154
// sort_by needs to be SortExpr. A Vec<String> can unambiguously be interpreted as
152155
// field names (ascending), but other types of expressions aren't supported here yet.
@@ -162,7 +165,14 @@ impl InternalDataFrame {
162165
.with_partition_by(partition_by)
163166
.with_sort_by(sort_by_expr)
164167
.with_single_file_output(single_file_output);
165-
let writer_options = TableGeoParquetOptions::default();
168+
169+
let mut writer_options = TableGeoParquetOptions::new();
170+
writer_options.overwrite_bbox_columns = overwrite_bbox_columns;
171+
if let Some(geoparquet_version) = geoparquet_version {
172+
writer_options.geoparquet_version = geoparquet_version.parse()?;
173+
} else {
174+
writer_options.geoparquet_version = GeoParquetVersion::Omitted;
175+
}
166176

167177
wait_for_future(
168178
py,

python/sedonadb/tests/io/test_parquet.py

Lines changed: 69 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,17 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717

18-
import pytest
18+
import json
1919
import tempfile
20-
import shapely
20+
from pathlib import Path
21+
2122
import geopandas
2223
import geopandas.testing
24+
import pytest
25+
import shapely
2326
from pyarrow import parquet
24-
from pathlib import Path
25-
from sedonadb.testing import geom_or_null, SedonaDB, DuckDB, skip_if_not_exists
27+
from sedonadb._lib import SedonaError
28+
from sedonadb.testing import DuckDB, SedonaDB, geom_or_null, skip_if_not_exists
2629

2730

2831
@pytest.mark.parametrize("name", ["water-junc", "water-point"])
@@ -257,6 +260,68 @@ def test_write_geoparquet_geometry(con, geoarrow_data, name):
257260
geopandas.testing.assert_geodataframe_equal(gdf_roundtrip, gdf)
258261

259262

263+
def test_write_geoparquet_1_1(con, geoarrow_data):
264+
# Checks GeoParquet 1.1 support specifically
265+
path = geoarrow_data / "ns-water" / "files" / "ns-water_water-junc_geo.parquet"
266+
skip_if_not_exists(path)
267+
268+
gdf = geopandas.read_parquet(path).sort_values(by="OBJECTID").reset_index(drop=True)
269+
270+
with tempfile.TemporaryDirectory() as td:
271+
tmp_parquet = Path(td) / "tmp.parquet"
272+
con.create_data_frame(gdf).to_parquet(
273+
tmp_parquet, sort_by="OBJECTID", geoparquet_version="1.1"
274+
)
275+
276+
file_kv_metadata = parquet.ParquetFile(tmp_parquet).metadata.metadata
277+
assert b"geo" in file_kv_metadata
278+
geo_metadata = json.loads(file_kv_metadata[b"geo"])
279+
assert geo_metadata["version"] == "1.1.0"
280+
geo_column = geo_metadata["columns"]["geometry"]
281+
assert geo_column["covering"] == {
282+
"bbox": {
283+
"xmin": ["bbox", "xmin"],
284+
"ymin": ["bbox", "ymin"],
285+
"xmax": ["bbox", "xmax"],
286+
"ymax": ["bbox", "ymax"],
287+
}
288+
}
289+
290+
# This should still roundtrip through GeoPandas because GeoPandas removes
291+
# the bbox column on read
292+
gdf_roundtrip = geopandas.read_parquet(tmp_parquet)
293+
assert all(gdf.columns == gdf_roundtrip.columns)
294+
geopandas.testing.assert_geodataframe_equal(gdf_roundtrip, gdf)
295+
296+
# ...but the bbox column should still be there
297+
df_roundtrip = con.read_parquet(tmp_parquet).to_pandas()
298+
assert "bbox" in df_roundtrip.columns
299+
300+
# An attempt to rewrite this should fail because it would have to overwrite
301+
# the bbox column
302+
tmp_parquet2 = Path(td) / "tmp2.parquet"
303+
with pytest.raises(
304+
SedonaError, match="Can't overwrite GeoParquet 1.1 bbox column 'bbox'"
305+
):
306+
con.read_parquet(tmp_parquet).to_parquet(
307+
tmp_parquet2, geoparquet_version="1.1"
308+
)
309+
310+
# ...unless we pass the appropriate option
311+
con.read_parquet(tmp_parquet).to_parquet(
312+
tmp_parquet2, geoparquet_version="1.1", overwrite_bbox_columns=True
313+
)
314+
df_roundtrip = con.read_parquet(tmp_parquet2).to_pandas()
315+
assert "bbox" in df_roundtrip.columns
316+
317+
318+
def test_write_geoparquet_unknown(con):
319+
with pytest.raises(SedonaError, match="Unexpected GeoParquet version string"):
320+
con.sql("SELECT 1 as one").to_parquet(
321+
"unused", geoparquet_version="not supported"
322+
)
323+
324+
260325
def test_write_geoparquet_geography(con, geoarrow_data):
261326
# Checks a read and write of geography (rounctrip, since nobody else can read/write)
262327
path = (

rust/sedona-geoparquet/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,12 +50,14 @@ datafusion-execution = { workspace = true }
5050
datafusion-expr = { workspace = true }
5151
datafusion-physical-expr = { workspace = true }
5252
datafusion-physical-plan = { workspace = true }
53+
float_next_after = { workspace = true }
5354
geo-traits = { workspace = true }
5455
futures = { workspace = true }
5556
object_store = { workspace = true }
5657
parquet = { workspace = true }
5758
sedona-common = { path = "../sedona-common" }
5859
sedona-expr = { path = "../sedona-expr" }
60+
sedona-functions = { path = "../sedona-functions" }
5961
sedona-geometry = { path = "../sedona-geometry" }
6062
sedona-schema = { path = "../sedona-schema" }
6163
serde = { workspace = true }

rust/sedona-geoparquet/src/format.rs

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ use sedona_schema::extension_type::ExtensionType;
4848
use crate::{
4949
file_opener::{storage_schema_contains_geo, GeoParquetFileOpener},
5050
metadata::{GeoParquetColumnEncoding, GeoParquetMetadata},
51-
options::{GeoParquetVersion, TableGeoParquetOptions},
51+
options::TableGeoParquetOptions,
5252
writer::create_geoparquet_writer_physical_plan,
5353
};
5454
use datafusion::datasource::physical_plan::ParquetSource;
@@ -91,17 +91,9 @@ impl FileFormatFactory for GeoParquetFormatFactory {
9191
) -> Result<Arc<dyn FileFormat>> {
9292
let mut options_mut = self.options.clone().unwrap_or_default();
9393
let mut format_options_mut = format_options.clone();
94-
options_mut.geoparquet_version =
95-
if let Some(version_string) = format_options_mut.remove("geoparquet_version") {
96-
match version_string.as_str() {
97-
"1.0" => GeoParquetVersion::V1_0,
98-
"1.1" => GeoParquetVersion::V1_1,
99-
"2.0" => GeoParquetVersion::V2_0,
100-
_ => GeoParquetVersion::default(),
101-
}
102-
} else {
103-
GeoParquetVersion::default()
104-
};
94+
if let Some(version_string) = format_options_mut.remove("geoparquet_version") {
95+
options_mut.geoparquet_version = version_string.parse()?;
96+
}
10597

10698
let inner_format = self.inner.create(state, &format_options_mut)?;
10799
if let Some(parquet_format) = inner_format.as_any().downcast_ref::<ParquetFormat>() {

rust/sedona-geoparquet/src/metadata.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,21 @@ pub struct GeoParquetCovering {
268268
pub bbox: GeoParquetBboxCovering,
269269
}
270270

271+
impl GeoParquetCovering {
272+
pub fn bbox_struct_xy(struct_column_name: &str) -> Self {
273+
GeoParquetCovering {
274+
bbox: GeoParquetBboxCovering {
275+
xmin: vec![struct_column_name.to_string(), "xmin".to_string()],
276+
ymin: vec![struct_column_name.to_string(), "ymin".to_string()],
277+
zmin: None,
278+
xmax: vec![struct_column_name.to_string(), "xmax".to_string()],
279+
ymax: vec![struct_column_name.to_string(), "ymax".to_string()],
280+
zmax: None,
281+
},
282+
}
283+
}
284+
}
285+
271286
/// Top-level GeoParquet file metadata
272287
#[derive(Clone, Debug, Serialize, Deserialize)]
273288
pub struct GeoParquetMetadata {

rust/sedona-geoparquet/src/options.rs

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,10 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use std::str::FromStr;
19+
1820
use datafusion::config::TableParquetOptions;
21+
use datafusion_common::{plan_err, DataFusionError};
1922

2023
/// [TableParquetOptions] wrapper with GeoParquet-specific options
2124
#[derive(Debug, Default, Clone)]
@@ -24,13 +27,22 @@ pub struct TableGeoParquetOptions {
2427
pub inner: TableParquetOptions,
2528
/// [GeoParquetVersion] to use when writing GeoParquet files
2629
pub geoparquet_version: GeoParquetVersion,
30+
/// When writing [GeoParquetVersion::V1_1], use `true` to overwrite existing
31+
/// bounding box columns.
32+
pub overwrite_bbox_columns: bool,
33+
}
34+
35+
impl TableGeoParquetOptions {
36+
pub fn new() -> Self {
37+
Self::default()
38+
}
2739
}
2840

2941
impl From<TableParquetOptions> for TableGeoParquetOptions {
3042
fn from(value: TableParquetOptions) -> Self {
3143
Self {
3244
inner: value,
33-
geoparquet_version: GeoParquetVersion::default(),
45+
..Default::default()
3446
}
3547
}
3648
}
@@ -73,3 +85,19 @@ impl Default for GeoParquetVersion {
7385
Self::V1_0
7486
}
7587
}
88+
89+
impl FromStr for GeoParquetVersion {
90+
type Err = DataFusionError;
91+
92+
fn from_str(s: &str) -> Result<Self, Self::Err> {
93+
match s.to_lowercase().as_str() {
94+
"1.0" => Ok(GeoParquetVersion::V1_0),
95+
"1.1" => Ok(GeoParquetVersion::V1_1),
96+
"2.0" => Ok(GeoParquetVersion::V2_0),
97+
"none" => Ok(GeoParquetVersion::Omitted),
98+
_ => plan_err!(
99+
"Unexpected GeoParquet version string (expected '1.0', '1.1', '2.0', or 'none')"
100+
),
101+
}
102+
}
103+
}

0 commit comments

Comments
 (0)