Skip to content

Commit 85ce98d

Browse files
paleolimbotCopilot
andauthored
feat(r/sedonadb): Add sd_write_parquet() to R bindings (#210)
Co-authored-by: Copilot <[email protected]>
1 parent a661772 commit 85ce98d

File tree

10 files changed

+373
-3
lines changed

10 files changed

+373
-3
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

r/sedonadb/NAMESPACE

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ export(sd_read_parquet)
2727
export(sd_sql)
2828
export(sd_to_view)
2929
export(sd_view)
30+
export(sd_write_parquet)
3031
export(sedonadb_adbc)
3132
importFrom(nanoarrow,as_nanoarrow_array_stream)
3233
importFrom(nanoarrow,infer_nanoarrow_schema)

r/sedonadb/R/000-wrappers.R

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,13 @@ class(`InternalContext`) <- c("InternalContext__bundle", "savvy_sedonadb__sealed
184184
}
185185
}
186186

187+
`InternalDataFrame_to_parquet` <- function(self) {
188+
function(`ctx`, `path`, `partition_by`, `sort_by`, `single_file_output`, `overwrite_bbox_columns`, `geoparquet_version` = NULL) {
189+
`ctx` <- .savvy_extract_ptr(`ctx`, "InternalContext")
190+
invisible(.Call(savvy_InternalDataFrame_to_parquet__impl, `self`, `ctx`, `path`, `partition_by`, `sort_by`, `single_file_output`, `overwrite_bbox_columns`, `geoparquet_version`))
191+
}
192+
}
193+
187194
`InternalDataFrame_to_view` <- function(self) {
188195
function(`ctx`, `table_ref`, `overwrite`) {
189196
`ctx` <- .savvy_extract_ptr(`ctx`, "InternalContext")
@@ -202,6 +209,7 @@ class(`InternalContext`) <- c("InternalContext__bundle", "savvy_sedonadb__sealed
202209
e$`show` <- `InternalDataFrame_show`(ptr)
203210
e$`to_arrow_schema` <- `InternalDataFrame_to_arrow_schema`(ptr)
204211
e$`to_arrow_stream` <- `InternalDataFrame_to_arrow_stream`(ptr)
212+
e$`to_parquet` <- `InternalDataFrame_to_parquet`(ptr)
205213
e$`to_view` <- `InternalDataFrame_to_view`(ptr)
206214

207215
class(e) <- c("InternalDataFrame", "savvy_sedonadb__sealed")

r/sedonadb/R/dataframe.R

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,83 @@ sd_preview <- function(.data, n = NULL, ascii = NULL, width = NULL) {
182182
invisible(.data)
183183
}
184184

185+
#' Write DataFrame to (Geo)Parquet files
186+
#'
187+
#' Write this DataFrame to one or more (Geo)Parquet files. For input that contains
188+
#' geometry columns, GeoParquet metadata is written such that suitable readers can
189+
#' recreate Geometry/Geography types when reading the output and potentially read
190+
#' fewer row groups when only a subset of the file is needed for a given query.
191+
#'
192+
#' @inheritParams sd_count
193+
#' @param path A filename or directory to which parquet file(s) should be written
194+
#' @param partition_by A character vector of column names to partition by. If non-empty,
195+
#' applies hive-style partitioning to the output
196+
#' @param sort_by A character vector of column names to sort by. Currently only
197+
#' ascending sort is supported
198+
#' @param single_file_output Use TRUE or FALSE to force writing a single Parquet
199+
#' file vs. writing one file per partition to a directory. By default,
200+
#' a single file is written if `partition_by` is unspecified and
201+
#' `path` ends with `.parquet`
202+
#' @param geoparquet_version GeoParquet metadata version to write if output contains
203+
#' one or more geometry columns. The default ("1.0") is the most widely
204+
#' supported and will result in geometry columns being recognized in many
205+
#' readers; however, only includes statistics at the file level.
206+
#' Use "1.1" to compute an additional bounding box column
207+
#' for every geometry column in the output: some readers can use these columns
208+
#' to prune row groups when files contain an effective spatial ordering.
209+
#' The extra columns will appear just before their geometry column and
210+
#' will be named "[geom_col_name]_bbox" for all geometry columns except
211+
#' "geometry", whose bounding box column name is just "bbox"
212+
#' @param overwrite_bbox_columns Use TRUE to overwrite any bounding box columns
213+
#' that already exist in the input. This is useful in a read -> modify
214+
#' -> write scenario to ensure these columns are up-to-date. If FALSE
215+
#' (the default), an error will be raised if a bbox column already exists
216+
#'
217+
#' @returns The input, invisibly
218+
#' @export
219+
#'
220+
#' @examples
221+
#' tmp_parquet <- tempfile(fileext = ".parquet")
222+
#'
223+
#' sd_sql("SELECT ST_SetSRID(ST_Point(1, 2), 4326) as geom") |>
224+
#' sd_write_parquet(tmp_parquet)
225+
#'
226+
#' sd_read_parquet(tmp_parquet)
227+
#' unlink(tmp_parquet)
228+
#'
229+
sd_write_parquet <- function(.data,
230+
path,
231+
partition_by = character(0),
232+
sort_by = character(0),
233+
single_file_output = NULL,
234+
geoparquet_version = "1.0",
235+
overwrite_bbox_columns = FALSE) {
236+
237+
# Determine single_file_output default based on path and partition_by
238+
if (is.null(single_file_output)) {
239+
single_file_output <- length(partition_by) == 0 && grepl("\\.parquet$", path)
240+
}
241+
242+
# Validate geoparquet_version
243+
if (!is.null(geoparquet_version)) {
244+
if (!geoparquet_version %in% c("1.0", "1.1")) {
245+
stop("geoparquet_version must be '1.0' or '1.1'")
246+
}
247+
}
248+
249+
# Call the underlying Rust method
250+
.data$df$to_parquet(
251+
ctx = .data$ctx,
252+
path = path,
253+
partition_by = partition_by,
254+
sort_by = sort_by,
255+
single_file_output = single_file_output,
256+
overwrite_bbox_columns = overwrite_bbox_columns,
257+
geoparquet_version = geoparquet_version
258+
)
259+
260+
invisible(.data)
261+
}
185262

186263
new_sedonadb_dataframe <- function(ctx, internal_df) {
187264
structure(list(ctx = ctx, df = internal_df), class = "sedonadb_dataframe")

r/sedonadb/man/sd_write_parquet.Rd

Lines changed: 67 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

r/sedonadb/src/init.c

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@
1616
// under the License.
1717

1818
#include <Rinternals.h>
19+
#include <stdint.h>
1920

2021
#include <R_ext/Parse.h>
21-
#include <stdint.h>
2222

2323
#include "rust/api.h"
2424

@@ -155,6 +155,17 @@ SEXP savvy_InternalDataFrame_to_arrow_stream__impl(SEXP self__,
155155
return handle_result(res);
156156
}
157157

158+
SEXP savvy_InternalDataFrame_to_parquet__impl(
159+
SEXP self__, SEXP c_arg__ctx, SEXP c_arg__path, SEXP c_arg__partition_by,
160+
SEXP c_arg__sort_by, SEXP c_arg__single_file_output,
161+
SEXP c_arg__overwrite_bbox_columns, SEXP c_arg__geoparquet_version) {
162+
SEXP res = savvy_InternalDataFrame_to_parquet__ffi(
163+
self__, c_arg__ctx, c_arg__path, c_arg__partition_by, c_arg__sort_by,
164+
c_arg__single_file_output, c_arg__overwrite_bbox_columns,
165+
c_arg__geoparquet_version);
166+
return handle_result(res);
167+
}
168+
158169
SEXP savvy_InternalDataFrame_to_view__impl(SEXP self__, SEXP c_arg__ctx,
159170
SEXP c_arg__table_ref,
160171
SEXP c_arg__overwrite) {
@@ -198,6 +209,8 @@ static const R_CallMethodDef CallEntries[] = {
198209
(DL_FUNC)&savvy_InternalDataFrame_to_arrow_schema__impl, 2},
199210
{"savvy_InternalDataFrame_to_arrow_stream__impl",
200211
(DL_FUNC)&savvy_InternalDataFrame_to_arrow_stream__impl, 2},
212+
{"savvy_InternalDataFrame_to_parquet__impl",
213+
(DL_FUNC)&savvy_InternalDataFrame_to_parquet__impl, 8},
201214
{"savvy_InternalDataFrame_to_view__impl",
202215
(DL_FUNC)&savvy_InternalDataFrame_to_view__impl, 4},
203216
{NULL, NULL, 0}};

r/sedonadb/src/rust/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ arrow-schema = { workspace = true }
2828
arrow-array = { workspace = true }
2929
datafusion = { workspace = true }
3030
datafusion-common = { workspace = true }
31+
datafusion-expr = { workspace = true }
3132
savvy = "*"
3233
savvy-ffi = "*"
3334
sedona = { path = "../../../../rust/sedona" }

r/sedonadb/src/rust/api.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,10 @@ SEXP savvy_InternalDataFrame_show__ffi(SEXP self__, SEXP c_arg__ctx,
4444
SEXP c_arg__ascii, SEXP c_arg__limit);
4545
SEXP savvy_InternalDataFrame_to_arrow_schema__ffi(SEXP self__, SEXP c_arg__out);
4646
SEXP savvy_InternalDataFrame_to_arrow_stream__ffi(SEXP self__, SEXP c_arg__out);
47+
SEXP savvy_InternalDataFrame_to_parquet__ffi(
48+
SEXP self__, SEXP c_arg__ctx, SEXP c_arg__path, SEXP c_arg__partition_by,
49+
SEXP c_arg__sort_by, SEXP c_arg__single_file_output,
50+
SEXP c_arg__overwrite_bbox_columns, SEXP c_arg__geoparquet_version);
4751
SEXP savvy_InternalDataFrame_to_view__ffi(SEXP self__, SEXP c_arg__ctx,
4852
SEXP c_arg__table_ref,
4953
SEXP c_arg__overwrite);

r/sedonadb/src/rust/src/dataframe.rs

Lines changed: 64 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,14 @@ use arrow_array::ffi::FFI_ArrowSchema;
2121
use arrow_array::ffi_stream::FFI_ArrowArrayStream;
2222
use arrow_array::{RecordBatchIterator, RecordBatchReader};
2323
use datafusion::catalog::MemTable;
24-
use datafusion::prelude::DataFrame;
24+
use datafusion::{logical_expr::SortExpr, prelude::DataFrame};
25+
use datafusion_common::Column;
26+
use datafusion_expr::Expr;
2527
use savvy::{savvy, savvy_err, Result};
26-
use sedona::context::SedonaDataFrame;
28+
use sedona::context::{SedonaDataFrame, SedonaWriteOptions};
2729
use sedona::reader::SedonaStreamReader;
2830
use sedona::show::{DisplayMode, DisplayTableOptions};
31+
use sedona_geoparquet::options::{GeoParquetVersion, TableGeoParquetOptions};
2932
use sedona_schema::schema::SedonaSchema;
3033
use tokio::runtime::Runtime;
3134

@@ -191,4 +194,63 @@ impl InternalDataFrame {
191194

192195
savvy::Sexp::try_from(out_string)
193196
}
197+
198+
#[allow(clippy::too_many_arguments)]
199+
fn to_parquet(
200+
&self,
201+
ctx: &InternalContext,
202+
path: &str,
203+
partition_by: savvy::Sexp,
204+
sort_by: savvy::Sexp,
205+
single_file_output: bool,
206+
overwrite_bbox_columns: bool,
207+
geoparquet_version: Option<&str>,
208+
) -> savvy::Result<()> {
209+
let partition_by_strsxp = savvy::StringSexp::try_from(partition_by)?;
210+
let partition_by_vec = partition_by_strsxp
211+
.iter()
212+
.map(|s| s.to_string())
213+
.collect::<Vec<_>>();
214+
215+
let sort_by_strsxp = savvy::StringSexp::try_from(sort_by)?;
216+
let sort_by_vec = sort_by_strsxp
217+
.iter()
218+
.map(|s| s.to_string())
219+
.collect::<Vec<_>>();
220+
221+
let sort_by_expr = sort_by_vec
222+
.iter()
223+
.map(|name| {
224+
let column = Expr::Column(Column::new_unqualified(name));
225+
SortExpr::new(column, true, false)
226+
})
227+
.collect::<Vec<_>>();
228+
229+
let options = SedonaWriteOptions::new()
230+
.with_partition_by(partition_by_vec)
231+
.with_sort_by(sort_by_expr)
232+
.with_single_file_output(single_file_output);
233+
234+
let mut writer_options = TableGeoParquetOptions::new();
235+
writer_options.overwrite_bbox_columns = overwrite_bbox_columns;
236+
if let Some(geoparquet_version) = geoparquet_version {
237+
writer_options.geoparquet_version = geoparquet_version
238+
.parse()
239+
.map_err(|e| savvy::Error::new(format!("Invalid geoparquet_version: {e}")))?;
240+
} else {
241+
writer_options.geoparquet_version = GeoParquetVersion::Omitted;
242+
}
243+
244+
let inner = self.inner.clone();
245+
let inner_context = ctx.inner.clone();
246+
let path_owned = path.to_string();
247+
248+
wait_for_future_captured_r(&self.runtime, async move {
249+
inner
250+
.write_geoparquet(&inner_context, &path_owned, options, Some(writer_options))
251+
.await
252+
})??;
253+
254+
Ok(())
255+
}
194256
}

0 commit comments

Comments
 (0)