Skip to content

Commit 8a7f88a

Browse files
ThearasStanding-Man
authored andcommitted
fix: ComposedPhysicalExtensionCodec does not use the same codec as encoding when decoding (apache#16986)
* fix(example): `ComposedPhysicalExtensionCodec` does not use the same codec as encoding when decoding Change-Id: I4e92abd9b49ee6f1fcdfd2d915fce8dc85c3b6a6 * naming and move `ComposedPhysicalExtensionCodec` to datafusion Change-Id: I780e217f061a92cd7fe5fe9b91953e81d42d05c8 * rename Change-Id: I6f3860053743b81a5598023becb2e18855904292
1 parent 8a21b5d commit 8a7f88a

File tree

2 files changed

+125
-67
lines changed

2 files changed

+125
-67
lines changed

datafusion-examples/examples/composed_extension_codec.rs

Lines changed: 10 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,14 @@ use std::fmt::Debug;
3535
use std::ops::Deref;
3636
use std::sync::Arc;
3737

38+
use datafusion::common::internal_err;
3839
use datafusion::common::Result;
39-
use datafusion::common::{internal_err, DataFusionError};
4040
use datafusion::logical_expr::registry::FunctionRegistry;
41-
use datafusion::logical_expr::{AggregateUDF, ScalarUDF};
4241
use datafusion::physical_plan::{DisplayAs, ExecutionPlan};
4342
use datafusion::prelude::SessionContext;
44-
use datafusion_proto::physical_plan::{AsExecutionPlan, PhysicalExtensionCodec};
43+
use datafusion_proto::physical_plan::{
44+
AsExecutionPlan, ComposedPhysicalExtensionCodec, PhysicalExtensionCodec,
45+
};
4546
use datafusion_proto::protobuf;
4647

4748
#[tokio::main]
@@ -54,12 +55,12 @@ async fn main() {
5455
});
5556
let ctx = SessionContext::new();
5657

57-
let composed_codec = ComposedPhysicalExtensionCodec {
58-
codecs: vec![
59-
Arc::new(ParentPhysicalExtensionCodec {}),
60-
Arc::new(ChildPhysicalExtensionCodec {}),
61-
],
62-
};
58+
// Position in this list is important as it will be used for decoding.
59+
// If new codec is added it should go to last position.
60+
let composed_codec = ComposedPhysicalExtensionCodec::new(vec![
61+
Arc::new(ParentPhysicalExtensionCodec {}),
62+
Arc::new(ChildPhysicalExtensionCodec {}),
63+
]);
6364

6465
// serialize execution plan to proto
6566
let proto: protobuf::PhysicalPlanNode =
@@ -232,60 +233,3 @@ impl PhysicalExtensionCodec for ChildPhysicalExtensionCodec {
232233
}
233234
}
234235
}
235-
236-
/// A PhysicalExtensionCodec that tries one of multiple inner codecs
237-
/// until one works
238-
#[derive(Debug)]
239-
struct ComposedPhysicalExtensionCodec {
240-
codecs: Vec<Arc<dyn PhysicalExtensionCodec>>,
241-
}
242-
243-
impl ComposedPhysicalExtensionCodec {
244-
fn try_any<T>(
245-
&self,
246-
mut f: impl FnMut(&dyn PhysicalExtensionCodec) -> Result<T>,
247-
) -> Result<T> {
248-
let mut last_err = None;
249-
for codec in &self.codecs {
250-
match f(codec.as_ref()) {
251-
Ok(node) => return Ok(node),
252-
Err(err) => last_err = Some(err),
253-
}
254-
}
255-
256-
Err(last_err.unwrap_or_else(|| {
257-
DataFusionError::NotImplemented("Empty list of composed codecs".to_owned())
258-
}))
259-
}
260-
}
261-
262-
impl PhysicalExtensionCodec for ComposedPhysicalExtensionCodec {
263-
fn try_decode(
264-
&self,
265-
buf: &[u8],
266-
inputs: &[Arc<dyn ExecutionPlan>],
267-
registry: &dyn FunctionRegistry,
268-
) -> Result<Arc<dyn ExecutionPlan>> {
269-
self.try_any(|codec| codec.try_decode(buf, inputs, registry))
270-
}
271-
272-
fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> Result<()> {
273-
self.try_any(|codec| codec.try_encode(node.clone(), buf))
274-
}
275-
276-
fn try_decode_udf(&self, name: &str, buf: &[u8]) -> Result<Arc<ScalarUDF>> {
277-
self.try_any(|codec| codec.try_decode_udf(name, buf))
278-
}
279-
280-
fn try_encode_udf(&self, node: &ScalarUDF, buf: &mut Vec<u8>) -> Result<()> {
281-
self.try_any(|codec| codec.try_encode_udf(node, buf))
282-
}
283-
284-
fn try_decode_udaf(&self, name: &str, buf: &[u8]) -> Result<Arc<AggregateUDF>> {
285-
self.try_any(|codec| codec.try_decode_udaf(name, buf))
286-
}
287-
288-
fn try_encode_udaf(&self, node: &AggregateUDF, buf: &mut Vec<u8>) -> Result<()> {
289-
self.try_any(|codec| codec.try_encode_udaf(node, buf))
290-
}
291-
}

datafusion/proto/src/physical_plan/mod.rs

Lines changed: 115 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2941,12 +2941,126 @@ impl PhysicalExtensionCodec for DefaultPhysicalExtensionCodec {
29412941
}
29422942
}
29432943

