Skip to content

Commit c876761

Browse files
authored
feat(r/sedonadb): Add FFI support for ScalarUDF and TableProvider (#214)
1 parent 44cd170 commit c876761

File tree

15 files changed

+316
-25
lines changed

15 files changed

+316
-25
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

r/sedonadb/NAMESPACE

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ S3method("[[<-",savvy_sedonadb__sealed)
55
S3method(as.data.frame,sedonadb_dataframe)
66
S3method(as_nanoarrow_array_stream,sedonadb_dataframe)
77
S3method(as_sedonadb_dataframe,data.frame)
8+
S3method(as_sedonadb_dataframe,datafusion_table_provider)
89
S3method(as_sedonadb_dataframe,nanoarrow_array)
910
S3method(as_sedonadb_dataframe,nanoarrow_array_stream)
1011
S3method(as_sedonadb_dataframe,sedonadb_dataframe)
@@ -24,6 +25,7 @@ export(sd_count)
2425
export(sd_drop_view)
2526
export(sd_preview)
2627
export(sd_read_parquet)
28+
export(sd_register_udf)
2729
export(sd_sql)
2830
export(sd_to_view)
2931
export(sd_view)

r/sedonadb/R/000-wrappers.R

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,12 @@ NULL
7777
}
7878
}
7979

80+
`InternalContext_data_frame_from_table_provider` <- function(self) {
81+
function(`provider_xptr`) {
82+
.savvy_wrap_InternalDataFrame(.Call(savvy_InternalContext_data_frame_from_table_provider__impl, `self`, `provider_xptr`))
83+
}
84+
}
85+
8086
`InternalContext_deregister_table` <- function(self) {
8187
function(`table_ref`) {
8288
invisible(.Call(savvy_InternalContext_deregister_table__impl, `self`, `table_ref`))
@@ -89,6 +95,18 @@ NULL
8995
}
9096
}
9197

