Skip to content

Commit 7d42f73

Browse files
committed
Adding record batch stream ffi representation
1 parent 0d23d55 commit 7d42f73

File tree

4 files changed

+486
-34
lines changed

4 files changed

+486
-34
lines changed

src/ffi/execution_plan.rs

Lines changed: 58 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,35 @@
11
use std::{
2-
ffi::{c_void, CString},
3-
num,
2+
ffi::{c_char, c_void, CString},
43
ptr::{null, null_mut},
54
slice,
65
sync::Arc,
76
};
87

9-
use arrow::{datatypes::Schema, ffi::FFI_ArrowSchema};
8+
use arrow::{
9+
array::RecordBatchReader,
10+
datatypes::Schema,
11+
ffi::{FFI_ArrowArray, FFI_ArrowSchema},
12+
ffi_stream::FFI_ArrowArrayStream,
13+
};
1014
use datafusion::{
1115
error::DataFusionError,
16+
execution::TaskContext,
1217
physical_plan::{DisplayAs, ExecutionMode, ExecutionPlan, PlanProperties},
1318
};
1419
use datafusion::{error::Result, physical_expr::EquivalenceProperties, prelude::SessionContext};
1520
use datafusion_proto::{
1621
physical_plan::{
1722
from_proto::{parse_physical_sort_exprs, parse_protobuf_partitioning},
18-
to_proto::{
19-
serialize_partitioning, serialize_physical_exprs, serialize_physical_sort_exprs,
20-
},
23+
to_proto::{serialize_partitioning, serialize_physical_sort_exprs},
2124
DefaultPhysicalExtensionCodec,
2225
},
23-
protobuf::{partitioning, Partitioning, PhysicalSortExprNodeCollection},
26+
protobuf::{Partitioning, PhysicalSortExprNodeCollection},
2427
};
25-
use prost::{DecodeError, Message};
28+
use futures::{StreamExt, TryStreamExt};
29+
use prost::Message;
30+
use tokio::runtime::Runtime;
31+
32+
use super::record_batch_stream::record_batch_to_arrow_stream;
2633

2734
#[repr(C)]
2835
#[derive(Debug)]
@@ -38,7 +45,13 @@ pub struct FFI_ExecutionPlan {
3845
err_code: &mut i32,
3946
) -> *mut *const FFI_ExecutionPlan,
4047
>,
41-
pub name: unsafe extern "C" fn(plan: *const FFI_ExecutionPlan) -> CString,
48+
pub name: unsafe extern "C" fn(plan: *const FFI_ExecutionPlan) -> *const c_char,
49+
50+
pub execute: unsafe extern "C" fn(
51+
plan: *const FFI_ExecutionPlan,
52+
partition: usize,
53+
err_code: &mut i32,
54+
) -> *const FFI_ArrowArrayStream,
4255

4356
pub private_data: *mut c_void,
4457
}
@@ -47,6 +60,7 @@ pub struct ExecutionPlanPrivateData {
4760
pub plan: Arc<dyn ExecutionPlan + Send>,
4861
pub last_error: Option<CString>,
4962
pub children: Vec<*const FFI_ExecutionPlan>,
63+
pub context: Arc<TaskContext>,
5064
}
5165

