Skip to content

Commit 42a16a1

Browse files
authored
feat(rust/sedona-geoparquet): Add GeoParquet writer for non-spatial output (#52)
1 parent 434be5c commit 42a16a1

File tree

16 files changed

+741
-69
lines changed

16 files changed

+741
-69
lines changed

Cargo.lock

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ rstest = "0.24.0"
110110
serde = { version = "1" }
111111
serde_json = { version = "1" }
112112
serde_with = { version = "1" }
113+
tempfile = { version = "3"}
113114
thiserror = { version = "2" }
114115
tokio = { version = "1.44" }
115116
url = "2.5.4"

python/sedonadb/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ arrow-array = { workspace = true }
3636
async-trait = { workspace = true }
3737
datafusion = { workspace = true }
3838
datafusion-common = { workspace = true }
39+
datafusion-expr = { workspace = true }
3940
datafusion-ffi = { workspace = true }
4041
futures = { workspace = true }
4142
pyo3 = { version = "0.25.1" }

python/sedonadb/python/sedonadb/dataframe.py

Lines changed: 63 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@
1414
# KIND, either express or implied. See the License for the
1515
# specific language governing permissions and limitations
1616
# under the License.
17-
from typing import TYPE_CHECKING, Union, Optional, Any
17+
18+
from pathlib import Path
19+
from typing import TYPE_CHECKING, Union, Optional, Any, Iterable
1820

1921
from sedonadb._options import global_options
2022

@@ -263,6 +265,66 @@ def to_pandas(
263265
else:
264266
return table.to_pandas()
265267

268+
def to_parquet(
269+
self,
270+
path: Union[str, Path],
271+
*,
272+
partition_by: Optional[Union[str, Iterable[str]]] = None,
273+
sort_by: Optional[Union[str, Iterable[str]]] = None,
274+
single_file_output: Optional[bool] = None,
275+
):
276+
"""Write this DataFrame to one or more (Geo)Parquet files
277+
278+
For input that contains geometry columns, GeoParquet metadata is written
279+
such that suitable readers can recreate Geometry/Geography types when
280+
reading the output.
281+
282+
283+
Args:
284+
path: A filename or directory to which parquet file(s) should be written.
285+
partition_by: A vector of column names to partition by. If non-empty,
286+
applies hive-style partitioning to the output.
287+
sort_by: A vector of column names to sort by. Currently only ascending
288+
sort is supported.
289+
single_file_output: Use True or False to force writing a single Parquet
290+
file vs. writing one file per partition to a directory. By default,
291+
a single file is written if `partition_by` is unspecified and
292+
`path` ends with `.parquet`.
293+
294+
Examples:
295+
296+
>>> import sedonadb
297+
>>> import tempfile
298+
>>> con = sedonadb.connect()
299+
>>> td = tempfile.TemporaryDirectory()
300+
>>> url = "https://github.com/apache/sedona-testing/raw/refs/heads/main/data/parquet/geoparquet-1.1.0.parquet"
301+
>>> con.read_parquet(url).to_parquet(f"{td.name}/tmp.parquet")
302+
303+
"""
304+
305+
path = Path(path)
306+
307+
if single_file_output is None:
308+
single_file_output = partition_by is None and str(path).endswith(".parquet")
309+
310+
if isinstance(partition_by, str):
311+
partition_by = [partition_by]
312+
elif partition_by is not None:
313+
partition_by = list(partition_by)
314+
else:
315+
partition_by = []
316+
317+
if isinstance(sort_by, str):
318+
sort_by = [sort_by]
319+
elif sort_by is not None:
320+
sort_by = list(sort_by)
321+
else:
322+
sort_by = []
323+
324+
self._impl.to_parquet(
325+
self._ctx, str(path), partition_by, sort_by, single_file_output
326+
)
327+
266328
def show(
267329
self,
268330
limit: Optional[int] = 10,

python/sedonadb/src/dataframe.rs

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,16 @@ use arrow_array::ffi_stream::FFI_ArrowArrayStream;
2222
use arrow_array::RecordBatchReader;
2323
use arrow_schema::Schema;
2424
use datafusion::catalog::MemTable;
25+
use datafusion::logical_expr::SortExpr;
2526
use datafusion::prelude::DataFrame;
27+
use datafusion_common::Column;
28+
use datafusion_expr::Expr;
2629
use datafusion_ffi::table_provider::FFI_TableProvider;
2730
use pyo3::prelude::*;
2831
use pyo3::types::PyCapsule;
29-
use sedona::context::SedonaDataFrame;
32+
use sedona::context::{SedonaDataFrame, SedonaWriteOptions};
3033
use sedona::show::{DisplayMode, DisplayTableOptions};
34+
use sedona_geoparquet::options::TableGeoParquetOptions;
3135
use sedona_schema::schema::SedonaSchema;
3236
use tokio::runtime::Runtime;
3337

@@ -119,6 +123,41 @@ impl InternalDataFrame {
119123
))
120124
}
121125

126+
fn to_parquet<'py>(
127+
&self,
128+
py: Python<'py>,
129+
ctx: &InternalContext,
130+
path: String,
131+
partition_by: Vec<String>,
132+
sort_by: Vec<String>,
133+
single_file_output: bool,
134+
) -> Result<(), PySedonaError> {
135+
// sort_by needs to be SortExpr. A Vec<String> can unambiguously be interpreted as
136+
// field names (ascending), but other types of expressions aren't supported here yet.
137+
let sort_by_expr = sort_by
138+
.into_iter()
139+
.map(|name| {
140+
let column = Expr::Column(Column::new_unqualified(name));
141+
SortExpr::new(column, true, false)
142+
})
143+
.collect::<Vec<_>>();
144+
145+
let options = SedonaWriteOptions::new()
146+
.with_partition_by(partition_by)
147+
.with_sort_by(sort_by_expr)
148+
.with_single_file_output(single_file_output);
149+
let writer_options = TableGeoParquetOptions::default();
150+
151+
wait_for_future(
152+
py,
153+
&self.runtime,
154+
self.inner
155+
.clone()
156+
.write_geoparquet(&ctx.inner, &path, options, Some(writer_options)),
157+
)??;
158+
Ok(())
159+
}
160+
122161
fn show<'py>(
123162
&self,
124163
py: Python<'py>,

