@@ -27,6 +27,7 @@ use datafusion::{
2727 execution:: { SendableRecordBatchStream , TaskContext } ,
2828 physical_plan:: { DisplayAs , ExecutionPlan , PlanProperties } ,
2929} ;
30+ use tokio:: runtime:: Runtime ;
3031
3132use crate :: {
3233 plan_properties:: FFI_PlanProperties , record_batch_stream:: FFI_RecordBatchStream ,
@@ -71,6 +72,7 @@ unsafe impl Sync for FFI_ExecutionPlan {}
7172pub struct ExecutionPlanPrivateData {
7273 pub plan : Arc < dyn ExecutionPlan > ,
7374 pub context : Arc < TaskContext > ,
75+ pub runtime : Option < Arc < Runtime > > ,
7476}
7577
7678unsafe extern "C" fn properties_fn_wrapper (
@@ -88,11 +90,14 @@ unsafe extern "C" fn children_fn_wrapper(
8890 let private_data = plan. private_data as * const ExecutionPlanPrivateData ;
8991 let plan = & ( * private_data) . plan ;
9092 let ctx = & ( * private_data) . context ;
93+ let runtime = & ( * private_data) . runtime ;
9194
9295 let children: Vec < _ > = plan
9396 . children ( )
9497 . into_iter ( )
95- . map ( |child| FFI_ExecutionPlan :: new ( Arc :: clone ( child) , Arc :: clone ( ctx) ) )
98+ . map ( |child| {
99+ FFI_ExecutionPlan :: new ( Arc :: clone ( child) , Arc :: clone ( ctx) , runtime. clone ( ) )
100+ } )
96101 . collect ( ) ;
97102
98103 children. into ( )
@@ -105,9 +110,10 @@ unsafe extern "C" fn execute_fn_wrapper(
105110 let private_data = plan. private_data as * const ExecutionPlanPrivateData ;
106111 let plan = & ( * private_data) . plan ;
107112 let ctx = & ( * private_data) . context ;
113+ let runtime = ( * private_data) . runtime . as_ref ( ) . map ( Arc :: clone) ;
108114
109115 match plan. execute ( partition, Arc :: clone ( ctx) ) {
110- Ok ( rbs) => RResult :: ROk ( rbs . into ( ) ) ,
116+ Ok ( rbs) => RResult :: ROk ( FFI_RecordBatchStream :: new ( rbs , runtime ) ) ,
111117 Err ( e) => RResult :: RErr (
112118 format ! ( "Error occurred during FFI_ExecutionPlan execute: {}" , e) . into ( ) ,
113119 ) ,
@@ -129,7 +135,11 @@ unsafe extern "C" fn clone_fn_wrapper(plan: &FFI_ExecutionPlan) -> FFI_Execution
129135 let private_data = plan. private_data as * const ExecutionPlanPrivateData ;
130136 let plan_data = & ( * private_data) ;
131137
132- FFI_ExecutionPlan :: new ( Arc :: clone ( & plan_data. plan ) , Arc :: clone ( & plan_data. context ) )
138+ FFI_ExecutionPlan :: new (
139+ Arc :: clone ( & plan_data. plan ) ,
140+ Arc :: clone ( & plan_data. context ) ,
141+ plan_data. runtime . clone ( ) ,
142+ )
133143}
134144
135145impl Clone for FFI_ExecutionPlan {
@@ -140,8 +150,16 @@ impl Clone for FFI_ExecutionPlan {
140150
141151impl FFI_ExecutionPlan {
142152 /// This function is called on the provider's side.
143- pub fn new ( plan : Arc < dyn ExecutionPlan > , context : Arc < TaskContext > ) -> Self {
144- let private_data = Box :: new ( ExecutionPlanPrivateData { plan, context } ) ;
153+ pub fn new (
154+ plan : Arc < dyn ExecutionPlan > ,
155+ context : Arc < TaskContext > ,
156+ runtime : Option < Arc < Runtime > > ,
157+ ) -> Self {
158+ let private_data = Box :: new ( ExecutionPlanPrivateData {
159+ plan,
160+ context,
161+ runtime,
162+ } ) ;
145163
146164 Self {
147165 properties : properties_fn_wrapper,
@@ -357,7 +375,7 @@ mod tests {
357375 let original_plan = Arc :: new ( EmptyExec :: new ( schema) ) ;
358376 let original_name = original_plan. name ( ) . to_string ( ) ;
359377
360- let local_plan = FFI_ExecutionPlan :: new ( original_plan, ctx. task_ctx ( ) ) ;
378+ let local_plan = FFI_ExecutionPlan :: new ( original_plan, ctx. task_ctx ( ) , None ) ;
361379
362380 let foreign_plan: ForeignExecutionPlan = ( & local_plan) . try_into ( ) ?;
363381
0 commit comments