Skip to content

Commit a1bad86

Browse files
committed
Add tests for NetworkCoalesceExec.
1 parent 1a3fe1c commit a1bad86

File tree

1 file changed

+54
-0
lines changed

1 file changed

+54
-0
lines changed

src/protobuf/distributed_codec.rs

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -534,4 +534,58 @@ mod tests {
534534

535535
Ok(())
536536
}
537+
538+
#[test]
539+
fn test_roundtrip_isolator_flight_coalesce() -> datafusion::common::Result<()> {
540+
let codec = DistributedCodec;
541+
let registry = MemoryFunctionRegistry::new();
542+
543+
let schema = schema_i32("e");
544+
let flight = Arc::new(new_network_coalesce_tasks_exec(
545+
Partitioning::UnknownPartitioning(1),
546+
schema,
547+
dummy_stage(),
548+
));
549+
550+
let plan: Arc<dyn ExecutionPlan> =
551+
Arc::new(PartitionIsolatorExec::new_ready(flight.clone(), 1)?);
552+
553+
let mut buf = Vec::new();
554+
codec.try_encode(plan.clone(), &mut buf)?;
555+
556+
let decoded = codec.try_decode(&buf, &[flight], &registry)?;
557+
assert_eq!(repr(&plan), repr(&decoded));
558+
559+
Ok(())
560+
}
561+
562+
#[test]
563+
fn test_roundtrip_isolator_union_coalesce() -> datafusion::common::Result<()> {
564+
let codec = DistributedCodec;
565+
let registry = MemoryFunctionRegistry::new();
566+
567+
let schema = schema_i32("c");
568+
let left = Arc::new(new_network_coalesce_tasks_exec(
569+
Partitioning::RoundRobinBatch(2),
570+
schema.clone(),
571+
dummy_stage(),
572+
));
573+
let right = Arc::new(new_network_coalesce_tasks_exec(
574+
Partitioning::RoundRobinBatch(2),
575+
schema.clone(),
576+
dummy_stage(),
577+
));
578+
579+
let union = Arc::new(UnionExec::new(vec![left.clone(), right.clone()]));
580+
let plan: Arc<dyn ExecutionPlan> =
581+
Arc::new(PartitionIsolatorExec::new_ready(union.clone(), 1)?);
582+
583+
let mut buf = Vec::new();
584+
codec.try_encode(plan.clone(), &mut buf)?;
585+
586+
let decoded = codec.try_decode(&buf, &[union], &registry)?;
587+
assert_eq!(repr(&plan), repr(&decoded));
588+
589+
Ok(())
590+
}
537591
}

0 commit comments

Comments
 (0)