Skip to content

Commit 5842e23

Browse files
timsaucerCopilot
andauthored
Implement FFI_PhysicalExpr and the structs it needs to support it. (#18916)
## Which issue does this PR close? Addresses part of #18671 but does not close it. ## Rationale for this change This PR exposes the `PhysicalExpr` trait via FFI. This will allow us to remove using protobuf for transferring *physical* expressions across the FFI boundary. We will still use protobuf for the *logical* side. The reason this is important is because it will allow us to eventually remove the `core` crate as described in #18671 but also it will enable keeping `ColumnarValue::Scalar` when using UDFs. This is important for UDF performance. Of all of the PRs I have prepared for #18671 this is the largest of the individual PRs. That is because it requires quite a few supporting structures from `datafusion-expr` in order to support it. ## What changes are included in this PR? - This PR introduces the `FFI_PhysicalExpr` trait and a variety of enums and structs that are needed to be FFI stable in order to implement it. It does _not_ replace the existing usage in the UDFs and other places with the `FFI_PhysicalExpr` yet. That comes in a later PR in order to keep the size of the individual requests to manageable. - Adds `FFIResult<T>` which is an alias for `RResult<T, RString>` - Adds FFI as variant on `DataFusionError` ## Are these changes tested? Unit tests are included. <img width="477" height="181" alt="Screenshot 2025-11-25 at 8 06 23 AM" src="https://github.com/user-attachments/assets/7e708bcb-922a-4433-9a52-afd22c3ba09e" /> ## Are there any user-facing changes? Since this is pure addition, no user facing changes in this PR. --------- Co-authored-by: Copilot <[email protected]>
1 parent 653caa1 commit 5842e23

33 files changed

+1904
-201
lines changed

Cargo.lock

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/common/src/error.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,10 @@ pub enum DataFusionError {
171171
/// to multiple receivers. For example, when the source of a repartition
172172
/// errors and the error is propagated to multiple consumers.
173173
Shared(Arc<DataFusionError>),
174+
/// An error that originated during a foreign function interface call.
175+
/// Transferring errors across the FFI boundary is difficult, so the original
176+
/// error will be converted to a string.
177+
Ffi(String),
174178
}
175179

176180
#[macro_export]
@@ -413,6 +417,7 @@ impl Error for DataFusionError {
413417
// can't be executed.
414418
DataFusionError::Collection(errs) => errs.first().map(|e| e as &dyn Error),
415419
DataFusionError::Shared(e) => Some(e.as_ref()),
420+
DataFusionError::Ffi(_) => None,
416421
}
417422
}
418423
}
@@ -544,6 +549,7 @@ impl DataFusionError {
544549
errs.first().expect("cannot construct DataFusionError::Collection with 0 errors, but got one such case").error_prefix()
545550
}
546551
DataFusionError::Shared(_) => "",
552+
DataFusionError::Ffi(_) => "FFI error: ",
547553
}
548554
}
549555

@@ -596,6 +602,7 @@ impl DataFusionError {
596602
.expect("cannot construct DataFusionError::Collection with 0 errors")
597603
.message(),
598604
DataFusionError::Shared(ref desc) => Cow::Owned(desc.to_string()),
605+
DataFusionError::Ffi(ref desc) => Cow::Owned(desc.to_string()),
599606
}
600607
}
601608

@@ -964,6 +971,9 @@ make_error!(substrait_err, substrait_datafusion_err, Substrait);
964971
// Exposes a macro to create `DataFusionError::ResourcesExhausted` with optional backtrace
965972
make_error!(resources_err, resources_datafusion_err, ResourcesExhausted);
966973