98+
`InternalContext_register_scalar_udf` <- function(self) {
99+
function(`scalar_udf_xptr`) {
100+
invisible(.Call(savvy_InternalContext_register_scalar_udf__impl, `self`, `scalar_udf_xptr`))
101+
}
102+
}
103+
104+
`InternalContext_scalar_udf_xptr` <- function(self) {
105+
function(`name`) {
106+
.Call(savvy_InternalContext_scalar_udf_xptr__impl, `self`, `name`)
107+
}
108+
}
109+
92110
`InternalContext_sql` <- function(self) {
93111
function(`query`) {
94112
.savvy_wrap_InternalDataFrame(.Call(savvy_InternalContext_sql__impl, `self`, `query`))
@@ -105,8 +123,11 @@ NULL
105123
e <- new.env(parent = emptyenv())
106124
e$.ptr <- ptr
107125
e$`data_frame_from_array_stream` <- `InternalContext_data_frame_from_array_stream`(ptr)
126+
e$`data_frame_from_table_provider` <- `InternalContext_data_frame_from_table_provider`(ptr)
108127
e$`deregister_table` <- `InternalContext_deregister_table`(ptr)
109128
e$`read_parquet` <- `InternalContext_read_parquet`(ptr)
129+
e$`register_scalar_udf` <- `InternalContext_register_scalar_udf`(ptr)
130+
e$`scalar_udf_xptr` <- `InternalContext_scalar_udf_xptr`(ptr)
110131
e$`sql` <- `InternalContext_sql`(ptr)
111132
e$`view` <- `InternalContext_view`(ptr)
112133

@@ -179,8 +200,8 @@ class(`InternalContext`) <- c("InternalContext__bundle", "savvy_sedonadb__sealed
179200
}
180201

181202
`InternalDataFrame_to_arrow_stream` <- function(self) {
182-
function(`out`) {
183-
invisible(.Call(savvy_InternalDataFrame_to_arrow_stream__impl, `self`, `out`))
203+
function(`out`, `requested_schema_xptr`) {
204+
invisible(.Call(savvy_InternalDataFrame_to_arrow_stream__impl, `self`, `out`, `requested_schema_xptr`))
184205
}
185206
}
186207

@@ -191,6 +212,12 @@ class(`InternalContext`) <- c("InternalContext__bundle", "savvy_sedonadb__sealed
191212
}
192213
}
193214

215+
`InternalDataFrame_to_provider` <- function(self) {
216+
function() {
217+
.Call(savvy_InternalDataFrame_to_provider__impl, `self`)
218+
}
219+
}
220+
194221
`InternalDataFrame_to_view` <- function(self) {
195222
function(`ctx`, `table_ref`, `overwrite`) {
196223
`ctx` <- .savvy_extract_ptr(`ctx`, "InternalContext")
@@ -210,6 +237,7 @@ class(`InternalContext`) <- c("InternalContext__bundle", "savvy_sedonadb__sealed
210237
e$`to_arrow_schema` <- `InternalDataFrame_to_arrow_schema`(ptr)
211238
e$`to_arrow_stream` <- `InternalDataFrame_to_arrow_stream`(ptr)
212239
e$`to_parquet` <- `InternalDataFrame_to_parquet`(ptr)
240+
e$`to_provider` <- `InternalDataFrame_to_provider`(ptr)
213241
e$`to_view` <- `InternalDataFrame_to_view`(ptr)
214242

215243
class(e) <- c("InternalDataFrame", "savvy_sedonadb__sealed")

r/sedonadb/R/context.R

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,23 @@ sd_view <- function(table_ref) {
8080
new_sedonadb_dataframe(ctx, df)
8181
}
8282

83+
#' Register a user-defined function
84+
#'
85+
#' Several types of user-defined functions can be registered into a session
86+
#' context. Currently, the only implemented variety is an external pointer
87+
#' to a Rust `FFI_ScalarUDF`, an example of which is available from the
88+
#' [DataFusion Python documentation](https://github.com/apache/datafusion-python/blob/6f3b1cab75cfaa0cdf914f9b6fa023cb9afccd7d/examples/datafusion-ffi-example/src/scalar_udf.rs).
89+
#'
90+
#' @param udf An object of class 'datafusion_scalar_udf'
91+
#'
92+
#' @returns NULL, invisibly
93+
#' @export
94+
#'
95+
sd_register_udf <- function(udf) {
96+
ctx <- ctx()
97+
ctx$register_scalar_udf(udf)
98+
}
99+
83100
# We use just one context for now. In theory we could support multiple
84101
# contexts with a shared runtime, which would scope the registration
85102
# of various components more cleanly from the runtime.

r/sedonadb/R/dataframe.R

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,13 @@ as_sedonadb_dataframe.nanoarrow_array_stream <- function(x, ..., schema = NULL,
6767
as_sedonadb_dataframe(new_sedonadb_dataframe(ctx, df), schema = schema)
6868
}
6969

70+
#' @export
71+
as_sedonadb_dataframe.datafusion_table_provider <- function(x, ..., schema = NULL) {
72+
ctx <- ctx()
73+
df <- ctx$data_frame_from_table_provider(x)
74+
new_sedonadb_dataframe(ctx, df)
75+
}
76+
7077
#' Count rows in a DataFrame
7178
#'
7279
#' @param .data A sedonadb_dataframe
@@ -297,9 +304,13 @@ infer_nanoarrow_schema.sedonadb_dataframe <- function(x, ...) {
297304

298305
#' @importFrom nanoarrow as_nanoarrow_array_stream
299306
#' @export
300-
as_nanoarrow_array_stream.sedonadb_dataframe <- function(x, ...) {
307+
as_nanoarrow_array_stream.sedonadb_dataframe <- function(x, ..., schema = NULL) {
308+
if (!is.null(schema)) {
309+
schema <- nanoarrow::as_nanoarrow_schema(schema)
310+
}
311+
301312
stream <- nanoarrow::nanoarrow_allocate_array_stream()
302-
x$df$to_arrow_stream(stream)
313+
x$df$to_arrow_stream(stream, schema)
303314
stream
304315
}
305316

r/sedonadb/man/sd_register_udf.Rd

Lines changed: 20 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

r/sedonadb/src/init.c

Lines changed: 40 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,10 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
#include <Rinternals.h>
1918
#include <stdint.h>
2019

20+
#include <Rinternals.h>
21+
2122
#include <R_ext/Parse.h>
2223

2324
#include "rust/api.h"
@@ -83,6 +84,13 @@ SEXP savvy_InternalContext_data_frame_from_array_stream__impl(
8384
return handle_result(res);
8485
}
8586

87+
SEXP savvy_InternalContext_data_frame_from_table_provider__impl(
88+
SEXP self__, SEXP c_arg__provider_xptr) {
89+
SEXP res = savvy_InternalContext_data_frame_from_table_provider__ffi(
90+
self__, c_arg__provider_xptr);
91+
return handle_result(res);
92+
}
93+
8694
SEXP savvy_InternalContext_deregister_table__impl(SEXP self__,
8795
SEXP c_arg__table_ref) {
8896
SEXP res =
@@ -100,6 +108,19 @@ SEXP savvy_InternalContext_read_parquet__impl(SEXP self__, SEXP c_arg__paths) {
100108
return handle_result(res);
101109
}
102110

111+
SEXP savvy_InternalContext_register_scalar_udf__impl(
112+
SEXP self__, SEXP c_arg__scalar_udf_xptr) {
113+
SEXP res = savvy_InternalContext_register_scalar_udf__ffi(
114+
self__, c_arg__scalar_udf_xptr);
115+
return handle_result(res);
116+
}
117+
118+
SEXP savvy_InternalContext_scalar_udf_xptr__impl(SEXP self__,
119+
SEXP c_arg__name) {
120+
SEXP res = savvy_InternalContext_scalar_udf_xptr__ffi(self__, c_arg__name);
121+
return handle_result(res);
122+
}
123+
103124
SEXP savvy_InternalContext_sql__impl(SEXP self__, SEXP c_arg__query) {
104125
SEXP res = savvy_InternalContext_sql__ffi(self__, c_arg__query);
105126
return handle_result(res);
@@ -149,9 +170,10 @@ SEXP savvy_InternalDataFrame_to_arrow_schema__impl(SEXP self__,
149170
return handle_result(res);
150171
}
151172

152-
SEXP savvy_InternalDataFrame_to_arrow_stream__impl(SEXP self__,
153-
SEXP c_arg__out) {
154-
SEXP res = savvy_InternalDataFrame_to_arrow_stream__ffi(self__, c_arg__out);
173+
SEXP savvy_InternalDataFrame_to_arrow_stream__impl(
174+
SEXP self__, SEXP c_arg__out, SEXP c_arg__requested_schema_xptr) {
175+
SEXP res = savvy_InternalDataFrame_to_arrow_stream__ffi(
176+
self__, c_arg__out, c_arg__requested_schema_xptr);
155177
return handle_result(res);
156178
}
157179

@@ -166,6 +188,11 @@ SEXP savvy_InternalDataFrame_to_parquet__impl(
166188
return handle_result(res);
167189
}
168190

191+
SEXP savvy_InternalDataFrame_to_provider__impl(SEXP self__) {
192+
SEXP res = savvy_InternalDataFrame_to_provider__ffi(self__);
193+
return handle_result(res);
194+
}
195+
169196
SEXP savvy_InternalDataFrame_to_view__impl(SEXP self__, SEXP c_arg__ctx,
170197
SEXP c_arg__table_ref,
171198
SEXP c_arg__overwrite) {
@@ -183,12 +210,18 @@ static const R_CallMethodDef CallEntries[] = {
183210
(DL_FUNC)&savvy_sedonadb_adbc_init_func__impl, 0},
184211
{"savvy_InternalContext_data_frame_from_array_stream__impl",
185212
(DL_FUNC)&savvy_InternalContext_data_frame_from_array_stream__impl, 3},
213+
{"savvy_InternalContext_data_frame_from_table_provider__impl",
214+
(DL_FUNC)&savvy_InternalContext_data_frame_from_table_provider__impl, 2},
186215
{"savvy_InternalContext_deregister_table__impl",
187216
(DL_FUNC)&savvy_InternalContext_deregister_table__impl, 2},
188217
{"savvy_InternalContext_new__impl",
189218
(DL_FUNC)&savvy_InternalContext_new__impl, 0},
190219
{"savvy_InternalContext_read_parquet__impl",
191220
(DL_FUNC)&savvy_InternalContext_read_parquet__impl, 2},
221+
{"savvy_InternalContext_register_scalar_udf__impl",
222+
(DL_FUNC)&savvy_InternalContext_register_scalar_udf__impl, 2},
223+
{"savvy_InternalContext_scalar_udf_xptr__impl",
224+
(DL_FUNC)&savvy_InternalContext_scalar_udf_xptr__impl, 2},
192225
{"savvy_InternalContext_sql__impl",
193226
(DL_FUNC)&savvy_InternalContext_sql__impl, 2},
194227
{"savvy_InternalContext_view__impl",
@@ -208,9 +241,11 @@ static const R_CallMethodDef CallEntries[] = {
208241
{"savvy_InternalDataFrame_to_arrow_schema__impl",
209242
(DL_FUNC)&savvy_InternalDataFrame_to_arrow_schema__impl, 2},
210243
{"savvy_InternalDataFrame_to_arrow_stream__impl",
211-
(DL_FUNC)&savvy_InternalDataFrame_to_arrow_stream__impl, 2},
244+
(DL_FUNC)&savvy_InternalDataFrame_to_arrow_stream__impl, 3},
212245
{"savvy_InternalDataFrame_to_parquet__impl",
213246
(DL_FUNC)&savvy_InternalDataFrame_to_parquet__impl, 8},
247+
{"savvy_InternalDataFrame_to_provider__impl",
248+
(DL_FUNC)&savvy_InternalDataFrame_to_provider__impl, 1},
214249
{"savvy_InternalDataFrame_to_view__impl",
215250
(DL_FUNC)&savvy_InternalDataFrame_to_view__impl, 4},
216251
{NULL, NULL, 0}};

r/sedonadb/src/rust/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ arrow-array = { workspace = true }
2929
datafusion = { workspace = true }
3030
datafusion-common = { workspace = true }
3131
datafusion-expr = { workspace = true }
32+
datafusion-ffi = { workspace = true }
3233
savvy = "*"
3334
savvy-ffi = "*"
3435
sedona = { path = "../../../../rust/sedona" }

r/sedonadb/src/rust/api.h

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,15 @@ SEXP savvy_sedonadb_adbc_init_func__ffi(void);
2626
// methods and associated functions for InternalContext
2727
SEXP savvy_InternalContext_data_frame_from_array_stream__ffi(
2828
SEXP self__, SEXP c_arg__stream_xptr, SEXP c_arg__collect_now);
29+
SEXP savvy_InternalContext_data_frame_from_table_provider__ffi(
30+
SEXP self__, SEXP c_arg__provider_xptr);
2931
SEXP savvy_InternalContext_deregister_table__ffi(SEXP self__,
3032
SEXP c_arg__table_ref);
3133
SEXP savvy_InternalContext_new__ffi(void);
3234
SEXP savvy_InternalContext_read_parquet__ffi(SEXP self__, SEXP c_arg__paths);
35+
SEXP savvy_InternalContext_register_scalar_udf__ffi(
36+
SEXP self__, SEXP c_arg__scalar_udf_xptr);
37+
SEXP savvy_InternalContext_scalar_udf_xptr__ffi(SEXP self__, SEXP c_arg__name);
3338
SEXP savvy_InternalContext_sql__ffi(SEXP self__, SEXP c_arg__query);
3439
SEXP savvy_InternalContext_view__ffi(SEXP self__, SEXP c_arg__table_ref);
3540

@@ -43,11 +48,13 @@ SEXP savvy_InternalDataFrame_show__ffi(SEXP self__, SEXP c_arg__ctx,
4348
SEXP c_arg__width_chars,
4449
SEXP c_arg__ascii, SEXP c_arg__limit);
4550
SEXP savvy_InternalDataFrame_to_arrow_schema__ffi(SEXP self__, SEXP c_arg__out);
46-
SEXP savvy_InternalDataFrame_to_arrow_stream__ffi(SEXP self__, SEXP c_arg__out);
51+
SEXP savvy_InternalDataFrame_to_arrow_stream__ffi(
52+
SEXP self__, SEXP c_arg__out, SEXP c_arg__requested_schema_xptr);
4753
SEXP savvy_InternalDataFrame_to_parquet__ffi(
4854
SEXP self__, SEXP c_arg__ctx, SEXP c_arg__path, SEXP c_arg__partition_by,
4955
SEXP c_arg__sort_by, SEXP c_arg__single_file_output,
5056
SEXP c_arg__overwrite_bbox_columns, SEXP c_arg__geoparquet_version);
57+
SEXP savvy_InternalDataFrame_to_provider__ffi(SEXP self__);
5158
SEXP savvy_InternalDataFrame_to_view__ffi(SEXP self__, SEXP c_arg__ctx,
5259
SEXP c_arg__table_ref,
5360
SEXP c_arg__overwrite);

r/sedonadb/src/rust/src/context.rs

Lines changed: 34 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,20 +16,19 @@
1616
// under the License.
1717
use std::sync::Arc;
1818

19-
use arrow_array::{
20-
ffi_stream::{ArrowArrayStreamReader, FFI_ArrowArrayStream},
21-
RecordBatchReader,
22-
};
19+
use arrow_array::RecordBatchReader;
2320
use arrow_schema::ArrowError;
2421
use datafusion::catalog::{MemTable, TableProvider};
25-
use savvy::{savvy, savvy_err, Result};
22+
use datafusion_ffi::udf::FFI_ScalarUDF;
23+
use savvy::{savvy, savvy_err, IntoExtPtrSexp, Result};
2624

2725
use sedona::{context::SedonaContext, record_batch_reader_provider::RecordBatchReaderProvider};
2826
use sedona_geoparquet::provider::GeoParquetReadOptions;
2927
use tokio::runtime::Runtime;
3028

3129
use crate::{
3230
dataframe::{new_data_frame, InternalDataFrame},
31+
ffi::{import_array_stream, import_scalar_udf, import_table_provider, FFIScalarUdfR},
3332
runtime::wait_for_future_captured_r,
3433
};
3534

@@ -94,14 +93,7 @@ impl InternalContext {
9493
stream_xptr: savvy::Sexp,
9594
collect_now: bool,
9695
) -> savvy::Result<InternalDataFrame> {
97-
let ffi_stream =
98-
unsafe { savvy_ffi::R_ExternalPtrAddr(stream_xptr.0) as *mut FFI_ArrowArrayStream };
99-
if ffi_stream.is_null() {
100-
return Err(savvy_err!("external pointer to null in to_arrow_schema()"));
101-
}
102-
103-
let stream = unsafe { FFI_ArrowArrayStream::from_raw(ffi_stream as _) };
104-
let stream_reader = ArrowArrayStreamReader::try_new(stream)?;
96+
let stream_reader = import_array_stream(stream_xptr)?;
10597

10698
// Some readers are sensitive to being collected on the R thread or not, so
10799
// provide the option to collect everything immediately.
@@ -117,8 +109,37 @@ impl InternalContext {
117109
Ok(new_data_frame(inner, self.runtime.clone()))
118110
}
119111

112+
pub fn data_frame_from_table_provider(
113+
&self,
114+
provider_xptr: savvy::Sexp,
115+
) -> Result<InternalDataFrame> {
116+
let provider = import_table_provider(provider_xptr)?;
117+
let inner = self.inner.ctx.read_table(provider)?;
118+
Ok(new_data_frame(inner, self.runtime.clone()))
119+
}
120+
120121
pub fn deregister_table(&self, table_ref: &str) -> savvy::Result<()> {
121122
self.inner.ctx.deregister_table(table_ref)?;
122123
Ok(())
123124
}
125+
126+
pub fn scalar_udf_xptr(&self, name: &str) -> savvy::Result<savvy::Sexp> {
127+
if let Some(udf) = self.inner.ctx.state().scalar_functions().get(name) {
128+
let ffi_scalar_udf: FFI_ScalarUDF = udf.clone().into();
129+
let mut ffi_xptr = FFIScalarUdfR(ffi_scalar_udf).into_external_pointer();
130+
unsafe { savvy_ffi::Rf_protect(ffi_xptr.0) };
131+
ffi_xptr.set_class(vec!["datafusion_scalar_udf"])?;
132+
unsafe { savvy_ffi::Rf_unprotect(1) };
133+
134+
Ok(ffi_xptr)
135+
} else {
136+
Err(savvy_err!("Scalar UDF '{name}' was not found"))
137+
}
138+
}
139+
140+
pub fn register_scalar_udf(&self, scalar_udf_xptr: savvy::Sexp) -> savvy::Result<()> {
141+
let scalar_udf = import_scalar_udf(scalar_udf_xptr)?;
142+
self.inner.ctx.register_udf(scalar_udf);
143+
Ok(())
144+
}
124145
}

0 commit comments

Comments
 (0)