Skip to content

Commit 0b5bb1a

Browse files
committed
Adding plan properties parsing from ffi
1 parent 7e1f182 commit 0b5bb1a

File tree

1 file changed

+94
-6
lines changed

1 file changed

+94
-6
lines changed

src/ffi/execution_plan.rs

Lines changed: 94 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,24 @@
11
use std::{
22
ffi::{c_void, CString},
3-
ptr::null,
3+
ptr::{null, null_mut},
4+
slice,
45
sync::Arc,
56
};
67

7-
use datafusion::error::Result;
8+
use arrow::{datatypes::Schema, ffi::FFI_ArrowSchema};
89
use datafusion::{
910
error::DataFusionError,
1011
physical_plan::{DisplayAs, ExecutionMode, ExecutionPlan, PlanProperties},
1112
};
13+
use datafusion::{error::Result, physical_expr::EquivalenceProperties, prelude::SessionContext};
14+
use datafusion_proto::{
15+
physical_plan::{
16+
from_proto::{parse_physical_sort_exprs, parse_protobuf_partitioning},
17+
DefaultPhysicalExtensionCodec,
18+
},
19+
protobuf::{partitioning, Partitioning, PhysicalSortExprNodeCollection},
20+
};
21+
use prost::{DecodeError, Message};
1222

1323
#[repr(C)]
1424
#[derive(Debug)]
@@ -119,7 +129,7 @@ impl ExportedExecutionPlan {
119129
let properties_fn = (*plan).properties.ok_or(DataFusionError::NotImplemented(
120130
"properties not implemented on FFIExecutionPlan".to_string(),
121131
))?;
122-
properties_fn(plan).into()
132+
properties_fn(plan).try_into()?
123133
};
124134

125135
let children = unsafe {
@@ -224,6 +234,8 @@ pub struct FFIPlanProperties {
224234
buffer_bytes: &mut *mut u8,
225235
) -> i32,
226236
>,
237+
238+
pub schema: Option<unsafe extern "C" fn(plan: *const FFIPlanProperties) -> FFI_ArrowSchema>,
227239
}
228240

229241
impl From<&PlanProperties> for FFIPlanProperties {
@@ -232,10 +244,86 @@ impl From<&PlanProperties> for FFIPlanProperties {
232244
}
233245
}
234246

235-
impl From<FFIPlanProperties> for PlanProperties {
236-
fn from(value: FFIPlanProperties) -> Self {
237-
todo!()
247+
impl TryFrom<FFIPlanProperties> for PlanProperties {
248+
type Error = DataFusionError;
249+
250+
fn try_from(value: FFIPlanProperties) -> std::result::Result<Self, Self::Error> {
251+
unsafe {
252+
let schema_fn = value.schema.ok_or(DataFusionError::NotImplemented(
253+
"schema() not implemented on FFIPlanProperties".to_string(),
254+
))?;
255+
let ffi_schema = schema_fn(&value);
256+
let schema: Schema = (&ffi_schema).try_into()?;
257+
258+
let ordering_fn = value
259+
.output_ordering
260+
.ok_or(DataFusionError::NotImplemented(
261+
"output_ordering() not implemented on FFIPlanProperties".to_string(),
262+
))?;
263+
let mut buff_size = 0;
264+
let mut buff = null_mut();
265+
if ordering_fn(&value, &mut buff_size, &mut buff) != 0 {
266+
return Err(DataFusionError::Plan(
267+
"Error occurred during FFI call to output_ordering in FFIPlanProperties"
268+
.to_string(),
269+
));
270+
}
271+
let data = slice::from_raw_parts(buff, buff_size);
272+
273+
let proto_output_ordering = PhysicalSortExprNodeCollection::decode(data)
274+
.map_err(|e| DataFusionError::External(Box::new(e)))?;
275+
276+
// TODO we will need to get these, but unsure if it happesn on the provider or consumer right now.
277+
let default_ctx = SessionContext::new();
278+
let codex = DefaultPhysicalExtensionCodec {};
279+
let orderings = parse_physical_sort_exprs(
280+
&proto_output_ordering.physical_sort_expr_nodes,
281+
&default_ctx,
282+
&schema,
283+
&codex,
284+
)?;
285+
286+
let partitioning_fn =
287+
value
288+
.output_partitioning
289+
.ok_or(DataFusionError::NotImplemented(
290+
"output_partitioning() not implemented on FFIPlanProperties".to_string(),
291+
))?;
292+
if partitioning_fn(&value, &mut buff_size, &mut buff) != 0 {
293+
return Err(DataFusionError::Plan(
294+
"Error occurred during FFI call to output_partitioning in FFIPlanProperties"
295+
.to_string(),
296+
));
297+
}
298+
let data = slice::from_raw_parts(buff, buff_size);
299+
300+
let proto_partitioning =
301+
Partitioning::decode(data).map_err(|e| DataFusionError::External(Box::new(e)))?;
302+
// TODO: Validate this unwrap is safe.
303+
let partitioning = parse_protobuf_partitioning(
304+
Some(&proto_partitioning),
305+
&default_ctx,
306+
&schema,
307+
&codex,
308+
)?
309+
.unwrap();
310+
311+
let execution_mode_fn = value.execution_mode.ok_or(DataFusionError::NotImplemented(
312+
"execution_mode() not implemented on FFIPlanProperties".to_string(),
313+
))?;
314+
let execution_mode = execution_mode_fn(&value).into();
315+
316+
let eq_properties =
317+
EquivalenceProperties::new_with_orderings(Arc::new(schema), &[orderings]);
318+
319+
Ok(Self::new(eq_properties, partitioning, execution_mode))
320+
}
238321
}
322+
// fn from(value: FFIPlanProperties) -> Self {
323+
// let schema = self.schema()
324+
325+
// let equiv_prop = EquivalenceProperties::new_with_orderings(schema, orderings);
326+
// }
239327
}
240328

241329
#[repr(C)]

0 commit comments

Comments
 (0)