2944+
/// DataEncoderTuple captures the position of the encoder
2945+
/// in the codec list that was used to encode the data and actual encoded data
2946+
#[derive(Clone, PartialEq, prost::Message)]
2947+
struct DataEncoderTuple {
2948+
/// The position of encoder used to encode data
2949+
/// (to be used for decoding)
2950+
#[prost(uint32, tag = 1)]
2951+
pub encoder_position: u32,
2952+
2953+
#[prost(bytes, tag = 2)]
2954+
pub blob: Vec<u8>,
2955+
}
2956+
2957+
/// A PhysicalExtensionCodec that tries one of multiple inner codecs
2958+
/// until one works
2959+
#[derive(Debug)]
2960+
pub struct ComposedPhysicalExtensionCodec {
2961+
codecs: Vec<Arc<dyn PhysicalExtensionCodec>>,
2962+
}
2963+
2964+
impl ComposedPhysicalExtensionCodec {
2965+
// Position in this codesc list is important as it will be used for decoding.
2966+
// If new codec is added it should go to last position.
2967+
pub fn new(codecs: Vec<Arc<dyn PhysicalExtensionCodec>>) -> Self {
2968+
Self { codecs }
2969+
}
2970+
2971+
fn decode_protobuf<R>(
2972+
&self,
2973+
buf: &[u8],
2974+
decode: impl FnOnce(&dyn PhysicalExtensionCodec, &[u8]) -> Result<R>,
2975+
) -> Result<R> {
2976+
let proto = DataEncoderTuple::decode(buf)
2977+
.map_err(|e| DataFusionError::Internal(e.to_string()))?;
2978+
2979+
let codec = self.codecs.get(proto.encoder_position as usize).ok_or(
2980+
DataFusionError::Internal(
2981+
"Can't find required codec in codec list".to_owned(),
2982+
),
2983+
)?;
2984+
2985+
decode(codec.as_ref(), &proto.blob)
2986+
}
2987+
2988+
fn encode_protobuf(
2989+
&self,
2990+
buf: &mut Vec<u8>,
2991+
mut encode: impl FnMut(&dyn PhysicalExtensionCodec, &mut Vec<u8>) -> Result<()>,
2992+
) -> Result<()> {
2993+
let mut data = vec![];
2994+
let mut last_err = None;
2995+
let mut encoder_position = None;
2996+
2997+
// find the encoder
2998+
for (position, codec) in self.codecs.iter().enumerate() {
2999+
match encode(codec.as_ref(), &mut data) {
3000+
Ok(_) => {
3001+
encoder_position = Some(position as u32);
3002+
break;
3003+
}
3004+
Err(err) => last_err = Some(err),
3005+
}
3006+
}
3007+
3008+
let encoder_position = encoder_position.ok_or_else(|| {
3009+
last_err.unwrap_or_else(|| {
3010+
DataFusionError::NotImplemented(
3011+
"Empty list of composed codecs".to_owned(),
3012+
)
3013+
})
3014+
})?;
3015+
3016+
// encode with encoder position
3017+
let proto = DataEncoderTuple {
3018+
encoder_position,
3019+
blob: data,
3020+
};
3021+
proto
3022+
.encode(buf)
3023+
.map_err(|e| DataFusionError::Internal(e.to_string()))
3024+
}
3025+
}
3026+
3027+
impl PhysicalExtensionCodec for ComposedPhysicalExtensionCodec {
3028+
fn try_decode(
3029+
&self,
3030+
buf: &[u8],
3031+
inputs: &[Arc<dyn ExecutionPlan>],
3032+
registry: &dyn FunctionRegistry,
3033+
) -> Result<Arc<dyn ExecutionPlan>> {
3034+
self.decode_protobuf(buf, |codec, data| codec.try_decode(data, inputs, registry))
3035+
}
3036+
3037+
fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> Result<()> {
3038+
self.encode_protobuf(buf, |codec, data| codec.try_encode(Arc::clone(&node), data))
3039+
}
3040+
3041+
fn try_decode_udf(&self, name: &str, buf: &[u8]) -> Result<Arc<ScalarUDF>> {
3042+
self.decode_protobuf(buf, |codec, data| codec.try_decode_udf(name, data))
3043+
}
3044+
3045+
fn try_encode_udf(&self, node: &ScalarUDF, buf: &mut Vec<u8>) -> Result<()> {
3046+
self.encode_protobuf(buf, |codec, data| codec.try_encode_udf(node, data))
3047+
}
3048+
3049+
fn try_decode_udaf(&self, name: &str, buf: &[u8]) -> Result<Arc<AggregateUDF>> {
3050+
self.decode_protobuf(buf, |codec, data| codec.try_decode_udaf(name, data))
3051+
}
3052+
3053+
fn try_encode_udaf(&self, node: &AggregateUDF, buf: &mut Vec<u8>) -> Result<()> {
3054+
self.encode_protobuf(buf, |codec, data| codec.try_encode_udaf(node, data))
3055+
}
3056+
}
3057+
29443058
fn into_physical_plan(
29453059
node: &Option<Box<protobuf::PhysicalPlanNode>>,
29463060
registry: &dyn FunctionRegistry,
29473061
runtime: &RuntimeEnv,
29483062
extension_codec: &dyn PhysicalExtensionCodec,
2949-
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
3063+
) -> Result<Arc<dyn ExecutionPlan>> {
29503064
if let Some(field) = node {
29513065
field.try_into_physical_plan(registry, runtime, extension_codec)
29523066
} else {

0 commit comments

Comments
 (0)