11use std:: {
2- ffi:: { c_void, CString } ,
3- num,
2+ ffi:: { c_char, c_void, CString } ,
43 ptr:: { null, null_mut} ,
54 slice,
65 sync:: Arc ,
76} ;
87
9- use arrow:: { datatypes:: Schema , ffi:: FFI_ArrowSchema } ;
8+ use arrow:: {
9+ array:: RecordBatchReader ,
10+ datatypes:: Schema ,
11+ ffi:: { FFI_ArrowArray , FFI_ArrowSchema } ,
12+ ffi_stream:: FFI_ArrowArrayStream ,
13+ } ;
1014use datafusion:: {
1115 error:: DataFusionError ,
16+ execution:: TaskContext ,
1217 physical_plan:: { DisplayAs , ExecutionMode , ExecutionPlan , PlanProperties } ,
1318} ;
1419use datafusion:: { error:: Result , physical_expr:: EquivalenceProperties , prelude:: SessionContext } ;
1520use datafusion_proto:: {
1621 physical_plan:: {
1722 from_proto:: { parse_physical_sort_exprs, parse_protobuf_partitioning} ,
18- to_proto:: {
19- serialize_partitioning, serialize_physical_exprs, serialize_physical_sort_exprs,
20- } ,
23+ to_proto:: { serialize_partitioning, serialize_physical_sort_exprs} ,
2124 DefaultPhysicalExtensionCodec ,
2225 } ,
23- protobuf:: { partitioning , Partitioning , PhysicalSortExprNodeCollection } ,
26+ protobuf:: { Partitioning , PhysicalSortExprNodeCollection } ,
2427} ;
25- use prost:: { DecodeError , Message } ;
28+ use futures:: { StreamExt , TryStreamExt } ;
29+ use prost:: Message ;
30+ use tokio:: runtime:: Runtime ;
31+
32+ use super :: record_batch_stream:: record_batch_to_arrow_stream;
2633
2734#[ repr( C ) ]
2835#[ derive( Debug ) ]
@@ -38,7 +45,13 @@ pub struct FFI_ExecutionPlan {
3845 err_code : & mut i32 ,
3946 ) -> * mut * const FFI_ExecutionPlan ,
4047 > ,
41- pub name : unsafe extern "C" fn ( plan : * const FFI_ExecutionPlan ) -> CString ,
48+ pub name : unsafe extern "C" fn ( plan : * const FFI_ExecutionPlan ) -> * const c_char ,
49+
50+ pub execute : unsafe extern "C" fn (
51+ plan : * const FFI_ExecutionPlan ,
52+ partition : usize ,
53+ err_code : & mut i32 ,
54+ ) -> * const FFI_ArrowArrayStream ,
4255
4356 pub private_data : * mut c_void ,
4457}
@@ -47,6 +60,7 @@ pub struct ExecutionPlanPrivateData {
4760 pub plan : Arc < dyn ExecutionPlan + Send > ,
4861 pub last_error : Option < CString > ,
4962 pub children : Vec < * const FFI_ExecutionPlan > ,
63+ pub context : Arc < TaskContext > ,
5064}
5165
5266unsafe extern "C" fn properties_fn_wrapper ( plan : * const FFI_ExecutionPlan ) -> FFI_PlanProperties {
@@ -63,11 +77,6 @@ unsafe extern "C" fn children_fn_wrapper(
6377 let private_data = ( * plan) . private_data as * const ExecutionPlanPrivateData ;
6478
6579 * num_children = ( * private_data) . children . len ( ) ;
66- // let children: Vec<FFI_ExecutionPlan> = children
67- // .into_iter()
68- // .map(|child| FFI_ExecutionPlan::new(child.clone()))
69- // .collect();
70-
7180 * err_code = 0 ;
7281
7382 let mut children: Vec < _ > = ( * private_data) . children . to_owned ( ) ;
@@ -78,12 +87,36 @@ unsafe extern "C" fn children_fn_wrapper(
7887 children_ptr
7988}
8089
81- unsafe extern "C" fn name_fn_wrapper ( plan : * const FFI_ExecutionPlan ) -> CString {
90+ unsafe extern "C" fn execute_fn_wrapper (
91+ plan : * const FFI_ExecutionPlan ,
92+ partition : usize ,
93+ err_code : & mut i32 ,
94+ ) -> * const FFI_ArrowArrayStream {
95+ let private_data = ( * plan) . private_data as * const ExecutionPlanPrivateData ;
96+
97+ let mut record_batch_stream = match ( * private_data)
98+ . plan
99+ . execute ( partition, ( * private_data) . context . clone ( ) )
100+ {
101+ Ok ( rbs) => rbs,
102+ Err ( _e) => {
103+ * err_code = 1 ;
104+ return null ( ) ;
105+ }
106+ } ;
107+
108+ let ffi_stream = Box :: new ( record_batch_to_arrow_stream ( record_batch_stream) ) ;
109+
110+ Box :: into_raw ( ffi_stream)
111+ }
112+ unsafe extern "C" fn name_fn_wrapper ( plan : * const FFI_ExecutionPlan ) -> * const c_char {
82113 let private_data = ( * plan) . private_data as * const ExecutionPlanPrivateData ;
83114
84115 let name = ( * private_data) . plan . name ( ) ;
85116
86- CString :: new ( name) . unwrap_or ( CString :: new ( "unable to parse execution plan name" ) . unwrap ( ) )
117+ CString :: new ( name)
118+ . unwrap_or ( CString :: new ( "unable to parse execution plan name" ) . unwrap ( ) )
119+ . into_raw ( )
87120}
88121
89122// Since the trait ExecutionPlan requires borrowed values, we wrap our FFI.
@@ -116,18 +149,19 @@ impl DisplayAs for ExportedExecutionPlan {
116149
117150impl FFI_ExecutionPlan {
118151 /// This function is called on the provider's side.
119- pub fn new ( plan : Arc < dyn ExecutionPlan + Send > ) -> Self {
152+ pub fn new ( plan : Arc < dyn ExecutionPlan + Send > , context : Arc < TaskContext > ) -> Self {
120153 let children = plan
121154 . children ( )
122155 . into_iter ( )
123- . map ( |child| Box :: new ( FFI_ExecutionPlan :: new ( child. clone ( ) ) ) )
156+ . map ( |child| Box :: new ( FFI_ExecutionPlan :: new ( child. clone ( ) , context . clone ( ) ) ) )
124157 . map ( |child| Box :: into_raw ( child) as * const FFI_ExecutionPlan )
125158 . collect ( ) ;
126159 println ! ( "children collected" ) ;
127160
128161 let private_data = Box :: new ( ExecutionPlanPrivateData {
129162 plan,
130163 children,
164+ context,
131165 last_error : None ,
132166 } ) ;
133167 println ! ( "generated private data, ready to return" ) ;
@@ -136,6 +170,7 @@ impl FFI_ExecutionPlan {
136170 properties : Some ( properties_fn_wrapper) ,
137171 children : Some ( children_fn_wrapper) ,
138172 name : name_fn_wrapper,
173+ execute : execute_fn_wrapper,
139174 private_data : Box :: into_raw ( private_data) as * mut c_void ,
140175 }
141176 }
@@ -159,9 +194,10 @@ impl ExportedExecutionPlan {
159194 pub unsafe fn new ( plan : * const FFI_ExecutionPlan ) -> Result < Self > {
160195 let name_fn = ( * plan) . name ;
161196 let name_cstr = name_fn ( plan) ;
162- let name = name_cstr
163- . into_string ( )
164- . unwrap_or ( "Unable to parse FFI_ExecutionPlan name" . to_string ( ) ) ;
197+ let name = CString :: from_raw ( name_cstr as * mut c_char )
198+ . to_str ( )
199+ . unwrap_or ( "Unable to parse FFI_ExecutionPlan name" )
200+ . to_string ( ) ;
165201
166202 println ! ( "entered ExportedExecutionPlan::new" ) ;
167203 let properties = unsafe {
@@ -511,6 +547,7 @@ impl TryFrom<FFI_PlanProperties> for PlanProperties {
511547}
512548
513549#[ repr( C ) ]
550+ #[ allow( non_camel_case_types) ]
514551pub enum FFI_ExecutionMode {
515552 Bounded ,
516553
0 commit comments