@@ -157,12 +157,6 @@ mod tests {
157157 physical_plan:: { displayable, sorts:: sort:: SortExec , union:: UnionExec , ExecutionPlan } ,
158158 } ;
159159
160- type TestCase = (
161- & ' static str ,
162- Arc < dyn ExecutionPlan > ,
163- Vec < Arc < dyn ExecutionPlan > > ,
164- ) ;
165-
166160 fn schema_i32 ( name : & str ) -> Arc < Schema > {
167161 Arc :: new ( Schema :: new ( vec ! [ Field :: new( name, DataType :: Int32 , false ) ] ) )
168162 }
@@ -172,29 +166,51 @@ mod tests {
172166 }
173167
174168 #[ test]
175- fn distributed_codec_roundtrips ( ) -> datafusion:: common:: Result < ( ) > {
169+ fn roundtrip_single_flight ( ) -> datafusion:: common:: Result < ( ) > {
176170 let codec = DistributedCodec ;
177171 let registry = MemoryFunctionRegistry :: new ( ) ;
178172
179- let mut cases: Vec < TestCase > = Vec :: new ( ) ;
180-
181- // ArrowFlightReadExec
182173 let schema = schema_i32 ( "a" ) ;
183174 let part = Partitioning :: Hash ( vec ! [ Arc :: new( Column :: new( "a" , 0 ) ) ] , 4 ) ;
184175 let plan: Arc < dyn ExecutionPlan > = Arc :: new ( ArrowFlightReadExec :: new ( part, schema, 0 ) ) ;
185- cases. push ( ( "single_flight" , plan, vec ! [ ] ) ) ;
186176
187- // PartitionIsolatorExec -> ArrowFlightReadExec
177+ let mut buf = Vec :: new ( ) ;
178+ codec. try_encode ( plan. clone ( ) , & mut buf) ?;
179+
180+ let decoded = codec. try_decode ( & buf, & [ ] , & registry) ?;
181+ assert_eq ! ( repr( & plan) , repr( & decoded) ) ;
182+
183+ Ok ( ( ) )
184+ }
185+
186+ #[ test]
187+ fn roundtrip_isolator_flight ( ) -> datafusion:: common:: Result < ( ) > {
188+ let codec = DistributedCodec ;
189+ let registry = MemoryFunctionRegistry :: new ( ) ;
190+
188191 let schema = schema_i32 ( "b" ) ;
189192 let flight = Arc :: new ( ArrowFlightReadExec :: new (
190193 Partitioning :: UnknownPartitioning ( 1 ) ,
191194 schema,
192195 0 ,
193196 ) ) ;
197+
194198 let plan: Arc < dyn ExecutionPlan > = Arc :: new ( PartitionIsolatorExec :: new ( flight. clone ( ) , 3 ) ) ;
195- cases. push ( ( "isolator_flight" , plan, vec ! [ flight] ) ) ;
196199
197- // PartitionIsolatorExec -> UnionExec(ArrowFlightReadExec)
200+ let mut buf = Vec :: new ( ) ;
201+ codec. try_encode ( plan. clone ( ) , & mut buf) ?;
202+
203+ let decoded = codec. try_decode ( & buf, & [ flight] , & registry) ?;
204+ assert_eq ! ( repr( & plan) , repr( & decoded) ) ;
205+
206+ Ok ( ( ) )
207+ }
208+
209+ #[ test]
210+ fn roundtrip_isolator_union ( ) -> datafusion:: common:: Result < ( ) > {
211+ let codec = DistributedCodec ;
212+ let registry = MemoryFunctionRegistry :: new ( ) ;
213+
198214 let schema = schema_i32 ( "c" ) ;
199215 let left = Arc :: new ( ArrowFlightReadExec :: new (
200216 Partitioning :: RoundRobinBatch ( 2 ) ,
@@ -206,38 +222,45 @@ mod tests {
206222 schema. clone ( ) ,
207223 1 ,
208224 ) ) ;
225+
209226 let union = Arc :: new ( UnionExec :: new ( vec ! [ left. clone( ) , right. clone( ) ] ) ) ;
210227 let plan: Arc < dyn ExecutionPlan > = Arc :: new ( PartitionIsolatorExec :: new ( union. clone ( ) , 5 ) ) ;
211- cases. push ( ( "isolator_union" , plan, vec ! [ union ] ) ) ;
212228
213- // PartitionIsolatorExec -> SortExec -> ArrowFlightReadExec
229+ let mut buf = Vec :: new ( ) ;
230+ codec. try_encode ( plan. clone ( ) , & mut buf) ?;
231+
232+ let decoded = codec. try_decode ( & buf, & [ union] , & registry) ?;
233+ assert_eq ! ( repr( & plan) , repr( & decoded) ) ;
234+
235+ Ok ( ( ) )
236+ }
237+
238+ #[ test]
239+ fn roundtrip_isolator_sort_flight ( ) -> datafusion:: common:: Result < ( ) > {
240+ let codec = DistributedCodec ;
241+ let registry = MemoryFunctionRegistry :: new ( ) ;
242+
214243 let schema = schema_i32 ( "d" ) ;
215244 let flight = Arc :: new ( ArrowFlightReadExec :: new (
216245 Partitioning :: UnknownPartitioning ( 1 ) ,
217246 schema. clone ( ) ,
218247 0 ,
219248 ) ) ;
249+
220250 let sort_expr = PhysicalSortExpr {
221251 expr : col ( "d" , & schema) ?,
222252 options : Default :: default ( ) ,
223253 } ;
224254 let sort = Arc :: new ( SortExec :: new ( vec ! [ sort_expr] . into ( ) , flight. clone ( ) ) ) ;
255+
225256 let plan: Arc < dyn ExecutionPlan > = Arc :: new ( PartitionIsolatorExec :: new ( sort. clone ( ) , 2 ) ) ;
226- cases. push ( ( "isolator_sort_flight" , plan, vec ! [ sort] ) ) ;
227257
228- // Test each case
229- for ( name, original, inputs) in cases {
230- let mut buf = Vec :: new ( ) ;
231- codec. try_encode ( original. clone ( ) , & mut buf) ?;
258+ let mut buf = Vec :: new ( ) ;
259+ codec. try_encode ( plan. clone ( ) , & mut buf) ?;
232260
233- let decoded = codec. try_decode ( & buf, & inputs, & registry) ?;
261+ let decoded = codec. try_decode ( & buf, & [ sort] , & registry) ?;
262+ assert_eq ! ( repr( & plan) , repr( & decoded) ) ;
234263
235- assert_eq ! (
236- repr( & original) ,
237- repr( & decoded) ,
238- "mismatch after round-trip for {name}"
239- ) ;
240- }
241264 Ok ( ( ) )
242265 }
243266}
0 commit comments