974+
// Exposes a macro to create `DataFusionError::Ffi` with optional backtrace
975+
make_error!(ffi_err, ffi_datafusion_err, Ffi);
976+
967977
// Exposes a macro to create `DataFusionError::SQL` with optional backtrace
968978
#[macro_export]
969979
macro_rules! sql_datafusion_err {

datafusion/common/src/lib.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -102,9 +102,9 @@ pub use utils::project_schema;
102102
// https://github.com/rust-lang/rust/pull/52234#issuecomment-976702997
103103
#[doc(hidden)]
104104
pub use error::{
105-
_config_datafusion_err, _exec_datafusion_err, _internal_datafusion_err,
106-
_not_impl_datafusion_err, _plan_datafusion_err, _resources_datafusion_err,
107-
_substrait_datafusion_err,
105+
_config_datafusion_err, _exec_datafusion_err, _ffi_datafusion_err,
106+
_internal_datafusion_err, _not_impl_datafusion_err, _plan_datafusion_err,
107+
_resources_datafusion_err, _substrait_datafusion_err,
108108
};
109109

110110
// The HashMap and HashSet implementations that should be used as the uniform defaults

datafusion/ffi/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,10 @@ async-ffi = { version = "0.5.0", features = ["abi_stable"] }
4848
async-trait = { workspace = true }
4949
datafusion = { workspace = true, default-features = false }
5050
datafusion-common = { workspace = true }
51+
datafusion-expr = { workspace = true }
5152
datafusion-functions-aggregate-common = { workspace = true }
53+
datafusion-physical-expr = { workspace = true }
54+
datafusion-physical-expr-common = { workspace = true }
5255
datafusion-proto = { workspace = true }
5356
datafusion-proto-common = { workspace = true }
5457
futures = { workspace = true }

datafusion/ffi/src/arrow_wrappers.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use arrow::{
2424
error::ArrowError,
2525
ffi::{from_ffi, to_ffi, FFI_ArrowArray, FFI_ArrowSchema},
2626
};
27+
use datafusion_common::{DataFusionError, ScalarValue};
2728
use log::error;
2829

2930
/// This is a wrapper struct around FFI_ArrowSchema simply to indicate
@@ -95,3 +96,21 @@ impl TryFrom<&ArrayRef> for WrappedArray {
9596
Ok(WrappedArray { array, schema })
9697
}
9798
}
99+
100+
impl TryFrom<&ScalarValue> for WrappedArray {
101+
type Error = DataFusionError;
102+
103+
fn try_from(value: &ScalarValue) -> Result<Self, Self::Error> {
104+
let array = value.to_array()?;
105+
WrappedArray::try_from(&array).map_err(Into::into)
106+
}
107+
}
108+
109+
impl TryFrom<WrappedArray> for ScalarValue {
110+
type Error = DataFusionError;
111+
112+
fn try_from(value: WrappedArray) -> Result<Self, Self::Error> {
113+
let array: ArrayRef = value.try_into()?;
114+
ScalarValue::try_from_array(array.as_ref(), 0)
115+
}
116+
}

datafusion/ffi/src/catalog_provider.rs

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use crate::{
2929
schema_provider::{FFI_SchemaProvider, ForeignSchemaProvider},
3030
};
3131

32+
use crate::util::FFIResult;
3233
use datafusion::error::Result;
3334

3435
/// A stable struct for sharing [`CatalogProvider`] across FFI boundaries.
@@ -43,19 +44,19 @@ pub struct FFI_CatalogProvider {
4344
name: RString,
4445
) -> ROption<FFI_SchemaProvider>,
4546

46-
pub register_schema:
47-
unsafe extern "C" fn(
48-
provider: &Self,
49-
name: RString,
50-
schema: &FFI_SchemaProvider,
51-
) -> RResult<ROption<FFI_SchemaProvider>, RString>,
47+
pub register_schema: unsafe extern "C" fn(
48+
provider: &Self,
49+
name: RString,
50+
schema: &FFI_SchemaProvider,
51+
)
52+
-> FFIResult<ROption<FFI_SchemaProvider>>,
5253