5266
unsafe extern "C" fn properties_fn_wrapper(plan: *const FFI_ExecutionPlan) -> FFI_PlanProperties {
@@ -63,11 +77,6 @@ unsafe extern "C" fn children_fn_wrapper(
6377
let private_data = (*plan).private_data as *const ExecutionPlanPrivateData;
6478

6579
*num_children = (*private_data).children.len();
66-
// let children: Vec<FFI_ExecutionPlan> = children
67-
// .into_iter()
68-
// .map(|child| FFI_ExecutionPlan::new(child.clone()))
69-
// .collect();
70-
7180
*err_code = 0;
7281

7382
let mut children: Vec<_> = (*private_data).children.to_owned();
@@ -78,12 +87,36 @@ unsafe extern "C" fn children_fn_wrapper(
7887
children_ptr
7988
}
8089

81-
unsafe extern "C" fn name_fn_wrapper(plan: *const FFI_ExecutionPlan) -> CString {
90+
unsafe extern "C" fn execute_fn_wrapper(
91+
plan: *const FFI_ExecutionPlan,
92+
partition: usize,
93+
err_code: &mut i32,
94+
) -> *const FFI_ArrowArrayStream {
95+
let private_data = (*plan).private_data as *const ExecutionPlanPrivateData;
96+
97+
let mut record_batch_stream = match (*private_data)
98+
.plan
99+
.execute(partition, (*private_data).context.clone())
100+
{
101+
Ok(rbs) => rbs,
102+
Err(_e) => {
103+
*err_code = 1;
104+
return null();
105+
}
106+
};
107+
108+
let ffi_stream = Box::new(record_batch_to_arrow_stream(record_batch_stream));
109+
110+
Box::into_raw(ffi_stream)
111+
}
112+
unsafe extern "C" fn name_fn_wrapper(plan: *const FFI_ExecutionPlan) -> *const c_char {
82113
let private_data = (*plan).private_data as *const ExecutionPlanPrivateData;
83114

84115
let name = (*private_data).plan.name();
85116

86-
CString::new(name).unwrap_or(CString::new("unable to parse execution plan name").unwrap())
117+
CString::new(name)
118+
.unwrap_or(CString::new("unable to parse execution plan name").unwrap())
119+
.into_raw()
87120
}
88121

89122
// Since the trait ExecutionPlan requires borrowed values, we wrap our FFI.
@@ -116,18 +149,19 @@ impl DisplayAs for ExportedExecutionPlan {
116149

117150
impl FFI_ExecutionPlan {
118151
/// This function is called on the provider's side.
119-
pub fn new(plan: Arc<dyn ExecutionPlan + Send>) -> Self {
152+
pub fn new(plan: Arc<dyn ExecutionPlan + Send>, context: Arc<TaskContext>) -> Self {
120153
let children = plan
121154
.children()
122155
.into_iter()
123-
.map(|child| Box::new(FFI_ExecutionPlan::new(child.clone())))
156+
.map(|child| Box::new(FFI_ExecutionPlan::new(child.clone(), context.clone())))
124157
.map(|child| Box::into_raw(child) as *const FFI_ExecutionPlan)
125158
.collect();
126159
println!("children collected");
127160

128161
let private_data = Box::new(ExecutionPlanPrivateData {
129162
plan,
130163
children,
164+
context,
131165
last_error: None,
132166
});
133167
println!("generated private data, ready to return");
@@ -136,6 +170,7 @@ impl FFI_ExecutionPlan {
136170
properties: Some(properties_fn_wrapper),
137171
children: Some(children_fn_wrapper),
138172
name: name_fn_wrapper,
173+
execute: execute_fn_wrapper,
139174
private_data: Box::into_raw(private_data) as *mut c_void,
140175
}
141176
}
@@ -159,9 +194,10 @@ impl ExportedExecutionPlan {
159194
pub unsafe fn new(plan: *const FFI_ExecutionPlan) -> Result<Self> {
160195
let name_fn = (*plan).name;
161196
let name_cstr = name_fn(plan);
162-
let name = name_cstr
163-
.into_string()
164-
.unwrap_or("Unable to parse FFI_ExecutionPlan name".to_string());
197+
let name = CString::from_raw(name_cstr as *mut c_char)
198+
.to_str()
199+
.unwrap_or("Unable to parse FFI_ExecutionPlan name")
200+
.to_string();
165201

166202
println!("entered ExportedExecutionPlan::new");
167203
let properties = unsafe {
@@ -511,6 +547,7 @@ impl TryFrom<FFI_PlanProperties> for PlanProperties {
511547
}
512548

513549
#[repr(C)]
550+
#[allow(non_camel_case_types)]
514551
pub enum FFI_ExecutionMode {
515552
Bounded,
516553

src/ffi/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
pub mod execution_plan;
2+
pub mod record_batch_stream;
23
pub mod session_config;
34
pub mod table_provider;
45

0 commit comments

Comments
 (0)