@@ -31,7 +31,21 @@ use crate::{
3131} ;
3232
3333#[ derive( Debug ) ]
34- pub struct DDCodec { }
34+ pub struct DDCodec {
35+ sub_codec : Arc < dyn PhysicalExtensionCodec > ,
36+ }
37+
38+ impl DDCodec {
39+ pub fn new ( sub_codec : Arc < dyn PhysicalExtensionCodec > ) -> Self {
40+ Self { sub_codec }
41+ }
42+ }
43+
44+ impl Default for DDCodec {
45+ fn default ( ) -> Self {
46+ Self :: new ( Arc :: new ( DefaultPhysicalExtensionCodec { } ) )
47+ }
48+ }
3549
3650impl PhysicalExtensionCodec for DDCodec {
3751 fn try_decode (
@@ -127,6 +141,13 @@ impl PhysicalExtensionCodec for DDCodec {
127141 Ok ( Arc :: new ( RecordBatchExec :: new ( batch) ) )
128142 }
129143 }
144+ } else if let Ok ( ext) = self . sub_codec . try_decode ( buf, inputs, registry) {
145+ // If the node is not a DDExecNode, we delegate to the sub codec
146+ trace ! (
147+ "Delegated decoding to sub codec for node: {}" ,
148+ displayable( ext. as_ref( ) ) . one_line( )
149+ ) ;
150+ Ok ( ext)
130151 } else {
131152 internal_err ! ( "cannot decode proto extension in distributed datafusion codec" )
132153 }
@@ -151,52 +172,58 @@ impl PhysicalExtensionCodec for DDCodec {
151172 stage_id : reader. stage_id ,
152173 } ;
153174
154- Payload :: StageReaderExec ( pb)
175+ Some ( Payload :: StageReaderExec ( pb) )
155176 } else if let Some ( pi) = node. as_any ( ) . downcast_ref :: < PartitionIsolatorExec > ( ) {
156177 let pb = PartitionIsolatorExecNode {
157178 partition_count : pi. partition_count as u64 ,
158179 } ;
159180
160- Payload :: IsolatorExec ( pb)
181+ Some ( Payload :: IsolatorExec ( pb) )
161182 } else if let Some ( max) = node. as_any ( ) . downcast_ref :: < MaxRowsExec > ( ) {
162183 let pb = MaxRowsExecNode {
163184 max_rows : max. max_rows as u64 ,
164185 } ;
165- Payload :: MaxRowsExec ( pb)
186+ Some ( Payload :: MaxRowsExec ( pb) )
166187 } else if let Some ( exec) = node. as_any ( ) . downcast_ref :: < DistributedAnalyzeExec > ( ) {
167188 let pb = DistributedAnalyzeExecNode {
168189 verbose : exec. verbose ,
169190 show_statistics : exec. show_statistics ,
170191 } ;
171- Payload :: DistributedAnalyzeExec ( pb)
192+ Some ( Payload :: DistributedAnalyzeExec ( pb) )
172193 } else if let Some ( exec) = node. as_any ( ) . downcast_ref :: < DistributedAnalyzeRootExec > ( ) {
173194 let pb = DistributedAnalyzeRootExecNode {
174195 verbose : exec. verbose ,
175196 show_statistics : exec. show_statistics ,
176197 } ;
177- Payload :: DistributedAnalyzeRootExec ( pb)
198+ Some ( Payload :: DistributedAnalyzeRootExec ( pb) )
178199 } else if let Some ( exec) = node. as_any ( ) . downcast_ref :: < RecordBatchExec > ( ) {
179200 let pb = RecordBatchExecNode {
180201 batch : batch_to_ipc ( & exec. batch ) . map_err ( |e| {
181202 internal_datafusion_err ! ( "Failed to encode RecordBatch: {:#?}" , e)
182203 } ) ?,
183204 } ;
184- Payload :: RecordBatchExec ( pb)
205+ Some ( Payload :: RecordBatchExec ( pb) )
185206 } else {
186- return internal_err ! ( "Not supported node to encode to proto" ) ;
207+ trace ! (
208+ "Node {} is not a custom DDExecNode, delegating to sub codec" ,
209+ displayable( node. as_ref( ) ) . one_line( )
210+ ) ;
211+ None
187212 } ;
188213
189- let pb = DdExecNode {
190- payload : Some ( payload) ,
191- } ;
192- pb. encode ( buf)
193- . map_err ( |e| internal_datafusion_err ! ( "Failed to encode protobuf: {}" , e) ) ?;
194-
195- trace ! (
196- "DONE encoding node: {}" ,
197- displayable( node. as_ref( ) ) . one_line( )
198- ) ;
199- Ok ( ( ) )
214+ match payload {
215+ Some ( payload) => {
216+ let pb = DdExecNode {
217+ payload : Some ( payload) ,
218+ } ;
219+ pb. encode ( buf)
220+ . map_err ( |e| internal_datafusion_err ! ( "Failed to encode protobuf: {:#?}" , e) )
221+ }
222+ None => {
223+ // If the node is not one of our custom nodes, we delegate to the sub codec
224+ self . sub_codec . try_encode ( node, buf)
225+ }
226+ }
200227 }
201228}
202229
@@ -225,7 +252,7 @@ mod test {
225252
226253 fn verify_round_trip ( exec : Arc < dyn ExecutionPlan > ) {
227254 let ctx = SessionContext :: new ( ) ;
228- let codec = DDCodec { } ;
255+ let codec = DDCodec :: new ( Arc :: new ( DefaultPhysicalExtensionCodec { } ) ) ;
229256
230257 // serialize execution plan to proto
231258 let proto: protobuf:: PhysicalPlanNode =
@@ -255,7 +282,7 @@ mod test {
255282 let schema = create_test_schema ( ) ;
256283 let part = Partitioning :: UnknownPartitioning ( 2 ) ;
257284 let exec = Arc :: new ( DDStageReaderExec :: try_new ( part, schema, 1 ) . unwrap ( ) ) ;
258- let codec = DDCodec { } ;
285+ let codec = DDCodec :: new ( Arc :: new ( DefaultPhysicalExtensionCodec { } ) ) ;
259286 let mut buf = vec ! [ ] ;
260287 codec. try_encode ( exec. clone ( ) , & mut buf) . unwrap ( ) ;
261288 let ctx = SessionContext :: new ( ) ;
0 commit comments