11use std:: {
22 ffi:: { c_char, c_void, CString } ,
3- ptr:: { null, null_mut} ,
3+ pin:: Pin ,
4+ ptr:: null_mut,
45 slice,
56 sync:: Arc ,
67} ;
78
8- use arrow:: {
9- array:: RecordBatchReader ,
10- datatypes:: Schema ,
11- ffi:: { FFI_ArrowArray , FFI_ArrowSchema } ,
12- ffi_stream:: FFI_ArrowArrayStream ,
13- } ;
9+ use arrow:: { datatypes:: Schema , ffi:: FFI_ArrowSchema , ffi_stream:: FFI_ArrowArrayStream } ;
1410use datafusion:: {
1511 error:: DataFusionError ,
16- execution:: TaskContext ,
12+ execution:: { SendableRecordBatchStream , TaskContext } ,
1713 physical_plan:: { DisplayAs , ExecutionMode , ExecutionPlan , PlanProperties } ,
1814} ;
1915use datafusion:: { error:: Result , physical_expr:: EquivalenceProperties , prelude:: SessionContext } ;
@@ -25,11 +21,9 @@ use datafusion_proto::{
2521 } ,
2622 protobuf:: { Partitioning , PhysicalSortExprNodeCollection } ,
2723} ;
28- use futures:: { StreamExt , TryStreamExt } ;
2924use prost:: Message ;
30- use tokio:: runtime:: Runtime ;
3125
32- use super :: record_batch_stream:: record_batch_to_arrow_stream;
26+ use super :: record_batch_stream:: { record_batch_to_arrow_stream, ConsumerRecordBatchStream } ;
3327
3428#[ repr( C ) ]
3529#[ derive( Debug ) ]
@@ -51,7 +45,7 @@ pub struct FFI_ExecutionPlan {
5145 plan : * const FFI_ExecutionPlan ,
5246 partition : usize ,
5347 err_code : & mut i32 ,
54- ) -> * const FFI_ArrowArrayStream ,
48+ ) -> FFI_ArrowArrayStream ,
5549
5650 pub private_data : * mut c_void ,
5751}
@@ -91,23 +85,21 @@ unsafe extern "C" fn execute_fn_wrapper(
9185 plan : * const FFI_ExecutionPlan ,
9286 partition : usize ,
9387 err_code : & mut i32 ,
94- ) -> * const FFI_ArrowArrayStream {
88+ ) -> FFI_ArrowArrayStream {
9589 let private_data = ( * plan) . private_data as * const ExecutionPlanPrivateData ;
9690
97- let mut record_batch_stream = match ( * private_data)
91+ let record_batch_stream = match ( * private_data)
9892 . plan
9993 . execute ( partition, ( * private_data) . context . clone ( ) )
10094 {
10195 Ok ( rbs) => rbs,
10296 Err ( _e) => {
10397 * err_code = 1 ;
104- return null ( ) ;
98+ return FFI_ArrowArrayStream :: empty ( ) ;
10599 }
106100 } ;
107101
108- let ffi_stream = Box :: new ( record_batch_to_arrow_stream ( record_batch_stream) ) ;
109-
110- Box :: into_raw ( ffi_stream)
102+ record_batch_to_arrow_stream ( record_batch_stream)
111103}
112104unsafe extern "C" fn name_fn_wrapper ( plan : * const FFI_ExecutionPlan ) -> * const c_char {
113105 let private_data = ( * plan) . private_data as * const ExecutionPlanPrivateData ;
@@ -215,7 +207,7 @@ impl ExportedExecutionPlan {
215207 ) ) ?;
216208 let mut num_children = 0 ;
217209 let mut err_code = 0 ;
218- let mut children_ptr = children_fn ( plan, & mut num_children, & mut err_code) ;
210+ let children_ptr = children_fn ( plan, & mut num_children, & mut err_code) ;
219211
220212 println ! (
221213 "We called the FFI function children so the provider told us we have {} children" ,
@@ -284,25 +276,32 @@ impl ExecutionPlan for ExportedExecutionPlan {
284276 self : Arc < Self > ,
285277 children : Vec < Arc < dyn ExecutionPlan > > ,
286278 ) -> datafusion:: error:: Result < Arc < dyn ExecutionPlan > > {
287- todo ! ( )
279+ Ok ( Arc :: new ( ExportedExecutionPlan {
280+ plan : self . plan ,
281+ name : self . name . clone ( ) ,
282+ children,
283+ properties : self . properties . clone ( ) ,
284+ } ) )
288285 }
289286
290287 fn execute (
291288 & self ,
292289 partition : usize ,
293- context : Arc < datafusion:: execution:: TaskContext > ,
290+ _context : Arc < datafusion:: execution:: TaskContext > ,
294291 ) -> datafusion:: error:: Result < datafusion:: execution:: SendableRecordBatchStream > {
295- todo ! ( )
296- }
297- }
298-
299- impl DisplayAs for FFI_ExecutionPlan {
300- fn fmt_as (
301- & self ,
302- t : datafusion:: physical_plan:: DisplayFormatType ,
303- f : & mut std:: fmt:: Formatter ,
304- ) -> std:: fmt:: Result {
305- todo ! ( )
292+ unsafe {
293+ let execute_fn = ( * self . plan ) . execute ;
294+ let mut err_code = 0 ;
295+ let arrow_stream = execute_fn ( self . plan , partition, & mut err_code) ;
296+
297+ match err_code {
298+ 0 => ConsumerRecordBatchStream :: try_from ( arrow_stream)
299+ . map ( |v| Pin :: new ( Box :: new ( v) ) as SendableRecordBatchStream ) ,
300+ _ => Err ( DataFusionError :: Execution (
301+ "Error occurred during FFI call to FFI_ExecutionPlan execute." . to_string ( ) ,
302+ ) ) ,
303+ }
304+ }
306305 }
307306}
308307
@@ -311,9 +310,6 @@ impl DisplayAs for FFI_ExecutionPlan {
311310#[ allow( missing_docs) ]
312311#[ allow( non_camel_case_types) ]
313312pub struct FFI_PlanProperties {
314- // We will build equivalence properties from teh schema and ordersing (new_with_orderings). This is how we do ti in dataset_exec
315- // pub eq_properties: Option<unsafe extern "C" fn(plan: *const FFI_PlanProperties) -> EquivalenceProperties>,
316-
317313 // Returns protobuf serialized bytes of the partitioning
318314 pub output_partitioning : Option <
319315 unsafe extern "C" fn (
0 commit comments