@@ -2941,12 +2941,126 @@ impl PhysicalExtensionCodec for DefaultPhysicalExtensionCodec {
29412941 }
29422942}
29432943
2944+ /// DataEncoderTuple captures the position of the encoder
2945+ /// in the codec list that was used to encode the data and actual encoded data
2946+ #[ derive( Clone , PartialEq , prost:: Message ) ]
2947+ struct DataEncoderTuple {
2948+ /// The position of encoder used to encode data
2949+ /// (to be used for decoding)
2950+ #[ prost( uint32, tag = 1 ) ]
2951+ pub encoder_position : u32 ,
2952+
2953+ #[ prost( bytes, tag = 2 ) ]
2954+ pub blob : Vec < u8 > ,
2955+ }
2956+
2957+ /// A PhysicalExtensionCodec that tries one of multiple inner codecs
2958+ /// until one works
2959+ #[ derive( Debug ) ]
2960+ pub struct ComposedPhysicalExtensionCodec {
2961+ codecs : Vec < Arc < dyn PhysicalExtensionCodec > > ,
2962+ }
2963+
2964+ impl ComposedPhysicalExtensionCodec {
2965+ // Position in this codesc list is important as it will be used for decoding.
2966+ // If new codec is added it should go to last position.
2967+ pub fn new ( codecs : Vec < Arc < dyn PhysicalExtensionCodec > > ) -> Self {
2968+ Self { codecs }
2969+ }
2970+
2971+ fn decode_protobuf < R > (
2972+ & self ,
2973+ buf : & [ u8 ] ,
2974+ decode : impl FnOnce ( & dyn PhysicalExtensionCodec , & [ u8 ] ) -> Result < R > ,
2975+ ) -> Result < R > {
2976+ let proto = DataEncoderTuple :: decode ( buf)
2977+ . map_err ( |e| DataFusionError :: Internal ( e. to_string ( ) ) ) ?;
2978+
2979+ let codec = self . codecs . get ( proto. encoder_position as usize ) . ok_or (
2980+ DataFusionError :: Internal (
2981+ "Can't find required codec in codec list" . to_owned ( ) ,
2982+ ) ,
2983+ ) ?;
2984+
2985+ decode ( codec. as_ref ( ) , & proto. blob )
2986+ }
2987+
2988+ fn encode_protobuf (
2989+ & self ,
2990+ buf : & mut Vec < u8 > ,
2991+ mut encode : impl FnMut ( & dyn PhysicalExtensionCodec , & mut Vec < u8 > ) -> Result < ( ) > ,
2992+ ) -> Result < ( ) > {
2993+ let mut data = vec ! [ ] ;
2994+ let mut last_err = None ;
2995+ let mut encoder_position = None ;
2996+
2997+ // find the encoder
2998+ for ( position, codec) in self . codecs . iter ( ) . enumerate ( ) {
2999+ match encode ( codec. as_ref ( ) , & mut data) {
3000+ Ok ( _) => {
3001+ encoder_position = Some ( position as u32 ) ;
3002+ break ;
3003+ }
3004+ Err ( err) => last_err = Some ( err) ,
3005+ }
3006+ }
3007+
3008+ let encoder_position = encoder_position. ok_or_else ( || {
3009+ last_err. unwrap_or_else ( || {
3010+ DataFusionError :: NotImplemented (
3011+ "Empty list of composed codecs" . to_owned ( ) ,
3012+ )
3013+ } )
3014+ } ) ?;
3015+
3016+ // encode with encoder position
3017+ let proto = DataEncoderTuple {
3018+ encoder_position,
3019+ blob : data,
3020+ } ;
3021+ proto
3022+ . encode ( buf)
3023+ . map_err ( |e| DataFusionError :: Internal ( e. to_string ( ) ) )
3024+ }
3025+ }
3026+
3027+ impl PhysicalExtensionCodec for ComposedPhysicalExtensionCodec {
3028+ fn try_decode (
3029+ & self ,
3030+ buf : & [ u8 ] ,
3031+ inputs : & [ Arc < dyn ExecutionPlan > ] ,
3032+ registry : & dyn FunctionRegistry ,
3033+ ) -> Result < Arc < dyn ExecutionPlan > > {
3034+ self . decode_protobuf ( buf, |codec, data| codec. try_decode ( data, inputs, registry) )
3035+ }
3036+
3037+ fn try_encode ( & self , node : Arc < dyn ExecutionPlan > , buf : & mut Vec < u8 > ) -> Result < ( ) > {
3038+ self . encode_protobuf ( buf, |codec, data| codec. try_encode ( Arc :: clone ( & node) , data) )
3039+ }
3040+
3041+ fn try_decode_udf ( & self , name : & str , buf : & [ u8 ] ) -> Result < Arc < ScalarUDF > > {
3042+ self . decode_protobuf ( buf, |codec, data| codec. try_decode_udf ( name, data) )
3043+ }
3044+
3045+ fn try_encode_udf ( & self , node : & ScalarUDF , buf : & mut Vec < u8 > ) -> Result < ( ) > {
3046+ self . encode_protobuf ( buf, |codec, data| codec. try_encode_udf ( node, data) )
3047+ }
3048+
3049+ fn try_decode_udaf ( & self , name : & str , buf : & [ u8 ] ) -> Result < Arc < AggregateUDF > > {
3050+ self . decode_protobuf ( buf, |codec, data| codec. try_decode_udaf ( name, data) )
3051+ }
3052+
3053+ fn try_encode_udaf ( & self , node : & AggregateUDF , buf : & mut Vec < u8 > ) -> Result < ( ) > {
3054+ self . encode_protobuf ( buf, |codec, data| codec. try_encode_udaf ( node, data) )
3055+ }
3056+ }
3057+
29443058fn into_physical_plan (
29453059 node : & Option < Box < protobuf:: PhysicalPlanNode > > ,
29463060 registry : & dyn FunctionRegistry ,
29473061 runtime : & RuntimeEnv ,
29483062 extension_codec : & dyn PhysicalExtensionCodec ,
2949- ) -> Result < Arc < dyn ExecutionPlan > , DataFusionError > {
3063+ ) -> Result < Arc < dyn ExecutionPlan > > {
29503064 if let Some ( field) = node {
29513065 field. try_into_physical_plan ( registry, runtime, extension_codec)
29523066 } else {
0 commit comments