Skip to content

Commit 7e1f182

Browse files
committed
Using datafusion-proto for execution plan properties
1 parent 8daac65 commit 7e1f182

File tree

2 files changed

+108
-58
lines changed

2 files changed

+108
-58
lines changed

src/ffi/execution_plan.rs

Lines changed: 88 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -7,23 +7,21 @@ use std::{
77
use datafusion::error::Result;
88
use datafusion::{
99
error::DataFusionError,
10-
parquet::file::properties,
11-
physical_expr::{EquivalenceProperties, LexOrdering},
12-
physical_plan::{DisplayAs, ExecutionMode, ExecutionPlan, Partitioning, PlanProperties},
10+
physical_plan::{DisplayAs, ExecutionMode, ExecutionPlan, PlanProperties},
1311
};
1412

1513
#[repr(C)]
1614
#[derive(Debug)]
1715
#[allow(missing_docs)]
1816
#[allow(non_camel_case_types)]
19-
pub struct FFI_ExecutionPlan {
17+
pub struct FFIExecutionPlan {
2018
pub properties:
21-
Option<unsafe extern "C" fn(plan: *const FFI_ExecutionPlan) -> FFI_PlanProperties>,
19+
Option<unsafe extern "C" fn(plan: *const FFIExecutionPlan) -> FFIPlanProperties>,
2220
pub children: Option<
2321
unsafe extern "C" fn(
24-
plan: *const FFI_ExecutionPlan,
22+
plan: *const FFIExecutionPlan,
2523
num_children: &mut usize,
26-
out: &mut *const FFI_ExecutionPlan,
24+
out: &mut *const FFIExecutionPlan,
2725
) -> i32,
2826
>,
2927

@@ -35,24 +33,24 @@ pub struct ExecutionPlanPrivateData {
3533
pub last_error: Option<CString>,
3634
}
3735

38-
unsafe extern "C" fn properties_fn_wrapper(plan: *const FFI_ExecutionPlan) -> FFI_PlanProperties {
36+
unsafe extern "C" fn properties_fn_wrapper(plan: *const FFIExecutionPlan) -> FFIPlanProperties {
3937
let private_data = (*plan).private_data as *const ExecutionPlanPrivateData;
4038
let properties = (*private_data).plan.properties();
4139
properties.into()
4240
}
4341

4442
unsafe extern "C" fn children_fn_wrapper(
45-
plan: *const FFI_ExecutionPlan,
43+
plan: *const FFIExecutionPlan,
4644
num_children: &mut usize,
47-
out: &mut *const FFI_ExecutionPlan,
45+
out: &mut *const FFIExecutionPlan,
4846
) -> i32 {
4947
let private_data = (*plan).private_data as *const ExecutionPlanPrivateData;
5048

5149
let children = (*private_data).plan.children();
5250
*num_children = children.len();
53-
let children: Vec<FFI_ExecutionPlan> = children
51+
let children: Vec<FFIExecutionPlan> = children
5452
.into_iter()
55-
.map(|child| FFI_ExecutionPlan::new(child.clone()))
53+
.map(|child| FFIExecutionPlan::new(child.clone()))
5654
.collect();
5755
*out = children.as_ptr();
5856

@@ -64,7 +62,7 @@ unsafe extern "C" fn children_fn_wrapper(
6462
// in the provider's side.
6563
#[derive(Debug)]
6664
pub struct ExportedExecutionPlan {
67-
plan: *const FFI_ExecutionPlan,
65+
plan: *const FFIExecutionPlan,
6866
properties: PlanProperties,
6967
children: Vec<Arc<dyn ExecutionPlan>>,
7068
}
@@ -75,14 +73,18 @@ unsafe impl Sync for ExportedExecutionPlan {}
7573
impl DisplayAs for ExportedExecutionPlan {
7674
fn fmt_as(
7775
&self,
78-
t: datafusion::physical_plan::DisplayFormatType,
76+
_t: datafusion::physical_plan::DisplayFormatType,
7977
f: &mut std::fmt::Formatter,
8078
) -> std::fmt::Result {
81-
todo!()
79+
write!(
80+
f,
81+
"FFIExecutionPlan(number_of_children={})",
82+
self.children.len(),
83+
)
8284
}
8385
}
8486

85-
impl FFI_ExecutionPlan {
87+
impl FFIExecutionPlan {
8688
pub fn new(plan: Arc<dyn ExecutionPlan + Send>) -> Self {
8789
let private_data = Box::new(ExecutionPlanPrivateData {
8890
plan,
@@ -106,24 +108,30 @@ impl FFI_ExecutionPlan {
106108
}
107109

108110
impl ExportedExecutionPlan {
109-
pub fn new(plan: *const FFI_ExecutionPlan) -> Result<Self> {
111+
/// Wrap a FFI Execution Plan
112+
///
113+
/// # Safety
114+
///
115+
/// The caller must ensure the pointer provided points to a valid implementation
116+
/// of FFIExecutionPlan
117+
pub unsafe fn new(plan: *const FFIExecutionPlan) -> Result<Self> {
110118
let properties = unsafe {
111119
let properties_fn = (*plan).properties.ok_or(DataFusionError::NotImplemented(
112-
"properties not implemented on FFI_ExecutionPlan".to_string(),
120+
"properties not implemented on FFIExecutionPlan".to_string(),
113121
))?;
114122
properties_fn(plan).into()
115123
};
116124

117125
let children = unsafe {
118126
let children_fn = (*plan).children.ok_or(DataFusionError::NotImplemented(
119-
"children not implemented on FFI_ExecutionPlan".to_string(),
127+
"children not implemented on FFIExecutionPlan".to_string(),
120128
))?;
121129
let mut num_children = 0;
122-
let mut children_ptr: *const FFI_ExecutionPlan = null();
130+
let mut children_ptr: *const FFIExecutionPlan = null();
123131

124132
if children_fn(plan, &mut num_children, &mut children_ptr) != 0 {
125133
return Err(DataFusionError::Plan(
126-
"Error getting children for FFI_ExecutionPlan".to_string(),
134+
"Error getting children for FFIExecutionPlan".to_string(),
127135
));
128136
}
129137

@@ -179,7 +187,7 @@ impl ExecutionPlan for ExportedExecutionPlan {
179187
}
180188
}
181189

182-
impl DisplayAs for FFI_ExecutionPlan {
190+
impl DisplayAs for FFIExecutionPlan {
183191
fn fmt_as(
184192
&self,
185193
t: datafusion::physical_plan::DisplayFormatType,
@@ -192,26 +200,69 @@ impl DisplayAs for FFI_ExecutionPlan {
192200
#[repr(C)]
193201
#[derive(Debug)]
194202
#[allow(missing_docs)]
195-
#[allow(non_camel_case_types)]
196-
pub struct FFI_PlanProperties {
197-
/// See [ExecutionPlanProperties::equivalence_properties]
198-
pub eq_properties: EquivalenceProperties,
199-
/// See [ExecutionPlanProperties::output_partitioning]
200-
pub partitioning: Partitioning,
201-
/// See [ExecutionPlanProperties::execution_mode]
202-
pub execution_mode: ExecutionMode,
203-
/// See [ExecutionPlanProperties::output_ordering]
204-
output_ordering: Option<LexOrdering>,
205-
}
206-
207-
impl From<&PlanProperties> for FFI_PlanProperties {
203+
pub struct FFIPlanProperties {
204+
// We will build equivalence properties from teh schema and ordersing (new_with_orderings). This is how we do ti in dataset_exec
205+
// pub eq_properties: Option<unsafe extern "C" fn(plan: *const FFIPlanProperties) -> EquivalenceProperties>,
206+
207+
// Returns protobuf serialized bytes of the partitioning
208+
pub output_partitioning: Option<
209+
unsafe extern "C" fn(
210+
plan: *const FFIPlanProperties,
211+
buffer_size: &mut usize,
212+
buffer_bytes: &mut *mut u8,
213+
) -> i32,
214+
>,
215+
216+
pub execution_mode:
217+
Option<unsafe extern "C" fn(plan: *const FFIPlanProperties) -> FFIExecutionMode>,
218+
219+
// PhysicalSortExprNodeCollection proto
220+
pub output_ordering: Option<
221+
unsafe extern "C" fn(
222+
plan: *const FFIPlanProperties,
223+
buffer_size: &mut usize,
224+
buffer_bytes: &mut *mut u8,
225+
) -> i32,
226+
>,
227+
}
228+
229+
impl From<&PlanProperties> for FFIPlanProperties {
208230
fn from(value: &PlanProperties) -> Self {
209231
todo!()
210232
}
211233
}
212234

213-
impl From<FFI_PlanProperties> for PlanProperties {
214-
fn from(value: FFI_PlanProperties) -> Self {
235+
impl From<FFIPlanProperties> for PlanProperties {
236+
fn from(value: FFIPlanProperties) -> Self {
215237
todo!()
216238
}
217239
}
240+
241+
#[repr(C)]
242+
pub enum FFIExecutionMode {
243+
Bounded,
244+
245+
Unbounded,
246+
247+
PipelineBreaking,
248+
}
249+
250+
impl From<ExecutionMode> for FFIExecutionMode {
251+
fn from(value: ExecutionMode) -> Self {
252+
match value {
253+
ExecutionMode::Bounded => FFIExecutionMode::Bounded,
254+
ExecutionMode::Unbounded => FFIExecutionMode::Unbounded,
255+
ExecutionMode::PipelineBreaking => FFIExecutionMode::PipelineBreaking,
256+
}
257+
}
258+
}
259+
260+
impl From<FFIExecutionMode> for ExecutionMode {
261+
fn from(value: FFIExecutionMode) -> Self {
262+
match value {
263+
FFIExecutionMode::Bounded => ExecutionMode::Bounded,
264+
FFIExecutionMode::Unbounded => ExecutionMode::Unbounded,
265+
FFIExecutionMode::PipelineBreaking => ExecutionMode::PipelineBreaking,
266+
}
267+
}
268+
}

src/ffi/table_provider.rs

Lines changed: 20 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use datafusion::{
2323
use tokio::runtime::Runtime;
2424

2525
use super::{
26-
execution_plan::{ExecutionPlanPrivateData, ExportedExecutionPlan, FFI_ExecutionPlan},
26+
execution_plan::{ExecutionPlanPrivateData, ExportedExecutionPlan, FFIExecutionPlan},
2727
session_config::{FFI_SessionConfig, SessionConfigPrivateData},
2828
};
2929
use datafusion::error::Result;
@@ -44,7 +44,7 @@ pub struct FFI_TableProvider {
4444
n_filters: c_int,
4545
filters: *const *const c_char,
4646
limit: c_int,
47-
out: *mut FFI_ExecutionPlan,
47+
out: *mut FFIExecutionPlan,
4848
) -> c_int,
4949
>,
5050
pub private_data: *mut c_void,
@@ -73,7 +73,7 @@ unsafe extern "C" fn provider_scan(
7373
n_filters: c_int,
7474
filters: *const *const c_char,
7575
limit: c_int,
76-
mut out: *mut FFI_ExecutionPlan,
76+
mut out: *mut FFIExecutionPlan,
7777
) -> c_int {
7878
let config = unsafe { (*session_config).private_data as *const SessionConfigPrivateData };
7979
let session = SessionStateBuilder::new()
@@ -135,7 +135,7 @@ impl ExportedTableProvider {
135135
projections: Option<&Vec<usize>>,
136136
filters: Vec<String>,
137137
limit: Option<usize>,
138-
) -> Result<FFI_ExecutionPlan> {
138+
) -> Result<FFIExecutionPlan> {
139139
let private_data = self.get_private_data();
140140
let provider = &private_data.provider;
141141

@@ -155,10 +155,10 @@ impl ExportedTableProvider {
155155
// last_error: None,
156156
// });
157157

158-
// Ok(FFI_ExecutionPlan {
158+
// Ok(FFIExecutionPlan {
159159
// private_data: Box::into_raw(plan_ptr) as *mut c_void,
160160
// })
161-
Ok(FFI_ExecutionPlan::new(plan))
161+
Ok(FFIExecutionPlan::new(plan))
162162
}
163163
}
164164

@@ -231,7 +231,7 @@ impl TableProvider for FFI_TableProvider {
231231

232232
/// Get the type of this table for metadata/catalog purposes.
233233
fn table_type(&self) -> TableType {
234-
TableType::Base
234+
todo!()
235235
}
236236

237237
/// Create an ExecutionPlan that will scan the table.
@@ -273,10 +273,10 @@ impl TableProvider for FFI_TableProvider {
273273
None => -1,
274274
};
275275

276-
let mut out = FFI_ExecutionPlan::empty();
276+
let mut out = FFIExecutionPlan::empty();
277277

278-
let err_code = unsafe {
279-
scan_fn(
278+
let plan = unsafe {
279+
let err_code = scan_fn(
280280
self,
281281
&session_config,
282282
n_projections,
@@ -285,16 +285,17 @@ impl TableProvider for FFI_TableProvider {
285285
filters_ptr.as_ptr(),
286286
limit,
287287
&mut out,
288-
)
289-
};
288+
);
290289

291-
if 0 != err_code {
292-
return Err(datafusion::error::DataFusionError::Internal(
293-
"Unable to perform scan via FFI".to_string(),
294-
));
295-
}
290+
if 0 != err_code {
291+
return Err(datafusion::error::DataFusionError::Internal(
292+
"Unable to perform scan via FFI".to_string(),
293+
));
294+
}
295+
296+
ExportedExecutionPlan::new(&out)?
297+
};
296298

297-
let plan = ExportedExecutionPlan::new(&out)?;
298299
Ok(Arc::new(plan))
299300
}
300301

@@ -304,8 +305,6 @@ impl TableProvider for FFI_TableProvider {
304305
&self,
305306
filter: &[&Expr],
306307
) -> Result<Vec<TableProviderFilterPushDown>> {
307-
Err(datafusion::error::DataFusionError::NotImplemented(
308-
"support filter pushdown not implemented".to_string(),
309-
))
308+
todo!()
310309
}
311310
}

0 commit comments

Comments
 (0)