@@ -2941,12 +2941,126 @@ impl PhysicalExtensionCodec for DefaultPhysicalExtensionCodec {
29412941    } 
29422942} 
29432943
2944+ /// DataEncoderTuple captures the position of the encoder 
2945+ /// in the codec list that was used to encode the data and actual encoded data 
2946+ #[ derive( Clone ,  PartialEq ,  prost:: Message ) ]  
2947+ struct  DataEncoderTuple  { 
2948+     /// The position of encoder used to encode data 
2949+ /// (to be used for decoding) 
2950+ #[ prost( uint32,  tag = 1 ) ]  
2951+     pub  encoder_position :  u32 , 
2952+ 
2953+     #[ prost( bytes,  tag = 2 ) ]  
2954+     pub  blob :  Vec < u8 > , 
2955+ } 
2956+ 
2957+ /// A PhysicalExtensionCodec that tries one of multiple inner codecs 
2958+ /// until one works 
2959+ #[ derive( Debug ) ]  
2960+ pub  struct  ComposedPhysicalExtensionCodec  { 
2961+     codecs :  Vec < Arc < dyn  PhysicalExtensionCodec > > , 
2962+ } 
2963+ 
2964+ impl  ComposedPhysicalExtensionCodec  { 
2965+     // Position in this codesc list is important as it will be used for decoding. 
2966+     // If new codec is added it should go to last position. 
2967+     pub  fn  new ( codecs :  Vec < Arc < dyn  PhysicalExtensionCodec > > )  -> Self  { 
2968+         Self  {  codecs } 
2969+     } 
2970+ 
2971+     fn  decode_protobuf < R > ( 
2972+         & self , 
2973+         buf :  & [ u8 ] , 
2974+         decode :  impl  FnOnce ( & dyn  PhysicalExtensionCodec ,  & [ u8 ] )  -> Result < R > , 
2975+     )  -> Result < R >  { 
2976+         let  proto = DataEncoderTuple :: decode ( buf) 
2977+             . map_err ( |e| DataFusionError :: Internal ( e. to_string ( ) ) ) ?; 
2978+ 
2979+         let  codec = self . codecs . get ( proto. encoder_position  as  usize ) . ok_or ( 
2980+             DataFusionError :: Internal ( 
2981+                 "Can't find required codec in codec list" . to_owned ( ) , 
2982+             ) , 
2983+         ) ?; 
2984+ 
2985+         decode ( codec. as_ref ( ) ,  & proto. blob ) 
2986+     } 
2987+ 
2988+     fn  encode_protobuf ( 
2989+         & self , 
2990+         buf :  & mut  Vec < u8 > , 
2991+         mut  encode :  impl  FnMut ( & dyn  PhysicalExtensionCodec ,  & mut  Vec < u8 > )  -> Result < ( ) > , 
2992+     )  -> Result < ( ) >  { 
2993+         let  mut  data = vec ! [ ] ; 
2994+         let  mut  last_err = None ; 
2995+         let  mut  encoder_position = None ; 
2996+ 
2997+         // find the encoder 
2998+         for  ( position,  codec)  in  self . codecs . iter ( ) . enumerate ( )  { 
2999+             match  encode ( codec. as_ref ( ) ,  & mut  data)  { 
3000+                 Ok ( _)  => { 
3001+                     encoder_position = Some ( position as  u32 ) ; 
3002+                     break ; 
3003+                 } 
3004+                 Err ( err)  => last_err = Some ( err) , 
3005+             } 
3006+         } 
3007+ 
3008+         let  encoder_position = encoder_position. ok_or_else ( || { 
3009+             last_err. unwrap_or_else ( || { 
3010+                 DataFusionError :: NotImplemented ( 
3011+                     "Empty list of composed codecs" . to_owned ( ) , 
3012+                 ) 
3013+             } ) 
3014+         } ) ?; 
3015+ 
3016+         // encode with encoder position 
3017+         let  proto = DataEncoderTuple  { 
3018+             encoder_position, 
3019+             blob :  data, 
3020+         } ; 
3021+         proto
3022+             . encode ( buf) 
3023+             . map_err ( |e| DataFusionError :: Internal ( e. to_string ( ) ) ) 
3024+     } 
3025+ } 
3026+ 
3027+ impl  PhysicalExtensionCodec  for  ComposedPhysicalExtensionCodec  { 
3028+     fn  try_decode ( 
3029+         & self , 
3030+         buf :  & [ u8 ] , 
3031+         inputs :  & [ Arc < dyn  ExecutionPlan > ] , 
3032+         registry :  & dyn  FunctionRegistry , 
3033+     )  -> Result < Arc < dyn  ExecutionPlan > >  { 
3034+         self . decode_protobuf ( buf,  |codec,  data| codec. try_decode ( data,  inputs,  registry) ) 
3035+     } 
3036+ 
3037+     fn  try_encode ( & self ,  node :  Arc < dyn  ExecutionPlan > ,  buf :  & mut  Vec < u8 > )  -> Result < ( ) >  { 
3038+         self . encode_protobuf ( buf,  |codec,  data| codec. try_encode ( Arc :: clone ( & node) ,  data) ) 
3039+     } 
3040+ 
3041+     fn  try_decode_udf ( & self ,  name :  & str ,  buf :  & [ u8 ] )  -> Result < Arc < ScalarUDF > >  { 
3042+         self . decode_protobuf ( buf,  |codec,  data| codec. try_decode_udf ( name,  data) ) 
3043+     } 
3044+ 
3045+     fn  try_encode_udf ( & self ,  node :  & ScalarUDF ,  buf :  & mut  Vec < u8 > )  -> Result < ( ) >  { 
3046+         self . encode_protobuf ( buf,  |codec,  data| codec. try_encode_udf ( node,  data) ) 
3047+     } 
3048+ 
3049+     fn  try_decode_udaf ( & self ,  name :  & str ,  buf :  & [ u8 ] )  -> Result < Arc < AggregateUDF > >  { 
3050+         self . decode_protobuf ( buf,  |codec,  data| codec. try_decode_udaf ( name,  data) ) 
3051+     } 
3052+ 
3053+     fn  try_encode_udaf ( & self ,  node :  & AggregateUDF ,  buf :  & mut  Vec < u8 > )  -> Result < ( ) >  { 
3054+         self . encode_protobuf ( buf,  |codec,  data| codec. try_encode_udaf ( node,  data) ) 
3055+     } 
3056+ } 
3057+ 
29443058fn  into_physical_plan ( 
29453059    node :  & Option < Box < protobuf:: PhysicalPlanNode > > , 
29463060    registry :  & dyn  FunctionRegistry , 
29473061    runtime :  & RuntimeEnv , 
29483062    extension_codec :  & dyn  PhysicalExtensionCodec , 
2949- )  -> Result < Arc < dyn  ExecutionPlan > ,   DataFusionError >  { 
3063+ )  -> Result < Arc < dyn  ExecutionPlan > >  { 
29503064    if  let  Some ( field)  = node { 
29513065        field. try_into_physical_plan ( registry,  runtime,  extension_codec) 
29523066    }  else  { 
0 commit comments