python/sedonadb/tests/io/test_parquet.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import tempfile
2020
import shapely
2121
import geopandas
22+
import geopandas.testing
2223
from pyarrow import parquet
2324
from pathlib import Path
2425
from sedonadb.testing import geom_or_null, SedonaDB, DuckDB, skip_if_not_exists
@@ -238,3 +239,39 @@ def test_read_geoparquet_prune_polygons(sedona_testing, predicate):
238239
"""
239240
)
240241
eng.assert_result(result, gdf)
242+
243+
244+
@pytest.mark.parametrize("name", ["water-junc", "water-point"])
245+
def test_write_geoparquet_geometry(con, geoarrow_data, name):
246+
# Checks a read and write of some non-trivial files and ensures we match GeoPandas
247+
path = geoarrow_data / "ns-water" / "files" / f"ns-water_{name}_geo.parquet"
248+
skip_if_not_exists(path)
249+
250+
gdf = geopandas.read_parquet(path).sort_values(by="OBJECTID").reset_index(drop=True)
251+
252+
with tempfile.TemporaryDirectory() as td:
253+
tmp_parquet = Path(td) / "tmp.parquet"
254+
con.create_data_frame(gdf).to_parquet(tmp_parquet)
255+
256+
gdf_roundtrip = geopandas.read_parquet(tmp_parquet)
257+
geopandas.testing.assert_geodataframe_equal(gdf_roundtrip, gdf)
258+
259+
260+
def test_write_geoparquet_geography(con, geoarrow_data):
261+
# Checks a read and write of geography (rounctrip, since nobody else can read/write)
262+
path = (
263+
geoarrow_data
264+
/ "natural-earth"
265+
/ "files"
266+
/ "natural-earth_countries-geography_geo.parquet"
267+
)
268+
skip_if_not_exists(path)
269+
270+
table = con.read_parquet(path).to_arrow_table()
271+
272+
with tempfile.TemporaryDirectory() as td:
273+
tmp_parquet = Path(td) / "tmp.parquet"
274+
con.create_data_frame(table).to_parquet(tmp_parquet)
275+
276+
table_roundtrip = con.read_parquet(tmp_parquet).to_arrow_table()
277+
assert table_roundtrip == table

python/sedonadb/tests/test_dataframe.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@
1818
import geoarrow.types as gat
1919
import geopandas.testing
2020
import pandas as pd
21+
from pathlib import Path
2122
import pyarrow as pa
2223
import pytest
2324
import sedonadb
25+
import tempfile
2426

2527

2628
def test_dataframe_from_dataframe(con):
@@ -281,6 +283,55 @@ def test_dataframe_to_pandas(con):
281283
)
282284

283285

286+
def test_dataframe_to_parquet(con):
287+
df = con.sql(
288+
"SELECT * FROM (VALUES ('one', 1), ('two', 2), ('three', 3)) AS t(a, b)"
289+
)
290+
291+
with tempfile.TemporaryDirectory() as td:
292+
# Defaults with a path that ends with .parquet (single file)
293+
tmp_parquet_file = Path(td) / "tmp.parquet"
294+
df.to_parquet(tmp_parquet_file)
295+
296+
assert tmp_parquet_file.exists()
297+
assert tmp_parquet_file.is_file()
298+
pd.testing.assert_frame_equal(
299+
pd.read_parquet(tmp_parquet_file),
300+
pd.DataFrame({"a": ["one", "two", "three"], "b": [1, 2, 3]}),
301+
)
302+
303+
# Defaults with a path that doesn't end in .parquet (directory)
304+
tmp_parquet_dir = Path(td) / "tmp"
305+
df.to_parquet(tmp_parquet_dir)
306+
307+
assert tmp_parquet_dir.exists()
308+
assert tmp_parquet_dir.is_dir()
309+
pd.testing.assert_frame_equal(
310+
pd.read_parquet(tmp_parquet_dir),
311+
pd.DataFrame({"a": ["one", "two", "three"], "b": [1, 2, 3]}),
312+
)
313+
314+
# With partition_by
315+
tmp_parquet_dir = Path(td) / "tmp_partitioned"
316+
df.to_parquet(tmp_parquet_dir, partition_by=["a"])
317+
assert tmp_parquet_dir.exists()
318+
assert tmp_parquet_dir.is_dir()
319+
pd.testing.assert_frame_equal(
320+
pd.read_parquet(tmp_parquet_dir).sort_values("b").reset_index(drop=True),
321+
pd.DataFrame(
322+
{"b": [1, 2, 3], "a": pd.Categorical(["one", "two", "three"])}
323+
),
324+
)
325+
326+
# With order_by
327+
tmp_parquet = Path(td) / "tmp_ordered.parquet"
328+
df.to_parquet(tmp_parquet, sort_by=["a"])
329+
pd.testing.assert_frame_equal(
330+
pd.read_parquet(tmp_parquet),
331+
pd.DataFrame({"a": ["one", "three", "two"], "b": [1, 3, 2]}),
332+
)
333+
334+
284335
def test_show(con, capsys):
285336
con.sql("SELECT 1 as one").show()
286337
expected = """

rust/sedona-geoparquet/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ default = []
3434
sedona-testing = { path = "../sedona-testing" }
3535
url = { workspace = true }
3636
rstest = { workspace = true }
37+
tempfile = { workspace = true }
38+
tokio = { workspace = true }
3739

3840
[dependencies]
3941
async-trait = { workspace = true }
@@ -59,4 +61,3 @@ sedona-schema = { path = "../sedona-schema" }
5961
serde = { workspace = true }
6062
serde_json = { workspace = true }
6163
serde_with = { workspace = true }
62-
tokio = { workspace = true }

0 commit comments

Comments
 (0)