Skip to content

Commit 32e6fe8

Browse files
authored
feat: update FFI TableProvider and ExecutionPlan to use FFI Session and TaskContext (#19281)
## Which issue does this PR close? Addresses part of #18671 but does not close it. ## Rationale for this change This is the major change to address the requirements of #18671. This PR combines all of the previous PRs in the issue and uses them in `FFI_TableProvider` and `FFI_ExecutionPlan`. With this change the only remaining thing to close the issue is to remove the core crate. That is a large PR that mostly just changes import paths and will be a follow up. ## What changes are included in this PR? - Update all structs in the FFI crate to use the `FFI_PhysicalExpr`, `FFI_Session`, `FFI_TaskContext`, and `FFI_LogicalExtensionCodec`. - Remove creation of `SessionContext` within the FFI crate - Updates unit tests ## Are these changes tested? Unit tests are added. Coverage report: <img width="685" height="804" alt="Screenshot 2025-12-11 at 10 42 21 AM" src="https://github.com/user-attachments/assets/415822ec-909e-4abe-98de-ae32eb5ec9c3" /> ## Are there any user-facing changes? Yes. There is one major change to using the FFI crate that downstream users will need to implement. Now when creating a table provider, catalog provider, etc you need to provide a `TaskContextProvider` and an optional `LogicalExtensionCodec`. The upgrade guide has been updated.
1 parent eb30c19 commit 32e6fe8

File tree

28 files changed

+874
-584
lines changed

28 files changed

+874
-584
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion-examples/examples/ffi/ffi_example_table_provider/src/lib.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use abi_stable::{export_root_module, prefix_type::PrefixTypeTrait};
2121
use arrow::array::RecordBatch;
2222
use arrow::datatypes::{DataType, Field, Schema};
2323
use datafusion::{common::record_batch, datasource::MemTable};
24+
use datafusion_ffi::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
2425
use datafusion_ffi::table_provider::FFI_TableProvider;
2526
use ffi_module_interface::{TableProviderModule, TableProviderModuleRef};
2627

@@ -34,7 +35,9 @@ fn create_record_batch(start_value: i32, num_values: usize) -> RecordBatch {
3435

3536
/// Here we only wish to create a simple table provider as an example.
3637
/// We create an in-memory table and convert it to it's FFI counterpart.
37-
extern "C" fn construct_simple_table_provider() -> FFI_TableProvider {
38+
extern "C" fn construct_simple_table_provider(
39+
codec: FFI_LogicalExtensionCodec,
40+
) -> FFI_TableProvider {
3841
let schema = Arc::new(Schema::new(vec![
3942
Field::new("a", DataType::Int32, true),
4043
Field::new("b", DataType::Float64, true),
@@ -50,7 +53,7 @@ extern "C" fn construct_simple_table_provider() -> FFI_TableProvider {
5053

5154
let table_provider = MemTable::try_new(schema, vec![batches]).unwrap();
5255

53-
FFI_TableProvider::new(Arc::new(table_provider), true, None)
56+
FFI_TableProvider::new_with_ffi_codec(Arc::new(table_provider), true, None, codec)
5457
}
5558

5659
#[export_root_module]

datafusion-examples/examples/ffi/ffi_module_interface/src/lib.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use abi_stable::{
2121
package_version_strings,
2222
sabi_types::VersionStrings,
2323
};
24+
use datafusion_ffi::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
2425
use datafusion_ffi::table_provider::FFI_TableProvider;
2526

2627
#[repr(C)]
@@ -33,7 +34,8 @@ use datafusion_ffi::table_provider::FFI_TableProvider;
3334
/// how a user may wish to separate these concerns.
3435
pub struct TableProviderModule {
3536
/// Constructs the table provider
36-
pub create_table: extern "C" fn() -> FFI_TableProvider,
37+
pub create_table:
38+
extern "C" fn(codec: FFI_LogicalExtensionCodec) -> FFI_TableProvider,
3739
}
3840

3941
impl RootModule for TableProviderModuleRef {

datafusion-examples/examples/ffi/ffi_module_loader/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,5 +24,6 @@ publish = false
2424
[dependencies]
2525
abi_stable = "0.11.3"
2626
datafusion = { workspace = true }
27+
datafusion-ffi = { workspace = true }
2728
ffi_module_interface = { path = "../ffi_module_interface" }
2829
tokio = { workspace = true, features = ["rt-multi-thread", "parking_lot"] }

datafusion-examples/examples/ffi/ffi_module_loader/src/main.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ use datafusion::{
2424

2525
use abi_stable::library::{RootModule, development_utils::compute_library_path};
2626
use datafusion::datasource::TableProvider;
27+
use datafusion::execution::TaskContextProvider;
28+
use datafusion_ffi::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
2729
use ffi_module_interface::TableProviderModuleRef;
2830

2931
#[tokio::main]
@@ -39,21 +41,24 @@ async fn main() -> Result<()> {
3941
TableProviderModuleRef::load_from_directory(&library_path)
4042
.map_err(|e| DataFusionError::External(Box::new(e)))?;
4143

44+
let ctx = Arc::new(SessionContext::new());
45+
let codec = FFI_LogicalExtensionCodec::new_default(
46+
&(Arc::clone(&ctx) as Arc<dyn TaskContextProvider>),
47+
);
48+
4249
// By calling the code below, the table provided will be created within
4350
// the module's code.
4451
let ffi_table_provider =
4552
table_provider_module
4653
.create_table()
4754
.ok_or(DataFusionError::NotImplemented(
4855
"External table provider failed to implement create_table".to_string(),
49-
))?();
56+
))?(codec);
5057

5158
// In order to access the table provider within this executable, we need to
5259
// turn it into a `TableProvider`.
5360
let foreign_table_provider: Arc<dyn TableProvider> = (&ffi_table_provider).into();
5461

55-
let ctx = SessionContext::new();
56-
5762
// Display the data to show the full cycle works.
5863
ctx.register_table("external_table", foreign_table_provider)?;
5964
let df = ctx.table("external_table").await?;

datafusion/ffi/src/catalog_provider.rs

Lines changed: 72 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -15,22 +15,24 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use std::{any::Any, ffi::c_void, sync::Arc};
19-
20-
use abi_stable::{
21-
StableAbi,
22-
std_types::{ROption, RResult, RString, RVec},
18+
use std::any::Any;
19+
use std::ffi::c_void;
20+
use std::sync::Arc;
21+
22+
use abi_stable::StableAbi;
23+
use abi_stable::std_types::{ROption, RResult, RString, RVec};
24+
use datafusion_catalog::{CatalogProvider, SchemaProvider};
25+
use datafusion_common::error::Result;
26+
use datafusion_proto::logical_plan::{
27+
DefaultLogicalExtensionCodec, LogicalExtensionCodec,
2328
};
24-
use datafusion::catalog::{CatalogProvider, SchemaProvider};
2529
use tokio::runtime::Handle;
2630

27-
use crate::{
28-
df_result, rresult_return,
29-
schema_provider::{FFI_SchemaProvider, ForeignSchemaProvider},
30-
};
31-
31+
use crate::execution::FFI_TaskContextProvider;
32+
use crate::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
33+
use crate::schema_provider::{FFI_SchemaProvider, ForeignSchemaProvider};
3234
use crate::util::FFIResult;
33-
use datafusion::error::Result;
35+
use crate::{df_result, rresult_return};
3436

3537
/// A stable struct for sharing [`CatalogProvider`] across FFI boundaries.
3638
#[repr(C)]
@@ -58,6 +60,8 @@ pub struct FFI_CatalogProvider {
5860
)
5961
-> FFIResult<ROption<FFI_SchemaProvider>>,
6062

63+
pub logical_codec: FFI_LogicalExtensionCodec,
64+
6165
/// Used to create a clone on the provider of the execution plan. This should
6266
/// only need to be called by the receiver of the plan.
6367
pub clone: unsafe extern "C" fn(plan: &Self) -> Self,
@@ -118,7 +122,13 @@ unsafe extern "C" fn schema_fn_wrapper(
118122
unsafe {
119123
let maybe_schema = provider.inner().schema(name.as_str());
120124
maybe_schema
121-
.map(|schema| FFI_SchemaProvider::new(schema, provider.runtime()))
125+
.map(|schema| {
126+
FFI_SchemaProvider::new_with_ffi_codec(
127+
schema,
128+
provider.runtime(),
129+
provider.logical_codec.clone(),
130+
)
131+
})
122132
.into()
123133
}
124134
}
@@ -130,12 +140,18 @@ unsafe extern "C" fn register_schema_fn_wrapper(
130140
) -> FFIResult<ROption<FFI_SchemaProvider>> {
131141
unsafe {
132142
let runtime = provider.runtime();
133-
let provider = provider.inner();
143+
let inner_provider = provider.inner();
134144
let schema: Arc<dyn SchemaProvider + Send> = schema.into();
135145

136146
let returned_schema =
137-
rresult_return!(provider.register_schema(name.as_str(), schema))
138-
.map(|schema| FFI_SchemaProvider::new(schema, runtime))
147+
rresult_return!(inner_provider.register_schema(name.as_str(), schema))
148+
.map(|schema| {
149+
FFI_SchemaProvider::new_with_ffi_codec(
150+
schema,
151+
runtime,
152+
provider.logical_codec.clone(),
153+
)
154+
})
139155
.into();
140156

141157
RResult::ROk(returned_schema)
@@ -149,14 +165,20 @@ unsafe extern "C" fn deregister_schema_fn_wrapper(
149165
) -> FFIResult<ROption<FFI_SchemaProvider>> {
150166
unsafe {
151167
let runtime = provider.runtime();
152-
let provider = provider.inner();
168+
let inner_provider = provider.inner();
153169

154170
let maybe_schema =
155-
rresult_return!(provider.deregister_schema(name.as_str(), cascade));
171+
rresult_return!(inner_provider.deregister_schema(name.as_str(), cascade));
156172

157173
RResult::ROk(
158174
maybe_schema
159-
.map(|schema| FFI_SchemaProvider::new(schema, runtime))
175+
.map(|schema| {
176+
FFI_SchemaProvider::new_with_ffi_codec(
177+
schema,
178+
runtime,
179+
provider.logical_codec.clone(),
180+
)
181+
})
160182
.into(),
161183
)
162184
}
@@ -189,6 +211,7 @@ unsafe extern "C" fn clone_fn_wrapper(
189211
schema: schema_fn_wrapper,
190212
register_schema: register_schema_fn_wrapper,
191213
deregister_schema: deregister_schema_fn_wrapper,
214+
logical_codec: provider.logical_codec.clone(),
192215
clone: clone_fn_wrapper,
193216
release: release_fn_wrapper,
194217
version: super::version,
@@ -209,6 +232,24 @@ impl FFI_CatalogProvider {
209232
pub fn new(
210233
provider: Arc<dyn CatalogProvider + Send>,
211234
runtime: Option<Handle>,
235+
task_ctx_provider: impl Into<FFI_TaskContextProvider>,
236+
logical_codec: Option<Arc<dyn LogicalExtensionCodec>>,
237+
) -> Self {
238+
let task_ctx_provider = task_ctx_provider.into();
239+
let logical_codec =
240+
logical_codec.unwrap_or_else(|| Arc::new(DefaultLogicalExtensionCodec {}));
241+
let logical_codec = FFI_LogicalExtensionCodec::new(
242+
logical_codec,
243+
runtime.clone(),
244+
task_ctx_provider.clone(),
245+
);
246+
Self::new_with_ffi_codec(provider, runtime, logical_codec)
247+
}
248+
249+
pub fn new_with_ffi_codec(
250+
provider: Arc<dyn CatalogProvider + Send>,
251+
runtime: Option<Handle>,
252+
logical_codec: FFI_LogicalExtensionCodec,
212253
) -> Self {
213254
let private_data = Box::new(ProviderPrivateData { provider, runtime });
214255

@@ -217,6 +258,7 @@ impl FFI_CatalogProvider {
217258
schema: schema_fn_wrapper,
218259
register_schema: register_schema_fn_wrapper,
219260
deregister_schema: deregister_schema_fn_wrapper,
261+
logical_codec,
220262
clone: clone_fn_wrapper,
221263
release: release_fn_wrapper,
222264
version: super::version,
@@ -286,7 +328,11 @@ impl CatalogProvider for ForeignCatalogProvider {
286328
unsafe {
287329
let schema = match schema.as_any().downcast_ref::<ForeignSchemaProvider>() {
288330
Some(s) => &s.0,
289-
None => &FFI_SchemaProvider::new(schema, None),
331+
None => &FFI_SchemaProvider::new_with_ffi_codec(
332+
schema,
333+
None,
334+
self.0.logical_codec.clone(),
335+
),
290336
};
291337
let returned_schema: Option<FFI_SchemaProvider> =
292338
df_result!((self.0.register_schema)(&self.0, name.into(), schema))?
@@ -331,8 +377,10 @@ mod tests {
331377
.unwrap()
332378
.is_none()
333379
);
380+
let (_ctx, task_ctx_provider) = crate::util::tests::test_session_and_ctx();
334381

335-
let mut ffi_catalog = FFI_CatalogProvider::new(catalog, None);
382+
let mut ffi_catalog =
383+
FFI_CatalogProvider::new(catalog, None, task_ctx_provider, None);
336384
ffi_catalog.library_marker_id = crate::mock_foreign_marker_id;
337385

338386
let foreign_catalog: Arc<dyn CatalogProvider + Send> = (&ffi_catalog).into();
@@ -375,7 +423,9 @@ mod tests {
375423
fn test_ffi_catalog_provider_local_bypass() {
376424
let catalog = Arc::new(MemoryCatalogProvider::new());
377425

378-
let mut ffi_catalog = FFI_CatalogProvider::new(catalog, None);
426+
let (_ctx, task_ctx_provider) = crate::util::tests::test_session_and_ctx();
427+
let mut ffi_catalog =
428+
FFI_CatalogProvider::new(catalog, None, task_ctx_provider, None);
379429

380430
// Verify local libraries can be downcast to their original
381431
let foreign_catalog: Arc<dyn CatalogProvider + Send> = (&ffi_catalog).into();

0 commit comments

Comments
 (0)