Skip to content

Commit bfb00ad

Browse files
committed
Separate tests
1 parent 73af635 commit bfb00ad

File tree

1 file changed

+51
-28
lines changed

1 file changed

+51
-28
lines changed

src/plan/codec.rs

Lines changed: 51 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -157,12 +157,6 @@ mod tests {
157157
physical_plan::{displayable, sorts::sort::SortExec, union::UnionExec, ExecutionPlan},
158158
};
159159

160-
type TestCase = (
161-
&'static str,
162-
Arc<dyn ExecutionPlan>,
163-
Vec<Arc<dyn ExecutionPlan>>,
164-
);
165-
166160
fn schema_i32(name: &str) -> Arc<Schema> {
167161
Arc::new(Schema::new(vec![Field::new(name, DataType::Int32, false)]))
168162
}
@@ -172,29 +166,51 @@ mod tests {
172166
}
173167

174168
#[test]
175-
fn distributed_codec_roundtrips() -> datafusion::common::Result<()> {
169+
fn test_roundtrip_single_flight() -> datafusion::common::Result<()> {
176170
let codec = DistributedCodec;
177171
let registry = MemoryFunctionRegistry::new();
178172

179-
let mut cases: Vec<TestCase> = Vec::new();
180-
181-
// ArrowFlightReadExec
182173
let schema = schema_i32("a");
183174
let part = Partitioning::Hash(vec![Arc::new(Column::new("a", 0))], 4);
184175
let plan: Arc<dyn ExecutionPlan> = Arc::new(ArrowFlightReadExec::new(part, schema, 0));
185-
cases.push(("single_flight", plan, vec![]));
186176

187-
// PartitionIsolatorExec -> ArrowFlightReadExec
177+
let mut buf = Vec::new();
178+
codec.try_encode(plan.clone(), &mut buf)?;
179+
180+
let decoded = codec.try_decode(&buf, &[], &registry)?;
181+
assert_eq!(repr(&plan), repr(&decoded));
182+
183+
Ok(())
184+
}
185+
186+
#[test]
187+
fn test_roundtrip_isolator_flight() -> datafusion::common::Result<()> {
188+
let codec = DistributedCodec;
189+
let registry = MemoryFunctionRegistry::new();
190+
188191
let schema = schema_i32("b");
189192
let flight = Arc::new(ArrowFlightReadExec::new(
190193
Partitioning::UnknownPartitioning(1),
191194
schema,
192195
0,
193196
));
197+
194198
let plan: Arc<dyn ExecutionPlan> = Arc::new(PartitionIsolatorExec::new(flight.clone(), 3));
195-
cases.push(("isolator_flight", plan, vec![flight]));
196199

197-
// PartitionIsolatorExec -> UnionExec(ArrowFlightReadExec)
200+
let mut buf = Vec::new();
201+
codec.try_encode(plan.clone(), &mut buf)?;
202+
203+
let decoded = codec.try_decode(&buf, &[flight], &registry)?;
204+
assert_eq!(repr(&plan), repr(&decoded));
205+
206+
Ok(())
207+
}
208+
209+
#[test]
210+
fn test_roundtrip_isolator_union() -> datafusion::common::Result<()> {
211+
let codec = DistributedCodec;
212+
let registry = MemoryFunctionRegistry::new();
213+
198214
let schema = schema_i32("c");
199215
let left = Arc::new(ArrowFlightReadExec::new(
200216
Partitioning::RoundRobinBatch(2),
@@ -206,38 +222,45 @@ mod tests {
206222
schema.clone(),
207223
1,
208224
));
225+
209226
let union = Arc::new(UnionExec::new(vec![left.clone(), right.clone()]));
210227
let plan: Arc<dyn ExecutionPlan> = Arc::new(PartitionIsolatorExec::new(union.clone(), 5));
211-
cases.push(("isolator_union", plan, vec![union]));
212228

213-
// PartitionIsolatorExec -> SortExec -> ArrowFlightReadExec
229+
let mut buf = Vec::new();
230+
codec.try_encode(plan.clone(), &mut buf)?;
231+
232+
let decoded = codec.try_decode(&buf, &[union], &registry)?;
233+
assert_eq!(repr(&plan), repr(&decoded));
234+
235+
Ok(())
236+
}
237+
238+
#[test]
239+
fn test_roundtrip_isolator_sort_flight() -> datafusion::common::Result<()> {
240+
let codec = DistributedCodec;
241+
let registry = MemoryFunctionRegistry::new();
242+
214243
let schema = schema_i32("d");
215244
let flight = Arc::new(ArrowFlightReadExec::new(
216245
Partitioning::UnknownPartitioning(1),
217246
schema.clone(),
218247
0,
219248
));
249+
220250
let sort_expr = PhysicalSortExpr {
221251
expr: col("d", &schema)?,
222252
options: Default::default(),
223253
};
224254
let sort = Arc::new(SortExec::new(vec![sort_expr].into(), flight.clone()));
255+
225256
let plan: Arc<dyn ExecutionPlan> = Arc::new(PartitionIsolatorExec::new(sort.clone(), 2));
226-
cases.push(("isolator_sort_flight", plan, vec![sort]));
227257

228-
// Test each case
229-
for (name, original, inputs) in cases {
230-
let mut buf = Vec::new();
231-
codec.try_encode(original.clone(), &mut buf)?;
258+
let mut buf = Vec::new();
259+
codec.try_encode(plan.clone(), &mut buf)?;
232260

233-
let decoded = codec.try_decode(&buf, &inputs, &registry)?;
261+
let decoded = codec.try_decode(&buf, &[sort], &registry)?;
262+
assert_eq!(repr(&plan), repr(&decoded));
234263

235-
assert_eq!(
236-
repr(&original),
237-
repr(&decoded),
238-
"mismatch after round-trip for {name}"
239-
);
240-
}
241264
Ok(())
242265
}
243266
}

0 commit comments

Comments
 (0)