53-
pub deregister_schema:
54-
unsafe extern "C" fn(
55-
provider: &Self,
56-
name: RString,
57-
cascade: bool,
58-
) -> RResult<ROption<FFI_SchemaProvider>, RString>,
54+
pub deregister_schema: unsafe extern "C" fn(
55+
provider: &Self,
56+
name: RString,
57+
cascade: bool,
58+
)
59+
-> FFIResult<ROption<FFI_SchemaProvider>>,
5960

6061
/// Used to create a clone on the provider of the execution plan. This should
6162
/// only need to be called by the receiver of the plan.
@@ -118,7 +119,7 @@ unsafe extern "C" fn register_schema_fn_wrapper(
118119
provider: &FFI_CatalogProvider,
119120
name: RString,
120121
schema: &FFI_SchemaProvider,
121-
) -> RResult<ROption<FFI_SchemaProvider>, RString> {
122+
) -> FFIResult<ROption<FFI_SchemaProvider>> {
122123
let runtime = provider.runtime();
123124
let provider = provider.inner();
124125
let schema: Arc<dyn SchemaProvider + Send> = schema.into();
@@ -135,7 +136,7 @@ unsafe extern "C" fn deregister_schema_fn_wrapper(
135136
provider: &FFI_CatalogProvider,
136137
name: RString,
137138
cascade: bool,
138-
) -> RResult<ROption<FFI_SchemaProvider>, RString> {
139+
) -> FFIResult<ROption<FFI_SchemaProvider>> {
139140
let runtime = provider.runtime();
140141
let provider = provider.inner();
141142

@@ -150,8 +151,10 @@ unsafe extern "C" fn deregister_schema_fn_wrapper(
150151
}
151152

152153
unsafe extern "C" fn release_fn_wrapper(provider: &mut FFI_CatalogProvider) {
154+
debug_assert!(!provider.private_data.is_null());
153155
let private_data = Box::from_raw(provider.private_data as *mut ProviderPrivateData);
154156
drop(private_data);
157+
provider.private_data = std::ptr::null_mut();
155158
}
156159

157160
unsafe extern "C" fn clone_fn_wrapper(

datafusion/ffi/src/catalog_provider_list.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,8 +120,10 @@ unsafe extern "C" fn catalog_fn_wrapper(
120120
}
121121

122122
unsafe extern "C" fn release_fn_wrapper(provider: &mut FFI_CatalogProviderList) {
123+
debug_assert!(!provider.private_data.is_null());
123124
let private_data = Box::from_raw(provider.private_data as *mut ProviderPrivateData);
124125
drop(private_data);
126+
provider.private_data = std::ptr::null_mut();
125127
}
126128

127129
unsafe extern "C" fn clone_fn_wrapper(

datafusion/ffi/src/execution_plan.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
use std::{ffi::c_void, pin::Pin, sync::Arc};
1919

2020
use abi_stable::{
21-
std_types::{RResult, RString, RVec},
21+
std_types::{RString, RVec},
2222
StableAbi,
2323
};
2424
use datafusion::{
@@ -29,6 +29,7 @@ use datafusion::{
2929
use datafusion::{error::Result, physical_plan::DisplayFormatType};
3030
use tokio::runtime::Handle;
3131

32+
use crate::util::FFIResult;
3233
use crate::{
3334
df_result, plan_properties::FFI_PlanProperties,
3435
record_batch_stream::FFI_RecordBatchStream, rresult,
@@ -53,7 +54,7 @@ pub struct FFI_ExecutionPlan {
5354
pub execute: unsafe extern "C" fn(
5455
plan: &Self,
5556
partition: usize,
56-
) -> RResult<FFI_RecordBatchStream, RString>,
57+
) -> FFIResult<FFI_RecordBatchStream>,
5758

5859
/// Used to create a clone on the provider of the execution plan. This should
5960
/// only need to be called by the receiver of the plan.
@@ -116,7 +117,7 @@ unsafe extern "C" fn children_fn_wrapper(
116117
unsafe extern "C" fn execute_fn_wrapper(
117118
plan: &FFI_ExecutionPlan,
118119
partition: usize,
119-
) -> RResult<FFI_RecordBatchStream, RString> {
120+
) -> FFIResult<FFI_RecordBatchStream> {
120121
let private_data = plan.private_data as *const ExecutionPlanPrivateData;
121122
let plan = &(*private_data).plan;
122123
let ctx = &(*private_data).context;
@@ -132,8 +133,10 @@ unsafe extern "C" fn name_fn_wrapper(plan: &FFI_ExecutionPlan) -> RString {
132133
}
133134

134135
unsafe extern "C" fn release_fn_wrapper(plan: &mut FFI_ExecutionPlan) {
136+
debug_assert!(!plan.private_data.is_null());
135137
let private_data = Box::from_raw(plan.private_data as *mut ExecutionPlanPrivateData);
136138
drop(private_data);
139+
plan.private_data = std::ptr::null_mut();
137140
}
138141

139142
unsafe extern "C" fn clone_fn_wrapper(plan: &FFI_ExecutionPlan) -> FFI_ExecutionPlan {
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use abi_stable::StableAbi;
19+
use datafusion_common::{DataFusionError, ScalarValue};
20+
use datafusion_expr::ColumnarValue;
21+
22+
use crate::arrow_wrappers::WrappedArray;
23+
24+
/// A stable struct for sharing [`ColumnarValue`] across FFI boundaries.
25+
/// Scalar values are passed as an Arrow array of length 1.
26+
#[repr(C)]
27+
#[derive(Debug, StableAbi)]
28+
#[allow(non_camel_case_types)]
29+
pub enum FFI_ColumnarValue {
30+
Array(WrappedArray),
31+
Scalar(WrappedArray),
32+
}
33+
34+
impl TryFrom<ColumnarValue> for FFI_ColumnarValue {
35+
type Error = DataFusionError;
36+
fn try_from(value: ColumnarValue) -> Result<Self, Self::Error> {
37+
Ok(match value {
38+
ColumnarValue::Array(v) => {
39+
FFI_ColumnarValue::Array(WrappedArray::try_from(&v)?)
40+
}
41+
ColumnarValue::Scalar(v) => {
42+
FFI_ColumnarValue::Scalar(WrappedArray::try_from(&v)?)
43+
}
44+
})
45+
}
46+
}
47+
48+
impl TryFrom<FFI_ColumnarValue> for ColumnarValue {
49+
type Error = DataFusionError;
50+
fn try_from(value: FFI_ColumnarValue) -> Result<Self, Self::Error> {
51+
Ok(match value {
52+
FFI_ColumnarValue::Array(v) => ColumnarValue::Array(v.try_into()?),
53+
FFI_ColumnarValue::Scalar(v) => {
54+
ColumnarValue::Scalar(ScalarValue::try_from(v)?)
55+
}
56+
})
57+
}
58+
}
59+
60+
#[cfg(test)]
61+
mod tests {
62+
use arrow::array::create_array;
63+
use datafusion_common::{DataFusionError, ScalarValue};
64+
use datafusion_expr::ColumnarValue;
65+
66+
use crate::expr::columnar_value::FFI_ColumnarValue;
67+
68+
#[test]
69+
fn ffi_columnar_value_round_trip() -> Result<(), DataFusionError> {
70+
let array = create_array!(Int32, [1, 2, 3, 4, 5]);
71+
72+
for original in [
73+
ColumnarValue::Array(array),
74+
ColumnarValue::Scalar(ScalarValue::Int32(Some(1))),
75+
] {
76+
let ffi_variant = FFI_ColumnarValue::try_from(original.clone())?;
77+
78+
let returned_value = ColumnarValue::try_from(ffi_variant)?;
79+
80+
assert_eq!(format!("{returned_value:?}"), format!("{original:?}"));
81+
}
82+
83+
Ok(())
84+
}
85+
}

0 commit comments

Comments
 (0)