Skip to content

Commit 1b4ff25

Browse files
committed
Mimimum viable product demonstrating foreign table provider
1 parent 5f9c33a commit 1b4ff25

File tree

3 files changed

+71
-328
lines changed

3 files changed

+71
-328
lines changed

src/ffi/execution_plan.rs

Lines changed: 31 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,15 @@
11
use std::{
22
ffi::{c_char, c_void, CString},
3-
ptr::{null, null_mut},
3+
pin::Pin,
4+
ptr::null_mut,
45
slice,
56
sync::Arc,
67
};
78

8-
use arrow::{
9-
array::RecordBatchReader,
10-
datatypes::Schema,
11-
ffi::{FFI_ArrowArray, FFI_ArrowSchema},
12-
ffi_stream::FFI_ArrowArrayStream,
13-
};
9+
use arrow::{datatypes::Schema, ffi::FFI_ArrowSchema, ffi_stream::FFI_ArrowArrayStream};
1410
use datafusion::{
1511
error::DataFusionError,
16-
execution::TaskContext,
12+
execution::{SendableRecordBatchStream, TaskContext},
1713
physical_plan::{DisplayAs, ExecutionMode, ExecutionPlan, PlanProperties},
1814
};
1915
use datafusion::{error::Result, physical_expr::EquivalenceProperties, prelude::SessionContext};
@@ -25,11 +21,9 @@ use datafusion_proto::{
2521
},
2622
protobuf::{Partitioning, PhysicalSortExprNodeCollection},
2723
};
28-
use futures::{StreamExt, TryStreamExt};
2924
use prost::Message;
30-
use tokio::runtime::Runtime;
3125

32-
use super::record_batch_stream::record_batch_to_arrow_stream;
26+
use super::record_batch_stream::{record_batch_to_arrow_stream, ConsumerRecordBatchStream};
3327

3428
#[repr(C)]
3529
#[derive(Debug)]
@@ -51,7 +45,7 @@ pub struct FFI_ExecutionPlan {
5145
plan: *const FFI_ExecutionPlan,
5246
partition: usize,
5347
err_code: &mut i32,
54-
) -> *const FFI_ArrowArrayStream,
48+
) -> FFI_ArrowArrayStream,
5549

5650
pub private_data: *mut c_void,
5751
}
@@ -91,23 +85,21 @@ unsafe extern "C" fn execute_fn_wrapper(
9185
plan: *const FFI_ExecutionPlan,
9286
partition: usize,
9387
err_code: &mut i32,
94-
) -> *const FFI_ArrowArrayStream {
88+
) -> FFI_ArrowArrayStream {
9589
let private_data = (*plan).private_data as *const ExecutionPlanPrivateData;
9690

97-
let mut record_batch_stream = match (*private_data)
91+
let record_batch_stream = match (*private_data)
9892
.plan
9993
.execute(partition, (*private_data).context.clone())
10094
{
10195
Ok(rbs) => rbs,
10296
Err(_e) => {
10397
*err_code = 1;
104-
return null();
98+
return FFI_ArrowArrayStream::empty();
10599
}
106100
};
107101

108-
let ffi_stream = Box::new(record_batch_to_arrow_stream(record_batch_stream));
109-
110-
Box::into_raw(ffi_stream)
102+
record_batch_to_arrow_stream(record_batch_stream)
111103
}
112104
unsafe extern "C" fn name_fn_wrapper(plan: *const FFI_ExecutionPlan) -> *const c_char {
113105
let private_data = (*plan).private_data as *const ExecutionPlanPrivateData;
@@ -215,7 +207,7 @@ impl ExportedExecutionPlan {
215207
))?;
216208
let mut num_children = 0;
217209
let mut err_code = 0;
218-
let mut children_ptr = children_fn(plan, &mut num_children, &mut err_code);
210+
let children_ptr = children_fn(plan, &mut num_children, &mut err_code);
219211

220212
println!(
221213
"We called the FFI function children so the provider told us we have {} children",
@@ -284,25 +276,32 @@ impl ExecutionPlan for ExportedExecutionPlan {
284276
self: Arc<Self>,
285277
children: Vec<Arc<dyn ExecutionPlan>>,
286278
) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
287-
todo!()
279+
Ok(Arc::new(ExportedExecutionPlan {
280+
plan: self.plan,
281+
name: self.name.clone(),
282+
children,
283+
properties: self.properties.clone(),
284+
}))
288285
}
289286

290287
fn execute(
291288
&self,
292289
partition: usize,
293-
context: Arc<datafusion::execution::TaskContext>,
290+
_context: Arc<datafusion::execution::TaskContext>,
294291
) -> datafusion::error::Result<datafusion::execution::SendableRecordBatchStream> {
295-
todo!()
296-
}
297-
}
298-
299-
impl DisplayAs for FFI_ExecutionPlan {
300-
fn fmt_as(
301-
&self,
302-
t: datafusion::physical_plan::DisplayFormatType,
303-
f: &mut std::fmt::Formatter,
304-
) -> std::fmt::Result {
305-
todo!()
292+
unsafe {
293+
let execute_fn = (*self.plan).execute;
294+
let mut err_code = 0;
295+
let arrow_stream = execute_fn(self.plan, partition, &mut err_code);
296+
297+
match err_code {
298+
0 => ConsumerRecordBatchStream::try_from(arrow_stream)
299+
.map(|v| Pin::new(Box::new(v)) as SendableRecordBatchStream),
300+
_ => Err(DataFusionError::Execution(
301+
"Error occurred during FFI call to FFI_ExecutionPlan execute.".to_string(),
302+
)),
303+
}
304+
}
306305
}
307306
}
308307

@@ -311,9 +310,6 @@ impl DisplayAs for FFI_ExecutionPlan {
311310
#[allow(missing_docs)]
312311
#[allow(non_camel_case_types)]
313312
pub struct FFI_PlanProperties {
314-
// We will build equivalence properties from teh schema and ordersing (new_with_orderings). This is how we do ti in dataset_exec
315-
// pub eq_properties: Option<unsafe extern "C" fn(plan: *const FFI_PlanProperties) -> EquivalenceProperties>,
316-
317313
// Returns protobuf serialized bytes of the partitioning
318314
pub output_partitioning: Option<
319315
unsafe extern "C" fn(

0 commit comments

Comments
 (0)